In [None]:
from utils import load_tifstack, free_memory, chunk_generator
from tools import edge_detection
import cupy as cp
import numpy as np
from tqdm import tqdm
from multiprocessing import Manager, Process, Queue, Value, Lock
import blosc2
import os
from time import sleep

In [None]:
num_gpus = 8
# Create a queue for each GPU using Queue for multiprocessing
gpu_queues = [Queue() for _ in range(num_gpus)]
manager = Manager()
return_dict = manager.dict()
lock = Lock()
total_chunks = Value('i', 0)
processed_chunks = Value('i', 0)

In [None]:
# Producer function to assign chunks to GPU queues dynamically
def producer(scroll, chunk_size, gpu_queues, total_chunks):
    for i, (z, y, x) in tqdm(enumerate(chunk_generator(scroll.shape, chunk_size))):
        gpu_id = i % num_gpus
        chunk_id = (z, y, x)
        chunk = scroll[z:z+chunk_size[0], y:y+chunk_size[1], x:x+chunk_size[2]].astype(np.float32)
        gpu_queues[gpu_id].put((chunk_id, chunk))
        total_chunks.value += 1
        delta = total_chunks.value - processed_chunks.value
        sleep(20*(delta//8))
    # Signal the end of the data with a special value (None)
    for gpu_queue in gpu_queues:
        gpu_queue.put(None)

In [None]:
# Consumer function to process chunks on GPU
def process_chunk_on_gpu(gpu_id, task_queue, return_dict, processed_chunk, lock):
    cp.cuda.Device(gpu_id).use()
    while True:
        item = task_queue.get()
        if item is None:
            break
        chunk_id, chunk = item
        chunk = cp.array(chunk)
        chunk /= 65535

        chunk = edge_detection(chunk, cp.float32)
        
        with lock:
            return_dict[chunk_id] = chunk.get()
            processed_chunk.value += 1

        del chunk
        free_memory()
        

In [None]:
def writer_process(output_folder, chunk_size, return_dict, total_chunks, processed_chunks):
    clevel = 1
    nthreads = 200
    cparams = {
            "codec": blosc2.Codec.ZSTD,
            "clevel": clevel,
            "filters": [blosc2.Filter.BITSHUFFLE, blosc2.Filter.BYTEDELTA],
            "filters_meta": [0, 0],
            "nthreads": nthreads,
    }
    
    while True:
        if processed_chunks.value == total_chunks.value and len(return_dict) == 0:
            break
        for chunk_id, chunk in list(return_dict.items()):
            z, y, x = chunk_id
            filepath = os.path.join(output_folder, f"chunk_z_y_x_{z}_{y}_{x}.b2nd")
            try:
                array = blosc2.empty(chunk.shape, dtype=np.uint8, chunks=(chunk_size[0],chunk_size[1],chunk_size[2]), blocks=(100,100,100), urlpath=filepath, cparams=cparams)
                array[:,:,:] = chunk
            except:
                continue
            del return_dict[chunk_id]

In [None]:
scroll = load_tifstack("../scroll1-denoised/")

In [None]:
#shape = np.array(scroll2.shape)
shape = scroll.shape
chunk_size = [400, 400, 400]

In [None]:
# Create and start a producer process
producer_process = Process(target=producer, args=(scroll, chunk_size, gpu_queues, total_chunks))
producer_process.start()

In [None]:
# Create and start a process for each GPU
processes = []
for gpu_id in range(num_gpus):
    p = Process(target=process_chunk_on_gpu, args=(gpu_id, gpu_queues[gpu_id], return_dict, processed_chunks, lock))
    processes.append(p)
    p.start()

In [None]:
# Create and start the writer process
writer = Process(target=writer_process, args=("./scroll1-denoised/edges", chunk_size, return_dict, total_chunks, processed_chunks))
writer.start()