In [1]:
import ray
from ray.util.dask import ray_dask_get, enable_dask_on_ray, disable_dask_on_ray
import dask.array as da
import dask.dataframe as dd
import numpy as np
import os
import pandas as pd

In [2]:
# Start Ray.

service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
ray.init(f"ray://{service_host}:{service_port}")

d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))

# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)

# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()

df = dd.from_pandas(
    pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=["age", "grade"]),
    npartitions=2,
)
df.groupby(["age"]).mean().compute()

disable_dask_on_ray()

# The Dask config helper can be used as a context manager, limiting the scope
# of the Dask-on-Ray scheduler to the context.
with enable_dask_on_ray():
    print(d_arr.mean().compute())

ray.shutdown()

498.3975830078125


In [3]:
from skimage.io import imread
from skimage.io.collection import alphanumeric_key
from dask import delayed
import dask.array as da
from glob import glob

filenames = sorted(glob("/domino/datasets/local/nuclei_imgs/*.tif"), key=alphanumeric_key)
print(len(filenames))
# read the first file to get the shape and dtype
# ASSUMES THAT ALL FILES SHARE THE SAME SHAPE/TYPE
sample = imread(filenames[0])

lazy_imread = delayed(imread)  # lazy reader
lazy_arrays = [lazy_imread(fn) for fn in filenames]
dask_arrays = [
    da.from_delayed(delayed_reader, shape=sample.shape, dtype=sample.dtype)
    for delayed_reader in lazy_arrays
]
# Stack into one large dask.array
stack = da.stack(dask_arrays, axis=0)
stack.shape  # (nfiles, nz, ny, nx)

# in jupyter notebook the repr of a dask stack provides a useful visual:
stack

200


Unnamed: 0,Array,Chunk
Bytes,138.06 MiB,706.88 kiB
Shape,"(200, 520, 696)","(1, 520, 696)"
Dask graph,200 chunks in 401 graph layers,200 chunks in 401 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 138.06 MiB 706.88 kiB Shape (200, 520, 696) (1, 520, 696) Dask graph 200 chunks in 401 graph layers Data type uint16 numpy.ndarray",696  520  200,

Unnamed: 0,Array,Chunk
Bytes,138.06 MiB,706.88 kiB
Shape,"(200, 520, 696)","(1, 520, 696)"
Dask graph,200 chunks in 401 graph layers,200 chunks in 401 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray


In [4]:
! pip install tifffile
# ! pip install napari[all] PyQt5



In [34]:
import dask
import numpy as np
import matplotlib.pyplot as plt
import os
import ray


from dask_image.imread import imread
from dask_image import ndfilters, ndmorph, ndmeasure
from ray.util.dask import ray_dask_get, enable_dask_on_ray, disable_dask_on_ray


service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
ray.init(f"ray://{service_host}:{service_port}")

enable_dask_on_ray()

images = imread('/domino/datasets/local/nuclei_imgs/*.tif')
smoothed = ndfilters.gaussian_filter(images, sigma=[0, 1, 1])
thresh = ndfilters.threshold_local(smoothed, images.chunksize)
threshold_images = smoothed > thresh
structuring_element = np.array([[[0, 0, 0], [0, 0, 0], [0, 0, 0]], [[0, 1, 0], [1, 1, 1], [0, 1, 0]], [[0, 0, 0], [0, 0, 0], [0, 0, 0]]])
binary_images = ndmorph.binary_closing(threshold_images, structure=structuring_element)
label_images, _ = ndmeasure.label(binary_images[:3], structuring_element)
label_images

Unnamed: 0,Array,Chunk
Bytes,4.14 MiB,1.38 MiB
Shape,"(3, 520, 696)","(1, 520, 696)"
Dask graph,3 chunks in 79 graph layers,3 chunks in 79 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray
"Array Chunk Bytes 4.14 MiB 1.38 MiB Shape (3, 520, 696) (1, 520, 696) Dask graph 3 chunks in 79 graph layers Data type int32 numpy.ndarray",696  520  3,

Unnamed: 0,Array,Chunk
Bytes,4.14 MiB,1.38 MiB
Shape,"(3, 520, 696)","(1, 520, 696)"
Dask graph,3 chunks in 79 graph layers,3 chunks in 79 graph layers
Data type,int32 numpy.ndarray,int32 numpy.ndarray


In [None]:
fig, (ax0, ax1, ax2, ax3, ax4) = plt.subplots(nrows=1, ncols=5, figsize=(25, 15))
ax0.imshow(images[0], cmap='gray')           
ax1.imshow(smoothed[0], cmap='gray') 
ax2.imshow(threshold_images[0], cmap='gray') 
ax3.imshow(binary_images[0], cmap='gray') 
ax4.imshow(label_images[0], cmap='gray') 

ax0.title.set_text('Original')
ax1.title.set_text('Guassian Filter')
ax2.title.set_text('Local Thresholding')
ax3.title.set_text('Binary Closing')
ax4.title.set_text('Detections')

In [None]:
# print("Number of nuclei:", _.compute())

In [38]:
def n_blobs(image):
    # image2d = image[0]
    structuring_element = np.array([ [0, 1, 0], [1, 1, 1], [0, 1, 0]])
    label_image, num_features = ndmeasure.label(image, structuring_element)
    return num_features.compute()

In [42]:
n_images = 10
blob_count = []
for image in binary_images[:n_images]:
    result = dask.delayed(n_blobs)(image)
    blob_count.append(result)

futures = dask.persist(*blob_count) 
results = dask.compute(*futures)

disable_dask_on_ray()

ray.shutdown()

In [43]:
results[:5]

(89, 62, 117, 462, 105)