Skip to content

Commit

Permalink
Fix define runs and allow storing of superuns (#472)
Browse files Browse the repository at this point in the history
* Fix define runs

* Allow rechunking of superruns, add new chunk properties

Co-authored-by: Joran Angevaare <jorana@nikhef.nl>
  • Loading branch information
WenzDaniel and JoranAngevaare committed Jul 15, 2021
1 parent a7d7c11 commit a43c55a
Show file tree
Hide file tree
Showing 11 changed files with 583 additions and 125 deletions.
94 changes: 73 additions & 21 deletions docs/source/advanced/superrun.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,103 @@ Superruns

Overview and motivation
------------------------
A superrun is a run defined by (parts of) other runs, which are called 'subruns'. Superrun names start with an underscore. Regular run names cannot start with an underscore.
A superrun is a run defined by (parts of) other runs, which are called 'subruns'.
Superrun names start with an underscore. Regular run names cannot start with an underscore.

Strax builds data for a superrun by loading (and potentially building) each of the subruns, then slicing and concatenating them as necessary.
Strax builds data for a superrun by loading (and potentially building) each of the subruns, then
slicing and concatenating them as necessary. In addition superruns can be stored to disk as a
rechunked representation of its subruns. This currently only works for static lineages e.g. without
default-by-run_id settings. Stored superruns have the advantage that loading data is much faster
and different data_types of the same kind can be combined.

Superruns are useful to track common groupings of data. For example:

* 'Minimum bias' runs, consisting only of low-energy events, events passing some cuts, DM-candidates, PMT flashes, or other thing of interest. The low-level data of these is much smaller than that of all the full runs, and can be brought to a local analysis facility, enabling on-site low-level waveform watching.
* Grouping similar runs. For example, shifters might group good runs from a week of calibration data with some source under a single name, e.g. `_kr_feb2019`.


Superruns can be built from other superruns. Thus, _sr1_v0.2 could be built from
_background_january, _background_february, etc.

Superruns can be built from other superruns. Thus, _sr1_v0.2 could be built from _background_january, _background_february, etc.
Defining superruns and making data:
-----------------------------------
Use the `define_run` context method to define a new superrun. Currently it is only supported to
define superruns from a list of run_ids::

st.define_run('_awesome_superrun', ['123', '124'])

Defining superruns
-------------------
Use the `define_run` context method to define a new superrun. There are three ways to do this.

From a list of runids::
.. From a dictionary of time range tuples. The times must be 64-bit integer UTC timestamps since the unix epoch::
st.define_run('_awesome_superrun', ['123', '124'])
.. st.define_run('_awesome_superrun', {
'123': [(start, stop), (start, stop), ...],
'124': [(start, stop), (start, stop), ...],})
From a dictionary of time range tuples. The times must be 64-bit integer UTC timestamps since the unix epoch::
.. From a dataframe (or record array) with strax data::
st.define_run('_awesome_superrun', {
'123': [(start, stop), (start, stop), ...],
'124': [(start, stop), (start, stop), ...],})
.. st.define_run('_awesome_superrun', events_df)
st.define_run('_awesome_superrun', events_df, from_run='123')
From a dataframe (or record array) with strax data::
.. In this case, the run will be made of the time ranges that correspond exactly to `events_df`. If `events_df` already has a `run_id` field (e.g. because it consists of data from multiple runs), you do not need to pass `from_run`, it will be read off from the data.
st.define_run('_awesome_superrun', events_df)
st.define_run('_awesome_superrun', events_df, from_run='123')
It is up to the storage frontent to process your request for defining a run. As a normal user, you
generally only have permissions to create a new run in the `DataDirectory` (local files) storage
frontend, where runs are recorded in json files.

Making superrun data is as easy as creating any other data. Once a superrun is defined we can make
for exmaple event_info via::

st.make('_awesome_superrun', 'event_info)

For bookkeeping each stored superrun chunk contains information of its constituents in a field
called subruns e.g.:

{'0': {'end': 10, 'start': 0},
'1': {'end': 30, 'start': 20},
'2': {'end': 50, 'start': 40}}

Where the keys represent the subrun_ids and start/end the start and end of the corresponding
first/last chunk included in the superrun chunk. The same information can also be found in the
metadata of the individual chunks:

In this case, the run will be made of the time ranges that correspond exactly to `events_df`. If `events_df` already has a `run_id` field (e.g. because it consists of data from multiple runs), you do not need to pass `from_run`, it will be read off from the data.
{'chunk_i': 0,
'end': 50,
'filename': 'records-j3nd2fjbiq-000000',
'filesize': 2343,
'first_endtime': 1,
'first_time': 0,
'last_endtime': 50,
'last_time': 49,
'n': 300,
'nbytes': 77100,
'run_id': '_superrun_test',
'start': 0,
'subruns': {'0': {'end': 10, 'start': 0},
'1': {'end': 30, 'start': 20},
'2': {'end': 50, 'start': 40}}}

It is up to the storage frontent to process your request for defining a run. As a normal user, you generally only have permissions to create a new run in the `DataDirectory` (local files) storage frontend, where runs are recorded in json files.
After creating data we can load the superrun as we are used to and combine it with other data_types
of the same kind too.

To work more easily with superruns all chunks have also the properties `chunk.is_superun` as well as
`chunk.first_subrun` and `chunk.last_subrun`.

How superruns work
--------------------

As mentioned above, strax builds data for superruns by slicing data of the subruns. Thus, peaks from a superrun come from the peaks of the subruns, which are built from their own records as usual.
As mentioned above, strax builds data for superruns by slicing data of the subruns. Thus, peaks
from a superrun come from the peaks of the subruns, which are built from their own records as usual.

Defaults for settings can be runid-dependent in strax. If an option specifies `default_per_run=[(run, setting), (run2, setting2)]`, then runs in between run and run2 will use setting, and runs after run2 `setting2`. Superruns store a deterministic hash of this `default_per_run` specification for tracking purposes.
Defaults for settings can be runid-dependent in strax, although this is not preferred any longer.
If an option specifies `default_per_run=[(run, setting), (run2, setting2)]`, then runs in between
run and run2 will use setting, and runs after run2 `setting2`. Superruns store a deterministic hash
of this `default_per_run` specification for tracking purposes.

You cannot currently go directly from the superrun's records to the superrun's peaks. This would be tricky to implement, since (1) (2) even with the same settings, many plugins choose to do something different depending on the runid. For example, in straxen the gain model is specified by a file, but which gains from the file are actually used is dependent on the runid.
You cannot currently go directly from the superrun's records to the superrun's peaks. This would be
tricky to implement, since (1) (2) even with the same settings, many plugins choose to do something
different depending on the runid. For example, in straxen the gain model is specified by a file,
but which gains from the file are actually used is dependent on the runid.

Thus, superruns won't help build data faster, but they will speed up loading data after it has been built. This is important, because strax' overhead for loading a run is larger than hax, due to its version and option tracking.
Thus, superruns won't help build data faster, but they will speed up loading data after it has been
built. This is important, because strax' overhead for loading a run is larger than hax, due to its
version and option tracking (this is only true if per-run-default options are allowed).
109 changes: 101 additions & 8 deletions strax/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Chunk:
# run_id is not superfluous to track:
# this could change during the run in superruns (in the future)
run_id: str
subruns: dict
start: int
end: int

Expand All @@ -36,13 +37,15 @@ def __init__(self,
start,
end,
data,
subruns=None,
target_size_mb=default_chunk_size_mb):
self.data_type = data_type
self.data_kind = data_kind
self.dtype = np.dtype(dtype)
self.run_id = run_id
self.start = start
self.end = end
self.subruns = subruns
if data is None:
data = np.empty(0, dtype)
self.data = data
Expand Down Expand Up @@ -110,6 +113,34 @@ def nbytes(self):
def duration(self):
return self.end - self.start

@property
def is_superrun(self):
return bool(self.subruns) and self.run_id.startswith('_')

@property
def first_subrun(self):
_subrun = None
if self.is_superrun:
_subrun = self._get_subrun(0)
return _subrun

@property
def last_subrun(self):
_subrun = None
if self.is_superrun:
_subrun = self._get_subrun(-1)
return _subrun

def _get_subrun(self, index):
"""
Returns subrun according to position in chunk.
"""
subrun_id = list(self.subruns.keys())[index]
_subrun = {'run_id': subrun_id,
'start': self.subruns[subrun_id]['start'],
'end': self.subruns[subrun_id]['end']}
return _subrun

def _mbs(self):
if self.duration:
return (self.nbytes / 1e6) / (self.duration / 1e9)
Expand Down Expand Up @@ -233,11 +264,14 @@ def concatenate(cls, chunks):
data_type = data_types[0]

run_ids = [c.run_id for c in chunks]

if len(set(run_ids)) != 1:
raise ValueError(
f"Cannot concatenate {data_type} chunks with "
f"different run ids: {run_ids}")

run_id = run_ids[0]
subruns = _update_subruns_in_chunk(chunks)

prev_end = 0
for c in chunks:
Expand All @@ -254,6 +288,7 @@ def concatenate(cls, chunks):
data_type=data_type,
data_kind=chunks[0].data_kind,
run_id=run_id,
subruns=subruns,
data=np.concatenate([c.data for c in chunks]),
target_size_mb=max([c.target_size_mb for c in chunks]))

Expand All @@ -263,19 +298,29 @@ def continuity_check(chunk_iter):
"""Check continuity of chunks yielded by chunk_iter as they are yielded"""
last_end = None
last_runid = None
for s in chunk_iter:
if s.run_id != last_runid:
last_subrun = {'run_id': None}
for chunk in chunk_iter:
if chunk.run_id != last_runid:
# TODO: can we do better?
last_end = None
last_subrun = {'run_id': None}

if chunk.is_superrun:
_subrun = chunk.first_subrun
if _subrun['run_id'] != last_subrun['run_id']:
last_end = None
else:
last_end = last_subrun['end']

if last_end is not None:
if s.start != last_end:
if chunk.start != last_end:
raise ValueError("Data is not continuous. "
f"Chunk {s} should have started at {last_end}")
yield s

last_end = s.end
last_runid = s.run_id
f"Chunk {chunk} should have started at {last_end}")
yield chunk

last_end = chunk.end
last_runid = chunk.run_id
last_subrun = chunk.last_subrun

@export
class CannotSplit(Exception):
Expand Down Expand Up @@ -332,3 +377,51 @@ def split_array(data, t, allow_early_split=False):
t = min(data[splittable_i]['time'], t)

return data[:splittable_i], data[splittable_i:], t


@export
def transform_chunk_to_superrun_chunk(superrun_id, chunk):
"""
Function which transforms/creates a new superrun chunk from subrun chunk.
:param superrun_id: id/name of the superrun.
:param chunk: strax.Chunk of a superrun subrun.
:return: strax.Chunk
"""
if chunk is None:
return chunk
subruns = {chunk.run_id: {'start': chunk.start,
'end': chunk.end}}

return Chunk(start=chunk.start,
end=chunk.end,
dtype=chunk.dtype,
data_type=chunk.data_type,
data_kind=chunk.data_kind,
run_id=superrun_id,
subruns=subruns,
data=chunk.data,
target_size_mb=chunk.target_size_mb)


def _update_subruns_in_chunk(chunks):
"""
Updates list of subruns in a superrun chunk during concatenation
Updates also their start/ends too.
"""
subruns = None
for c_i, c in enumerate(chunks):
if not subruns:
subruns = c.subruns
continue

for subrun_id, subrun_start_end in c.subruns.items():
if subrun_id in subruns:
subruns[subrun_id] = {'start': min(subruns[subrun_id]['start'],
subrun_start_end['start']),
'end': max(subruns[subrun_id]['end'],
subrun_start_end['end'])
}
else:
subruns[subrun_id] = subrun_start_end
return subruns

0 comments on commit a43c55a

Please sign in to comment.