### Tensordot performance comparison between Blosc2 and Dask+Zarr with persistent storage

In [None]:
%load_ext memprofiler
from time import time
import numpy as np
import blosc2
import dask
import dask.array as da
import zarr
from numcodecs import Blosc
import h5py
import hdf5plugin
# It looks like b2h5py does not significantly accelerates this workload
# import b2h5py.auto
# assert(b2h5py.is_fast_slicing_enabled())

In [None]:
# --- Experiment Setup ---
N = 600
shape_a = (N,) * 3
shape_b = (N,) * 3
shape_out = (N,) * 2
chunks = (150,) * 3
chunks_out = (150,) * 2
dtype = np.float64
cparams = blosc2.CParams(codec=blosc2.Codec.LZ4, clevel=1)
compressor = Blosc(cname='lz4', clevel=1, shuffle=Blosc.SHUFFLE)
h5compressor = hdf5plugin.Blosc2(cname='lz4', clevel=1, filters=hdf5plugin.Blosc2.SHUFFLE)
create = True
scheduler = "single-threaded" if blosc2.nthreads == 1 else "threads"

In [None]:
# --- Numpy array creation ---
if create:
    t0 = time()
    matrix_numpy = np.linspace(0, 1, N**3).reshape(shape_a)
    print(f"N={N}, Numpy array creation = {time() - t0:.2f} s")

In [None]:
# --- Blosc2 array creation ---
if create:
    t0 = time()
    matrix_a_blosc2 = blosc2.asarray(matrix_numpy, cparams=cparams, chunks=chunks, urlpath="a.b2nd", mode="w")
    matrix_b_blosc2 = blosc2.asarray(matrix_numpy, cparams=cparams, chunks=chunks, urlpath="b.b2nd", mode="w")
    print(f"N={N}, Array creation = {time() - t0:.2f} s")

In [None]:
# Re-open the arrays
t0 = time()
matrix_a_blosc2 = blosc2.open("a.b2nd", mode="r")
matrix_b_blosc2 = blosc2.open("b.b2nd", mode="r")
print(f"N={N}, Blosc2 array opening = {time() - t0:.2f} s")

# Tensordot computation with Blosc2

In [None]:
%%mprof_run 1.Blosc2::1.from_blosc2_to_blosc2
# --- Tensordot computation ---
for axis in ((0, 1), (1, 2), (2, 0)):
    t0 = time()
    lexpr = blosc2.lazyexpr("tensordot(matrix_a_blosc2, matrix_b_blosc2, axes=(axis, axis))")
    out_blosc2 = lexpr.compute(urlpath="out.b2nd", mode="w", chunks=chunks_out)
    print(f"axes={axis}, Blosc2 Performance = {time() - t0:.2f} s")

In [None]:
# --- HDF5 array creation ---
if create:
    t0 = time()
    f = h5py.File("a_b_out.h5", "w")
    f.create_dataset("a", data=matrix_numpy, dtype=dtype, chunks=chunks, **h5compressor)
    f.create_dataset("b", data=matrix_numpy, dtype=dtype, chunks=chunks, **h5compressor)
    f.create_dataset("out", shape=shape_out, dtype=dtype, chunks=chunks_out, **h5compressor)
    print(f"N={N}, HDF5 array creation = {time() - t0:.2f} s")
    f.close()

# Re-open the HDF5 arrays
t0 = time()
f = h5py.File("a_b_out.h5", "a")
matrix_a_hdf5 = f["a"]
matrix_b_hdf5 = f["b"]
out_hdf5 = f["out"]

In [None]:
%%mprof_run 2.Blosc2::2.from_hdf5_to_hdf5
# --- Tensordot computation with HDF5 ---
for axis in ((0, 1), (1, 2), (2, 0)):
    t0 = time()
    blosc2.evaluate("tensordot(matrix_a_hdf5, matrix_b_hdf5, axes=(axis, axis))", out=out_hdf5)
    print(f"axes={axis}, HDF5 Performance = {time() - t0:.2f} s")

In [None]:
# --- Zarr array creation ---
if create:
    t0 = time()
    matrix_a_zarr = zarr.open_array("a.zarr", mode="w", shape=shape_a, chunks=chunks,
                                    dtype=dtype, compressor=compressor, zarr_format=2)
    matrix_a_zarr[:] = matrix_numpy

    matrix_b_zarr = zarr.open_array("b.zarr", mode="w", shape=shape_b, chunks=chunks,
                                    dtype=dtype, compressor=compressor, zarr_format=2)
    matrix_b_zarr[:] = matrix_numpy
    print(f"N={N}, Zarr array creation = {time() - t0:.2f} s")

In [None]:
# --- Re-open the Zarr arrays ---
t0 = time()
matrix_a_zarr = zarr.open("a.zarr", mode="r")
matrix_b_zarr = zarr.open("b.zarr", mode="r")
print(f"N={N}, Zarr array opening = {time() - t0:.2f} s")

In [None]:
%%mprof_run 2.Blosc2::1.from_zarr_to_zarr
# --- Tensordot computation with Blosc2
zout2 = zarr.open_array("out2.zarr", mode="w", shape=shape_out, chunks=chunks_out,
                        dtype=dtype, compressor=compressor, zarr_format=2)
for axis in ((0, 1), (1, 2), (2, 0)):
    t0 = time()
    blosc2.evaluate("tensordot(matrix_a_zarr, matrix_b_zarr, axes=(axis, axis))", out=zout2)
    print(f"axes={axis}, Blosc2 Performance = {time() - t0:.2f} s")

# --- Tensordot computation with Dask

In [None]:
%%mprof_run 3.Dask::2.from_hdf5_to_hdf5
# --- Tensordot computation with Dask (to_zarr) ---
matrix_a_dask = da.from_array(matrix_a_hdf5, chunks=chunks)
matrix_b_dask = da.from_array(matrix_b_hdf5, chunks=chunks)
with dask.config.set(scheduler=scheduler, num_workers=blosc2.nthreads):
    for axis in ((0, 1), (1, 2), (2, 0)):
        t0 = time()
        dexpr = da.tensordot(matrix_a_dask, matrix_b_dask, axes=(axis, axis))
        da.to_hdf5('a_b_out.h5', '/out', dexpr, chunks=chunks_out)
        print(f"axes={axis}, Dask Performance = {time() - t0:.2f} s")
f.close()

In [None]:
%%mprof_run 3.Dask::1.from_zarr_to_zarr
# --- Tensordot computation with Dask (to_zarr) ---
matrix_a_dask = da.from_zarr(matrix_a_zarr, chunks=chunks)
matrix_b_dask = da.from_zarr(matrix_b_zarr, chunks=chunks)
zout = zarr.open_array("out.zarr", mode="w", shape=shape_out, chunks=chunks_out,
                       dtype=dtype, compressor=compressor, zarr_format=2)
with dask.config.set(scheduler=scheduler, num_workers=blosc2.nthreads):
    for axis in ((0, 1), (1, 2), (2, 0)):
        t0 = time()
        dexpr = da.tensordot(matrix_a_dask, matrix_b_dask, axes=(axis, axis))
        da.to_zarr(dexpr, zout, chunks=chunks_out)
        print(f"axes={axis}, Dask Performance = {time() - t0:.2f} s")

In [None]:
%mprof_plot .* -t "tensordot ({N}, {N}, {N}) -- Number of threads: {blosc2.nthreads}"