Skip to content
Browse files

Tasks are now shared between apps by default, can be private by @task(…

…shared=False)
  • Loading branch information...
1 parent c3d3463 commit a1cc46a44bab4ca46f1e1a0805e8e06a1dd23397 @ask committed
Showing with 27 additions and 24 deletions.
  1. +11 −8 celery/app/base.py
  2. +15 −15 celery/app/builtins.py
  3. +1 −1 celery/app/registry.py
View
19 celery/app/base.py
@@ -31,8 +31,9 @@
from celery.utils.imports import instantiate, symbol_by_name
from .annotations import prepare as prepare_annotations
-from .builtins import builtin_task, load_builtin_tasks
+from .builtins import shared_task, load_shared_tasks
from .defaults import DEFAULTS, find_deprecated_settings
+from .registry import TaskRegistry
from .state import _tls, get_current_app
from .utils import AppPickler, Settings, bugreport, _unpickle_app
@@ -55,7 +56,7 @@ class Celery(object):
loader_cls = "celery.loaders.app:AppLoader"
log_cls = "celery.app.log:Logging"
control_cls = "celery.app.control:Control"
- registry_cls = "celery.app.registry:TaskRegistry"
+ registry_cls = TaskRegistry
_pool = None
def __init__(self, main=None, loader=None, backend=None,
@@ -71,12 +72,14 @@ def __init__(self, main=None, loader=None, backend=None,
self.log_cls = log or self.log_cls
self.control_cls = control or self.control_cls
self.set_as_current = set_as_current
- self.registry_cls = self.registry_cls if tasks is None else tasks
+ self.registry_cls = symbol_by_name(self.registry_cls)
self.accept_magic_kwargs = accept_magic_kwargs
self.finalized = False
self._pending = deque()
- self._tasks = instantiate(self.registry_cls)
+ self._tasks = tasks
+ if not isinstance(self._tasks, TaskRegistry):
+ self._tasks = TaskRegistry(self._tasks or {})
# these options are moved to the config to
# simplify pickling of the app object.
@@ -106,13 +109,13 @@ def worker_main(self, argv=None):
def task(self, *args, **opts):
"""Creates new task class from any callable."""
- def inner_create_task_cls(builtin=False, **opts):
+ def inner_create_task_cls(shared=True, **opts):
def _create_task_cls(fun):
- if builtin:
+ if shared:
cons = lambda app: app._task_from_fun(fun, **opts)
cons.__name__ = fun.__name__
- builtin_task(cons)
+ shared_task(cons)
if self.accept_magic_kwargs: # compat mode
return self._task_from_fun(fun, **opts)
@@ -142,7 +145,7 @@ def _task_from_fun(self, fun, **options):
def finalize(self):
if not self.finalized:
- load_builtin_tasks(self)
+ load_shared_tasks(self)
pending = self._pending
while pending:
View
30 celery/app/builtins.py
@@ -6,28 +6,28 @@
from celery.utils import uuid
-#: global list of functions defining a built-in task.
-#: these are called for every app instance to setup built-in task.
-_builtin_tasks = []
+#: global list of functions defining tasks that should be
+#: added to all apps.
+_shared_tasks = []
-def builtin_task(constructor):
+def shared_task(constructor):
"""Decorator that specifies that the decorated function is a function
that generates a built-in task.
The function will then be called for every new app instance created
(lazily, so more exactly when the task registry for that app is needed).
"""
- _builtin_tasks.append(constructor)
+ _shared_tasks.append(constructor)
return constructor
-def load_builtin_tasks(app):
+def load_shared_tasks(app):
"""Loads the built-in tasks for an app instance."""
- [constructor(app) for constructor in _builtin_tasks]
+ [constructor(app) for constructor in _shared_tasks]
-@builtin_task
+@shared_task
def add_backend_cleanup_task(app):
"""The backend cleanup task can be used to clean up the default result
backend.
@@ -48,7 +48,7 @@ def backend_cleanup():
return backend_cleanup
-@builtin_task
+@shared_task
def add_unlock_chord_task(app):
"""The unlock chord task is used by result backends that doesn't
have native chord support.
@@ -71,7 +71,7 @@ def unlock_chord(setid, callback, interval=1, propagate=False,
return unlock_chord
-@builtin_task
+@shared_task
def add_map_task(app):
from celery.canvas import subtask
@@ -81,7 +81,7 @@ def xmap(task, it):
return list(map(task, it))
-@builtin_task
+@shared_task
def add_starmap_task(app):
from celery.canvas import subtask
@@ -91,7 +91,7 @@ def xstarmap(task, it):
return list(starmap(task, it))
-@builtin_task
+@shared_task
def add_chunk_task(app):
from celery.canvas import chunks as _chunks
@@ -100,7 +100,7 @@ def chunks(task, it, n):
return _chunks.apply_chunks(task, it, n)
-@builtin_task
+@shared_task
def add_group_task(app):
from celery.canvas import subtask
from celery.app.state import get_current_task
@@ -154,7 +154,7 @@ def apply(self, args=(), kwargs={}, **options):
return Group
-@builtin_task
+@shared_task
def add_chain_task(app):
from celery.canvas import maybe_subtask
@@ -185,7 +185,7 @@ def apply(self, args=(), kwargs={}, **options):
return Chain
-@builtin_task
+@shared_task
def add_chord_task(app):
"""Every chord is executed in a dedicated task, so that the chord
can be used as a subtask, and this generates the task
View
2 celery/app/registry.py
@@ -13,7 +13,6 @@
import inspect
-from celery import current_app
from celery.exceptions import NotRegistered
@@ -60,4 +59,5 @@ def filter_types(self, type):
def _unpickle_task(name):
+ from celery import current_app
return current_app.tasks[name]

0 comments on commit a1cc46a

Please sign in to comment.
Something went wrong with that request. Please try again.