Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add get_zarr method to context #540

Merged
merged 23 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 69 additions & 0 deletions docs/source/advanced/out_of_core.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
Out of core computation
=======================

Overview and motivation
------------------------
Many times analyses involve performing some computation not implemented by a plugin (e.g. plotting)
that require loading more data than can fit into memory,
these type of tasks are commonly reffered to as out-of-core computations.
Out-of-core algorithms usually involve a few repeating steps:

1. chunk the dataset into managable sizes
2. load the data chunk by chunk
3. perform some computation on each chunk
4. save a summary of the results for each chunk
5. perform some combination of the per-chunk results into a final result.

While it is of course possible to implement these operations yourself, it can be tedious and repetative and the code becomes very rigid to the specific calculations being performed.
A better approach is to use abstractions of commonly performed operations that use out-of-core algorithms under the hood to get the same result as if the operations were performed on the entire dataset.
Code written using these abstractions can then run both on in-memory datasets as well as out-of-core datasets alike.
More importantly the implmentations of these algorithms can be written once and packaged to then be used by all.

Data chunking
-------------
The zarr package provides an abstraction of the data-access api of numpy arrays for chunked and compressed data stored in memory or disk.
zarr provides an array abstraction with identical behavior to a numpy array when accessing data but where the underlyign data is actually a collection of compressed (optional) chunks.
the strax context provides a convenience method for loading data directly into zarr arrays.

.. code-block:: python

import strax

context = strax.Context(**CONTEXT_KWARGS)

# you can pass the same arguments you pass to context.get_array()
zgrp = context.get_zarr(RUN_IDs, DATA_TYPES, **GET_ARRAY_KWARGS)

# the zarr group contains multiple arrays, one for each data type
z = zgrp.data_type

# individual arrays are also accessible via the __getitem__ interface
z = zgrp['data_type']

# numpy-like data access, abstracting away the underlying
# data reading which may include readin multiple chunks from disk/memory
# and decompression then concatenation to return an in memory numpy array
z[:100]
WenzDaniel marked this conversation as resolved.
Show resolved Hide resolved


Data processing
---------------
The dask package provides abstractions for most of the numpy and pandas apis.
The dask.Array and dask.DataFrame objects implement their respective apis
using fully distributed algorithms, only loading a fraction of the total data into memory
at any given moment for a given computing partition (thread/process/HPC-job).

.. code-block:: python

import dask.array as da

# easily convert to dask.Array abstraction for processing
darr = da.from_zarr(z)

# its recommended to rechunk to sizes more appropriate for processing
# see dask documentation for details
darr.rechunk(CHUNK_SIZE)

# you can also convert the dask.Array abstraction
# to a dask.DataFrame abstraction if you need the pandas api
ddf = darr.to_dask_dataframe()
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ You might also find these presentations useful:

advanced/chunking
advanced/superrun
advanced/out_of_core
advanced/plugin_dev
advanced/recompression
advanced/fuzzy_for
Expand Down
53 changes: 28 additions & 25 deletions extra_requirements/requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
# File for the requirements of strax with the automated tests
blosc==1.10.4
boltons==21.0.0
dill==0.3.4
coveralls==3.2.0
coverage==5.5
flake8==3.9.2
hypothesis==6.17.4
immutabledict==2.2.0
lz4==3.1.3
mongomock==3.23.0
npshmex==0.2.1
numba==0.53.1
numexpr==2.7.3
numpy==1.21.2
pandas==1.3.2
packaging==21.0
pdmongo==0.1.0
psutil==5.8.0
pymongo==3.12.0
pytest==6.2.5
pytest-cov==2.12.1
scipy==1.7.1
tqdm==4.62.2
zstd==1.5.0.2
# File for the requirements of strax with the automated tests
blosc==1.10.4
boltons==21.0.0
dill==0.3.4
coveralls==3.2.0
coverage==5.5
flake8==3.9.2
hypothesis==6.17.4
immutabledict==2.2.0
lz4==3.1.3
mongomock==3.23.0
npshmex==0.2.1
numba==0.53.1
numexpr==2.7.3
numpy==1.21.2
pandas==1.3.2
packaging==21.0
pdmongo==0.1.0
psutil==5.8.0
pymongo==3.12.0
pytest==6.2.5
pytest-cov==2.12.1
scipy==1.7.1
tqdm==4.62.2
zstd==1.5.0.2
mongomock==3.23.0
jmosbacher marked this conversation as resolved.
Show resolved Hide resolved
zarr==2.10.1
fsspec
jmosbacher marked this conversation as resolved.
Show resolved Hide resolved
50 changes: 50 additions & 0 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,56 @@ def get_df(self, run_id: ty.Union[str, tuple, list],
f"array fields. Please use get_array.")
raise


def get_zarr(self, run_ids, targets, storage='./strax_temp_data',
jmosbacher marked this conversation as resolved.
Show resolved Hide resolved
progress_bar=False, overwrite=True, **kwargs):
jmosbacher marked this conversation as resolved.
Show resolved Hide resolved
"""get perisistant arrays using zarr. This is useful when
loading large amounts of data that cannot fit in memory
zarr is very compatible with dask.
Targets are loaded into separate arrays and runs are merged.
the data is added to any existing data in the storage location.

:param run_ids: (Iterable) Run ids you wish to load.
:param targets: (Iterable) targets to load.
:param storage: (str, optional) fsspec path to store array. Defaults to './strax_temp_data'.
:param overwrite: (boolean, optional) whether to overwrite existing arrays for targets at given path.

:returns zarr.Group: zarr group containing the persistant arrays available at
the storage location after loading the requested data
the runs loaded into a given array can be seen in the
array .attrs['RUNS'] field
"""
import zarr
context_hash = self._context_hash()
kwargs_hash = strax.deterministic_hash(kwargs)
root = zarr.open(storage, mode='w')
group = root.create_group(context_hash+'/'+kwargs_hash, overwrite=overwrite)
for target in strax.to_str_tuple(targets):
idx = 0
zarray = None
if target in group:
zarray = group[target]
if not overwrite:
idx = zarray.size
INSERTED = {}
for run_id in strax.to_str_tuple(run_ids):
if zarray is not None and run_id in zarray.attrs.get('RUNS', {}):
continue
key = self.key_for(run_id, target)
INSERTED[run_id] = dict(start_idx=idx, end_idx=idx, lineage_hash=key.lineage_hash)
for chunk in self.get_iter(run_id, target, progress_bar=progress_bar, **kwargs):
end_idx = idx+chunk.data.size
if zarray is None:
dtype = [(d[0][1], )+d[1:] for d in chunk.dtype.descr]
zarray = group.create_dataset(target, shape=end_idx, dtype=dtype)
else:
zarray.resize(end_idx)
zarray[idx:end_idx] = chunk.data
idx = end_idx
INSERTED[run_id]['end_idx'] = end_idx
zarray.attrs['RUNS'] = dict(zarray.attrs.get('RUNS', {}), **INSERTED)
return group

def key_for(self, run_id, target):
"""
Get the DataKey for a given run and a given target plugin. The
Expand Down
22 changes: 22 additions & 0 deletions tests/test_get_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

import strax
from strax.testutils import Records, Peaks, run_id
import tempfile
import numpy as np



def test_get_zarr():
"""Get a context for the tests below"""
with tempfile.TemporaryDirectory() as temp_dir:
context = strax.Context(storage=strax.DataDirectory(temp_dir,
deep_scan=True),
register=[Records, Peaks],
use_per_run_defaults=True,
)
records = context.get_array(run_id, 'records')
peaks = context.get_array(run_id, 'peaks')
zgrp = context.get_zarr(run_id, ('records', 'peaks'), storage='memory://')

assert np.all(zgrp.records['time'] == records['time'])
assert np.all(zgrp.peaks['time'] == peaks['time'])