Skip to content

Commit

Permalink
Improved the executing user guide
Browse files Browse the repository at this point in the history
  • Loading branch information
Ask Solem committed Oct 12, 2010
1 parent af187b2 commit c9a0fe4
Showing 1 changed file with 128 additions and 72 deletions.
200 changes: 128 additions & 72 deletions docs/userguide/executing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Basics
======

Executing tasks is done with :meth:`~celery.task.Base.Task.apply_async`,
and its shortcut: :meth:`~celery.task.Base.Task.delay`.
and the shortcut: :meth:`~celery.task.Base.Task.delay`.

``delay`` is simple and convenient, as it looks like calling a regular
function:
Expand All @@ -23,7 +23,7 @@ function:
Task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
The same thing using ``apply_async`` is written like this:
The same using ``apply_async`` is written like this:

.. code-block:: python
Expand All @@ -32,20 +32,19 @@ The same thing using ``apply_async`` is written like this:
While ``delay`` is convenient, it doesn't give you as much control as using
``apply_async``. With ``apply_async`` you can override the execution options
available as attributes on the ``Task`` class: ``routing_key``, ``exchange``,
``immediate``, ``mandatory``, ``priority``, and ``serializer``.
In addition you can set a countdown/eta, or provide a custom broker connection.
available as attributes on the ``Task`` class (see :ref:`task-options`).
In addition you can set countdown/eta, task expiry, provide a custom broker
connection and more.

Let's go over these in more detail. The following examples use this simple
task, which adds together two numbers:
Let's go over these in more detail. All the examples uses a simple task,
called ``add``, taking two positional arguments and returning the sum:

.. code-block:: python
@task
def add(x, y):
return x + y
.. note::

You can also execute a task by name using
Expand All @@ -63,31 +62,33 @@ ETA and countdown
=================

The ETA (estimated time of arrival) lets you set a specific date and time that
is the earliest time at which your task will execute. ``countdown`` is
a shortcut to set this by seconds in the future.
is the earliest time at which your task will be executed. ``countdown`` is
a shortcut to set eta by seconds into the future.

.. code-block:: python
>>> result = add.apply_async(args=[10, 10], countdown=3)
>>> result.get() # this takes at least 3 seconds to return
20
Note that your task is guaranteed to be executed at some time *after* the
specified date and time has passed, but not necessarily at that exact time.
The task is guaranteed to be executed at some time *after* the
specified date and time, but not necessarily at that exact time.
Possible reasons for broken deadlines may include many items waiting
in the queue, or heavy network latency. To make sure your tasks
are executed in a timely manner you should monitor queue lenghts. Use
Munin, or similar tools, to receive alerts, so appropiate action can be
taken to ease the workload. See :ref:`monitoring-munin`.

While ``countdown`` is an integer, ``eta`` must be a :class:`~datetime.datetime` object,
specifying an exact date and time in the future. This is good if you already
have a :class:`~datetime.datetime` object and need to modify it with a
:class:`~datetime.timedelta`, or when using time in seconds is not very readable.
While ``countdown`` is an integer, ``eta`` must be a :class:`~datetime.datetime`
object, specifying an exact date and time (including millisecond precision,
and timezone information):

.. code-block:: python
from datetime import datetime, timedelta
>>> from datetime import datetime, timedelta
def add_tomorrow(username):
"""Add this tomorrow."""
tomorrow = datetime.now() + timedelta(days=1)
add.apply_async(args=[10, 10], eta=tomorrow)
>>> tomorrow = datetime.now() + timedelta(days=1)
>>> add.apply_async(args=[10, 10], eta=tomorrow)
.. _executing-expiration:

Expand All @@ -96,7 +97,9 @@ Expiration

The ``expires`` argument defines an optional expiry time,
either as seconds after task publish, or a specific date and time using
:class:~datetime.datetime`.
:class:~datetime.datetime`:

.. code-block:: python
>>> # Task expires after one minute from now.
>>> add.apply_async(args=[10, 10], expires=60)
Expand All @@ -107,35 +110,84 @@ either as seconds after task publish, or a specific date and time using
... expires=datetime.now() + timedelta(days=1)
When a worker receives a task that has been expired it will mark
When a worker receives an expired task it will mark
the task as :state:`REVOKED` (:exc:`~celery.exceptions.TaskRevokedError`).
.. _executing-serializers:
Serializers
===========
Data passed between celery and workers has to be serialized to be
transferred. The default serializer is :mod:`pickle`, but you can
change this for each
task. There is built-in support for using :mod:`pickle`, ``JSON``, ``YAML``
and ``msgpack``. You can also add your own custom serializers by registering
them into the Carrot serializer registry.
Data transferred between clients and workers needs to be serialized.
The default serializer is :mod:`pickle`, but you can
change this globally or for each individual task.
There is built-in support for :mod:`pickle`, ``JSON``, ``YAML``
and ``msgpack``, and you can also add your own custom serializers by registering
them into the Carrot serializer registry (see
`Carrot: Serialization of Data`_).
.. _`Carrot: Serialization of Data`:
http://packages.python.org/carrot/introduction.html#serialization-of-data
Each option has its advantages and disadvantages.
json -- JSON is supported in many programming languages, is now
a standard part of Python (since 2.6), and is fairly fast to decode
using the modern Python libraries such as :mod:`cjson` or :mod:`simplejson`.
The primary disadvantage to JSON is that it limits you to the following
data types: strings, unicode, floats, boolean, dictionaries, and lists.
Decimals and dates are notably missing.
Also, binary data will be transferred using base64 encoding, which will
cause the transferred data to be around 34% larger than an encoding which
supports native binary types.
However, if your data fits inside the above constraints and you need
cross-language support, the default setting of JSON is probably your
best choice.
See http://json.org for more information.
pickle -- If you have no desire to support any language other than
Python, then using the pickle encoding will gain you the support of
all built-in Python data types (except class instances), smaller
messages when sending binary files, and a slight speedup over JSON
processing.
See http://docs.python.org/library/pickle.html for more information.
yaml -- YAML has many of the same characteristics as json,
except that it natively supports more data types (including dates,
recursive references, etc.)
However, the Python libraries for YAML are a good bit slower than the
libraries for JSON.
The default serializer (pickle) supports Python objects, like ``datetime`` and
any custom datatypes you define yourself. But since pickle has poor support
outside of the Python language, you need to choose another serializer if you
need to communicate with other languages. In that case, ``JSON`` is a very
popular choice.
If you need a more expressive set of data types and need to maintain
cross-language compatibility, then YAML may be a better fit than the above.
The serialization method is sent with the message, so the worker knows how to
deserialize any task. Of course, if you use a custom serializer, this must
also be registered in the worker.
See http://yaml.org/ for more information.
When sending a task the serialization method is taken from the following
places in order: The ``serializer`` argument to ``apply_async``, the
Task's ``serializer`` attribute, and finally the global default
:setting:`CELERY_TASK_SERIALIZER` configuration directive.
msgpack -- msgpack is a binary serialization format that is closer to JSON
in features. It is very young however, and support should be considered
experimental at this point.
See http://msgpack.org/ for more information.
The encoding used is available as a message header, so the worker knows how to
deserialize any task. If you use a custom serializer, this serializer must
be available for the worker.
The client uses the following order to decide which serializer
to use when sending a task:
1. The ``serializer`` argument to ``apply_async``
2. The tasks ``serializer`` attribute
3. The default :setting:`CELERY_TASK_SERIALIZER` setting.
*Using the ``serializer`` argument to ``apply_async``:
.. code-block:: python
Expand All @@ -146,13 +198,13 @@ Task's ``serializer`` attribute, and finally the global default
Connections and connection timeouts.
====================================
Currently there is no support for broker connection pools in celery,
so this is something you need to be aware of when sending more than
one task at a time, as ``apply_async``/``delay`` establishes and
closes a connection every time.
Currently there is no support for broker connection pools, so
``apply_async`` establishes and closes a new connection every time
it is called. This is something you need to be aware of when sending
more than one task at a time.
If you need to send more than one task at the same time, it's a good idea to
establish the connection yourself and pass it to ``apply_async``:
You handle the connection manually by creating a
publisher::
.. code-block:: python
Expand All @@ -171,9 +223,15 @@ establish the connection yourself and pass it to ``apply_async``:
print([res.get() for res in results])
The connection timeout is the number of seconds to wait before we give up
establishing the connection. You can set this with the ``connect_timeout``
argument to ``apply_async``:
.. note::
This particularly example is better expressed as a task set.
See :ref:`sets-taskset`. Tasksets already reuses connections.
The connection timeout is the number of seconds to wait before giving up
on establishing the connection. You can set this by using the
``connect_timeout`` argument to ``apply_async``:
.. code-block:: python
Expand All @@ -191,38 +249,36 @@ Routing options
===============
Celery uses the AMQP routing mechanisms to route tasks to different workers.
You can route tasks using the following entities: exchange, queue and routing key.
Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
routing key. Let's look at an example:
Our application has a lot of tasks, some process video, others process images,
and some gather collective intelligence about users. Some of these have
higher priority than others so we want to make sure the high priority tasks
get sent to powerful machines, while low priority tasks are sent to dedicated
machines that can handle these at their own pace.
Let's pretend we have an application with lot of different tasks: some
process video, others process images, and some gather collective intelligence
about its users. Some of these tasks are more important, so we want to make
sure the high priority tasks get sent to dedicated nodes.
For the sake of example we have only one exchange called ``tasks``.
There are different types of exchanges that matches the routing key in
different ways, the exchange types are:
For the sake of this example we have a single exchange called ``tasks``.
There are different types of exchanges, each type interpreting the routing
key in different ways, implementing different messaging scenarios.
The most common types used with Celery are ``direct`` and ``topic``.
* direct
Matches the routing key exactly.
* topic
In the topic exchange the routing key is made up of words separated by dots (``.``).
Words can be matched by the wild cards ``*`` and ``#``, where ``*`` matches one
exact word, and ``#`` matches one or many.
In the topic exchange the routing key is made up of words separated by
dots (``.``). Words can be matched by the wild cards ``*`` and ``#``,
where ``*`` matches one exact word, and ``#`` matches one or many words.
For example, ``*.stock.#`` matches the routing keys ``usd.stock`` and
``euro.stock.db`` but not ``stock.nasdaq``.
(there are also other exchange types, but these are not used by celery)

So, we create three queues, ``video``, ``image`` and ``lowpri`` that bind to
our ``tasks`` exchange. For the queues we use the following binding keys::
We create three queues, ``video``, ``image`` and ``lowpri`` that binds to
the ``tasks`` exchange. For the queues we use the following binding keys::
video: video.#
image: image.#
Expand All @@ -245,7 +301,7 @@ listen to different queues:
Later, if the crop task is consuming a lot of resources,
we can bind some new workers to handle just the ``"image.crop"`` task,
we can bind new workers to handle just the ``"image.crop"`` task,
by creating a new queue that binds to ``"image.crop``".
.. seealso::
Expand All @@ -257,20 +313,20 @@ by creating a new queue that binds to ``"image.crop``".
AMQP options
============
.. warning::
The ``mandatory`` and ``immediate`` flags are not supported by
:mod:`amqplib` at this point.

* mandatory
This sets the delivery to be mandatory. An exception will be raised
This sets the delivery to be mandatory. An exception will be raised
if there are no running workers able to take on the task.
Not supported by :mod:`amqplib`.
* immediate
Request immediate delivery. Will raise an exception
if the task cannot be routed to a worker immediately.
Not supported by :mod:`amqplib`.
* priority
A number between ``0`` and ``9``, where ``0`` is the highest priority.
Expand Down

0 comments on commit c9a0fe4

Please sign in to comment.