## Generate a collection of images, preprocess them, and save in a chunked store

Create the ingredients for an imaging dataset:

- a function that produces an image (here, a numpy array)
- a function that generates a file on disk containing the image data

(This notebook was inspired by this [video](https://www.youtube.com/watch?v=wANQkgDuTAk) produced by core Dask maintainer Matt Rocklin)

In [3]:
import numpy as np
import os

def get_tmpdir(path):
    import atexit
    import shutil
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path, exist_ok=True)
    atexit.register(shutil.rmtree, path)
    return path

def get_img(z):
    import time
    y,x = np.meshgrid(np.arange(-256,256), np.arange(-256,256))
    time.sleep(.025)
    return (y ^ x ^ z).astype('uint8')

def save_img(img, fname):
    from tifffile import imsave
    try:
        imsave(fname, img)
        return 0
    except:
        return 1

### Save the images serially 

In [4]:
%%time
results = []
tmpdir = get_tmpdir('data1')

for z in range(512):
    fname = os.path.join(tmpdir, str(z).zfill(4) + '.tif')
    img = get_img(z)
    results.append(save_img(img, fname))

print(np.all(np.array(results) == 0))

True
CPU times: user 972 ms, sys: 807 ms, total: 1.78 s
Wall time: 16.5 s


### Save the images in parallel
first we set up a "cluster"

In [24]:
from distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:56707  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 64.00 GiB


then we give the cluster some work to do

In [6]:
%%time
futures = []
tmpdir2 = get_tmpdir('data2')

for z in range(512):
    fname = os.path.join(tmpdir2, str(z).zfill(4) + '.tif')
    img = client.submit(get_img, z)
    futures.append(client.submit(save_img, img, fname))
    
results = client.gather(futures)

print(np.all(np.array(results) == 0))

True
CPU times: user 4.17 s, sys: 160 ms, total: 4.33 s
Wall time: 4.41 s


### Ingest image data 

In [13]:
%%time
import dask.array as da
from dask_image.imread import imread as dimread

darr = dimread(tmpdir2 + '/*.tif')
darr

CPU times: user 172 ms, sys: 7.26 ms, total: 179 ms
Wall time: 174 ms


Unnamed: 0,Array,Chunk
Bytes,128.00 MiB,256.00 kiB
Shape,"(512, 512, 512)","(1, 512, 512)"
Count,1024 Tasks,512 Chunks
Type,uint8,numpy.ndarray
"Array Chunk Bytes 128.00 MiB 256.00 kiB Shape (512, 512, 512) (1, 512, 512) Count 1024 Tasks 512 Chunks Type uint8 numpy.ndarray",512  512  512,

Unnamed: 0,Array,Chunk
Bytes,128.00 MiB,256.00 kiB
Shape,"(512, 512, 512)","(1, 512, 512)"
Count,1024 Tasks,512 Chunks
Type,uint8,numpy.ndarray


### Generate a multiresolution pyramid

In [15]:
iso_chunks = (64,) * darr.ndim 
reducer = np.mean
pyramid = {}
pyramid['s0'] = darr
pyramid['s1'] = da.coarsen(reducer, darr, {k: 2 for k in range(darr.ndim)}).astype(darr.dtype)
pyramid['s2'] = da.coarsen(reducer, darr, {k: 4 for k in range(darr.ndim)}).astype(darr.dtype)
pyramid = {k: v.rechunk(iso_chunks) for k,v in pyramid.items()}

In [16]:
pyramid

{'s0': dask.array<rechunk-merge, shape=(512, 512, 512), dtype=uint8, chunksize=(64, 64, 64), chunktype=numpy.ndarray>,
 's1': dask.array<rechunk-merge, shape=(256, 256, 256), dtype=uint8, chunksize=(64, 64, 64), chunktype=numpy.ndarray>,
 's2': dask.array<rechunk-merge, shape=(128, 128, 128), dtype=uint8, chunksize=(64, 64, 64), chunktype=numpy.ndarray>}

### Save multiresolution data to disk in a chunked format

In [17]:
import zarr
from numcodecs import GZip

n5_path = get_tmpdir( 'test.n5')
save_chunks = (64,) * darr.ndim

neuroglancer_attributes = {'axes' : ['z','y','x'], 'scales': [[1,1,1], [2,2,2], [4,4,4]], 'unit': 'nm'}

group = zarr.open(zarr.N5Store(n5_path), mode='w')
group.attrs.update(neuroglancer_attributes)

arrays = []
for k,v in pyramid.items():
    arrays.append(group.zeros(name=k, shape=v.shape, dtype=v.dtype, chunks=save_chunks, compressor=GZip(-1)))

In [23]:
!cat test.n5/attributes.json

{
    "axes": [
        "z",
        "y",
        "x"
    ],
    "n5": "2.0.0",
    "scales": [
        [
            1,
            1,
            1
        ],
        [
            2,
            2,
            2
        ],
        [
            4,
            4,
            4
        ]
    ],
    "unit": "nm"
}

In [25]:
da.store(pyramid.values(), arrays, lock=None)



In [26]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
