Skip to content

Commit

Permalink
Single threaded alternative processor (#773)
Browse files Browse the repository at this point in the history
* add processors field to context

* Separate rechunk logic from saver

* Minor abstract processor changes/fixes

* Single-thread processor + tests

* Track time spent per data type

* Fix final message lingering

* Multi-output and rebase fixes, add logging

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Placate our code review robot overlords

* Keep old straxen test running

* Be compatible with hyperruns and fix code style

* Code style fix

---------

Co-authored-by: Yossi Mosbacher <joe.mosbacher>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Dacheng Xu <dx2227@columbia.edu>
  • Loading branch information
3 people authored Jun 10, 2024
1 parent b55b0d3 commit d0cbc1b
Show file tree
Hide file tree
Showing 13 changed files with 913 additions and 397 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repos:
args: [--safe, --line-length=100, --preview]
- id: black-jupyter
args: [--safe, --line-length=100, --preview]
language_version: python3.9
language_version: python3

- repo: https://github.com/pycqa/docformatter
rev: v1.7.5
Expand Down
1 change: 1 addition & 0 deletions strax/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .mailbox import *
from .processor import *
from .processors import *
from .context import *
from .run_selection import *
from .corrections import *
Expand Down
41 changes: 41 additions & 0 deletions strax/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,44 @@ def _update_subruns_in_chunk(chunks):
else:
subruns[subrun_id] = subrun_start_end
return subruns


@export
class Rechunker:
"""Helper class for rechunking.
Send in chunks via receive, which returns either None (no chunk to send) or a chunk to send.
Don't forget a final call to .flush() to get any final data out!
"""

def __init__(self, rechunk=False, run_id=None):
self.rechunk = rechunk
self.is_superrun = run_id and run_id.startswith("_") and not run_id.startswith("__")
self.run_id = run_id

self.cache = None

def receive(self, chunk):
if self.is_superrun:
chunk = strax.transform_chunk_to_superrun_chunk(self.run_id, chunk)
if not self.rechunk:
# We aren't rechunking
return chunk
if self.cache:
# We have an old chunk, so we need to concatenate
chunk = strax.Chunk.concatenate([self.cache, chunk])
if chunk.data.nbytes >= chunk.target_size_mb * 1e6:
# Enough data to send a new chunk!
self.cache = None
return chunk
else:
# Not enough data yet, so we cache the chunk
self.cache = chunk
return None

def flush(self):
result = self.cache
self.cache = None
return result
54 changes: 45 additions & 9 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ class Context:
_run_defaults_cache: dict
storage: ty.List[strax.StorageFrontend]

def __init__(self, storage=None, config=None, register=None, register_all=None, **kwargs):
processors: ty.Mapping[str, strax.BaseProcessor]

def __init__(
self, storage=None, config=None, register=None, register_all=None, processors=None, **kwargs
):
"""Create a strax context.
:param storage: Storage front-ends to use. Can be:
Expand All @@ -202,7 +206,9 @@ def __init__(self, storage=None, config=None, register=None, register_all=None,
applied to plugins
:param register: plugin class or list of plugin classes to register
:param register_all: module for which all plugin classes defined in it
will be registered.
will be registered.
:param processors: A mapping of processor names to classes to use for
data processing.
Any additional kwargs are considered Context-specific options; see
Context.takes_config.
Expand All @@ -226,12 +232,32 @@ def __init__(self, storage=None, config=None, register=None, register_all=None,
if register is not None:
self.register(register)

if processors is None:
processors = strax.PROCESSORS

if isinstance(processors, str):
processors = [processors]

if isinstance(processors, (list, tuple)):
ps = {}
for processor in processors:
if isinstance(processor, str) and processor in strax.PROCESSORS:
ps[processor] = strax.PROCESSORS[processor]
elif isinstance(processor, strax.BaseProcessor):
ps[processor.__name__] = processor
else:
raise ValueError(f"Unknown processor {processor}")
processors = ps

self.processors = processors

def new_context(
self,
storage=tuple(),
config=None,
register=None,
register_all=None,
processors=None,
replace=False,
**kwargs,
):
Expand All @@ -255,7 +281,7 @@ def new_context(
config = strax.combine_configs(self.config, config, mode="update")
kwargs = strax.combine_configs(self.context_config, kwargs, mode="update")

new_c = Context(storage=storage, config=config, **kwargs)
new_c = Context(storage=storage, config=config, processors=processors, **kwargs)
if not replace:
new_c._plugin_class_registry = self._plugin_class_registry.copy()
new_c.register_all(register_all)
Expand Down Expand Up @@ -1434,7 +1460,7 @@ def to_absolute_time_range(
def get_iter(
self,
run_id: str,
targets: ty.Union[ty.Tuple[str], ty.List[str]],
targets,
save=tuple(),
max_workers=None,
time_range=None,
Expand All @@ -1449,6 +1475,7 @@ def get_iter(
progress_bar=True,
multi_run_progress_bar=True,
_chunk_number=None,
processor=None,
**kwargs,
) -> ty.Iterator[strax.Chunk]:
"""Compute target for run_id and iterate over results.
Expand Down Expand Up @@ -1516,8 +1543,17 @@ def get_iter(
if k.startswith("_temp"):
del self._plugin_class_registry[k]

if processor is None:
processor = list(self.processors)[0]

if isinstance(processor, str):
processor = self.processors[processor]

if not hasattr(processor, "iter"):
raise ValueError("Processors must implement a iter methed.")

seen_a_chunk = False
generator = strax.ThreadedMailboxProcessor(
generator = processor(
components,
max_workers=max_workers,
allow_shm=self.context_config["allow_shm"],
Expand Down Expand Up @@ -2542,8 +2578,7 @@ def add_method(cls, f):
:param multi_run_progress_bar: Display a progress bar for loading multiple runs
"""

get_docs = (
"""
get_docs = """
:param run_id: run id to get
:param targets: list/tuple of strings of data type names to get
:param ignore_errors: Return the data for the runs that successfully loaded, even if some runs
Expand All @@ -2563,9 +2598,10 @@ def add_method(cls, f):
:param run_id_as_bytes: Boolean if true uses byte string instead of an
unicode string added to a multi-run array. This can save a lot of
memory when loading many runs.
:param processor: Name of the processor to use. If not specified, the
first processor from the context's processor list is used.
"""
+ select_docs
)
get_docs += select_docs

for attr in dir(Context):
attr_val = getattr(Context, attr)
Expand Down
Loading

0 comments on commit d0cbc1b

Please sign in to comment.