## LBM : Benchmark Chunk Sizes

In [None]:
overwrite = False                                 # flag to re-extract tiffs with extracted data

datapath = Path('/data2/fpo/data/raw/high_res/')   # string pointing to directory containing your data
savepath = Path('/data2/fpo/data/extraction/')           # string pointing to directory containing your data
savepath.mkdir(exist_ok=True, parents=True)

htiffs = [x for x in datapath.glob('*.tif')]      # this accumulates a list of every filepath which contains a .tif file
reader = scanreader.read_scan(str(htiffs[0]), join_contiguous=True, lbm=True, x_cut=(6,6), y_cut=(17,0))  # this should take < 2s, no actual data is being read yet

In [None]:
import os
import sys
import time
from pathlib import Path

import cv2
import numpy as np
import pandas as pd
import psutil

# Give this notebook access to the root package
sys.path.append('../../')  # TODO: Take this out when we upload to pypi
print(sys.path[0])

import core.io
import scanreader

import zarr
import bokeh.plotting as bpl
import holoviews as hv
import panel as pn
from IPython import get_ipython
import logging
import matplotlib.pyplot as plt

try:
    import dask.array as da
    has_dask = True
except ImportError:
    has_dask = False

try:
    cv2.setNumThreads(0)
except():
    pass

try:
    if __IPYTHON__:
        get_ipython().run_line_magic('load_ext', 'autoreload')
        get_ipython().run_line_magic('autoreload', '2')
except NameError:
    pass

bpl.output_notebook()
hv.notebook_extension('bokeh')

# logging
logging.basicConfig(format="{asctime} - {levelname} - [{filename} {funcName}() {lineno}] - pid {process} - {message}",
                    filename=None, 
                    level=logging.WARNING, style="{") # this shows you just errors that can harm your program
                    # level=logging.DEBUG, style="{") # this shows you general information that developers use to trakc their program 
                    # (be careful when playing movies, there will be a lot of debug messages)

# set env variables 
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"

# this session had output planes ordered differently, we need to reorder them
chan_order = np.array([ 1, 5, 6, 7, 8, 9, 2, 10, 11, 12, 13, 14, 15, 16, 17, 3, 18, 19, 20, 21, 22, 23, 4, 24, 25, 26, 27, 28, 29, 30])  # this is specific to our dataset
chan_order = [x-1 for x in chan_order]  # convert to 0-based indexing

# Benchmark: Chunk Sizes

The below section demonstrates how to search for the optimal data chunking/partitioning scheme for our datasets.

- Chunking by **Image**:

  The smallest chunks we have. Each image is loaded in parallel, which requires many cores 

In [12]:
# directory to save our benchmarks
benchmarks_savedir = datapath / 'benchmarks'
benchmarks_savedir.mkdir(exist_ok=True, parents=True)

In [None]:
# Helper function to process a dataset
# Data can be any numpy or numpy-like array

def benchmark_chunk_sizes(data, chunk_shape, savepath='', overwrite=True):
    savepath = Path(savepath).with_suffix('.zarr')
    
    # benchmark write
    start = time.time()
    store = zarr.DirectoryStore(savepath)  # save data to persistent disk storage
    z = zarr.zeros(data.shape, chunks=chunk_shape, dtype='int16', store=store, overwrite=overwrite)

    if hasattr(data, 'compute'):
        z[:] = data.compute()             # this will auto-chunk based on the specified chunks in 'open'
    else:
        z[:] = data

    write = time.time() - start
    formatted_write = f"{write:.2f}"

    # benchmark read
    start = time.time()
    _ = z[:]
    read = time.time() - start
    formatted_read = f"{read:.2f}"

    chunksize_nbytes = np.prod(chunk_shape) * z.dtype.itemsize  # 2 bytes per int16
    return [
        str(data.shape),
        str(z.chunks),
        z.nbytes / 1e6,
        chunksize_nbytes / 1e6,
        z.dtype,
        z.order,
        formatted_read,
        formatted_write,
        z.store.path
    ]

Use our raw dataset to get image shapes, and read/write operation on the **same dataset** with different chunk sizes.

In [None]:
# we use dimensions from our initial raw data store
zinf = zarr.open(save_str)
zinf.chunks

In [None]:
labels = [
    'Array  [x,y,z,t]',
    'Chunks [x,y,z,t]',
    'Chunk Size (Mb)',
    'Array Size (Mb)',
    'Data Type',
    'Order',
    'Read Time (s)',
    'Write Time (s)',
    'Save Path'
]

# the chunk sizes we want to benchmark
chunksizes = [
    (zinf.shape[0], zinf.shape[1], 1, zinf.shape[3]),  # [300x300x1x1750]
    (zinf.shape[0], zinf.shape[1], zinf.shape[2], 1),  # [300x300x30x1  ]
    (zinf.shape[0], zinf.shape[1], 1, 1)               # [300x300x1x1   ]
]

# give our dataset a name. this will be the column header
names = [
    ('chunked_by_plane'),  # [300x300x1x1750]
    ('chunked_by_frame'),  # [300x300x30x1  ]
    ('chunked_by_image')   # [300x300x1x1   ]
]

benchmark_chunks_dir = benchmarks_savedir / "chunks"
benchmark_chunks_dir.mkdir(exist_ok=True, parents=True)

vals = {data_name: [] for data_name in names}
for i, (chunksize, dataset) in enumerate(zip(chunksizes, names)):
    # save the same data but with different chunk sizes
    chunks_save = benchmark_chunks_dir / f'{dataset}.zarr'
    # new store 
    store = zarr.DirectoryStore(save_str)  # save data to persistent disk storage
    z = zarr.zeros(zinf.shape, chunks=(zinf.shape[0], zinf.shape[1], 1, zinf.shape[3]), dtype='int16', store=store, overwrite=True)
    vals[dataset] = benchmark_chunk_sizes(zinf, chunksize, savepath=f"{chunks_save}", overwrite=True)

df = pd.DataFrame(index=labels, columns=names, data=None)
for k, v in vals.items():
    df[k] = v


In [None]:
df