Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapper for get_iter to get accumulated results. #253

Merged
merged 14 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
100 changes: 100 additions & 0 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,106 @@ def get_array(self, run_id: ty.Union[str, tuple, list],
results = [x.data for x in source]
return np.concatenate(results)

def accumulate(self,
run_id: ty.Union[str, tuple, list],
targets,
fields=None,
function=None,
store_first_for_others=True,
function_takes_fields=False,
**kwargs):
"""Return a dictionary with the sum of the result of get_array.

:param function: Apply this function to the array before summing the
results. Will be called as function(array), where array is
a chunk of the get_array result.
Should return either:
* A scalar or 1d array -> accumulated result saved under 'result'
* A record array or dict -> fields accumulated individually
* None -> nothing accumulated
If not provided, the identify function is used.

:param fields: Fields of the function output to accumulate.
If not provided, all output fields will be accumulated.

:param store_first_for_others: if True (default), for fields included
in the data but not fields, store the first value seen in the data
(if any value is seen).

:param function_takes_fields: If True, function will be called as
function(data, fields) instead of function(data).

All other options are as for get_iter.

:returns dictionary: Dictionary with the accumulated result;
see function and store_first_for_others arguments.
Four fields are always added:
start: start time of the first processed chunk
end: end time of the last processed chunk
n_chunks: number of chunks in run
n_rows: number of data entries in run
"""
n_chunks = 0
seen_data = False
result = {'n_rows': 0}
if fields is not None:
fields = strax.to_str_tuple(fields)
if function is None:
def function(arr):
return arr
function_takes_fields = False

for chunk in self.get_iter(run_id, targets, **kwargs):
data = chunk.data

if n_chunks == 0:
result['start'] = chunk.start
if fields is None:
# Sum all fields except time and endtime
fields = [x for x in data.dtype.names
if x not in ('time', 'endtime')]

if store_first_for_others and not seen_data and len(data):
# Store the first value we see for the non-accumulated fields
for name in data.dtype.names:
if name in fields:
result[name] = data[0][name]
seen_data = True
result['end'] = chunk.end
result['n_rows'] += len(data)

# Run the function
if function_takes_fields:
data = function(data, fields)
else:
data = function(data)

# Accumulate the result
# Don't try to be clever here,
# += doesn't work on readonly array fields;
# .sum() doesn't work on scalars
if data is None:
pass

elif (isinstance(data, dict)
or (isinstance(data, np.ndarray)
and data.dtype.fields is not None)):
# Function returned record array or dict
for field in fields:
result[field] = (
result.get(field, 0)
+ np.sum(data[field], axis=0))
else:
# Function returned a scalar or flat array
result['result'] = (
np.sum(data, axis=0)
+ result.get('result', 0))

n_chunks += 1

result['n_chunks'] = n_chunks
return result

def get_df(self, run_id: ty.Union[str, tuple, list],
targets, save=tuple(), max_workers=None,
**kwargs) -> pd.DataFrame:
Expand Down
1 change: 0 additions & 1 deletion strax/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
except ImportError:
# This is allowed to fail, it only crashes if allow_shm = True
SHMExecutor = None
pass


@export
Expand Down