Skip to content
Browse files

Modern Task class no longer supports: establish_connection(), get_pub…

…lisher(), get_consumer() and get_logger()

These are now only available on the compat Task class:

    from celery.task import Task

The new Task class is now avalable as:

    from celery import Task

The methods deprecated above are not very useful, it is far better
to use the Celery app methods (broker_connection, amqp.TaskProducer,
amqp.TaskConsumer instead).

(Also remember to write in changelog that the new modern Task class
no longer uses classmethods).
  • Loading branch information...
1 parent 547c071 commit b4a3f9f67e807fbf6c73e41cff7e3c4b35cfd9ed @ask committed Jun 4, 2012
View
1 celery/__init__.py
@@ -20,6 +20,7 @@
old_module, new_module = recreate_module(__name__, # pragma: no cover
by_module={
"celery.app": ["Celery", "bugreport"],
+ "celery.app.task": ["Task"],
"celery.state": ["current_app", "current_task"],
"celery.canvas": ["chain", "chord", "chunks",
"group", "subtask", "xmap", "xstarmap"],
View
2 celery/app/base.py
@@ -300,7 +300,7 @@ def _after_fork(self, obj_):
def create_task_cls(self):
"""Creates a base task class using default configuration
taken from this app."""
- return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
+ return self.subclass_with_self("celery.app.task:Task", name="Task",
attribute="_app", abstract=True)
def subclass_with_self(self, Class, name=None, attribute="app",
View
74 celery/app/task.py
@@ -159,7 +159,7 @@ def __repr__(cls):
return "<unbound %s>" % (cls.__name__, )
-class BaseTask(object):
+class Task(object):
"""Task base class.
When called tasks apply the :meth:`run` method. This method must
@@ -402,65 +402,6 @@ def run(self, *args, **kwargs):
def start_strategy(self, app, consumer):
return instantiate(self.Strategy, self, app, consumer)
- def get_logger(self, **kwargs):
- """Get task-aware logger object."""
- logger = get_logger(self.name)
- if logger.parent is logging.root:
- logger.parent = get_logger("celery.task")
- return logger
-
- def establish_connection(self, connect_timeout=None):
- """Establish a connection to the message broker."""
- return self._get_app().broker_connection(
- connect_timeout=connect_timeout)
-
- def get_publisher(self, connection=None, exchange=None,
- connect_timeout=None, exchange_type=None, **options):
- """Get a celery task message publisher.
-
- :rtype :class:`~celery.app.amqp.TaskProducer`:
-
- .. warning::
-
- If you don't specify a connection, one will automatically
- be established for you, in that case you need to close this
- connection after use::
-
- >>> publisher = self.get_publisher()
- >>> # ... do something with publisher
- >>> publisher.connection.close()
-
- """
- exchange = self.exchange if exchange is None else exchange
- if exchange_type is None:
- exchange_type = self.exchange_type
- connection = connection or self.establish_connection(connect_timeout)
- return self._get_app().amqp.TaskProducer(connection,
- exchange=exchange and Exchange(exchange, exchange_type),
- routing_key=self.routing_key, **options)
-
- def get_consumer(self, connection=None, queues=None, **kwargs):
- """Get message consumer.
-
- :rtype :class:`kombu.messaging.Consumer`:
-
- .. warning::
-
- If you don't specify a connection, one will automatically
- be established for you, in that case you need to close this
- connection after use::
-
- >>> consumer = self.get_consumer()
- >>> # do something with consumer
- >>> consumer.close()
- >>> consumer.connection.close()
-
- """
- app = self._get_app()
- connection = connection or self.establish_connection()
- return app.amqp.TaskConsumer(connection,
- queues or app.amqp.queue_or_default(self.queue), **kwargs)
-
def delay(self, *args, **kwargs):
"""Star argument version of :meth:`apply_async`.
@@ -700,7 +641,7 @@ def apply(self, args=None, kwargs=None, **options):
:rtype :class:`celery.result.EagerResult`:
"""
- # trace imports BaseTask, so need to import inline.
+ # trace imports Task, so need to import inline.
from celery.task.trace import eager_trace_task
app = self._get_app()
@@ -893,9 +834,13 @@ def __repr__(self):
"""`repr(task)`"""
return "<@task: %s>" % (self.name, )
- @cached_property
- def logger(self):
- return self.get_logger()
+ def _get_logger(self, **kwargs):
+ """Get task-aware logger object."""
+ logger = get_logger(self.name)
+ if logger.parent is logging.root:
+ logger.parent = get_logger("celery.task")
+ return logger
+ logger = cached_property(_get_logger)
@property
def request(self):
@@ -904,3 +849,4 @@ def request(self):
@property
def __name__(self):
return self.__class__.__name__
+BaseTask = Task # compat alias
View
2 celery/bin/celery.py
@@ -433,7 +433,7 @@ def run(self, force_ipython=False, force_bpython=False,
import celery.task.base
self.app.loader.import_default_modules()
self.locals = {"celery": self.app,
- "BaseTask": celery.task.base.BaseTask,
+ "Task": celery.Task,
"chord": celery.chord,
"group": celery.group,
"chain": celery.chain,
View
73 celery/task/base.py
@@ -15,17 +15,21 @@
from celery import current_app
from celery.__compat__ import class_property, reclassmethod
-from celery.app.task import Context, TaskType, BaseTask # noqa
+from celery.app.task import Context, TaskType, Task as BaseTask # noqa
from celery.schedules import maybe_schedule
#: list of methods that must be classmethods in the old API.
_COMPAT_CLASSMETHODS = (
- "get_logger", "establish_connection", "get_publisher", "get_consumer",
- "delay", "apply_async", "retry", "apply", "AsyncResult", "subtask",
- "push_request", "pop_request")
+ "delay", "apply_async", "retry", "apply",
+ "AsyncResult", "subtask", "push_request", "pop_request")
class Task(BaseTask):
+ """Deprecated Task base class.
+
+ Modern applications should use :class:`celery.Task` instead.
+
+ """
abstract = True
__bound__ = False
@@ -42,6 +46,67 @@ def _get_request(self):
return self.request_stack.top
request = class_property(_get_request)
+ #: Deprecated alias to :attr:`logger``.
+ get_logger = reclassmethod(BaseTask._get_logger)
+
+ @classmethod
+ def establish_connection(self, connect_timeout=None):
+ """Deprecated method used to get a broker connection.
+
+ Should be replaced with :meth:`@Celery.broker_connection`
+ instead, or by acquiring connections from the connection pool:
+
+ .. code-block:: python
+
+ # using the connection pool
+ with celery.pool.acquire(block=True) as conn:
+ ...
+
+ # establish fresh connection
+ with celery.broker_connection() as conn:
+ ...
+ """
+ return self._get_app().broker_connection(
+ connect_timeout=connect_timeout)
+
+
+ def get_publisher(self, connection=None, exchange=None,
+ connect_timeout=None, exchange_type=None, **options):
+ """Deprecated method to get the task publisher (now called producer).
+
+ Should be replaced with :class:`@amqp.TaskProducer`:
+
+ .. code-block:: python
+
+ with celery.broker_connection() as conn:
+ with celery.amqp.TaskProducer(conn) as prod:
+ my_task.apply_async(producer=prod)
+
+ """
+ exchange = self.exchange if exchange is None else exchange
+ if exchange_type is None:
+ exchange_type = self.exchange_type
+ connection = connection or self.establish_connection(connect_timeout)
+ return self._get_app().amqp.TaskProducer(connection,
+ exchange=exchange and Exchange(exchange, exchange_type),
+ routing_key=self.routing_key, **options)
+
+
+ @classmethod
+ def get_consumer(self, connection=None, queues=None, **kwargs):
+ """Deprecated method used to get consumer for the queue
+ this task is sent to.
+
+ Should be replaced with :class:`@amqp.TaskConsumer` instead:
+
+ """
+ Q = self._get_app().amqp
+ connection = connection or self.establish_connection()
+ if queues is None:
+ queues = Q.queues[self.queue] if self.queue else Q.default_queue
+ return Q.TaskConsumer(connection, queues, **kwargs)
+
+
class PeriodicTask(Task):
"""A periodic task is a task that adds itself to the
View
2 celery/task/trace.py
@@ -29,7 +29,7 @@
from celery import current_app
from celery import states, signals
from celery.state import _task_stack, default_app
-from celery.app.task import BaseTask, Context
+from celery.app.task import Task as BaseTask, Context
from celery.datastructures import ExceptionInfo
from celery.exceptions import RetryTaskError
from celery.utils.serialization import get_pickleable_exception
View
10 celery/tests/app/test_amqp.py
@@ -7,31 +7,31 @@
from celery.tests.utils import AppCase
-class test_TaskPublisher(AppCase):
+class test_TaskProducer(AppCase):
def test__exit__(self):
- publisher = self.app.amqp.TaskPublisher(self.app.broker_connection())
+ publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
publisher.release = Mock()
with publisher:
pass
publisher.release.assert_called_with()
def test_declare(self):
- publisher = self.app.amqp.TaskPublisher(self.app.broker_connection())
+ publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
publisher.exchange.name = "foo"
publisher.declare()
publisher.exchange.name = None
publisher.declare()
def test_retry_policy(self):
- pub = self.app.amqp.TaskPublisher(Mock())
+ pub = self.app.amqp.TaskProducer(Mock())
pub.channel.connection.client.declared_entities = set()
pub.delay_task("tasks.add", (2, 2), {},
retry_policy={"frobulate": 32.4})
def test_publish_no_retry(self):
- pub = self.app.amqp.TaskPublisher(Mock())
+ pub = self.app.amqp.TaskProducer(Mock())
pub.channel.connection.client.declared_entities = set()
pub.delay_task("tasks.add", (2, 2), {}, retry=False, chord=123)
self.assertFalse(pub.connection.ensure.call_count)
View
2 celery/tests/app/test_app.py
@@ -261,7 +261,7 @@ def send(self, type, **fields):
chan.close()
assert conn.transport_cls == "memory"
- pub = self.app.amqp.TaskPublisher(conn,
+ pub = self.app.amqp.TaskProducer(conn,
exchange=Exchange("foo_exchange"))
dispatcher = Dispatcher()
View
14 celery/tests/utilities/test_compat.py
@@ -2,28 +2,28 @@
import celery
-from celery.app.task import BaseTask
-from celery.task.base import Task
+from celery.app.task import Task as ModernTask
+from celery.task.base import Task as CompatTask
from celery.tests.utils import Case
class test_MagicModule(Case):
def test_class_property_set_without_type(self):
- self.assertTrue(BaseTask.__dict__["app"].__get__(Task()))
+ self.assertTrue(ModernTask.__dict__["app"].__get__(CompatTask()))
def test_class_property_set_on_class(self):
- self.assertIs(BaseTask.__dict__["app"].__set__(None, None),
- BaseTask.__dict__["app"])
+ self.assertIs(ModernTask.__dict__["app"].__set__(None, None),
+ ModernTask.__dict__["app"])
def test_class_property_set(self):
- class X(Task):
+ class X(CompatTask):
pass
app = celery.Celery(set_as_current=False)
- BaseTask.__dict__["app"].__set__(X(), app)
+ ModernTask.__dict__["app"].__set__(X(), app)
self.assertEqual(X.app, app)
def test_dir(self):

0 comments on commit b4a3f9f

Please sign in to comment.
Something went wrong with that request. Please try again.