Permalink
Browse files

Tests passing

  • Loading branch information...
1 parent b4a3f9f commit c50789350dc979791113fc59309786ff8ffb19e5 @ask committed Jun 4, 2012
View
@@ -16,7 +16,6 @@
import os
import sys
-from kombu import Exchange
from kombu.utils import cached_property
from celery import current_app
@@ -168,7 +167,7 @@ class Task(object):
"""
__metaclass__ = TaskType
- __tracer__ = None
+ __trace__ = None
ErrorMail = ErrorMail
MaxRetriesExceededError = MaxRetriesExceededError
@@ -192,36 +191,6 @@ class Task(object):
#: Deprecated and scheduled for removal in v3.0.
accept_magic_kwargs = None
- #: Destination queue. The queue needs to exist
- #: in :setting:`CELERY_QUEUES`. The `routing_key`, `exchange` and
- #: `exchange_type` attributes will be ignored if this is set.
- queue = None
-
- #: Overrides the apps default `routing_key` for this task.
- routing_key = None
-
- #: Overrides the apps default `exchange` for this task.
- exchange = None
-
- #: Overrides the apps default exchange type for this task.
- exchange_type = None
-
- #: Override the apps default delivery mode for this task. Default is
- #: `"persistent"`, but you can change this to `"transient"`, which means
- #: messages will be lost if the broker is restarted. Consult your broker
- #: manual for any additional delivery modes.
- delivery_mode = None
-
- #: Mandatory message routing.
- mandatory = False
-
- #: Request immediate delivery.
- immediate = False
-
- #: Default message priority. A number between 0 to 9, where 0 is the
- #: highest. Note that RabbitMQ does not support priorities.
- priority = None
-
#: Maximum number of retries before giving up. If set to :const:`None`,
#: it will **never** stop retrying.
max_retries = 3
@@ -247,10 +216,6 @@ class Task(object):
#: If enabled an email will be sent to :setting:`ADMINS` whenever a task
#: of this type fails.
send_error_emails = False
- disable_error_emails = False # FIXME
-
- #: List of exception types to send error emails for.
- error_whitelist = ()
#: The name of a serializer that are registered with
#: :mod:`kombu.serialization.registry`. Default is `"pickle"`.
@@ -298,16 +263,10 @@ class Task(object):
#: Default task expiry time.
expires = None
- #: The type of task *(no longer used)*.
- type = "regular"
-
__bound__ = False
from_config = (
- ("exchange_type", "CELERY_DEFAULT_EXCHANGE_TYPE"),
- ("delivery_mode", "CELERY_DEFAULT_DELIVERY_MODE"),
("send_error_emails", "CELERY_SEND_TASK_ERROR_EMAILS"),
- ("error_whitelist", "CELERY_TASK_ERROR_WHITELIST"),
("serializer", "CELERY_TASK_SERIALIZER"),
("rate_limit", "CELERY_DEFAULT_RATE_LIMIT"),
("track_started", "CELERY_TRACK_STARTED"),
@@ -808,7 +767,8 @@ def after_return(self, status, retval, task_id, args, kwargs, einfo):
pass
def send_error_email(self, context, exc, **kwargs):
- if self.send_error_emails and not self.disable_error_emails:
+ if self.send_error_emails and \
+ not getattr(self, "disable_error_emails", None):
self.ErrorMail(self, **kwargs).send(context, exc)
def execute(self, request, pool, loglevel, logfile, **kwargs):
@@ -52,7 +52,7 @@ def process_initializer(app, hostname):
from celery.task.trace import build_tracer
for name, task in app.tasks.iteritems():
- task.__tracer__ = build_tracer(name, task, app.loader, hostname)
+ task.__trace__ = build_tracer(name, task, app.loader, hostname)
signals.worker_process_init.send(sender=None)
View
@@ -13,6 +13,8 @@
"""
from __future__ import absolute_import
+from kombu import Exchange
+
from celery import current_app
from celery.__compat__ import class_property, reclassmethod
from celery.app.task import Context, TaskType, Task as BaseTask # noqa
@@ -33,6 +35,26 @@ class Task(BaseTask):
abstract = True
__bound__ = False
+ #- Deprecated compat. attributes -:
+
+ queue = None
+ routing_key = None
+ exchange = None
+ exchange_type = None
+ delivery_mode = None
+ mandatory = False
+ immediate = False
+ priority = None
+ type = "regular"
+ error_whitelist = ()
+ disable_error_emails = False
+
+ from_config = BaseTask.from_config + (
+ ("exchange_type", "CELERY_DEFAULT_EXCHANGE_TYPE"),
+ ("delivery_mode", "CELERY_DEFAULT_DELIVERY_MODE"),
+ ("error_whitelist", "CELERY_TASK_ERROR_WHITELIST"),
+ )
+
# In old Celery the @task decorator didn't exist, so one would create
# classes instead and use them directly (e.g. MyTask.apply_async()).
# the use of classmethods was a hack so that it was not necessary
@@ -69,7 +91,6 @@ def establish_connection(self, connect_timeout=None):
return self._get_app().broker_connection(
connect_timeout=connect_timeout)
-
def get_publisher(self, connection=None, exchange=None,
connect_timeout=None, exchange_type=None, **options):
"""Deprecated method to get the task publisher (now called producer).
@@ -91,7 +112,6 @@ def get_publisher(self, connection=None, exchange=None,
exchange=exchange and Exchange(exchange, exchange_type),
routing_key=self.routing_key, **options)
-
@classmethod
def get_consumer(self, connection=None, queues=None, **kwargs):
"""Deprecated method used to get consumer for the queue
@@ -107,7 +127,6 @@ def get_consumer(self, connection=None, queues=None, **kwargs):
return Q.TaskConsumer(connection, queues, **kwargs)
-
class PeriodicTask(Task):
"""A periodic task is a task that adds itself to the
:setting:`CELERYBEAT_SCHEDULE` setting."""
@@ -284,15 +284,15 @@ def trace_task(uuid, args, kwargs, request=None):
def trace_task(task, uuid, args, kwargs, request=None, **opts):
try:
- if task.__tracer__ is None:
- task.__tracer__ = build_tracer(task.name, task, **opts)
- return task.__tracer__(uuid, args, kwargs, request)
+ if task.__trace__ is None:
+ task.__trace__ = build_tracer(task.name, task, **opts)
+ return task.__trace__(uuid, args, kwargs, request)
except Exception, exc:
return report_internal_error(task, exc), None
def trace_task_ret(task, uuid, args, kwargs, request):
- return _tasks[task].__tracer__(uuid, args, kwargs, request)[0]
+ return _tasks[task].__trace__(uuid, args, kwargs, request)[0]
def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
@@ -70,7 +70,7 @@ def xtask():
trace_task(xtask, "uuid", (), {})
self.assertTrue(report_internal_error.call_count)
- self.assertIs(xtask.__tracer__, tracer)
+ self.assertIs(xtask.__trace__, tracer)
class test_TraceInfo(Case):
@@ -348,7 +348,7 @@ def update_strategies(self):
hostname = self.hostname
for name, task in self.app.tasks.iteritems():
S[name] = task.start_strategy(app, self)
- task.__tracer__ = build_tracer(name, task, loader, hostname)
+ task.__trace__ = build_tracer(name, task, loader, hostname)
def start(self):
"""Start the consumer.
@@ -73,9 +73,9 @@ def execute_and_trace(name, uuid, args, kwargs, request=None, **opts):
hostname = opts.get("hostname")
setps("celeryd", name, hostname, rate_limit=True)
try:
- if task.__tracer__ is None:
- task.__tracer__ = build_tracer(name, task, **opts)
- return task.__tracer__(uuid, args, kwargs, request)[0]
+ if task.__trace__ is None:
+ task.__trace__ = build_tracer(name, task, **opts)
+ return task.__trace__(uuid, args, kwargs, request)[0]
finally:
setps("celeryd", "-idle-", hostname, rate_limit=True)
except Exception, exc:

0 comments on commit c507893

Please sign in to comment.