# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import with_statement
from celery.state import get_current_worker_task
from import app_or_default
from celery.canvas import subtask, maybe_subtask # noqa
from celery.utils import uuid
class TaskSet(list):
"""A task containing several subtasks, making it possible
to track how many, or when all of the tasks have been completed.
:param tasks: A list of :class:`subtask` instances.
>>> urls = ("", "")
>>> s = TaskSet(refresh_feed.s(url) for url in urls)
>>> taskset_result = s.apply_async()
>>> list_of_return_values = taskset_result.join() # *expensive*
app = None
def __init__(self, tasks=None, app=None, Publisher=None):
super(TaskSet, self).__init__(maybe_subtask(t) for t in tasks or []) = app_or_default(app or
self.Publisher = Publisher or = len(self) # XXX compat
def apply_async(self, connection=None, connect_timeout=None,
publisher=None, taskset_id=None):
"""Apply TaskSet."""
app =
return self.apply(taskset_id=taskset_id)
with app.default_connection(connection, connect_timeout) as conn:
setid = taskset_id or uuid()
pub = publisher or self.Publisher(conn)
results = self._async_results(setid, pub)
result = app.TaskSetResult(setid, results)
parent = get_current_worker_task()
if parent:
return result
def _async_results(self, taskset_id, publisher):
return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
for task in self]
def apply(self, taskset_id=None):
"""Applies the TaskSet locally by blocking until all tasks return."""
setid = taskset_id or uuid()
return, self._sync_results(setid))
def _sync_results(self, taskset_id):
return [task.apply(taskset_id=taskset_id) for task in self]
def _get_tasks(self):
return self
def _set_tasks(self, tasks):
self[:] = tasks
tasks = property(_get_tasks, _set_tasks)