In [46]:
import pybdv
import os
try:
    import dask.array as da
    import zarr
    has_dask = True
except ImportError:
    has_dask = False
import numpy as np
from importlib import reload  
reload(pybdv) 
import pybdv
from pybdv.converter import make_bdv_from_dask_array, \
normalize_output_path_dask, handle_setup_id, validate_attributes
from pybdv.util import get_key

In [3]:
data = da.random.randint(0,1000,(10,20,30))
np_array = data.compute()

In [12]:
output_path = 'E:\\test.zarr'
setup_id=None
timepoint=0
setup_name=None
affine=None
attributes={'channel': {'id': None}}
overwrite='all'
chunks=None
downscale_factors=[[2,2,2], [2,2,2]]
downsample_chunks = ((5,5,5), (1,1,1))
downscale_func=np.mean
resolution=[1., 1., 1.]
unit='pixel'


In [13]:
if not has_dask:
    raise ImportError("Please install dask to use this function")
if not isinstance(data, da.Array):
    raise ValueError("Input needs to be dask array, got %s" % type(data))
ndim = data.ndim
if ndim != 3 or len(resolution) != ndim:
    raise ValueError("Invalid input dimensionality")
if affine is not None:
    validate_affine(affine)
is_h5 = False
data_path, xml_path, is_n5 = normalize_output_path_dask(output_path)
setup_id, overwrite_data, overwrite_metadata, skip = handle_setup_id(setup_id,
                                                                        xml_path,
                                                                        timepoint,
                                                                        overwrite,
                                                                        is_h5)

In [14]:
enforce_consistency = not (overwrite_data or overwrite_metadata)
attributes_ = validate_attributes(xml_path, attributes, setup_id, enforce_consistency)
attributes_

{'channel': {'id': 0}}

In [15]:
if chunks is not None:
        data = data.rechunk(chunks)

In [16]:
 if downscale_factors is None:
        # set single level downscale factor
        factors = [[1, 1, 1]]

In [44]:
def make_scales_dask(data, data_path, is_n5, downscale_factors, downscale_func,
                ndim, setup_id, downsample_chunks=None, timepoint=0, overwrite=False):
    if not all(isinstance(factor, (int, tuple, list)) for factor in downscale_factors):
        raise ValueError("Invalid downscale factor")
    if not all(len(factor) == 3 for factor in downscale_factors
               if isinstance(factor, (tuple, list))):
        raise ValueError("Invalid downscale factor")
    # normalize all factors to be tuple or list
    factors = [ndim*[factor] if isinstance(factor, int) else factor
               for factor in downscale_factors]
    # make sure downsample chunks are also 3 items for each downsample factor
    if downsample_chunks is not None:
        if not all(len(chunks) == 3 for chunks in downsample_chunks):
            raise ValueError("Invalid downscale chunks")
    else:
        downsample_chunks = tuple((64,64,64) for _ in range(len(downscale_factors)))
    # run single downsampling stages

    pyramid = {}
    pyramid['s0'] = data
    current_factor = np.array([1,1,1])
    for scale, (factor, chunks) in enumerate(zip(factors, downsample_chunks)):
        key_ = 's%d' % (scale + 1)
        current_factor *= factor
        factor_dict = {k: v for k, v in zip(range(ndim), current_factor)}
        print(f'key: {key_}, factor: {current_factor}, dict: {factor_dict}, chunks:{chunks}')
        pyramid[key_] = da.coarsen(downscale_func, data, factor_dict, trim_excess=True).rechunk(chunks)
    base_key = get_key(is_h5=False, timepoint=timepoint, setup_id=setup_id, scale=0)
    path_all = os.path.join(data_path, base_key[:-2])
    if is_n5:
        store = zarr.N5FSStore(path_all)
    else:
        store = zarr.DirectoryStore(path_all)
    group = zarr.open(store, mode='w')
    save_chunks_all = ((data.chunksize),) + downsample_chunks
    arrays = []
    for (k,v), save_chunks in zip(pyramid.items(), save_chunks_all):
        arrays.append(group.zeros(name=k, shape=v.shape, dtype=v.dtype, 
        chunks=save_chunks, compressor=GZip()))
    da.store(pyramid.values(), arrays, lock=None)
    # add first level to factors
    factors = [[1, 1, 1]] + factors
    return factors


In [22]:
data.chunksize

(10, 20, 30)

In [45]:
factors= make_scales_dask(data, data_path, is_n5, downscale_factors, downscale_func,
                    ndim, setup_id, downsample_chunks=None,
                    timepoint=timepoint, overwrite=overwrite_data)


key: s1, factor: [2 2 2], dict: {0: 2, 1: 2, 2: 2}, chunks:(64, 64, 64)
key: s2, factor: [4 4 4], dict: {0: 4, 1: 4, 2: 4}, chunks:(64, 64, 64)
setup0/timepoint0/s0


In [32]:
from numcodecs import GZip

In [33]:
list(downsample_chunks)

[(5, 5, 5), (1, 1, 1)]

In [35]:
factors

[[1, 1, 1], [2, 2, 2], [2, 2, 2]]

In [27]:
from_disk = zarr.open_array(r'E:\test.zarr\setup0\timepoint0\s0')
np.allclose(np_array, from_disk)

True

In [None]:
pyramid = {}
pyramid['s0'] = upscaled_45.astype(np.uint16).rechunk((128, 128,128))
pyramid['s1'] = da.coarsen(reducer, upscaled_45, {k: 2 for k in range(upscaled_45.ndim)}, 
                            trim_excess=True).astype(np.uint16).rechunk((128, 128,128))
pyramid['s2'] = da.coarsen(reducer, upscaled_45, {k: 4 for k in range(upscaled_45.ndim)}, 
                            trim_excess=True).astype(np.uint16).rechunk((128, 128,128))
pyramid['s3'] = da.coarsen(reducer, upscaled_45, {k: 8 for k in range(upscaled_45.ndim)}, 
                            trim_excess=True).astype(np.uint16).rechunk((64, 64,64))
pyramid['s4'] = da.coarsen(reducer, upscaled_45, {k: 16 for k in range(upscaled_45.ndim)}, 
                            trim_excess=True).astype(np.uint16).rechunk((64, 64,64))
pyramid['s5'] = da.coarsen(reducer, pyramid['s4'], {k: 2 for k in range(upscaled_45.ndim)}, 
                            trim_excess=True).astype(np.uint16).rechunk((64, 64,64))
n5_path = filename + f'/tile_{pos:03}_ch{view}'
group = zarr.open(zarr.N5Store(n5_path), mode='w')
group.attrs.update(neuroglancer_attributes)
save_chunks_all = [(128, 128,128), (128, 128,128), (128, 128,128), (64, 64,64), (64, 64,64), (64, 64,64)]
arrays = []
for (k,v), save_chunks in zip(pyramid.items(), save_chunks_all):
    arrays.append(group.zeros(name=k, shape=v.shape, dtype=v.dtype, chunks=save_chunks, compressor='gzip'))
da.store(pyramid.values(), arrays, lock=None)