Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelSourcePlugin fixes and test #104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 16 additions & 6 deletions strax/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,13 @@ def setup_mailboxes(self, components, mailboxes, executor):
else:
break

# Which data types are we outputting?
# Which data types should we output?
# Anything that's requested by a plugin we did not inline,
# and the final target (whether inlined or not)
self.outputs_to_send.update(set(components.targets))
for d, p in plugins.items():
self.outputs_to_send.update(
set(p.depends_on) & self.sub_plugins.keys())
self.outputs_to_send.update(set(p.depends_on))
self.outputs_to_send &= self.sub_plugins.keys() | {self.provides}

# If the savers do not require rechunking, run them in this way also
for d in list(self.sub_plugins.keys()) + [self.provides]:
Expand All @@ -316,10 +319,17 @@ def setup_mailboxes(self, components, mailboxes, executor):
self.sub_savers[d] = savers[d]
del savers[d]

mailboxes[self.provides].add_sender(self.iter(
# We need a new mailbox to collect temporary outputs in
# These will be dictionaries of stuff to send
# It can't be named after self.provides,
# maybe self.provides is requested by someone,
# in which case that mailbox needs to exist as usual
# (see also #94)
mailbox_name = self.provides + '_parallelsource'
mailboxes[mailbox_name].add_sender(self.iter(
iters={}, executor=executor))
mailboxes[self.provides].add_reader(partial(self.send_outputs,
mailboxes=mailboxes))
mailboxes[mailbox_name].add_reader(partial(self.send_outputs,
mailboxes=mailboxes))
return components

def send_outputs(self, source, mailboxes):
Expand Down
2 changes: 2 additions & 0 deletions strax/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,5 @@ def iter(self):
# which is printed for the user
if traceback is not None:
raise exc.with_traceback(traceback)

self.log.debug("Processing finished")
3 changes: 2 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def test_filestore():

# The first dir contains peaks.
# It should have one data chunk (rechunk is on) and a metadata file
assert os.listdir(data_dirs[0]) == ['000000', 'metadata.json']
assert sorted(os.listdir(data_dirs[0])) \
== ['000000', 'metadata.json']

# Check metadata got written correctly.
metadata = mystrax.get_meta(run_id, 'peaks')
Expand Down
77 changes: 77 additions & 0 deletions tests/test_parallelsourceplugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import strax
import numpy as np

# TODO: these are small modifications of the test helpers in test_core.py
# Can we avoid duplication somehow?
n_chunks = 10
recs_per_chunk = 10
run_id = '0'


class Records(strax.ParallelSourcePlugin):
provides = 'records'
depends_on = tuple()
dtype = strax.record_dtype()

def compute(self, chunk_i):
r = np.zeros(recs_per_chunk, self.dtype)
r['time'] = chunk_i
r['length'] = 1
r['dt'] = 1
r['channel'] = np.arange(len(r))
return r

def source_finished(self):
return True

def is_ready(self, chunk_i):
return chunk_i < n_chunks


class Peaks(strax.Plugin):
parallel = True
provides = 'peaks'
depends_on = ('records',)
dtype = strax.peak_dtype()

def compute(self, records):
assert isinstance(records, np.ndarray), \
f"Recieved {type(records)} instead of numpy array!"
p = np.zeros(len(records), self.dtype)
p['time'] = records['time']
return p


def test_processing():
"""Test ParallelSource plugin under several conditions"""
# TODO: For some reason, with max_workers = 2,
# there is a hang at the end
# We haven't tested multiprocessing anywhere else,
# so we should do that first
max_workers = 1
for request_peaks in (True, False):
for peaks_parallel in (True, False):
# for max_workers in (1, 2):
Peaks.parallel = peaks_parallel
print(f"\nTesting with request_peaks {request_peaks}, "
f"peaks_parallel {peaks_parallel}, "
f"max_workers {max_workers}")

mystrax = strax.Context(storage=[],
register=[Records, Peaks])
bla = mystrax.get_array(
run_id=run_id,
targets='peaks' if request_peaks else 'records',
max_workers=max_workers)
assert len(bla) == recs_per_chunk * n_chunks
assert bla.dtype == (
strax.peak_dtype() if request_peaks else strax.record_dtype())


if __name__ == '__main__':
import logging
logging.basicConfig(
level=logging.DEBUG,
format='{name} in {threadName} at {asctime}: {message}',
style='{')
test_processing()