Browse files

Use ' instead of "

  • Loading branch information...
1 parent cc1ae88 commit 566c4da7b4b1f7d248e8e0548a698043ec22be7d @ask ask committed Jun 14, 2012
Showing with 2,270 additions and 2,263 deletions.
  1. +8 −0 LICENSE
  2. +35 −35 celery/__compat__.py
  3. +17 −23 celery/__init__.py
  4. +9 −9 celery/app/__init__.py
  5. +5 −5 celery/app/abstract.py
  6. +32 −32 celery/app/amqp.py
  7. +3 −3 celery/app/annotations.py
  8. +35 −35 celery/app/base.py
  9. +28 −28 celery/app/builtins.py
  10. +36 −36 celery/app/control.py
  11. +149 −149 celery/app/defaults.py
  12. +13 −13 celery/app/log.py
  13. +3 −3 celery/app/registry.py
  14. +6 −6 celery/app/routes.py
  15. +64 −64 celery/app/task.py
  16. +25 −25 celery/app/utils.py
  17. +24 −24 celery/apps/beat.py
  18. +69 −69 celery/apps/worker.py
  19. +8 −8 celery/backends/__init__.py
  20. +31 −31 celery/backends/amqp.py
  21. +49 −49 celery/backends/base.py
  22. +14 −14 celery/backends/cache.py
  23. +26 −26 celery/backends/cassandra.py
  24. +6 −6 celery/backends/database/__init__.py
  25. +15 −13 celery/backends/database/a805d4bd.py
  26. +2 −2 celery/backends/database/dfd042c7.py
  27. +16 −16 celery/backends/database/models.py
  28. +2 −2 celery/backends/database/session.py
  29. +41 −41 celery/backends/mongodb.py
  30. +10 −10 celery/backends/redis.py
  31. +38 −38 celery/beat.py
  32. +32 −32 celery/bin/base.py
  33. +72 −72 celery/bin/camqadm.py
  34. +178 −192 celery/bin/celery.py
  35. +8 −8 celery/bin/celerybeat.py
  36. +1 −1 celery/bin/celeryctl.py
  37. +30 −30 celery/bin/celeryd.py
  38. +19 −19 celery/bin/celeryd_detach.py
  39. +91 −91 celery/bin/celeryd_multi.py
  40. +15 −15 celery/bin/celeryev.py
  41. +52 −52 celery/canvas.py
  42. +5 −5 celery/concurrency/__init__.py
  43. +4 −4 celery/concurrency/base.py
  44. +1 −1 celery/concurrency/eventlet.py
  45. +1 −1 celery/concurrency/gevent.py
  46. +17 −17 celery/concurrency/processes/__init__.py
  47. +5 −5 celery/concurrency/solo.py
  48. +1 −1 celery/concurrency/threads.py
  49. +4 −4 celery/contrib/abortable.py
  50. +8 −8 celery/contrib/batches.py
  51. +31 −31 celery/contrib/bundles.py
  52. +2 −2 celery/contrib/methods.py
  53. +7 −7 celery/contrib/migrate.py
  54. +17 −17 celery/contrib/rdb.py
  55. +7 −7 celery/datastructures.py
  56. +16 −16 celery/events/__init__.py
  57. +63 −63 celery/events/cursesmon.py
  58. +25 −25 celery/events/dumper.py
  59. +6 −6 celery/events/snapshot.py
  60. +20 −20 celery/events/state.py
  61. +7 −7 celery/loaders/__init__.py
  62. +12 −12 celery/loaders/base.py
  63. +10 −10 celery/loaders/default.py
  64. +6 −6 celery/local.py
  65. +57 −57 celery/platforms.py
  66. +7 −7 celery/result.py
  67. +36 −36 celery/schedules.py
  68. +2 −2 celery/security/__init__.py
  69. +6 −6 celery/security/certificate.py
  70. +2 −2 celery/security/key.py
  71. +15 −15 celery/security/serialization.py
  72. +1 −1 celery/security/utils.py
  73. +16 −26 celery/signals.py
  74. +14 −14 celery/states.py
  75. +6 −6 celery/task/__init__.py
  76. +22 −22 celery/task/base.py
  77. +23 −23 celery/task/http.py
  78. +1 −1 celery/task/sets.py
  79. +10 −10 celery/task/trace.py
  80. +18 −18 celery/utils/__init__.py
  81. +3 −3 celery/utils/compat.py
  82. +5 −5 celery/utils/debug.py
  83. +8 −13 celery/utils/functional.py
  84. +8 −8 celery/utils/imports.py
  85. +14 −14 celery/utils/log.py
  86. +15 −15 celery/utils/mail.py
  87. +4 −4 celery/utils/serialization.py
  88. +22 −22 celery/utils/term.py
  89. +11 −11 celery/utils/text.py
  90. +9 −9 celery/utils/threads.py
  91. +19 −19 celery/utils/timer2.py
  92. +19 −19 celery/utils/timeutils.py
  93. +38 −38 celery/worker/__init__.py
  94. +10 −10 celery/worker/abstract.py
  95. +16 −16 celery/worker/autoreload.py
  96. +10 −10 celery/worker/autoscale.py
  97. +4 −4 celery/worker/buckets.py
  98. +28 −28 celery/worker/consumer.py
  99. +63 −63 celery/worker/control.py
  100. +3 −3 celery/worker/heartbeat.py
  101. +1 −1 celery/worker/hub.py
  102. +80 −80 celery/worker/job.py
  103. +7 −7 celery/worker/mediator.py
  104. +12 −12 celery/worker/state.py
  105. +27 −0 docs/copyright.rst
  106. +6 −0 docs/index.rst
  107. +0 −1 docs/userguide/signals.rst
View
8 LICENSE
@@ -34,6 +34,14 @@ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
+Documentation License
+=====================
+
+The documentation portion of Celery (the rendered contents of the
+"docs" directory of a software distribution or checkout) is supplied
+under the Creative Commons Attribution-Noncommercial-Share Alike 3.0
+United States License as described by
+http://creativecommons.org/licenses/by-nc-sa/3.0/us/
Footnotes
=========
View
70 celery/__compat__.py
@@ -23,7 +23,7 @@
The module %s is deprecated and will be removed in a future version.
"""
-DEFAULT_ATTRS = set(["__file__", "__path__", "__doc__", "__all__"])
+DEFAULT_ATTRS = set(['__file__', '__path__', '__doc__', '__all__'])
# im_func is no longer available in Py3.
# instead the unbound method itself can be used.
@@ -37,61 +37,61 @@ def fun_of_method(method): # noqa
def getappattr(path):
"""Gets attribute from the current_app recursively,
- e.g. getappattr("amqp.get_task_consumer")``."""
+ e.g. getappattr('amqp.get_task_consumer')``."""
from celery import current_app
return current_app._rgetattr(path)
def _compat_task_decorator(*args, **kwargs):
from celery import current_app
- kwargs.setdefault("accept_magic_kwargs", True)
+ kwargs.setdefault('accept_magic_kwargs', True)
return current_app.task(*args, **kwargs)
def _compat_periodic_task_decorator(*args, **kwargs):
from celery.task import periodic_task
- kwargs.setdefault("accept_magic_kwargs", True)
+ kwargs.setdefault('accept_magic_kwargs', True)
return periodic_task(*args, **kwargs)
COMPAT_MODULES = {
- "celery": {
- "execute": {
- "send_task": "send_task",
+ 'celery': {
+ 'execute': {
+ 'send_task': 'send_task',
},
- "decorators": {
- "task": _compat_task_decorator,
- "periodic_task": _compat_periodic_task_decorator,
+ 'decorators': {
+ 'task': _compat_task_decorator,
+ 'periodic_task': _compat_periodic_task_decorator,
},
- "log": {
- "get_default_logger": "log.get_default_logger",
- "setup_logger": "log.setup_logger",
- "setup_loggig_subsystem": "log.setup_logging_subsystem",
- "redirect_stdouts_to_logger": "log.redirect_stdouts_to_logger",
+ 'log': {
+ 'get_default_logger': 'log.get_default_logger',
+ 'setup_logger': 'log.setup_logger',
+ 'setup_loggig_subsystem': 'log.setup_logging_subsystem',
+ 'redirect_stdouts_to_logger': 'log.redirect_stdouts_to_logger',
},
- "messaging": {
- "TaskPublisher": "amqp.TaskPublisher",
- "TaskConsumer": "amqp.TaskConsumer",
- "establish_connection": "broker_connection",
- "with_connection": "with_default_connection",
- "get_consumer_set": "amqp.TaskConsumer",
+ 'messaging': {
+ 'TaskPublisher': 'amqp.TaskPublisher',
+ 'TaskConsumer': 'amqp.TaskConsumer',
+ 'establish_connection': 'broker_connection',
+ 'with_connection': 'with_default_connection',
+ 'get_consumer_set': 'amqp.TaskConsumer',
},
- "registry": {
- "tasks": "tasks",
+ 'registry': {
+ 'tasks': 'tasks',
},
},
- "celery.task": {
- "control": {
- "broadcast": "control.broadcast",
- "rate_limit": "control.rate_limit",
- "time_limit": "control.time_limit",
- "ping": "control.ping",
- "revoke": "control.revoke",
- "discard_all": "control.purge",
- "inspect": "control.inspect",
+ 'celery.task': {
+ 'control': {
+ 'broadcast': 'control.broadcast',
+ 'rate_limit': 'control.rate_limit',
+ 'time_limit': 'control.time_limit',
+ 'ping': 'control.ping',
+ 'revoke': 'control.revoke',
+ 'discard_all': 'control.purge',
+ 'inspect': 'control.inspect',
},
- "schedules": "celery.schedules",
- "chords": "celery.canvas",
+ 'schedules': 'celery.schedules',
+ 'chords': 'celery.canvas',
}
}
@@ -187,7 +187,7 @@ def prepare(attr):
fqdn = '.'.join([pkg.__name__, name])
module = sys.modules[fqdn] = import_module(attrs)
return module
- attrs["__all__"] = attrs.keys()
+ attrs['__all__'] = attrs.keys()
return create_module(name, dict(attrs), pkg=pkg, prepare_attr=prepare)
View
40 celery/__init__.py
@@ -7,12 +7,12 @@
from __future__ import absolute_import
-VERSION = (2, 6, 0, "rc4")
-__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
-__author__ = "Ask Solem"
-__contact__ = "ask@celeryproject.org"
-__homepage__ = "http://celeryproject.org"
-__docformat__ = "restructuredtext"
+VERSION = (2, 6, 0, 'rc4')
+__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
+__author__ = 'Ask Solem'
+__contact__ = 'ask@celeryproject.org'
+__homepage__ = 'http://celeryproject.org'
+__docformat__ = 'restructuredtext'
# -eof meta-
@@ -21,22 +21,16 @@
old_module, new_module = recreate_module(__name__, # pragma: no cover
by_module={
- "celery.app": ["Celery", "bugreport"],
- "celery.app.task": ["Task"],
- "celery.state": ["current_app", "current_task"],
- "celery.canvas": ["chain", "chord", "chunks",
- "group", "subtask", "xmap", "xstarmap"],
- "celery.utils": ["uuid"],
+ 'celery.app': ['Celery', 'bugreport'],
+ 'celery.app.task': ['Task'],
+ 'celery.state': ['current_app', 'current_task'],
+ 'celery.canvas': ['chain', 'chord', 'chunks',
+ 'group', 'subtask', 'xmap', 'xstarmap'],
+ 'celery.utils': ['uuid'],
},
- direct={"task": "celery.task"},
- __package__="celery",
- __file__=__file__,
- __path__=__path__,
- __doc__=__doc__,
- __version__=__version__,
- __author__=__author__,
- __contact__=__contact__,
- __homepage__=__homepage__,
- __docformat__=__docformat__,
- VERSION=VERSION,
+ direct={'task': 'celery.task'},
+ __package__='celery', __file__=__file__,
+ __path__=__path__, __doc__=__doc__, __version__=__version__,
+ __author__=__author__, __contact__=__contact__,
+ __homepage__=__homepage__, __docformat__=__docformat__, VERSION=VERSION,
)
View
18 celery/app/__init__.py
@@ -30,11 +30,11 @@
#: is no active app.
app_or_default = None
-#: The "default" loader is the default loader used by old applications.
-default_loader = os.environ.get("CELERY_LOADER") or "default"
+#: The 'default' loader is the default loader used by old applications.
+default_loader = os.environ.get('CELERY_LOADER') or 'default'
#: Global fallback app instance.
-set_default_app(Celery("default", loader=default_loader,
+set_default_app(Celery('default', loader=default_loader,
set_as_current=False,
accept_magic_kwargs=True))
@@ -53,13 +53,13 @@ def _app_or_default_trace(app=None): # pragma: no cover
from traceback import print_stack
from billiard import current_process
if app is None:
- if getattr(state._tls, "current_app", None):
- print("-- RETURNING TO CURRENT APP --") # noqa+
+ if getattr(state._tls, 'current_app', None):
+ print('-- RETURNING TO CURRENT APP --') # noqa+
print_stack()
return state._tls.current_app
- if current_process()._name == "MainProcess":
- raise Exception("DEFAULT APP")
- print("-- RETURNING TO DEFAULT APP --") # noqa+
+ if current_process()._name == 'MainProcess':
+ raise Exception('DEFAULT APP')
+ print('-- RETURNING TO DEFAULT APP --') # noqa+
print_stack()
return state.default_app
return app
@@ -74,7 +74,7 @@ def disable_trace():
global app_or_default
app_or_default = _app_or_default
-if os.environ.get("CELERY_TRACE_APP"): # pragma: no cover
+if os.environ.get('CELERY_TRACE_APP'): # pragma: no cover
enable_trace()
else:
disable_trace()
View
10 celery/app/abstract.py
@@ -22,17 +22,17 @@ def get_key(self, attr):
class _configurated(type):
def __new__(cls, name, bases, attrs):
- attrs["__confopts__"] = dict((attr, spec.get_key(attr))
+ attrs['__confopts__'] = dict((attr, spec.get_key(attr))
for attr, spec in attrs.iteritems()
if isinstance(spec, from_config))
- inherit_from = attrs.get("inherit_confopts", ())
+ inherit_from = attrs.get('inherit_confopts', ())
for subcls in bases:
try:
- attrs["__confopts__"].update(subcls.__confopts__)
+ attrs['__confopts__'].update(subcls.__confopts__)
except AttributeError:
pass
for subcls in inherit_from:
- attrs["__confopts__"].update(subcls.__confopts__)
+ attrs['__confopts__'].update(subcls.__confopts__)
attrs = dict((k, v if not isinstance(v, from_config) else None)
for k, v in attrs.iteritems())
return super(_configurated, cls).__new__(cls, name, bases, attrs)
@@ -41,7 +41,7 @@ def __new__(cls, name, bases, attrs):
class configurated(object):
__metaclass__ = _configurated
- def setup_defaults(self, kwargs, namespace="celery"):
+ def setup_defaults(self, kwargs, namespace='celery'):
confopts = self.__confopts__
app, find = self.app, self.app.conf.find_value_for_key
View
64 celery/app/amqp.py
@@ -90,24 +90,24 @@ def add(self, queue, **kwargs):
def add_compat(self, name, **options):
# docs used to use binding_key as routing key
- options.setdefault("routing_key", options.get("binding_key"))
+ options.setdefault('routing_key', options.get('binding_key'))
q = self[name] = entry_to_queue(name, **options)
return q
def format(self, indent=0, indent_first=True):
"""Format routing table into string for log dumps."""
active = self.consume_from
if not active:
- return ""
+ return ''
info = [QUEUE_FORMAT.strip() % {
- "name": (name + ":").ljust(12),
- "exchange": q.exchange.name,
- "exchange_type": q.exchange.type,
- "routing_key": q.routing_key}
+ 'name': (name + ':').ljust(12),
+ 'exchange': q.exchange.name,
+ 'exchange_type': q.exchange.type,
+ 'routing_key': q.routing_key}
for name, q in sorted(active.iteritems())]
if indent_first:
- return textindent("\n".join(info), indent)
- return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
+ return textindent('\n'.join(info), indent)
+ return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
def select_subset(self, wanted):
"""Sets :attr:`consume_from` by selecting a subset of the
@@ -141,8 +141,8 @@ class TaskProducer(Producer):
retry_policy = None
def __init__(self, channel=None, exchange=None, *args, **kwargs):
- self.retry = kwargs.pop("retry", self.retry)
- self.retry_policy = kwargs.pop("retry_policy",
+ self.retry = kwargs.pop('retry', self.retry)
+ self.retry_policy = kwargs.pop('retry_policy',
self.retry_policy or {})
exchange = exchange or self.exchange
self.queues = self.app.amqp.queues # shortcut
@@ -165,9 +165,9 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
task_args = task_args or []
task_kwargs = task_kwargs or {}
if not isinstance(task_args, (list, tuple)):
- raise ValueError("task args must be a list or tuple")
+ raise ValueError('task args must be a list or tuple')
if not isinstance(task_kwargs, dict):
- raise ValueError("task kwargs must be a dictionary")
+ raise ValueError('task kwargs must be a dictionary')
if countdown: # Convert countdown to ETA.
now = now or self.app.now()
eta = now + timedelta(seconds=countdown)
@@ -177,21 +177,21 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
eta = eta and eta.isoformat()
expires = expires and expires.isoformat()
- body = {"task": task_name,
- "id": task_id,
- "args": task_args,
- "kwargs": task_kwargs,
- "retries": retries or 0,
- "eta": eta,
- "expires": expires,
- "utc": self.utc,
- "callbacks": callbacks,
- "errbacks": errbacks}
+ body = {'task': task_name,
+ 'id': task_id,
+ 'args': task_args,
+ 'kwargs': task_kwargs,
+ 'retries': retries or 0,
+ 'eta': eta,
+ 'expires': expires,
+ 'utc': self.utc,
+ 'callbacks': callbacks,
+ 'errbacks': errbacks}
group_id = group_id or taskset_id
if group_id:
- body["taskset"] = group_id
+ body['taskset'] = group_id
if chord:
- body["chord"] = chord
+ body['chord'] = chord
self.publish(body, exchange=exchange, mandatory=mandatory,
immediate=immediate, routing_key=routing_key,
@@ -203,7 +203,7 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
signals.task_sent.send(sender=task_name, **body)
if event_dispatcher:
- event_dispatcher.send("task-sent", uuid=task_id,
+ event_dispatcher.send('task-sent', uuid=task_id,
name=task_name,
args=repr(task_args),
kwargs=repr(task_kwargs),
@@ -218,14 +218,14 @@ class TaskPublisher(TaskProducer):
"""Deprecated version of :class:`TaskProducer`."""
def __init__(self, channel=None, exchange=None, *args, **kwargs):
- self.app = app_or_default(kwargs.pop("app", self.app))
- self.retry = kwargs.pop("retry", self.retry)
- self.retry_policy = kwargs.pop("retry_policy",
+ self.app = app_or_default(kwargs.pop('app', self.app))
+ self.retry = kwargs.pop('retry', self.retry)
+ self.retry_policy = kwargs.pop('retry_policy',
self.retry_policy or {})
exchange = exchange or self.exchange
if not isinstance(exchange, Exchange):
exchange = Exchange(exchange,
- kwargs.pop("exchange_type", "direct"))
+ kwargs.pop('exchange_type', 'direct'))
self.queues = self.app.amqp.queues # shortcut
super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
@@ -267,15 +267,15 @@ def Queues(self, queues, create_missing=None):
def Router(self, queues=None, create_missing=None):
"""Returns the current task router."""
return _routes.Router(self.routes, queues or self.queues,
- self.app.either("CELERY_CREATE_MISSING_QUEUES",
+ self.app.either('CELERY_CREATE_MISSING_QUEUES',
create_missing), app=self.app)
@cached_property
def TaskConsumer(self):
"""Return consumer configured to consume from the queues
we are configured for (``app.amqp.queues.consume_from``)."""
return self.app.subclass_with_self(TaskConsumer,
- reverse="amqp.TaskConsumer")
+ reverse='amqp.TaskConsumer')
get_task_consumer = TaskConsumer # XXX compat
@cached_property
@@ -287,7 +287,7 @@ def TaskProducer(self):
"""
conf = self.app.conf
return self.app.subclass_with_self(TaskProducer,
- reverse="amqp.TaskProducer",
+ reverse='amqp.TaskProducer',
exchange=self.default_exchange,
routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
serializer=conf.CELERY_TASK_SERIALIZER,
View
6 celery/app/annotations.py
@@ -15,8 +15,8 @@
from celery.utils.functional import firstmethod, mpromise
from celery.utils.imports import instantiate
-_first_match = firstmethod("annotate")
-_first_match_any = firstmethod("annotate_any")
+_first_match = firstmethod('annotate')
+_first_match_any = firstmethod('annotate_any')
def resolve_all(anno, task):
@@ -27,7 +27,7 @@ class MapAnnotation(dict):
def annotate_any(self):
try:
- return dict(self["*"])
+ return dict(self['*'])
except KeyError:
pass
View
70 celery/app/base.py
@@ -47,12 +47,12 @@ class Celery(object):
SYSTEM = platforms.SYSTEM
IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
- amqp_cls = "celery.app.amqp:AMQP"
+ amqp_cls = 'celery.app.amqp:AMQP'
backend_cls = None
- events_cls = "celery.events:Events"
- loader_cls = "celery.loaders.app:AppLoader"
- log_cls = "celery.app.log:Logging"
- control_cls = "celery.app.control:Control"
+ events_cls = 'celery.events:Events'
+ loader_cls = 'celery.loaders.app:AppLoader'
+ log_cls = 'celery.app.log:Logging'
+ control_cls = 'celery.app.control:Control'
registry_cls = TaskRegistry
_pool = None
@@ -82,9 +82,9 @@ def __init__(self, main=None, loader=None, backend=None,
# simplify pickling of the app object.
self._preconf = {}
if broker:
- self._preconf["BROKER_URL"] = broker
+ self._preconf['BROKER_URL'] = broker
if include:
- self._preconf["CELERY_IMPORTS"] = include
+ self._preconf['CELERY_IMPORTS'] = include
if self.set_as_current:
self.set_current()
@@ -98,11 +98,11 @@ def on_init(self):
pass
def start(self, argv=None):
- return instantiate("celery.bin.celery:CeleryCommand", app=self) \
+ return instantiate('celery.bin.celery:CeleryCommand', app=self) \
.execute_from_commandline(argv)
def worker_main(self, argv=None):
- return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
+ return instantiate('celery.bin.celeryd:WorkerCommand', app=self) \
.execute_from_commandline(argv)
def task(self, *args, **opts):
@@ -135,14 +135,14 @@ def _create_task_cls(fun):
return inner_create_task_cls(**opts)
def _task_from_fun(self, fun, **options):
- base = options.pop("base", None) or self.Task
+ base = options.pop('base', None) or self.Task
T = type(fun.__name__, (base, ), dict({
- "app": self,
- "accept_magic_kwargs": False,
- "run": staticmethod(fun),
- "__doc__": fun.__doc__,
- "__module__": fun.__module__}, **options))()
+ 'app': self,
+ 'accept_magic_kwargs': False,
+ 'run': staticmethod(fun),
+ '__doc__': fun.__doc__,
+ '__module__': fun.__module__}, **options))()
task = self._tasks[T.name] # return global instance.
task.bind(self)
return task
@@ -167,19 +167,19 @@ def config_from_envvar(self, variable_name, silent=False):
del(self.conf)
return self.loader.config_from_envvar(variable_name, silent=silent)
- def config_from_cmdline(self, argv, namespace="celery"):
+ def config_from_cmdline(self, argv, namespace='celery'):
self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
def send_task(self, name, args=None, kwargs=None, countdown=None,
eta=None, task_id=None, publisher=None, connection=None,
result_cls=None, expires=None, queues=None, **options):
if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
warnings.warn(AlwaysEagerIgnored(
- "CELERY_ALWAYS_EAGER has no effect on send_task"))
+ 'CELERY_ALWAYS_EAGER has no effect on send_task'))
result_cls = result_cls or self.AsyncResult
router = self.amqp.Router(queues)
- options.setdefault("compression",
+ options.setdefault('compression',
self.conf.CELERY_MESSAGE_COMPRESSION)
options = router.route(options, name, args, kwargs)
with self.default_producer(publisher) as producer:
@@ -200,10 +200,10 @@ def broker_connection(self, hostname=None, userid=None,
virtual_host or conf.BROKER_VHOST,
port or conf.BROKER_PORT,
transport=transport or conf.BROKER_TRANSPORT,
- insist=self.either("BROKER_INSIST", insist),
- ssl=self.either("BROKER_USE_SSL", ssl),
+ insist=self.either('BROKER_INSIST', insist),
+ ssl=self.either('BROKER_USE_SSL', ssl),
connect_timeout=self.either(
- "BROKER_CONNECTION_TIMEOUT", connect_timeout),
+ 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
**transport_options or {}))
@@ -238,7 +238,7 @@ def with_default_connection(self, fun):
"""
@wraps(fun)
def _inner(*args, **kwargs):
- connection = kwargs.pop("connection", None)
+ connection = kwargs.pop('connection', None)
with self.default_connection(connection) as c:
return fun(*args, **dict(kwargs, connection=c))
return _inner
@@ -297,10 +297,10 @@ def _after_fork(self, obj_):
def create_task_cls(self):
"""Creates a base task class using default configuration
taken from this app."""
- return self.subclass_with_self("celery.app.task:Task", name="Task",
- attribute="_app", abstract=True)
+ return self.subclass_with_self('celery.app.task:Task', name='Task',
+ attribute='_app', abstract=True)
- def subclass_with_self(self, Class, name=None, attribute="app",
+ def subclass_with_self(self, Class, name=None, attribute='app',
reverse=None, **kw):
"""Subclass an app-compatible class by setting its app attribute
to be this app instance.
@@ -312,7 +312,7 @@ def subclass_with_self(self, Class, name=None, attribute="app",
:param Class: The app-compatible class to subclass.
:keyword name: Custom name for the target class.
:keyword attribute: Name of the attribute holding the app,
- default is "app".
+ default is 'app'.
"""
Class = symbol_by_name(Class)
@@ -330,8 +330,8 @@ def _rgetattr(self, path):
return reduce(getattr, [self] + path.split('.'))
def __repr__(self):
- return "<%s %s:0x%x>" % (self.__class__.__name__,
- self.main or "__main__", id(self), )
+ return '<%s %s:0x%x>' % (self.__class__.__name__,
+ self.main or '__main__', id(self), )
def __reduce__(self):
# Reduce only pickles the configuration changes,
@@ -347,19 +347,19 @@ def __reduce_args__(self):
@cached_property
def Worker(self):
- return self.subclass_with_self("celery.apps.worker:Worker")
+ return self.subclass_with_self('celery.apps.worker:Worker')
@cached_property
def WorkController(self, **kwargs):
- return self.subclass_with_self("celery.worker:WorkController")
+ return self.subclass_with_self('celery.worker:WorkController')
@cached_property
def Beat(self, **kwargs):
- return self.subclass_with_self("celery.apps.beat:Beat")
+ return self.subclass_with_self('celery.apps.beat:Beat')
@cached_property
def TaskSet(self):
- return self.subclass_with_self("celery.task.sets:TaskSet")
+ return self.subclass_with_self('celery.task.sets:TaskSet')
@cached_property
def Task(self):
@@ -371,15 +371,15 @@ def annotations(self):
@cached_property
def AsyncResult(self):
- return self.subclass_with_self("celery.result:AsyncResult")
+ return self.subclass_with_self('celery.result:AsyncResult')
@cached_property
def GroupResult(self):
- return self.subclass_with_self("celery.result:GroupResult")
+ return self.subclass_with_self('celery.result:GroupResult')
@cached_property
def TaskSetResult(self): # XXX compat
- return self.subclass_with_self("celery.result:TaskSetResult")
+ return self.subclass_with_self('celery.result:TaskSetResult')
@property
def pool(self):
View
56 celery/app/builtins.py
@@ -52,7 +52,7 @@ def add_backend_cleanup_task(app):
"""
- @app.task(name="celery.backend_cleanup")
+ @app.task(name='celery.backend_cleanup')
def backend_cleanup():
app.backend.cleanup()
return backend_cleanup
@@ -69,7 +69,7 @@ def add_unlock_chord_task(app):
from celery.canvas import subtask
from celery import result as _res
- @app.task(name="celery.chord_unlock", max_retries=None)
+ @app.task(name='celery.chord_unlock', max_retries=None)
def unlock_chord(group_id, callback, interval=1, propagate=False,
max_retries=None, result=None):
result = _res.GroupResult(group_id, map(_res.AsyncResult, result))
@@ -85,7 +85,7 @@ def unlock_chord(group_id, callback, interval=1, propagate=False,
def add_map_task(app):
from celery.canvas import subtask
- @app.task(name="celery.map")
+ @app.task(name='celery.map')
def xmap(task, it):
task = subtask(task).type
return list(map(task, it))
@@ -95,7 +95,7 @@ def xmap(task, it):
def add_starmap_task(app):
from celery.canvas import subtask
- @app.task(name="celery.starmap")
+ @app.task(name='celery.starmap')
def xstarmap(task, it):
task = subtask(task).type
return list(starmap(task, it))
@@ -105,7 +105,7 @@ def xstarmap(task, it):
def add_chunk_task(app):
from celery.canvas import chunks as _chunks
- @app.task(name="celery.chunks")
+ @app.task(name='celery.chunks')
def chunks(task, it, n):
return _chunks.apply_chunks(task, it, n)
@@ -118,7 +118,7 @@ def add_group_task(app):
class Group(app.Task):
app = _app
- name = "celery.group"
+ name = 'celery.group'
accept_magic_kwargs = False
def run(self, tasks, result, group_id):
@@ -139,15 +139,15 @@ def run(self, tasks, result, group_id):
def prepare(self, options, tasks, **kwargs):
r = []
- options["group_id"] = group_id = \
- options.setdefault("task_id", uuid())
+ options['group_id'] = group_id = \
+ options.setdefault('task_id', uuid())
for task in tasks:
opts = task.options
- opts["group_id"] = group_id
+ opts['group_id'] = group_id
try:
- tid = opts["task_id"]
+ tid = opts['task_id']
except KeyError:
- tid = opts["task_id"] = uuid()
+ tid = opts['task_id'] = uuid()
r.append(self.AsyncResult(tid))
return tasks, self.app.GroupResult(group_id, r), group_id
@@ -172,33 +172,33 @@ def add_chain_task(app):
class Chain(app.Task):
app = _app
- name = "celery.chain"
+ name = 'celery.chain'
accept_magic_kwargs = False
def apply_async(self, args=(), kwargs={}, **options):
if self.app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, **options)
- options.pop("publisher", None)
- group_id = options.pop("group_id", None)
- chord = options.pop("chord", None)
+ options.pop('publisher', None)
+ group_id = options.pop('group_id', None)
+ chord = options.pop('chord', None)
tasks = [maybe_subtask(t).clone(
- task_id=options.pop("task_id", uuid()),
+ task_id=options.pop('task_id', uuid()),
**options
)
- for t in kwargs["tasks"]]
+ for t in kwargs['tasks']]
reduce(lambda a, b: a.link(b), tasks)
if group_id:
tasks[-1].set(group_id=group_id)
if chord:
tasks[-1].set(chord=chord)
tasks[0].apply_async()
- results = [task.type.AsyncResult(task.options["task_id"])
+ results = [task.type.AsyncResult(task.options['task_id'])
for task in tasks]
reduce(lambda a, b: a.set_parent(b), reversed(results))
return results[-1]
def apply(self, args=(), kwargs={}, **options):
- tasks = [maybe_subtask(task).clone() for task in kwargs["tasks"]]
+ tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']]
res = prev = None
for task in tasks:
res = task.apply((prev.get(), ) if prev else ())
@@ -219,7 +219,7 @@ def add_chord_task(app):
class Chord(app.Task):
app = _app
- name = "celery.chord"
+ name = 'celery.chord'
accept_magic_kwargs = False
ignore_result = False
@@ -232,11 +232,11 @@ def run(self, header, body, interval=1, max_retries=None,
for task in header.tasks:
opts = task.options
try:
- tid = opts["task_id"]
+ tid = opts['task_id']
except KeyError:
- tid = opts["task_id"] = uuid()
- opts["chord"] = body
- opts["group_id"] = group_id
+ tid = opts['task_id'] = uuid()
+ opts['chord'] = body
+ opts['group_id'] = group_id
r.append(app.AsyncResult(tid))
if eager:
return header.apply(task_id=group_id)
@@ -250,17 +250,17 @@ def run(self, header, body, interval=1, max_retries=None,
def apply_async(self, args=(), kwargs={}, task_id=None, **options):
if self.app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, **options)
- header, body = (list(kwargs["header"]),
- maybe_subtask(kwargs["body"]))
+ header, body = (list(kwargs['header']),
+ maybe_subtask(kwargs['body']))
- callback_id = body.options.setdefault("task_id", task_id or uuid())
+ callback_id = body.options.setdefault('task_id', task_id or uuid())
parent = super(Chord, self).apply_async((header, body), **options)
body_result = self.AsyncResult(callback_id)
body_result.parent = parent
return body_result
def apply(self, args=(), kwargs={}, propagate=True, **options):
- body = kwargs["body"]
+ body = kwargs['body']
res = super(Chord, self).apply(args, dict(kwargs, eager=True),
**options)
return maybe_subtask(body).apply(
View
72 celery/app/control.py
@@ -52,44 +52,44 @@ def _request(self, command, **kwargs):
timeout=self.timeout, reply=True))
def report(self):
- return self._request("report")
+ return self._request('report')
def active(self, safe=False):
- return self._request("dump_active", safe=safe)
+ return self._request('dump_active', safe=safe)
def scheduled(self, safe=False):
- return self._request("dump_schedule", safe=safe)
+ return self._request('dump_schedule', safe=safe)
def reserved(self, safe=False):
- return self._request("dump_reserved", safe=safe)
+ return self._request('dump_reserved', safe=safe)
def stats(self):
- return self._request("stats")
+ return self._request('stats')
def revoked(self):
- return self._request("dump_revoked")
+ return self._request('dump_revoked')
def registered(self):
- return self._request("dump_tasks")
+ return self._request('dump_tasks')
registered_tasks = registered
def ping(self):
- return self._request("ping")
+ return self._request('ping')
def active_queues(self):
- return self._request("active_queues")
+ return self._request('active_queues')
class Control(object):
Mailbox = Mailbox
def __init__(self, app=None):
self.app = app_or_default(app)
- self.mailbox = self.Mailbox("celeryd", type="fanout")
+ self.mailbox = self.Mailbox('celeryd', type='fanout')
@cached_property
def inspect(self):
- return self.app.subclass_with_self(Inspect, reverse="control.inspect")
+ return self.app.subclass_with_self(Inspect, reverse='control.inspect')
def purge(self, connection=None):
"""Discard all waiting tasks.
@@ -105,7 +105,7 @@ def purge(self, connection=None):
discard_all = purge
def revoke(self, task_id, destination=None, terminate=False,
- signal="SIGTERM", **kwargs):
+ signal='SIGTERM', **kwargs):
"""Tell all (or specific) workers to revoke a task by id.
If a task is revoked, the workers will ignore the task and
@@ -120,10 +120,10 @@ def revoke(self, task_id, destination=None, terminate=False,
See :meth:`broadcast` for supported keyword arguments.
"""
- return self.broadcast("revoke", destination=destination,
- arguments={"task_id": task_id,
- "terminate": terminate,
- "signal": signal}, **kwargs)
+ return self.broadcast('revoke', destination=destination,
+ arguments={'task_id': task_id,
+ 'terminate': terminate,
+ 'signal': signal}, **kwargs)
def ping(self, destination=None, timeout=1, **kwargs):
"""Ping all (or specific) workers.
@@ -133,7 +133,7 @@ def ping(self, destination=None, timeout=1, **kwargs):
See :meth:`broadcast` for supported keyword arguments.
"""
- return self.broadcast("ping", reply=True, destination=destination,
+ return self.broadcast('ping', reply=True, destination=destination,
timeout=timeout, **kwargs)
def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
@@ -142,19 +142,19 @@ def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
:param task_name: Name of task to change rate limit for.
:param rate_limit: The rate limit as tasks per second, or a rate limit
- string (`"100/m"`, etc.
+ string (`'100/m'`, etc.
see :attr:`celery.task.base.Task.rate_limit` for
more information).
See :meth:`broadcast` for supported keyword arguments.
"""
- return self.broadcast("rate_limit", destination=destination,
- arguments={"task_name": task_name,
- "rate_limit": rate_limit},
+ return self.broadcast('rate_limit', destination=destination,
+ arguments={'task_name': task_name,
+ 'rate_limit': rate_limit},
**kwargs)
- def add_consumer(self, queue, exchange=None, exchange_type="direct",
+ def add_consumer(self, queue, exchange=None, exchange_type='direct',
routing_key=None, **options):
"""Tell all (or specific) workers to start consuming from a new queue.
@@ -169,26 +169,26 @@ def add_consumer(self, queue, exchange=None, exchange_type="direct",
:param queue: Name of queue to start consuming from.
:keyword exchange: Optional name of exchange.
- :keyword exchange_type: Type of exchange (defaults to "direct")
+ :keyword exchange_type: Type of exchange (defaults to 'direct')
command to, when empty broadcast to all workers.
:keyword routing_key: Optional routing key.
See :meth:`broadcast` for supported keyword arguments.
"""
- return self.broadcast("add_consumer",
- arguments={"queue": queue, "exchange": exchange,
- "exchange_type": exchange_type,
- "routing_key": routing_key}, **options)
+ return self.broadcast('add_consumer',
+ arguments={'queue': queue, 'exchange': exchange,
+ 'exchange_type': exchange_type,
+ 'routing_key': routing_key}, **options)
def cancel_consumer(self, queue, **kwargs):
"""Tell all (or specific) workers to stop consuming from ``queue``.
Supports the same keyword arguments as :meth:`broadcast`.
"""
- return self.broadcast("cancel_consumer",
- arguments={"queue": queue}, **kwargs)
+ return self.broadcast('cancel_consumer',
+ arguments={'queue': queue}, **kwargs)
def time_limit(self, task_name, soft=None, hard=None, **kwargs):
@@ -202,33 +202,33 @@ def time_limit(self, task_name, soft=None, hard=None, **kwargs):
Any additional keyword arguments are passed on to :meth:`broadcast`.
"""
- return self.broadcast("time_limit",
- arguments={"task_name": task_name,
- "hard": hard, "soft": soft}, **kwargs)
+ return self.broadcast('time_limit',
+ arguments={'task_name': task_name,
+ 'hard': hard, 'soft': soft}, **kwargs)
def enable_events(self, destination=None, **kwargs):
"""Tell all (or specific) workers to enable events."""
- return self.broadcast("enable_events", {}, destination, **kwargs)
+ return self.broadcast('enable_events', {}, destination, **kwargs)
def disable_events(self, destination=None, **kwargs):
"""Tell all (or specific) workers to enable events."""
- return self.broadcast("disable_events", {}, destination, **kwargs)
+ return self.broadcast('disable_events', {}, destination, **kwargs)
def pool_grow(self, n=1, destination=None, **kwargs):
"""Tell all (or specific) workers to grow the pool by ``n``.
Supports the same arguments as :meth:`broadcast`.
"""
- return self.broadcast("pool_grow", {}, destination, **kwargs)
+ return self.broadcast('pool_grow', {}, destination, **kwargs)
def pool_shrink(self, n=1, destination=None, **kwargs):
"""Tell all (or specific) workers to shrink the pool by ``n``.
Supports the same arguments as :meth:`broadcast`.
"""
- return self.broadcast("pool_shrink", {}, destination, **kwargs)
+ return self.broadcast('pool_shrink', {}, destination, **kwargs)
def broadcast(self, command, arguments=None, destination=None,
connection=None, reply=False, timeout=1, limit=None,
View
298 celery/app/defaults.py
@@ -16,17 +16,17 @@
from celery.utils import strtobool
from celery.utils.functional import memoize
-is_jython = sys.platform.startswith("java")
-is_pypy = hasattr(sys, "pypy_version_info")
+is_jython = sys.platform.startswith('java')
+is_pypy = hasattr(sys, 'pypy_version_info')
-DEFAULT_POOL = "processes"
+DEFAULT_POOL = 'processes'
if is_jython:
- DEFAULT_POOL = "threads"
+ DEFAULT_POOL = 'threads'
elif is_pypy:
if sys.pypy_version_info[0:3] < (1, 5, 0):
- DEFAULT_POOL = "solo"
+ DEFAULT_POOL = 'solo'
else:
- DEFAULT_POOL = "processes"
+ DEFAULT_POOL = 'processes'
DEFAULT_PROCESS_LOG_FMT = """
@@ -36,9 +36,9 @@
DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
%(task_name)s[%(task_id)s]: %(message)s"""
-_BROKER_OLD = {"deprecate_by": "2.5", "remove_by": "3.0", "alt": "BROKER_URL"}
-_REDIS_OLD = {"deprecate_by": "2.5", "remove_by": "3.0",
- "alt": "URL form of CELERY_RESULT_BACKEND"}
+_BROKER_OLD = {'deprecate_by': '2.5', 'remove_by': '3.0', 'alt': 'BROKER_URL'}
+_REDIS_OLD = {'deprecate_by': '2.5', 'remove_by': '3.0',
+ 'alt': 'URL form of CELERY_RESULT_BACKEND'}
class Option(object):
@@ -50,7 +50,7 @@ class Option(object):
def __init__(self, default=None, *args, **kwargs):
self.default = default
- self.type = kwargs.get("type") or "string"
+ self.type = kwargs.get('type') or 'string'
for attr, value in kwargs.iteritems():
setattr(self, attr, value)
@@ -59,153 +59,153 @@ def to_python(self, value):
NAMESPACES = {
- "BROKER": {
- "URL": Option(None, type="string"),
- "CONNECTION_TIMEOUT": Option(4, type="float"),
- "CONNECTION_RETRY": Option(True, type="bool"),
- "CONNECTION_MAX_RETRIES": Option(100, type="int"),
- "POOL_LIMIT": Option(10, type="int"),
- "INSIST": Option(False, type="bool",
- deprecate_by="2.4", remove_by="3.0"),
- "USE_SSL": Option(False, type="bool"),
- "TRANSPORT": Option(type="string"),
- "TRANSPORT_OPTIONS": Option({}, type="dict"),
- "HOST": Option(type="string", **_BROKER_OLD),
- "PORT": Option(type="int", **_BROKER_OLD),
- "USER": Option(type="string", **_BROKER_OLD),
- "PASSWORD": Option(type="string", **_BROKER_OLD),
- "VHOST": Option(type="string", **_BROKER_OLD),
+ 'BROKER': {
+ 'URL': Option(None, type='string'),
+ 'CONNECTION_TIMEOUT': Option(4, type='float'),
+ 'CONNECTION_RETRY': Option(True, type='bool'),
+ 'CONNECTION_MAX_RETRIES': Option(100, type='int'),
+ 'POOL_LIMIT': Option(10, type='int'),
+ 'INSIST': Option(False, type='bool',
+ deprecate_by='2.4', remove_by='3.0'),
+ 'USE_SSL': Option(False, type='bool'),
+ 'TRANSPORT': Option(type='string'),
+ 'TRANSPORT_OPTIONS': Option({}, type='dict'),
+ 'HOST': Option(type='string', **_BROKER_OLD),
+ 'PORT': Option(type='int', **_BROKER_OLD),
+ 'USER': Option(type='string', **_BROKER_OLD),
+ 'PASSWORD': Option(type='string', **_BROKER_OLD),
+ 'VHOST': Option(type='string', **_BROKER_OLD),
},
- "CASSANDRA": {
- "COLUMN_FAMILY": Option(type="string"),
- "DETAILED_MODE": Option(False, type="bool"),
- "KEYSPACE": Option(type="string"),
- "READ_CONSISTENCY": Option(type="string"),
- "SERVERS": Option(type="list"),
- "WRITE_CONSISTENCY": Option(type="string"),
+ 'CASSANDRA': {
+ 'COLUMN_FAMILY': Option(type='string'),
+ 'DETAILED_MODE': Option(False, type='bool'),
+ 'KEYSPACE': Option(type='string'),
+ 'READ_CONSISTENCY': Option(type='string'),
+ 'SERVERS': Option(type='list'),
+ 'WRITE_CONSISTENCY': Option(type='string'),
},
- "CELERY": {
- "ACKS_LATE": Option(False, type="bool"),
- "ALWAYS_EAGER": Option(False, type="bool"),
- "AMQP_TASK_RESULT_EXPIRES": Option(type="float",
- deprecate_by="2.5", remove_by="3.0",
- alt="CELERY_TASK_RESULT_EXPIRES"),
- "AMQP_TASK_RESULT_CONNECTION_MAX": Option(1, type="int",
- remove_by="2.5", alt="BROKER_POOL_LIMIT"),
- "ANNOTATIONS": Option(type="any"),
- "BROADCAST_QUEUE": Option("celeryctl"),
- "BROADCAST_EXCHANGE": Option("celeryctl"),
- "BROADCAST_EXCHANGE_TYPE": Option("fanout"),
- "CACHE_BACKEND": Option(),
- "CACHE_BACKEND_OPTIONS": Option({}, type="dict"),
- "CREATE_MISSING_QUEUES": Option(True, type="bool"),
- "DEFAULT_RATE_LIMIT": Option(type="string"),
- "DISABLE_RATE_LIMITS": Option(False, type="bool"),
- "DEFAULT_ROUTING_KEY": Option("celery"),
- "DEFAULT_QUEUE": Option("celery"),
- "DEFAULT_EXCHANGE": Option("celery"),
- "DEFAULT_EXCHANGE_TYPE": Option("direct"),
- "DEFAULT_DELIVERY_MODE": Option(2, type="string"),
- "EAGER_PROPAGATES_EXCEPTIONS": Option(False, type="bool"),
- "ENABLE_UTC": Option(False, type="bool"),
- "EVENT_SERIALIZER": Option("json"),
- "IMPORTS": Option((), type="tuple"),
- "INCLUDE": Option((), type="tuple"),
- "IGNORE_RESULT": Option(False, type="bool"),
- "MAX_CACHED_RESULTS": Option(5000, type="int"),
- "MESSAGE_COMPRESSION": Option(type="string"),
- "MONGODB_BACKEND_SETTINGS": Option(type="dict"),
- "REDIS_HOST": Option(type="string", **_REDIS_OLD),
- "REDIS_PORT": Option(type="int", **_REDIS_OLD),
- "REDIS_DB": Option(type="int", **_REDIS_OLD),
- "REDIS_PASSWORD": Option(type="string", **_REDIS_OLD),
- "REDIS_MAX_CONNECTIONS": Option(type="int"),
- "RESULT_BACKEND": Option(type="string"),
- "RESULT_DB_SHORT_LIVED_SESSIONS": Option(False, type="bool"),
- "RESULT_DBURI": Option(),
- "RESULT_ENGINE_OPTIONS": Option(type="dict"),
- "RESULT_EXCHANGE": Option("celeryresults"),
- "RESULT_EXCHANGE_TYPE": Option("direct"),
- "RESULT_SERIALIZER": Option("pickle"),
- "RESULT_PERSISTENT": Option(False, type="bool"),
- "ROUTES": Option(type="any"),
- "SEND_EVENTS": Option(False, type="bool"),
- "SEND_TASK_ERROR_EMAILS": Option(False, type="bool"),
- "SEND_TASK_SENT_EVENT": Option(False, type="bool"),
- "STORE_ERRORS_EVEN_IF_IGNORED": Option(False, type="bool"),
- "TASK_ERROR_WHITELIST": Option((), type="tuple",
- deprecate_by="2.5", remove_by="3.0"),
- "TASK_PUBLISH_RETRY": Option(True, type="bool"),
- "TASK_PUBLISH_RETRY_POLICY": Option({
- "max_retries": 100,
- "interval_start": 0,
- "interval_max": 1,
- "interval_step": 0.2}, type="dict"),
- "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="float"),
- "TASK_SERIALIZER": Option("pickle"),
- "TIMEZONE": Option(type="string"),
- "TRACK_STARTED": Option(False, type="bool"),
- "REDIRECT_STDOUTS": Option(True, type="bool"),
- "REDIRECT_STDOUTS_LEVEL": Option("WARNING"),
- "QUEUES": Option(type="dict"),
- "SECURITY_KEY": Option(type="string"),
- "SECURITY_CERTIFICATE": Option(type="string"),
- "SECURITY_CERT_STORE": Option(type="string"),
+ 'CELERY': {
+ 'ACKS_LATE': Option(False, type='bool'),
+ 'ALWAYS_EAGER': Option(False, type='bool'),
+ 'AMQP_TASK_RESULT_EXPIRES': Option(type='float',
+ deprecate_by='2.5', remove_by='3.0',
+ alt='CELERY_TASK_RESULT_EXPIRES'),
+ 'AMQP_TASK_RESULT_CONNECTION_MAX': Option(1, type='int',
+ remove_by='2.5', alt='BROKER_POOL_LIMIT'),
+ 'ANNOTATIONS': Option(type='any'),
+ 'BROADCAST_QUEUE': Option('celeryctl'),
+ 'BROADCAST_EXCHANGE': Option('celeryctl'),
+ 'BROADCAST_EXCHANGE_TYPE': Option('fanout'),
+ 'CACHE_BACKEND': Option(),
+ 'CACHE_BACKEND_OPTIONS': Option({}, type='dict'),
+ 'CREATE_MISSING_QUEUES': Option(True, type='bool'),
+ 'DEFAULT_RATE_LIMIT': Option(type='string'),
+ 'DISABLE_RATE_LIMITS': Option(False, type='bool'),
+ 'DEFAULT_ROUTING_KEY': Option('celery'),
+ 'DEFAULT_QUEUE': Option('celery'),
+ 'DEFAULT_EXCHANGE': Option('celery'),
+ 'DEFAULT_EXCHANGE_TYPE': Option('direct'),
+ 'DEFAULT_DELIVERY_MODE': Option(2, type='string'),
+ 'EAGER_PROPAGATES_EXCEPTIONS': Option(False, type='bool'),
+ 'ENABLE_UTC': Option(False, type='bool'),
+ 'EVENT_SERIALIZER': Option('json'),
+ 'IMPORTS': Option((), type='tuple'),
+ 'INCLUDE': Option((), type='tuple'),
+ 'IGNORE_RESULT': Option(False, type='bool'),
+ 'MAX_CACHED_RESULTS': Option(5000, type='int'),
+ 'MESSAGE_COMPRESSION': Option(type='string'),
+ 'MONGODB_BACKEND_SETTINGS': Option(type='dict'),
+ 'REDIS_HOST': Option(type='string', **_REDIS_OLD),
+ 'REDIS_PORT': Option(type='int', **_REDIS_OLD),
+ 'REDIS_DB': Option(type='int', **_REDIS_OLD),
+ 'REDIS_PASSWORD': Option(type='string', **_REDIS_OLD),
+ 'REDIS_MAX_CONNECTIONS': Option(type='int'),
+ 'RESULT_BACKEND': Option(type='string'),
+ 'RESULT_DB_SHORT_LIVED_SESSIONS': Option(False, type='bool'),
+ 'RESULT_DBURI': Option(),
+ 'RESULT_ENGINE_OPTIONS': Option(type='dict'),
+ 'RESULT_EXCHANGE': Option('celeryresults'),
+ 'RESULT_EXCHANGE_TYPE': Option('direct'),
+ 'RESULT_SERIALIZER': Option('pickle'),
+ 'RESULT_PERSISTENT': Option(False, type='bool'),
+ 'ROUTES': Option(type='any'),
+ 'SEND_EVENTS': Option(False, type='bool'),
+ 'SEND_TASK_ERROR_EMAILS': Option(False, type='bool'),
+ 'SEND_TASK_SENT_EVENT': Option(False, type='bool'),
+ 'STORE_ERRORS_EVEN_IF_IGNORED': Option(False, type='bool'),
+ 'TASK_ERROR_WHITELIST': Option((), type='tuple',
+ deprecate_by='2.5', remove_by='3.0'),
+ 'TASK_PUBLISH_RETRY': Option(True, type='bool'),
+ 'TASK_PUBLISH_RETRY_POLICY': Option({
+ 'max_retries': 100,
+ 'interval_start': 0,
+ 'interval_max': 1,
+ 'interval_step': 0.2}, type='dict'),
+ 'TASK_RESULT_EXPIRES': Option(timedelta(days=1), type='float'),
+ 'TASK_SERIALIZER': Option('pickle'),
+ 'TIMEZONE': Option(type='string'),
+ 'TRACK_STARTED': Option(False, type='bool'),
+ 'REDIRECT_STDOUTS': Option(True, type='bool'),
+ 'REDIRECT_STDOUTS_LEVEL': Option('WARNING'),
+ 'QUEUES': Option(type='dict'),
+ 'SECURITY_KEY': Option(type='string'),
+ 'SECURITY_CERTIFICATE': Option(type='string'),
+ 'SECURITY_CERT_STORE': Option(type='string'),
},
- "CELERYD": {
- "AUTOSCALER": Option("celery.worker.autoscale.Autoscaler"),
- "AUTORELOADER": Option("celery.worker.autoreload.Autoreloader"),
- "BOOT_STEPS": Option((), type="tuple"),
- "CONCURRENCY": Option(0, type="int"),
- "TIMER": Option(type="string"),
- "TIMER_PRECISION": Option(1.0, type="float"),
- "FORCE_EXECV": Option(True, type="bool"),
- "HIJACK_ROOT_LOGGER": Option(True, type="bool"),
- "CONSUMER": Option(type="string"),
- "LOG_FORMAT": Option(DEFAULT_PROCESS_LOG_FMT),
- "LOG_COLOR": Option(type="bool"),
- "LOG_LEVEL": Option("WARN", deprecate_by="2.4", remove_by="3.0",
- alt="--loglevel argument"),
- "LOG_FILE": Option(deprecate_by="2.4", remove_by="3.0"),
- "MEDIATOR": Option("celery.worker.mediator.Mediator"),
- "MAX_TASKS_PER_CHILD": Option(type="int"),
- "POOL": Option(DEFAULT_POOL),
- "POOL_PUTLOCKS": Option(True, type="bool"),
- "PREFETCH_MULTIPLIER": Option(4, type="int"),
- "STATE_DB": Option(),
- "TASK_LOG_FORMAT": Option(DEFAULT_TASK_LOG_FMT),
- "TASK_SOFT_TIME_LIMIT": Option(type="float"),
- "TASK_TIME_LIMIT": Option(type="float"),
- "WORKER_LOST_WAIT": Option(10.0, type="float")
+ 'CELERYD': {
+ 'AUTOSCALER': Option('celery.worker.autoscale.Autoscaler'),
+ 'AUTORELOADER': Option('celery.worker.autoreload.Autoreloader'),
+ 'BOOT_STEPS': Option((), type='tuple'),
+ 'CONCURRENCY': Option(0, type='int'),
+ 'TIMER': Option(type='string'),
+ 'TIMER_PRECISION': Option(1.0, type='float'),
+ 'FORCE_EXECV': Option(True, type='bool'),
+ 'HIJACK_ROOT_LOGGER': Option(True, type='bool'),
+ 'CONSUMER': Option(type='string'),
+ 'LOG_FORMAT': Option(DEFAULT_PROCESS_LOG_FMT),
+ 'LOG_COLOR': Option(type='bool'),
+ 'LOG_LEVEL': Option('WARN', deprecate_by='2.4', remove_by='3.0',
+ alt='--loglevel argument'),
+ 'LOG_FILE': Option(deprecate_by='2.4', remove_by='3.0'),
+ 'MEDIATOR': Option('celery.worker.mediator.Mediator'),
+ 'MAX_TASKS_PER_CHILD': Option(type='int'),
+ 'POOL': Option(DEFAULT_POOL),
+ 'POOL_PUTLOCKS': Option(True, type='bool'),
+ 'PREFETCH_MULTIPLIER': Option(4, type='int'),
+ 'STATE_DB': Option(),
+ 'TASK_LOG_FORMAT': Option(DEFAULT_TASK_LOG_FMT),
+ 'TASK_SOFT_TIME_LIMIT': Option(type='float'),
+ 'TASK_TIME_LIMIT': Option(type='float'),
+ 'WORKER_LOST_WAIT': Option(10.0, type='float')
},
- "CELERYBEAT": {
- "SCHEDULE": Option({}, type="dict"),
- "SCHEDULER": Option("celery.beat.PersistentScheduler"),
- "SCHEDULE_FILENAME": Option("celerybeat-schedule"),
- "MAX_LOOP_INTERVAL": Option(0, type="float"),
- "LOG_LEVEL": Option("INFO", deprecate_by="2.4", remove_by="3.0"),
- "LOG_FILE": Option(deprecate_by="2.4", remove_by="3.0"),
+ 'CELERYBEAT': {
+ 'SCHEDULE': Option({}, type='dict'),
+ 'SCHEDULER': Option('celery.beat.PersistentScheduler'),
+ 'SCHEDULE_FILENAME': Option('celerybeat-schedule'),
+ 'MAX_LOOP_INTERVAL': Option(0, type='float'),
+ 'LOG_LEVEL': Option('INFO', deprecate_by='2.4', remove_by='3.0'),
+ 'LOG_FILE': Option(deprecate_by='2.4', remove_by='3.0'),
},
- "CELERYMON": {
- "LOG_LEVEL": Option("INFO", deprecate_by="2.4", remove_by="3.0"),
- "LOG_FILE": Option(deprecate_by="2.4", remove_by="3.0"),
- "LOG_FORMAT": Option(DEFAULT_LOG_FMT),
+ 'CELERYMON': {
+ 'LOG_LEVEL': Option('INFO', deprecate_by='2.4', remove_by='3.0'),
+ 'LOG_FILE': Option(deprecate_by='2.4', remove_by='3.0'),
+ 'LOG_FORMAT': Option(DEFAULT_LOG_FMT),
},
- "EMAIL": {
- "HOST": Option("localhost"),
- "PORT": Option(25, type="int"),
- "HOST_USER": Option(),
- "HOST_PASSWORD": Option(),
- "TIMEOUT": Option(2, type="float"),
- "USE_SSL": Option(False, type="bool"),
- "USE_TLS": Option(False, type="bool"),
+ 'EMAIL': {
+ 'HOST': Option('localhost'),
+ 'PORT': Option(25, type='int'),
+ 'HOST_USER': Option(),
+ 'HOST_PASSWORD': Option(),
+ 'TIMEOUT': Option(2, type='float'),
+ 'USE_SSL': Option(False, type='bool'),
+ 'USE_TLS': Option(False, type='bool'),
},
- "SERVER_EMAIL": Option("celery@localhost"),
- "ADMINS": Option((), type="tuple"),
+ 'SERVER_EMAIL': Option('celery@localhost'),
+ 'ADMINS': Option((), type='tuple'),
}
-def flatten(d, ns=""):
+def flatten(d, ns=''):
stack = deque([(ns, d)])
while stack:
name, space = stack.popleft()
@@ -221,15 +221,15 @@ def find_deprecated_settings(source):
from celery.utils import warn_deprecated
for name, opt in flatten(NAMESPACES):
if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None):
- warn_deprecated(description="The %r setting" % (name, ),
+ warn_deprecated(description='The %r setting' % (name, ),
deprecation=opt.deprecate_by,
removal=opt.remove_by,
alternative=opt.alt)
return source
@memoize(maxsize=None)
-def find(name, namespace="celery"):
+def find(name, namespace='celery'):
# - Try specified namespace first.
namespace = namespace.upper()
try:
View
26 celery/app/log.py
@@ -41,8 +41,8 @@ def format(self, record):
record.__dict__.update(task_id=task.request.id,
task_name=task.name)
else:
- record.__dict__.setdefault("task_name", "???")
- record.__dict__.setdefault("task_id", "???")
+ record.__dict__.setdefault('task_name', '???')
+ record.__dict__.setdefault('task_id', '???')
return ColorFormatter.format(self, record)
@@ -60,17 +60,17 @@ def __init__(self, app):
self.colorize = self.app.conf.CELERYD_LOG_COLOR
def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
- redirect_level="WARNING"):
+ redirect_level='WARNING'):
handled = self.setup_logging_subsystem(loglevel, logfile)
if not handled:
- logger = get_logger("celery.redirected")
+ logger = get_logger('celery.redirected')
if redirect_stdouts:
self.redirect_stdouts_to_logger(logger,
loglevel=redirect_level)
os.environ.update(
- CELERY_LOG_LEVEL=str(loglevel) if loglevel else "",
- CELERY_LOG_FILE=str(logfile) if logfile else "",
- CELERY_LOG_REDIRECT="1" if redirect_stdouts else "",
+ CELERY_LOG_LEVEL=str(loglevel) if loglevel else '',
+ CELERY_LOG_FILE=str(logfile) if logfile else '',
+ CELERY_LOG_REDIRECT='1' if redirect_stdouts else '',
CELERY_LOG_REDIRECT_LEVEL=str(redirect_level))
def setup_logging_subsystem(self, loglevel=None, logfile=None,
@@ -107,7 +107,7 @@ def setup_logging_subsystem(self, loglevel=None, logfile=None,
# This is a hack for multiprocessing's fork+exec, so that
# logging before Process.run works.
- logfile_name = logfile if isinstance(logfile, basestring) else ""
+ logfile_name = logfile if isinstance(logfile, basestring) else ''
os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
_MP_FORK_LOGFILE_=logfile_name,
_MP_FORK_LOGFORMAT_=format)
@@ -127,7 +127,7 @@ def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
if colorize is None:
colorize = self.supports_color(logfile)
- logger = self.setup_handlers(get_logger("celery.task"),
+ logger = self.setup_handlers(get_logger('celery.task'),
logfile, format, colorize,
formatter=TaskFormatter, **kwargs)
logger.setLevel(loglevel)
@@ -181,7 +181,7 @@ def _detect_handler(self, logfile=None):
"""Create log handler with either a filename, an open stream
or :const:`None` (stderr)."""
logfile = sys.__stderr__ if logfile is None else logfile
- if hasattr(logfile, "write"):
+ if hasattr(logfile, 'write'):
return logging.StreamHandler(logfile)
return WatchedFileHandler(logfile)
@@ -191,12 +191,12 @@ def _has_handler(self, logger):
def _is_configured(self, logger):
return self._has_handler(logger) and not getattr(
- logger, "_rudimentary_setup", False)
+ logger, '_rudimentary_setup', False)
- def setup_logger(self, name="celery", *args, **kwargs):
+ def setup_logger(self, name='celery', *args, **kwargs):
"""Deprecated: No longer used."""
self.setup_logging_subsystem(*args, **kwargs)
return logging.root
- def get_default_logger(self, name="celery", **kwargs):
+ def get_default_logger(self, name='celery', **kwargs):
return get_logger(name)
View
6 celery/app/registry.py
@@ -39,16 +39,16 @@ def unregister(self, name):
"""
try:
- self.pop(getattr(name, "name", name))
+ self.pop(getattr(name, 'name', name))
except KeyError:
raise self.NotRegistered(name)
# -- these methods are irrelevant now and will be removed in 3.0
def regular(self):
- return self.filter_types("regular")
+ return self.filter_types('regular')
def periodic(self):
- return self.filter_types("periodic")
+ return self.filter_types('periodic')
def filter_types(self, type):
return dict((name, task) for name, task in self.iteritems()
View
12 celery/app/routes.py
@@ -14,7 +14,7 @@
from celery.utils.functional import firstmethod, mpromise
from celery.utils.imports import instantiate
-_first_route = firstmethod("route_for_task")
+_first_route = firstmethod('route_for_task')
class MapRoute(object):
@@ -45,7 +45,7 @@ def route(self, options, task, args=(), kwargs={}):
route = self.lookup_route(task, args, kwargs)
if route: # expands 'queue' in route.
return lpmerge(self.expand_destination(route), options)
- if "queue" not in options:
+ if 'queue' not in options:
options = lpmerge(self.expand_destination(
self.app.conf.CELERY_DEFAULT_QUEUE), options)
return options
@@ -57,21 +57,21 @@ def expand_destination(self, route):
else:
# can use defaults from configured queue, but override specific
# things (like the routing_key): great for topic exchanges.
- queue = route.pop("queue", None)
+ queue = route.pop('queue', None)
if queue: # expand config from configured queue.
try:
dest = self.queues[queue].as_dict()
except KeyError:
if not self.create_missing:
raise QueueNotFound(
- "Queue %r is not defined in CELERY_QUEUES" % queue)
- for key in "exchange", "routing_key":
+ 'Queue %r is not defined in CELERY_QUEUES' % queue)
+ for key in 'exchange', 'routing_key':
if route.get(key) is None:
route[key] = queue
dest = self.app.amqp.queues.add(queue, **route).as_dict()
# needs to be declared by publisher
- dest["queue"] = queue
+ dest['queue'] = queue
return lpmerge(dest, route)
return route
View
128 celery/app/task.py
@@ -32,16 +32,16 @@
#: extracts attributes related to publishing a message from an object.
extract_exec_options = mattrgetter(
- "queue", "routing_key", "exchange",
- "immediate", "mandatory", "priority", "expires",
- "serializer", "delivery_mode", "compression",
+ 'queue', 'routing_key', 'exchange',
+ 'immediate', 'mandatory', 'priority', 'expires',
+ 'serializer', 'delivery_mode', 'compression',
)
#: Billiard sets this when execv is enabled.
#: We use it to find out the name of the original ``__main__``
#: module, so that we can properly rewrite the name of the
#: task to be that of ``App.main``.
-MP_MAIN_FILE = os.environ.get("MP_MAIN_FILE") or None
+MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE') or None
class Context(object):
@@ -79,7 +79,7 @@ def get(self, key, default=None):
return default
def __repr__(self):
- return "<Context: %r>" % (vars(self, ))
+ return '<Context: %r>' % (vars(self, ))
@property
def children(self):
@@ -102,27 +102,27 @@ class TaskType(type):
def __new__(cls, name, bases, attrs):
new = super(TaskType, cls).__new__
- task_module = attrs.get("__module__") or "__main__"
+ task_module = attrs.get('__module__') or '__main__'
# - Abstract class: abstract attribute should not be inherited.
- if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
+ if attrs.pop('abstract', None) or not attrs.get('autoregister', True):
return new(cls, name, bases, attrs)
# The 'app' attribute is now a property, with the real app located
# in the '_app' attribute. Previously this was a regular attribute,
# so we should support classes defining it.
- _app1, _app2 = attrs.pop("_app", None), attrs.pop("app", None)
- app = attrs["_app"] = _app1 or _app2 or current_app
+ _app1, _app2 = attrs.pop('_app', None), attrs.pop('app', None)
+ app = attrs['_app'] = _app1 or _app2 or current_app
# - Automatically generate missing/empty name.
autoname = False
- if not attrs.get("name"):
+ if not attrs.get('name'):
try:
module_name = sys.modules[task_module].__name__
except KeyError: # pragma: no cover
# Fix for manage.py shell_plus (Issue #366).
module_name = task_module
- attrs["name"] = '.'.join(filter(None, [module_name, name]))
+ attrs['name'] = '.'.join(filter(None, [module_name, name]))
autoname = True
# - Create and register class.
@@ -137,11 +137,11 @@ def __new__(cls, name, bases, attrs):
# - to match App.main.
if MP_MAIN_FILE and sys.modules[task_module].__file__ == MP_MAIN_FILE:
# - see comment about :envvar:`MP_MAIN_FILE` above.
- task_module = "__main__"
- if autoname and task_module == "__main__" and app.main:
- attrs["name"] = '.'.join([app.main, name])
+ task_module = '__main__'
+ if autoname and task_module == '__main__' and app.main:
+ attrs['name'] = '.'.join([app.main, name])