# Large data processing

The biggest limitation to GPU-processing, beside its learning curve, is the memory space. It does not go higher than `32Gb` for commercial GPU. This can easily limits the maximum image size to process to `8Gb`, even less when we want to apply more complex algorithm requiring temporary steps.

This issue already exist outside of GPU-acceleartion and the solution is to tile our image and process each tile separatly to overcome the memory bottleneck. We can rely on the `dask` library to distribute our data accross our list of device, in the same way we would do on an HPC.

In [None]:
import dask.array as da
import dask.distributed as dd

import timeit
import numpy as np
import matplotlib.pyplot as plt

import pyclesperanto as cle
try:
    import cupy as xp
except:
    import numpy as xp
    Warning("Cupy not found, using numpy instead.")
try:
    import cupyx.scipy.ndimage as xdi
except:
    import scipy.ndimage as xdi
    Warning("Cupy not found, using scipy instead.")

import zarr
from skimage.io import imread

## Define dask client

For GPU usage, we will want to have one dask worker per device to use. We can have more than one thread per worker but, depending on the memory usage of the pipeline to run this can jam the device.

In [None]:
nb_devices = len(cle.list_available_devices(device_type="gpu"))

cluster = dd.LocalCluster(n_workers=nb_devices, threads_per_worker=1, processes=False)
client = dd.Client(cluster)
print(client)

We can associate each worker of our client to the index of a device of clesperanto using a `dict`. This will allows use to switch devices based on the worker.

In [None]:
workers = client.scheduler_info()['workers']
worker_dev_map = {worker: idx for idx, worker in enumerate(workers)}
for worker in workers:
    print(worker_dev_map[worker], "-", worker, ":" ,cle.select_device(worker_dev_map[worker], device_type="gpu").name)

## Load zarr data

We use a dataset that is provided by Theresa Suckert, OncoRay, University Hospital Carl Gustav Carus, TU Dresden. The dataset is licensed License: CC-BY 4.0. We are using a cropped version here that was resaved a 8-bit image to be able to provide it with the notebook. You find the full size 16-bit image in CZI file format online.

In [None]:
darray = da.from_zarr("./data/P1_H_C3H_M004_17-cropped.zarr", chunks=(2, 1000, 1000))  # Adjust the chunk size as needed
darray


In [None]:
fig, axs = plt.subplots(1, 2, figsize=(30, 10))
axs[0].imshow(darray[0], cmap='gray')
axs[1].imshow(darray[1], cmap='gray')
for ax in axs:
    ax.axis('off')
plt.tight_layout()
plt.show()

### clesperanto mini-Pipeline

Now we can build a small pipeline that will be run for each block of the dask and compute the result.

In [None]:
def cle_operation(image, block_info=None):
    # fetch the device associate to the worker
    worker = dd.get_worker()
    gpu_index = worker_dev_map[worker.address]
    device = cle.select_device(gpu_index, "gpu")
    chunk_coord = block_info[None]['chunk-location'] if block_info is not None else None

    print(f"Processing chunk {chunk_coord} with {device.name} ({gpu_index})")

    # TODO: add processing here

    # return the results as numpy array
    return np.asarray(image)

In [None]:
processed_image = da.map_overlap(cle_operation, darray, dtype=darray.dtype)
processed_image

In [None]:
start = timeit.default_timer()
result = processed_image.compute()
end = timeit.default_timer()
print(f"Time to compute: {end - start} seconds")

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(30, 10))
axs[0].imshow(result[0], cmap='viridis')
axs[1].imshow(result[1], cmap='viridis')
plt.show()

## Exercise 1: Apply some processing on the image

Process the image to segment the nuclei and count them per image block

## Additionnal Use-Case: Data projection & overlap

An other example for large data processing, in the case of data projection. Here we have a large 3D tissue from a slide scanner (~30Gb) that need to be projected before thurther analysis. Here we will focus on a crop 

This use-case was done by Emily Haimerl ([Gaia Novarino Lab](https://ist.ac.at/en/research/novarino-group/)), Maximilian Schuster, and Marco Dalla Vecchia ([Imaging & Optics Facility](https://iof.pages.ist.ac.at/)) from ISTA (Vienna)

In [None]:
darray = da.from_zarr("./data/WT_ScanRegion4-cropped.zarr", chunks=(28, 2, 2000, 2000))  # Adjust the chunk size as needed
darray

The objectives is to project the volume with the optic to keep the nuclei in focus. For that we use maximum local variance projection to identify the best focus plan and a top hat filter to remove the background. The objectives beeing to segment the nuclei and count the number of nuclei positive in the tissue.

In [None]:
def cle_operation(image, block_info=None):
    # fetch the device associate to the worker
    worker = dd.get_worker()
    gpu_index = worker_dev_map[worker.address]
    device = cle.select_device(gpu_index, "gpu")
    chunk_coord = block_info[None]['chunk-location'] if block_info is not None else None

    # pre-processing for projecting best focus intensity in each pixel
    img_dev = cle.push(image, device=device)
    proj = cle.extended_depth_of_focus_variance_projection(img_dev, sigma=100)
    bged = cle.top_hat(proj, radius_x=30.0, radius_y=30.0, connectivity="box")

    # return the results as numpy array
    return np.asarray(bged)

processed_image = da.map_overlap(cle_operation, darray[:,0,...], dtype=darray.dtype, drop_axis=[0], depth=[0, 40, 40])
processed_image

In [None]:
start = timeit.default_timer()
result = processed_image.compute()
end = timeit.default_timer()
print(f"Time to compute: {end - start} seconds")

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(30, 10))
axs[0].imshow(result, cmap='viridis')
# draw a box around the region of interest
axs[0].add_patch(plt.Rectangle((5000, 5000), 1000, 1000, edgecolor='red', facecolor='none', lw=2))
axs[1].imshow(result[5000:6000, 5000:6000], cmap='viridis')
for ax in axs:
    ax.axis('off')
plt.tight_layout()
plt.show()