From 120b61578c4103943f5407d3cde4899626143e8c Mon Sep 17 00:00:00 2001 From: Naomi Elstein Date: Wed, 17 Nov 2021 11:15:26 +0200 Subject: [PATCH] Comments and questions on celery/canvas.py --- celery/canvas.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/celery/canvas.py b/celery/canvas.py index 8e9ac136f08..f0bcd2c5260 100644 --- a/celery/canvas.py +++ b/celery/canvas.py @@ -281,14 +281,17 @@ def freeze(self, _id=None, group_id=None, chord=None, # XXX chord is also a class in outer scope. opts = self.options try: + # if there is already an id for this task, return it tid = opts['task_id'] except KeyError: + # otherwise, use the _id sent to this function, falling back on a generated UUID tid = opts['task_id'] = _id or uuid() if root_id: opts['root_id'] = root_id if parent_id: opts['parent_id'] = parent_id if 'reply_to' not in opts: + # fall back on unique ID for this thread in the app opts['reply_to'] = self.app.thread_oid if group_id and "group_id" not in opts: opts['group_id'] = group_id @@ -676,6 +679,8 @@ def run(self, args=None, kwargs=None, group_id=None, chord=None, else: return results_from_prepare[0] + # in order for a chain to be frozen, each of the members of the chain individually needs to be frozen + # TODO figure out why we are always cloning before freeze def freeze(self, _id=None, group_id=None, chord=None, root_id=None, parent_id=None, group_index=None): # pylint: disable=redefined-outer-name @@ -703,6 +708,7 @@ def prepare_steps(self, args, kwargs, tasks, use_link = True steps = deque(tasks) + # optimization: now the pop func is a local variable steps_pop = steps.pop steps_extend = steps.extend @@ -717,11 +723,15 @@ def prepare_steps(self, args, kwargs, tasks, # get the next task in the chain. while steps: task = steps_pop() + # if steps is not empty, this is the first task - reverse order + # if i = 0, this is the last task - again, because we're reversed is_first_task, is_last_task = not steps, not i if not isinstance(task, abstract.CallableSignature): task = from_dict(task, app=app) if isinstance(task, group): + # when groups are nested, they are unrolled - all tasks within + # groups within groups should be called in parallel task = maybe_unroll_group(task) # first task gets partial args from chain @@ -734,10 +744,11 @@ def prepare_steps(self, args, kwargs, tasks, task.args = tuple(args) + tuple(task.args) if isinstance(task, _chain): - # splice the chain + # splice (unroll) the chain steps_extend(task.tasks) continue + # TODO why isn't this asserting is_last_task == False? if isinstance(task, group) and prev_task: # automatically upgrade group(...) | s to chord(group, s) # for chords we freeze by pretending it's a normal @@ -1230,9 +1241,15 @@ def _freeze_group_tasks(self, _id=None, group_id=None, chord=None, root_id = opts.setdefault('root_id', root_id) parent_id = opts.setdefault('parent_id', parent_id) if isinstance(self.tasks, _regen): - # We are draining from a geneator here. + # We are draining from a generator here. + # tasks1, tasks2 are each a clone of self.tasks tasks1, tasks2 = itertools.tee(self._unroll_tasks(self.tasks)) + # freeze each task in tasks1, results now holds AsyncResult for each task results = regen(self._freeze_tasks(tasks1, group_id, chord, root_id, parent_id)) + # TODO figure out why this makes sense - + # we freeze all tasks in the clone tasks1, and then zip the results + # with the IDs of tasks in the second clone, tasks2. and then, we build + # a generator that takes only the task IDs from tasks2. self.tasks = regen(x[0] for x in zip(tasks2, results)) else: new_tasks = [] @@ -1265,6 +1282,7 @@ def _freeze_tasks(self, tasks, group_id, chord, root_id, parent_id): for group_index, task in enumerate(tasks)) def _unroll_tasks(self, tasks): + # should be refactored to: (maybe_signature(task, app=self._app, clone=True) for task in tasks) yield from (maybe_signature(task, app=self._app).clone() for task in tasks) def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id): @@ -1274,6 +1292,7 @@ def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id): group_index = 0 while stack: task = maybe_signature(stack.popleft(), app=self._app).clone() + # if this is a group, flatten it by adding all of the group's tasks to the stack if isinstance(task, group): stack.extendleft(task.tasks) else: @@ -1364,8 +1383,10 @@ def freeze(self, _id=None, group_id=None, chord=None, # XXX chord is also a class in outer scope. if not isinstance(self.tasks, group): self.tasks = group(self.tasks, app=self.app) + # first freeze all tasks in the header header_result = self.tasks.freeze( parent_id=parent_id, root_id=root_id, chord=self.body) + # secondly freeze all tasks in the body: those that should be called after the header body_result = self.body.freeze( _id, root_id=root_id, chord=chord, group_id=group_id, group_index=group_index)