Skip to content

Commit

Permalink
Comments and questions on celery/canvas.py
Browse files Browse the repository at this point in the history
  • Loading branch information
naomielst authored and auvipy committed Nov 17, 2021
1 parent d32356c commit 120b615
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,17 @@ def freeze(self, _id=None, group_id=None, chord=None,
# XXX chord is also a class in outer scope.
opts = self.options
try:
# if there is already an id for this task, return it
tid = opts['task_id']
except KeyError:
# otherwise, use the _id sent to this function, falling back on a generated UUID
tid = opts['task_id'] = _id or uuid()
if root_id:
opts['root_id'] = root_id
if parent_id:
opts['parent_id'] = parent_id
if 'reply_to' not in opts:
# fall back on unique ID for this thread in the app
opts['reply_to'] = self.app.thread_oid
if group_id and "group_id" not in opts:
opts['group_id'] = group_id
Expand Down Expand Up @@ -676,6 +679,8 @@ def run(self, args=None, kwargs=None, group_id=None, chord=None,
else:
return results_from_prepare[0]

# in order for a chain to be frozen, each of the members of the chain individually needs to be frozen
# TODO figure out why we are always cloning before freeze
def freeze(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None, group_index=None):
# pylint: disable=redefined-outer-name
Expand Down Expand Up @@ -703,6 +708,7 @@ def prepare_steps(self, args, kwargs, tasks,
use_link = True
steps = deque(tasks)

# optimization: now the pop func is a local variable
steps_pop = steps.pop
steps_extend = steps.extend

Expand All @@ -717,11 +723,15 @@ def prepare_steps(self, args, kwargs, tasks,
# get the next task in the chain.
while steps:
task = steps_pop()
# if steps is not empty, this is the first task - reverse order
# if i = 0, this is the last task - again, because we're reversed
is_first_task, is_last_task = not steps, not i

if not isinstance(task, abstract.CallableSignature):
task = from_dict(task, app=app)
if isinstance(task, group):
# when groups are nested, they are unrolled - all tasks within
# groups within groups should be called in parallel
task = maybe_unroll_group(task)

# first task gets partial args from chain
Expand All @@ -734,10 +744,11 @@ def prepare_steps(self, args, kwargs, tasks,
task.args = tuple(args) + tuple(task.args)

if isinstance(task, _chain):
# splice the chain
# splice (unroll) the chain
steps_extend(task.tasks)
continue

# TODO why isn't this asserting is_last_task == False?
if isinstance(task, group) and prev_task:
# automatically upgrade group(...) | s to chord(group, s)
# for chords we freeze by pretending it's a normal
Expand Down Expand Up @@ -1230,9 +1241,15 @@ def _freeze_group_tasks(self, _id=None, group_id=None, chord=None,
root_id = opts.setdefault('root_id', root_id)
parent_id = opts.setdefault('parent_id', parent_id)
if isinstance(self.tasks, _regen):
# We are draining from a geneator here.
# We are draining from a generator here.
# tasks1, tasks2 are each a clone of self.tasks
tasks1, tasks2 = itertools.tee(self._unroll_tasks(self.tasks))
# freeze each task in tasks1, results now holds AsyncResult for each task
results = regen(self._freeze_tasks(tasks1, group_id, chord, root_id, parent_id))
# TODO figure out why this makes sense -
# we freeze all tasks in the clone tasks1, and then zip the results
# with the IDs of tasks in the second clone, tasks2. and then, we build
# a generator that takes only the task IDs from tasks2.
self.tasks = regen(x[0] for x in zip(tasks2, results))
else:
new_tasks = []
Expand Down Expand Up @@ -1265,6 +1282,7 @@ def _freeze_tasks(self, tasks, group_id, chord, root_id, parent_id):
for group_index, task in enumerate(tasks))

def _unroll_tasks(self, tasks):
# should be refactored to: (maybe_signature(task, app=self._app, clone=True) for task in tasks)
yield from (maybe_signature(task, app=self._app).clone() for task in tasks)

def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
Expand All @@ -1274,6 +1292,7 @@ def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
group_index = 0
while stack:
task = maybe_signature(stack.popleft(), app=self._app).clone()
# if this is a group, flatten it by adding all of the group's tasks to the stack
if isinstance(task, group):
stack.extendleft(task.tasks)
else:
Expand Down Expand Up @@ -1364,8 +1383,10 @@ def freeze(self, _id=None, group_id=None, chord=None,
# XXX chord is also a class in outer scope.
if not isinstance(self.tasks, group):
self.tasks = group(self.tasks, app=self.app)
# first freeze all tasks in the header
header_result = self.tasks.freeze(
parent_id=parent_id, root_id=root_id, chord=self.body)
# secondly freeze all tasks in the body: those that should be called after the header
body_result = self.body.freeze(
_id, root_id=root_id, chord=chord, group_id=group_id,
group_index=group_index)
Expand Down

0 comments on commit 120b615

Please sign in to comment.