Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close inlined savers on exceptions in multiprocessing #390

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions strax/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,12 @@ def _send_from(self, iterable):
except StopIteration:
# No need to send this yet, close will do that
break
self.send(x)
try:
self.send(x)
except Exception as e:
# Inform the source we're going down
iterable.throw(e)
raise
i += 1

except Exception as e:
Expand Down Expand Up @@ -488,8 +493,13 @@ def divide_outputs(source,
# No need to send this yet, close will do that
break

for d, x in result.items():
mailboxes[d].send(x)
try:
for d, x in result.items():
mailboxes[d].send(x)
except Exception as e:
# Inform the source we're going down
source.throw(e)
raise
i += 1

except Exception as e:
Expand Down
222 changes: 112 additions & 110 deletions strax/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,119 +302,121 @@ def iter(self, iters, executor=None):
pacemaker = d
_end = self.input_buffer[d].end

for chunk_i in itertools.count():

# Online input support
while not self.is_ready(chunk_i):
if self.source_finished():
# Chunk_i does not exist. We are done.
print("Source finished!")
self.cleanup(iters, wait_for=pending_futures)
return

if time.time() > last_input_received + self.input_timeout:
raise InputTimeoutExceeded(
f"{self.__class__.__name__}:{id(self)} waited for "
f"more than {self.input_timeout} sec for arrival of "
f"input chunk {chunk_i}, and has given up.")

print(f"{self.__class__.__name__} with object id: {id(self)} "
f"waits for chunk {chunk_i}")
time.sleep(2)
last_input_received = time.time()

if pacemaker is None:
inputs_merged = dict()
else:
if chunk_i != 0:
# Fetch the pacemaker, to figure out when this chunk ends
# (don't do it for chunk 0, for which we already fetched)
if not self._fetch_chunk(pacemaker, iters):
# Source exhausted. Cleanup will do final checks.
self.cleanup(iters, wait_for=pending_futures)
return
this_chunk_end = self.input_buffer[pacemaker].end

inputs = dict()
# Fetch other inputs (when needed)
for d in self.depends_on:
if d != pacemaker:
while (self.input_buffer[d] is None
or self.input_buffer[d].end < this_chunk_end):
self._fetch_chunk(
d, iters,
check_end_not_before=this_chunk_end)
inputs[d], self.input_buffer[d] = \
self.input_buffer[d].split(
t=this_chunk_end,
allow_early_split=True)
# If any of the inputs were trimmed due to early splits,
# trim the others too.
# In very hairy cases this can take multiple passes.
# TODO: can we optimize this, or code it more elegantly?
max_passes_left = 10
while max_passes_left > 0:
this_chunk_end = min([x.end for x in inputs.values()]
+ [this_chunk_end])
if len(set([x.end for x in inputs.values()])) <= 1:
break
# To break out of nested loops:
class IterDone(Exception):
pass

try:
for chunk_i in itertools.count():

# Online input support
while not self.is_ready(chunk_i):
if self.source_finished():
# Chunk_i does not exist. We are done.
print("Source finished!")
raise IterDone()

if time.time() > last_input_received + self.input_timeout:
raise InputTimeoutExceeded(
f"{self.__class__.__name__}:{id(self)} waited for "
f"more than {self.input_timeout} sec for arrival of "
f"input chunk {chunk_i}, and has given up.")

print(f"{self.__class__.__name__} with object id: {id(self)} "
f"waits for chunk {chunk_i}")
time.sleep(2)
last_input_received = time.time()

if pacemaker is None:
inputs_merged = dict()
else:
if chunk_i != 0:
# Fetch the pacemaker, to figure out when this chunk ends
# (don't do it for chunk 0, for which we already fetched)
if not self._fetch_chunk(pacemaker, iters):
# Source exhausted. Cleanup will do final checks.
raise IterDone()
this_chunk_end = self.input_buffer[pacemaker].end

inputs = dict()
# Fetch other inputs (when needed)
for d in self.depends_on:
inputs[d], back_to_buffer = \
inputs[d].split(
if d != pacemaker:
while (self.input_buffer[d] is None
or self.input_buffer[d].end < this_chunk_end):
self._fetch_chunk(
d, iters,
check_end_not_before=this_chunk_end)
inputs[d], self.input_buffer[d] = \
self.input_buffer[d].split(
t=this_chunk_end,
allow_early_split=True)
self.input_buffer[d] = strax.Chunk.concatenate(
[back_to_buffer, self.input_buffer[d]])
max_passes_left -= 1
# If any of the inputs were trimmed due to early splits,
# trim the others too.
# In very hairy cases this can take multiple passes.
# TODO: can we optimize this, or code it more elegantly?
max_passes_left = 10
while max_passes_left > 0:
this_chunk_end = min([x.end for x in inputs.values()]
+ [this_chunk_end])
if len(set([x.end for x in inputs.values()])) <= 1:
break
for d in self.depends_on:
inputs[d], back_to_buffer = \
inputs[d].split(
t=this_chunk_end,
allow_early_split=True)
self.input_buffer[d] = strax.Chunk.concatenate(
[back_to_buffer, self.input_buffer[d]])
max_passes_left -= 1
else:
raise RuntimeError(
f"{self} was unable to get time-consistent "
f"inputs after ten passess. Inputs: \n{inputs}\n"
f"Input buffer:\n{self.input_buffer}")

# Merge inputs of the same kind
inputs_merged = {
kind: strax.Chunk.merge([inputs[d] for d in deps_of_kind])
for kind, deps_of_kind in self.dependencies_by_kind().items()}

# Submit the computation
# print(f"{self} calling with {inputs_merged}")
if self.parallel and executor is not None:
new_future = executor.submit(
self.do_compute,
chunk_i=chunk_i,
**inputs_merged)
pending_futures.append(new_future)
pending_futures = [f for f in pending_futures if not f.done()]
yield new_future
else:
yield self.do_compute(chunk_i=chunk_i, **inputs_merged)

except IterDone:
# Check all sources are exhausted.
# This is more than a check though -- it ensure the content of
# all sources are requested all the way (including the final
# Stopiteration), as required by lazy-mode processing requires
for d in iters.keys():
if self._fetch_chunk(d, iters):
raise RuntimeError(
f"{self} was unable to get time-consistent "
f"inputs after ten passess. Inputs: \n{inputs}\n"
f"Input buffer:\n{self.input_buffer}")

# Merge inputs of the same kind
inputs_merged = {
kind: strax.Chunk.merge([inputs[d] for d in deps_of_kind])
for kind, deps_of_kind in self.dependencies_by_kind().items()}

# Submit the computation
# print(f"{self} calling with {inputs_merged}")
if self.parallel and executor is not None:
new_future = executor.submit(
self.do_compute,
chunk_i=chunk_i,
**inputs_merged)
pending_futures.append(new_future)
pending_futures = [f for f in pending_futures if not f.done()]
yield new_future
else:
yield self.do_compute(chunk_i=chunk_i, **inputs_merged)

raise RuntimeError("This cannot happen.")

def cleanup(self,
iters: typing.Dict[str, typing.Iterable],
wait_for):
# The wait_for option is only used in child classes;
# A standard plugin doesn't need to do anything with the computation
# future results.

# Check all sources are exhausted.
# This is more than a check though -- it ensure the content of
# all sources are requested all the way (including the final
# Stopiteration), as required by lazy-mode processing requires
for d in iters.keys():
if self._fetch_chunk(d, iters):
raise RuntimeError(
f"Plugin {d} terminated without fetching last {d}!")
f"Plugin {d} terminated without fetching last {d}!")

# This can happen especially in time range selections
if int(self.save_when) != strax.SaveWhen.NEVER:
for d, buffer in self.input_buffer.items():
# Check the input buffer is empty
if buffer is not None and len(buffer):
raise RuntimeError(
f"Plugin {d} terminated with leftover {d}: {buffer}")
# This can happen especially in time range selections
if int(self.save_when) != strax.SaveWhen.NEVER:
for d, buffer in self.input_buffer.items():
# Check the input buffer is empty
if buffer is not None and len(buffer):
raise RuntimeError(
f"Plugin {d} terminated with leftover {d}: {buffer}")

finally:
self.cleanup(wait_for=pending_futures)

def cleanup(self, wait_for):
pass
# A standard plugin doesn't need to do anything here

def _check_dtype(self, x, d=None):
# There is an additional 'last resort' data type check
Expand Down Expand Up @@ -981,10 +983,10 @@ def do_compute(self, chunk_i=None, **kwargs):

return self._fix_output(results, start=r0.start, end=r0.end)

def cleanup(self, iters, wait_for):
print(f"{self.__class__.__name__} exhausted. "
def cleanup(self, wait_for):
print(f"{self.__class__.__name__} terminated. "
f"Waiting for {len(wait_for)} pending futures.")
for savers in self.sub_savers.values():
for s in savers:
s.close(wait_for=wait_for)
super().cleanup(iters, wait_for)
super().cleanup(wait_for)
2 changes: 2 additions & 0 deletions strax/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ class Records(strax.Plugin):
depends_on = tuple()
dtype = strax.record_dtype()

rechunk_on_save = False

def source_finished(self):
return True

Expand Down