In [1]:
import h5py
import numpy as np
import dask.array as da
from bluesky import RunEngine
from databroker import Broker
db = Broker.named('temp')
RE = RunEngine({})
RE.subscribe(db.insert)

from bluesky.plans import count
from ophyd.sim import SynSignalWithRegistry

In [2]:
def generate_data():
    return np.ones((10, 10))

def write_hdf5(filepath, arr):
    with h5py.File(filepath, 'w') as f:
        f.create_dataset('data', data=arr)

det = SynSignalWithRegistry(name='det', func=generate_data, save_func=write_hdf5, save_ext='h5', save_spec='HDF5_DEMO')

class DaskHandler:
    def __init__(self, filepath, root=''):
        self.filepath = filepath
        self._dset = h5py.File('{}_0.h5'.format(self.filepath), 'r')['data']  # lazy
    
    def __call__(self, index):
        return da.from_array(self._dset, chunks=(10, 10))  # still lazy
        
db.reg.register_handler('HDF5_DEMO', DaskHandler)

In [3]:
RE(count([det], 3))



('476d7bf5-5b6d-4848-b742-52b40b8650bd',)

In [4]:
h = db[-1]
img = da.stack(list(h.data('det')))

In [10]:
# No bytes have been read from disk yet.
img

dask.array<stack, shape=(3, 10, 10), dtype=float64, chunksize=(1, 10, 10)>

In [5]:
# Finally, load the data (in a streaming fashion) and compute the min.
img.min().compute()

1.0

In [7]:
img.transpose()

dask.array<transpose, shape=(10, 10, 3), dtype=float64, chunksize=(10, 10, 1)>