Skip to content

Commit

Permalink
work around race in task cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
Frens Jan Rumph committed May 12, 2017
1 parent d2d493a commit 01eaf24
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions bndl/compute/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,14 +1554,17 @@ def itake(self, num):


def _itake_parts(self):
# TODO use new scheduler to reuse old tasks in slicing the final stage
mask = slice(0, 1)
pcount = len(self.parts())
while mask.start < pcount:
masked = self.mask_partitions(lambda part: mask.start <= part.idx < mask.stop)
done = self.ctx.execute(masked._schedule())
job = masked._schedule()
done = self.ctx.execute(job)
try:
for task in done:
yield task.result[1]
if not task.dependents and not task.cancelled:
yield task.result[1]
mask = slice(mask.stop, mask.stop * 2 + 1)
finally:
done.close()
Expand Down Expand Up @@ -1603,12 +1606,8 @@ def _execute(self, ordered=True):
done = self.ctx.execute(job, order_results=ordered)
for task in done:
assert task in job.tasks
if not task.dependents:
try:
yield task.result[1]
except Exception as e:
raise Exception('something wrong with %r, %r' %
(task, (task.part, task.result, task.handle))) from e
if not task.dependents and not task.cancelled:
yield task.result[1]


def _generate_tasks(self, required, tasks, groups):
Expand Down Expand Up @@ -2273,14 +2272,19 @@ def execute(self, scheduler, executor):


def signal_stop(self):
if self.result and self.result[0]:
self.result_on[-1] = self.result[0]
if self.exception and self.dependencies:
exc = root_exc(self.exception)
if isinstance(exc, (DependenciesFailed, FailedDependency)):
for barrier in self.dependencies:
logger.debug('Marking barrier %r before %r as failed', barrier, self)
barrier.mark_failed(FailedDependency())
if isinstance(self.result, TaskCancelled):
self.cancelled = True
self.result = None
self.exception = None
else:
if self.result and self.result[0]:
self.result_on[-1] = self.result[0]
if self.exception and self.dependencies:
exc = root_exc(self.exception)
if isinstance(exc, (DependenciesFailed, FailedDependency)):
for barrier in self.dependencies:
logger.debug('Marking barrier %r before %r as failed', barrier, self)
barrier.mark_failed(FailedDependency())
super().signal_stop()


Expand Down Expand Up @@ -2316,8 +2320,8 @@ def _compute_part(part, dependency_locations):
data = list(data)
# return the result location (maybe None) and data
return task_ctx.get('result_location'), data
except TaskCancelled:
return None
except TaskCancelled as exc:
return exc
except DependenciesFailed:
raise
except Exception:
Expand Down

0 comments on commit 01eaf24

Please sign in to comment.