In [1]:
import os
import sys
import napari
import numpy as np
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
from tifffile import imread as tiff_read
from tifffile import imwrite as tiff_write
from dask_image.imread import imread as dask_read
import dask.array as da
from dask import distributed
import time
from yaspin import yaspin
from dask.distributed import Client
# import dask delayed
from dask.delayed import delayed

In [2]:
file_path = '/Volumes/bigData/kkpo_test/mp_test'
dask_max_path = '/Volumes/bigData/kkpo_test/mp_test/dask'
dask_vol_path = '/Volumes/bigData/kkpo_test/mp_test/dask_vol.zarr'
trad_max_path = '/Volumes/bigData/kkpo_test/mp_test/trad'
trad_vol_path = '/Volumes/bigData/kkpo_test/mp_test/trad_vol'

for path in [dask_max_path, dask_vol_path, trad_max_path, trad_vol_path]:
    if not os.path.exists(path):
        os.makedirs(path)

In [3]:
files = [file for file in os.listdir(file_path) if file.endswith('.tif') and not file.startswith('.')]
files.sort()
files

['S000_t000000_V000_R0000_X000_Y000_C01_I0_D0_P00192.tif',
 'S000_t000001_V000_R0000_X000_Y000_C01_I0_D0_P00192.tif',
 'S000_t000002_V000_R0000_X000_Y000_C02_I0_D0_P00192.tif',
 'S000_t000003_V000_R0000_X000_Y000_C01_I0_D0_P00192.tif',
 'S000_t000004_V000_R0000_X000_Y000_C01_I0_D0_P00192.tif']

#### Current Kkpo version

In [8]:
%%time

ims = dask_read(file_path + '/' + '*.tif')
print(f'gathered {len(ims)} images and assembled them into a dask array of shape {ims.shape} with chunk size {ims.chunks}')

# save volume as zarr
with yaspin() as sp:
    sp.text = f'Converting full volume to zarr, please be patient...'
    start = time.time()
    da.to_zarr(ims[:,:,::8,::8], dask_vol_path, overwrite=True)
    end = time.time()
    print(f'Saved channel zarr in {round(end - start, 3)} seconds')

# save max projection
with tqdm(total=len(files)) as max_pbar:
    max_pbar.set_description('Saving max projections')
    for tp, tp_name in enumerate(['t000000', 't000001', 't000002', 't000003', 't000004']):
        tiff_write(dask_max_path + '/' + f'{tp_name}_Max.tiff', np.max(ims[tp,:,:,:], axis=0))
        max_pbar.update(1)

gathered 5 images and assembled them into a dask array of shape (5, 192, 2048, 2048) with chunk size ((1, 1, 1, 1, 1), (192,), (2048,), (2048,))
⠼[0m Converting full volume to zarr, please be patient...[KSaved channel zarr in 7.125 seconds
[K

Saving max projections: 100%|██████████| 5/5 [00:16<00:00,  3.27s/it]

CPU times: user 3.03 s, sys: 1.64 s, total: 4.67 s
Wall time: 24.3 s





#### same thing but with distributed client

In [9]:
%%time

# make client with 16 processes
client = Client(n_workers=8)
print(client)

ims = dask_read(file_path + '/' + '*.tif')
print(f'gathered {len(ims)} images and assembled them into a dask array of shape {ims.shape} with chunk size {ims.chunks}')

# save volume as zarr
with yaspin() as sp:
    sp.text = f'Converting full volume to zarr, please be patient...'
    start = time.time()
    da.to_zarr(ims[:,:,::8,::8], dask_vol_path, overwrite=True)
    end = time.time()
    print(f'Saved channel zarr in {round(end - start, 3)} seconds')

# save max projection
with tqdm(total=len(files)) as max_pbar:
    max_pbar.set_description('Saving max projections')
    for tp, tp_name in enumerate(['t000000', 't000001', 't000002', 't000003', 't000004']):
        tiff_write(dask_max_path + '/' + f'{tp_name}_Max.tiff', np.max(ims[tp,:,:,:], axis=0))
        max_pbar.update(1)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 54359 instead


<Client: 'tcp://127.0.0.1:54360' processes=8 threads=16, memory=64.00 GiB>
gathered 5 images and assembled them into a dask array of shape (5, 192, 2048, 2048) with chunk size ((1, 1, 1, 1, 1), (192,), (2048,), (2048,))
⠴[0m Converting full volume to zarr, please be patient...[KSaved channel zarr in 8.865 seconds
[K

Saving max projections: 100%|██████████| 5/5 [00:20<00:00,  4.05s/it]

CPU times: user 5.58 s, sys: 2.58 s, total: 8.16 s
Wall time: 31.2 s





#### with client and explicit parallelization...

In [7]:
%%time
import skimage

# make client with 16 processes
client = Client()
print(client)

ims = dask_read(file_path + '/' + '*.tif')
print(f'gathered {len(ims)} images and assembled them into a dask array of shape {ims.shape} with chunk size {ims.chunks}')

#for tp, tp_name in enumerate(['t000000', 't000001', 't000002', 't000003', 't000004']):
max = delayed(np.max(ims[:,:,:,:], axis=0))
    #save = delayed(tiff_write(dask_max_path + '/' + f'{tp_name}_Max.tiff', max))

def save_file(arr, block_info=None):
    """ Save file to foo-x-y.tif, where x and y are block locations """
    filename = "foo-" + "-".join(map(str, block_info[0]["chunk-location"])) + ".tif"
    skimage.io.imsave(filename, arr)
    return arr

s = delayed(max.map_blocks(save_file, dtype=max.dtype))#.compute())       # call function on every block

max.visualize()
s.visualize()
s.compute()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 54288 instead


<Client: 'tcp://127.0.0.1:54289' processes=4 threads=16, memory=64.00 GiB>
gathered 5 images and assembled them into a dask array of shape (5, 192, 2048, 2048) with chunk size ((1, 1, 1, 1, 1), (192,), (2048,), (2048,))




KeyboardInterrupt: 