<br />    
<br />
<br />
# Results of chunking and compression mechanisms
<br />    
            
            
Using compression in HDF5 requires chunking. Chunking is the process of storing different subsets of the dataset contiguously on disk. For example for an array of dimensions (36, 257, 167, 84), chunking with (18, 257, 167, 84) gives four chunks. Chunking can be utilized when anticipating reading subsets of data. Compared to storing each row of an array conitguously on disk, reading subset of the array that match the chunking mechanism at the time of storage increases efficiency.

There are multiple variable that can control how the compression is done:

* chunks: The shape of the chunk
* compression: The compression algorithm, it can be either of gzip, szip, lzf, or blosc compressors such as zstd, blosclz, lz4, lz4hc, zlib
* compression_opts: The options for each compression algorithm
* shuffle: rearranging bytes in the data for possibility of improved compression

Compression and write speed:

The results in this section are dependent on the hardware. We start with considering only the Y channel. The analysis is similar for both channels.

Let's consider the following chunk shape, with 'gzip', at compresion option level 3, and with shuffle turned off:

|Chunk Size | Compression | Compression Options | Shuffle | Channel | File Size (MB) | Write Time (sec) | Read Time (sec) |
| -- | -- | -- | -- | -- | -- | -- | -- |
| (18, 36, 10, 21) |	gzip |	3|	0|	Y|	419.16|	45.48|	6.72|


After compressing the Y channel only, the file size is 419.16 MB Bytes and it takes 45.48 seconds for the data to be written, and 6.72 seconds to read the data back. We try turning the shuffling on:

|Chunk Size | Compression | Compression Options | Shuffle | Channel | File Size (MB) | Write Time (sec) | Read Time (sec) |
| -- | -- | -- | -- | -- | -- | -- | -- |
|(18, 36, 10, 21)|	gzip|	3|	1|	Y|	380.97 | 34.66 |	4.93 |

The shuffling algorithms takes advantage of the fact that numerical data in nearby voxels have reasonably close values. It reorders the bytes representing the values of nearby voxels, and places the zeros together. This allows for better compression.

Further testing shows that shuffling consistently gives significantly better results for other chunking sizes and algorithms. We will keep the shuffling on.

Testing with different settings with szip shows consistent lower compression ratio and longer write time. Among blosc algorithms, zstd and lz4 performed best on the available data. gzip and zstd give the best compression size. However, gzip has significantly slower read and write times compared to blosc algorithms and lzf. 


The following shows the results for different chunking sizes.

<img src="GraphA.png" width="90%"/>
<img src="GraphB.png" width="90%"/>


In [None]:
# Setting up data for Benchmarking:

import os
from IPython.display import display
import scipy.io
import numpy as np
import hdf5plugin
import h5py


path_to_files = 'Folder/AxelLab/data'
# info file name in path_to_files folder
fname0 = 'fly2_run1_info.mat'
# .mat file containing Calcium Imaging data in path_to_files folder
fname1 = '2019_04_18_Nsyb_NLS6s_Su_walk_G_fly2_run1_8401reg.mat'


# Open info file
fpath0 = os.path.join(path_to_files, fname0)
f_info = scipy.io.loadmat(fpath0, struct_as_record=False, squeeze_me=True)
info = f_info['info']
# Open .mat file containing Calcium Imaging data
fpath1 = os.path.join(path_to_files, fname1)
file = h5py.File(fpath1, 'r')
options = file['options']
landmarkThreshold = file['landmarkThreshold']
templates = file['templates']

Y = file['Y'] 
R = file['R']
# Note: Changing axis order copies Y and R into memory
Y = np.moveaxis(Y, 1, 2) 
R = np.moveaxis(R, 1, 2)

# Convert back to float32
Y = np.array(Y, dtype=np.float32)
R = np.array(R, dtype=np.float32)

In [None]:
# Setting up compression scenarios:

# The Cartesian product of the following parameters
# will be passed for compression

# blosc compressors: 'zstd', 'blosclz','lz4','lz4hc','zlib'
# blosc compression option range: integers 1-9 
compression_opts_list = { 'lzf' : [ None],            # lzf does not take compression options
                          'gzip': [ 1, 3, 6 ] }       # gzip compression option range: integers 1-9
shuffle_list = [1] # subset of [ hdf5plugin.Blosc.NOSHUFFLE, hdf5plugin.Blosc.SHUFFLE, hdf5plugin.Blosc.BITSHUFFLE ]
                   # or [ 0, 1, 2]

chunks_list =  [(10, 257, 7, 6),
                (4, 29, 43, 7),
                (36, 36, 10, 6),
                (4, 36, 11, 21)]

channels = [ 'Y' ] # subset of [ 'Y', 'R' ]

In [None]:
# Define the function to run benchmark tests

from datetime import datetime
from dateutil.tz import tzlocal
from pynwb import NWBFile, NWBHDF5IO, ProcessingModule
from pynwb.ophys import TwoPhotonSeries, OpticalChannel, ImageSegmentation, Fluorescence, DfOverF, MotionCorrection
from pynwb.device import Device
from pynwb.base import TimeSeries
from hdmf.backends.hdf5 import H5DataIO
from IPython.display import clear_output, Markdown, update_display
import pandas as pd
import itertools
import time
from tqdm import tqdm
import warnings
warnings.simplefilter('ignore')


def RunBenchmarkTests( compression_param_product, channels, info, uncompressed_results = None, show_results = True ):

    header_str = ("Chunk Size","Compression","Compression Options",
                  "Shuffle","Write Time (sec)","Read Time (sec)","File Size (MB)")
    display_style = {h: '{:.2f}'.format for h in header_str[4:7]}
    
    if uncompressed_results is not None:
        header_str +=  ("Compressed / Uncompressed Write Time %",
                        "Compressed / Uncompressed Read Time %",
                        "Compressed / Uncompressed Size %")
        display_style.update({h: '{:.2f}%'.format for h in header_str[7:10]})
   
    data_frame = pd.DataFrame(columns = header_str)
    results_file = 'results.tsv'

    clear_output()
    display(Markdown('') , display_id="figure")
    display(Markdown(''), display_id="data frame")

    compression_param_product_withpbar = tqdm(list(compression_param_product))
    for compression_params in compression_param_product_withpbar:

        # progressbar update
        compression_param_product_withpbar.set_description("Processing %s" % str(compression_params) )
        compression_param_product_withpbar.refresh()

        # unpacking compression parameters
        compression_opts, compression, chunks, shuffle = *compression_params,
        
        # run nwb with compression parameters
        time_write, time_read, file_size = RunNwb(compression_opts, compression, chunks, shuffle, channels)
        
        # prepare benchmark results        
        output_list =  [str(chunks),
                        compression,
                        str(compression_opts),
                        str(shuffle),
                        time_write,
                        time_read,
                        file_size]

        # add comp/uncompressed ratios
        if uncompressed_results is not None: 
            output_percentage = np.array([time_write, time_read, file_size])/uncompressed_results*100
            output_list += list(output_percentage)

        # add results to data frame
        data_frame.loc[len(data_frame)] = output_list

        if show_results == True:
            update_display(data_frame.style.format(display_style).set_properties(width='100px'),
                           display_id="data frame")            

            # store results
            with open(results_file, 'a') as output_file:
                output_line = '\t'.join( str(el) for el in output_list)+'\n' 
                output_file.write( output_line )
            
            # plot results 
            if uncompressed_results is None:
                columns = header_str[4:7]
            else:
                columns = header_str[7:10]
            marker_list = ["v","d","o","X"]
            output_data_frame = data_frame.copy(deep=True) # for plotting
            PlotGridSeq(output_data_frame,columns = columns,legend_columns=["Compression","Compression Options"],
                        compression_types = compression_opts_list, markers = marker_list )
            
    return data_frame

def RunNwb(compression_opts, compression, chunks, shuffle, channels):

    #Create new NWB file
    nwb = NWBFile(session_description='my CaIm recording', 
                  identifier='EXAMPLE_ID',
                  session_start_time=datetime.now(tzlocal()),
                  experimenter='Evan Schaffer',
                  lab='Axel lab',
                  institution='Columbia University',
                  experiment_description='EXPERIMENT_DESCRIPTION',
                  session_id='IDX')

    #Create and add device
    device = Device('Device')
    nwb.add_device(device)

    # Create an Imaging Plane for Yellow
    optical_channel_Y = OpticalChannel(name='OpticalChannel_Y',
                                       description='2P Optical Channel',
                                       emission_lambda=510.)
    imaging_plane_Y = nwb.create_imaging_plane(name='ImagingPlane_Y',
                                               optical_channel=optical_channel_Y,
                                               description='Imaging plane',
                                               device=device,
                                               excitation_lambda=488., 
                                               imaging_rate=info.daq.scanRate,
                                               indicator='NLS-GCaMP6s',
                                               location='whole central brain')
    # Create an Imaging Plane for Red
    optical_channel_R = OpticalChannel(name='OpticalChannel_R',
                                       description='2P Optical Channel',
                                       emission_lambda=633.)
    imaging_plane_R = nwb.create_imaging_plane(name='ImagingPlane_R',
                                               optical_channel=optical_channel_R,
                                               description='Imaging plane',
                                               device=device,
                                               excitation_lambda=488., 
                                               imaging_rate=info.daq.scanRate,
                                               indicator='redStinger',
                                               location='whole central brain')

    # output file name
    fname_nwb = 'file_compressed.nwb'
    output_path_to_files = 'Folder/AxelLab/data'
    fpath_nwb = os.path.join(output_path_to_files, fname_nwb)
    if os.path.isfile(fpath_nwb):
        os.remove(fpath_nwb)

    # compression keywords to pass to h5py
    shuffle = bool(shuffle)
    if compression in ['zstd','blosclz','lz4','lz4hc','zlib']:
        compression_kw = hdf5plugin.Blosc(cname=compression, clevel=compression_opts, shuffle=shuffle)
    else:
        compression_kw = { 'compression' : compression, 'compression_opts' : compression_opts,
                           'shuffle' : shuffle }

       
    if 'Y' in channels:
        if chunks != None:
            Y_chunk_iterator = SparseMatrixIterator(data=Y,
                                chunk_shape=chunks)
            Y_dataio = H5DataIO(Y_chunk_iterator, chunks=chunks, fillvalue=np.nan,
                                maxshape = (None,*Y.shape[1:]), **compression_kw)        
        else:
            Y_dataio = H5DataIO(Y, **compression_kw)        
        raw_image_series_Y = TwoPhotonSeries(name='TwoPhotonSeries_Y', 
                     imaging_plane=imaging_plane_Y,
                     rate=info.daq.scanRate,
                     dimension=Y_dataio.shape,
                     unit="unit",
                     data=Y_dataio)
        nwb.add_acquisition(raw_image_series_Y)            

    if 'R' in channels:
        if chunks != None:
            R_chunk_iterator = SparseMatrixIterator(data=R,
                                chunk_shape=chunks)
            
            R_dataio = H5DataIO(R_chunk_iterator, chunks=chunks, fillvalue=np.nan,
                                maxshape = (None,*R.shape[1:]), **compression_kw)        
        else:
            R_dataio = H5DataIO(R, **compression_kw)        
            
        raw_image_series_R = TwoPhotonSeries(name='TwoPhotonSeries_R', 
                     imaging_plane=imaging_plane_R,
                     rate=info.daq.scanRate,
                     dimension=R_dataio.shape,
                     unit="unit",
                     data=R_dataio) 
        nwb.add_acquisition(raw_image_series_R)            

    # start compression write clock
    time_write_start = time.clock()        
        
    #Saves to NWB file
    with NWBHDF5IO(fpath_nwb, mode='w') as io:
        io.write(nwb)

    time_write = time.clock() - time_write_start


    # clear file buffer
    if os.name == 'posix':
        try:
            with open(fpath_nwb) as fdforfadvise:
                os.posix_fadvise(fdforfadvise.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
                # normal unix file buffer 
                os.posix_fadvise(fdforfadvise.fileno(), 0, 0, os.POSIX_FADV_NORMAL)
        except:
            pass


    
    #Loads NWB file
    time_read_start = time.clock()

    with NWBHDF5IO(fpath_nwb, mode='r') as io:
        nwb = io.read()
        if 'Y' in channels:
            Y_series = nwb.acquisition['TwoPhotonSeries_Y']
            # read data into memory
            Y_series_data = Y_series.data[()]
            del Y_series_data
        if 'R' in channels:
            R_series = nwb.acquisition['TwoPhotonSeries_R']
            # read data into memory
            R_series_data = R_series.data[()]
            del R_series_data

    time_read = time.clock() - time_read_start

    nwbfile_size = os.stat(fpath_nwb).st_size/1024/1024# in MB        

    return time_write, time_read, nwbfile_size

In [None]:
# Function to select chunks to write to disk

from hdmf.data_utils import AbstractDataChunkIterator, DataChunk
from scipy.ndimage import morphology
import functools
from sklearn.neighbors import NearestNeighbors
from scipy import signal, stats, spatial
from scipy.optimize import linprog

class SparseMatrixIterator(AbstractDataChunkIterator):

    def __init__(self, data, chunk_shape):
        """
        :param shape: 2D tuple with the shape of the matrix
        :param chunk_shape: The shape of each chunk to be created
        :return:
        """
        self.shape, self.chunk_shape = data.shape, chunk_shape
        self.data = data
        self.__chunks_created = 0
        
        self.chunk_index_list = self.BlobDetection()        

    def __iter__(self):
        return self

    def __next__(self):
        """
        Return in each iteration a fully occupied data chunk of self.chunk_shape values at selected
        location within the matrix. Chunks are non-overlapping.
        """

        if len(self.chunk_index_list) != 0:
            chunk_index=self.chunk_index_list.pop(0)
            tmin = chunk_index[0] * self.chunk_shape[0]
            tmax = tmin + self.chunk_shape[0]
            xmin = chunk_index[1] * self.chunk_shape[1]
            xmax = xmin + self.chunk_shape[1]
            ymin = chunk_index[2] * self.chunk_shape[2]
            ymax = ymin + self.chunk_shape[2]
            zmin = chunk_index[3] * self.chunk_shape[3]
            zmax = zmin + self.chunk_shape[3]
                        
            selection = np.s_[tmin:tmax, xmin:xmax, ymin:ymax, zmin:zmax]
            data = self.data[selection]
            
            self.__chunks_created += 1
            return DataChunk(data=data, selection=selection)
        else:
            raise StopIteration

    next = __next__

    def recommended_chunk_shape(self):
        # Here we can optionally recommend what a good chunking should be.
        return self.chunk_shape

    def recommended_data_shape(self):
        # In cases where we don't know the full size this should be the minimum size.
        return (1,*self.data.shape[1:])

    @property
    def dtype(self):
        # The data type of our array
        return np.dtype(np.float32)

    @property
    def maxshape(self):
        # If we don't know the size of a dimension beforehand we can set the dimension to None instead
        return (None,*self.data.shape[1:])

    def BlobDetection(self):
        """
        :return: list of chunk indices of interest
        """

        # construct list of chunk indices
        chunk_index_shape = np.floor_divide( self.data.shape, self.chunk_shape )
        chunk_index_count = np.arange(np.prod( chunk_index_shape )).reshape( chunk_index_shape )
        chunk_index_grid = np.argwhere( chunk_index_count > -1 ).astype(int)
        
        # thresholding
        _ratio = 7
        time_len = len(self.data)
        data_downsample = self.data[:,::_ratio,::_ratio,::_ratio]
        num_val = np.prod(data_downsample.shape)
        data_downsample_morph_grad = morphology.morphological_gradient(data_downsample, size=(time_len,_ratio,_ratio,_ratio ))
        # find histogram
        relhist = stats.relfreq(data_downsample_morph_grad,
                                numbins = int(num_val/(8**3))+1, )
        relhist.lowerlimit + np.linspace(0, relhist.binsize*relhist.frequency.size, relhist.frequency.size)
        peaks, _ = signal.find_peaks(relhist.frequency, distance = 2)
        # threashold at 70% of histogram peak
        threshold = max(peaks)*0.7
        border_mask = data_downsample_morph_grad > threshold
        border_points = np.transpose(np.nonzero(border_mask))
        border_points = border_points*np.array([1,_ratio,_ratio,_ratio])

        # find chunks indices in border_points convex hull
        hull = scipy.spatial.ConvexHull( border_points )
        chunk_index_list = [chunk_index for chunk_index in chunk_index_grid
                            if self.ChunkIsInHull(hull, chunk_index*self.chunk_shape ) ]
        
        return chunk_index_list
    
    def ChunkIsInHull(self, hull, point):
        """
        :param hull: Convex hull object returned by scipy.spatial.ConvexHull
        :param point: The index point representing the chunk nD-cube
        :return: True if the chunk nd-Cube and the convex hull intersect
        """
        
        chunk_equations = np.zeros([ len(point)*2 , len(point) + 1 ])
        for dim in range(len(point)):
            chunk_equations[dim*2    , dim] = -1
            chunk_equations[dim*2 + 1, dim] =  1
            chunk_equations[dim*2    , -1]  = point[dim]
            chunk_equations[dim*2 + 1, -1]  = point[dim]

        equations = np.vstack( (hull.equations, chunk_equations) )
        A_eq = np.transpose(equations[:,0:4])
        b_eq = np.zeros(A_eq.shape[0])
        A_ub = equations[:,4].reshape(1,equations.shape[0])
        b_ub = np.array([0])
        c = np.ones(A_eq.shape[1])
        lp = linprog(c, A_ub, b_ub, A_eq, b_eq)

        return lp.success and np.sum(lp.x) > 0 and np.sum(lp.x) < 1e-5

In [None]:
# Plot function for compression results

import seaborn as sns
import matplotlib.pyplot as plt

def PlotGridSeq(df, columns, legend_columns, compression_types, ax_lims = None, markers = None):

    if ax_lims != None:
        if len(ax_lims) != len(columns):
            raise IndexError("number of ax_lims is not equal to the number of columns")
        # remove data not in axis limits
        df = df[ functools.reduce(   lambda x,y: x & ( df[y] < ax_lims[y][1] ) & ( df[y] > ax_lims[y][0] ), columns, True ) ]

    # add markers
    if markers == None:
        markers = sns.mpl.lines.Line2D.filled_markers

    df_mod = pd.concat([ df[legend_columns],
                         pd.DataFrame([{"Compression" : str(c), "Compression Options": str(el)}
                                      for c, opts in compression_types.items() for el in opts ]) ],
                       ignore_index = True)    
    df_mod = df_mod.drop_duplicates()

    _lightest_col = 0.4 # > 0 and < 1
    df_mod_group_compression = df_mod.sort_values(legend_columns).groupby(["Compression"])    
    palette_saturation = df_mod_group_compression.apply(lambda x: pd.Series(
            _lightest_col + ( 1 - _lightest_col )*(np.arange(len(x))+1)/len(x) )).tolist() # saturation > 0 and < 1
    current_palette = sns.color_palette()
    palette_colors = df_mod_group_compression.ngroup().apply(lambda x: current_palette[x] ).tolist()
    sns_ncolors_percompression = 10 # larger than total count of each algorithm's compression options
    palette_order = [sns.light_palette(color = palette_colors[i],
                       n_colors=sns_ncolors_percompression)[ int(palette_saturation[i]*sns_ncolors_percompression-1) ]
                     for i in range(len(palette_saturation)) ]

    # expand markers list if not of sufficient length
    if len(markers) < len(df_mod_group_compression):
        markers += tuple(set(matplotlib.lines.Line2D.filled_markers) - set(markers))
    marker_order = df_mod_group_compression.ngroup().apply(lambda x: markers[x] ).tolist()

    # format legends
    LegendFormatFunc = lambda x: ''.join(str(i).ljust(12) for i in x)
    legend_title = "Compression Option"
    legend_title = LegendFormatFunc(legend_title.split())
    df[legend_title] = df[legend_columns].astype(str).apply(LegendFormatFunc, axis = 1)
    legend_order = df_mod_group_compression.apply(lambda x: pd.Series(
            sorted(x.astype(str).apply(LegendFormatFunc, axis = 1))  )).tolist() # saturation > 0 and < 1
    
    # remove unused elements from legend_order, palette_order, and marker_order
    legend_present = df[legend_title].unique()    
    legend_order,palette_order,marker_order = map(list, zip(*[(i,j,k) for i,j,k in 
                                             zip( legend_order,palette_order,marker_order) if i in legend_present]) )
    
    plot_dpi = 300
    sns.set_style("darkgrid") 
    sns.set(rc={'figure.facecolor': '#F8F8F8'})
    sns.set_context(context="notebook", font_scale=1.2)
   
    # workaround to skip kde plots with single data point
    single_elements = df.groupby(legend_columns).size() == 1
    if not single_elements.any():
        try:
            sns_plot = sns.pairplot(vars=columns, hue = legend_title , hue_order = legend_order,
                                    palette = palette_order, data = df,
                                    markers= marker_order, plot_kws={ "s": 150, "linewidth": 0.05 }, height = 5 )
            # set axis limits
            if ax_lims != None:
                for i in range(len(columns)):
                    sns_plot.axes[i,i].set_xbound( ax_lims[columns[i]] )
                    sns_plot.axes[i,i].set_ybound( ax_lims[columns[i]] )

            # show plot
            update_display(sns_plot.fig, display_id="figure")
            sns_plot.savefig("figure.png", dpi=plot_dpi)
            plt.close(sns_plot.fig)
            
        except:
            pass


In [None]:
# Iterator for nested loop of compression variables
compression_param_product = itertools.chain.from_iterable(
    itertools.product(compression_opts_list[compression_alg],
                      [compression_alg],
                      chunks_list,
                      shuffle_list)
                      for compression_alg in compression_opts_list)

# adding uncompressed runs
num_uncompr_runs = 4 # 1st run is warm up, 3 for finding the average time without compression
no_compression_param_product = itertools.repeat( (None, None, None, None), num_uncompr_runs)
data_frame = RunBenchmarkTests( no_compression_param_product, channels, info, show_results = False )
uncompressed_results = data_frame.mean()

df = RunBenchmarkTests( compression_param_product, channels, info, uncompressed_results = uncompressed_results )
