![alt text](Celery.jpg "Celery")

### What is Celery?
* an open source asynchronous task queue/job queue based on distributed message passing
* "a task queue with batteries included"
* execution units called *tasks* are executed concurrently on one or more worker nodes
* tasks can execute asynchronously (in the background) or synchronously (wait until ready)
* used by Instagram to process millions of tasks every day

### Choosing a Broker
* Celery requires a solution to send and receive messages
  * typically this is performed via a separate service called a *message broker*
* there are several choices of broker available
  * `RabbitMQ`: feature-complete, stable, durable and easy to install
    * http://www.rabbitmq.com/install-standalone-mac.html
  * `Redis`:  also feature-complete, but  more susceptible to data loss in the event of abrupt termination or power failures http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis
  * Databases: not recommended, but can be sufficient for very small installations and testing purposes:
    * `SQLAlchemy`: http://docs.celeryproject.org/en/latest/getting-started/brokers/sqlalchemy.html#broker-sqlalchemy
    * `Django`: http://docs.celeryproject.org/en/latest/getting-started/brokers/django.html#broker-django
* We'll use `RabbitMQ`, as it's very common, robust, etc.
 

### Installing Celery
* __`pip3 install celery`__
* to test, try __`from celery import Celery`__
* install __`RabbitMQ`__
  * complete installation instructions at http://www.rabbitmq.com/download.html
  * on a Mac, it's easiest to use `Homebrew` to perform the installation http://www.rabbitmq.com/install-standalone-mac.html
  * start server __`/usr/local/sbin/rabbitmq-server`__

### The Celery Application
* first we need a Celery *instance*, i.e., the Celery application, or "app"
* this instance is used as the entry-point for everything you want to do in Celery, e.g., creating tasks and managing workers
  * therefore it must be possible for other modules to import it
  * for now, we'll put everything in a single module called `tasks.py`

In [4]:
# tasks.py
from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

* first argument to Celery is the name of the current module–__`tasks`__–this is needed so that names can be automatically generated
* second argument is the broker keyword argument which specifies the URL of the message broker you want to use

### Next: Running the Celery Worker Server
* in a production system, the worker would be run in the background, e.g., as a daemon (see http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html)
  * for demonstration purposes, we'll just run it by hand in a separate window
    * __`celery -A tasks worker --loglevel=info`__
    * make sure you run it from the directory which contains `tasks.py`
* for more information, try
  * __`celery worker --help`__
  * __`celery --help`__

### Calling the task!
* we will use the [__`delay()`__](http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.delay) method to invoke the task
* it's a shortcut for the fully-featured [__`apply_async()`__](http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async) method

In [3]:
print(r)

092a31fe-d250-4c27-97c1-073b794f43f3


In [2]:
from tasks import add
r = add.delay(4, 12)


* as above, calling __`delay()`__ returns an `AsyncResult` instance
  * can be used to check the state of the task, wait for the task to finish or get its return value (or if the task failed, the exception and traceback)
  * however, this isn’t enabled by default
    * you must configure Celery to use a *result backend*

### Keeping Results
* if you want to keep track of the tasks’ states, Celery needs to store or send the states somewhere
* there are several built-in result backends to choose from: `SQLAlchemy/Django ORM`, `Memcached`, `Redis`, `AMQP (RabbitMQ)`, and `MongoDB`–or you can define your own
* we will use the `rpc` result backend, which sends states back as transient messages (but does not store them)
* the backend is specified via the `backend` argument to Celery
  * (or via the `CELERY_RESULT_BACKEND` setting if you choose to use a configuration module)
* so we'll update `tasks.py` to specify the backend and then try again to see the result...

In [4]:
# tasks.py
from celery import Celery

app = Celery('tasks', backend='rpc://', broker='amqp://guest@localhost')

@app.task
def add(x, y):
    return x + y

In [8]:
# before we excute this, we must restart the Celery worker
from tasks import add
result = add.delay(3.94, 5.27)
result

<AsyncResult: 41c7ec17-5334-4e37-ba32-a3a4356a72c1>

In [9]:
result.ready()

True

In [10]:
result.get(timeout=1)

9.209999999999999

### What if the task raises an exception?
* __`get()`__ will re-raise the exception (unless you tell it not to)
* let's try an example...

In [None]:
@app.task
def exc():
    raise ValueError

In [12]:
from tasks import exc

try:
    result = exc.delay()
    result.get()
except:
    print('Task threw an exception:')

Task threw an exception:


### Lab: Celery
* create a Celery task named __`write_file()`__ which takes two arguments, a filename and a string, and write the string to the file
* create a second task named __`read_file()`__ which reads from a file
  * if the file does not exist, it waits until the file exists (perhaps have it poll once a second)
  * after a successful read, the task should delete the file
* test your tasks
* incorporate the task into a program which gets input from the user, and uses the tasks above to read and write data to a file (imagine that the files are on a remote system, even though that won't be the case for our lab)

In [13]:
from celery_lab import write_file

write_file.delay('/tmp/test.txt', 'hello mr. celery')

<AsyncResult: 206d7c38-9e26-44a2-9acd-58040bfd5b92>