https://docs.celeryproject.org/en/latest/getting-started/brokers/index.html#broker-overview  

|Name|Status|Monitoring|Remote Control|
|---|---|---|---|
|RabbitMQ|Stable|Yes|Yes|
|Redis|Stable|Yes|Yes|
|Amazon SQS|Stable|No|No|
|Zookeeper|Experimental|No|No|

https://docs.celeryproject.org/en/latest/userguide/tasks.html#task-result-backends  
built-in: SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), and Redis

https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application

    celery -A tasks worker --loglevel=info  # restart if changes

https://flower.readthedocs.io/en/latest/install.html#usage

    flower -A tasks --port=5555 # http

In [138]:
!cat tasks.py

from celery import Celery

# backend: optional, supports .get()
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

import time

@app.task
def add(x, y, sleep_time=1):
    print(f"sleep {sleep_time}s") # no output with delay(), use logging instead
    time.sleep(sleep_time)
    return x + y

@app.task
def get_type(obj):
    return str(type(obj))

@app.task
def return_same(obj):
    return obj

In [139]:
%load_ext autoreload
%autoreload 2
    
from tasks import *
import sys, datetime

app.backend

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


<celery.backends.amqp.AMQPBackend at 0x7fa7827dafd0>

In [140]:
app.broker_connection()

<Connection: amqp://guest:**@localhost:5672// at 0x7fa782eb3438>

In [141]:
add(4,4) # work as normal function

sleep 1s


8

In [142]:
result = add.delay(4, 4, sleep_time=3)
try:
    print(result.ready())
    result.get(timeout=1)
except:
    print("Exception: ", sys.exc_info()[0])

False
Exception:  <class 'celery.exceptions.TimeoutError'>


In [143]:
result = add.delay(4, 4, sleep_time=5)
print(result.ready())
try:
    print(datetime.datetime.now())
    result.get()
    print(datetime.datetime.now())
    print(result.ready())
except:
    print("Exception: ", sys.exc_info()[0])

False
2019-05-02 14:17:06.197627
2019-05-02 14:17:11.252982
True


In [144]:
result.get()

8

In [145]:
app.tasks

{'celery.accumulate': <@task: celery.accumulate of tasks at 0x7fa783077ef0>,
 'celery.backend_cleanup': <@task: celery.backend_cleanup of tasks at 0x7fa783077ef0>,
 'celery.chain': <@task: celery.chain of tasks at 0x7fa783077ef0>,
 'celery.chord': <@task: celery.chord of tasks at 0x7fa783077ef0>,
 'celery.chord_unlock': <@task: celery.chord_unlock of tasks at 0x7fa783077ef0>,
 'celery.chunks': <@task: celery.chunks of tasks at 0x7fa783077ef0>,
 'celery.group': <@task: celery.group of tasks at 0x7fa783077ef0>,
 'celery.map': <@task: celery.map of tasks at 0x7fa783077ef0>,
 'celery.starmap': <@task: celery.starmap of tasks at 0x7fa783077ef0>,
 'tasks.add': <@task: tasks.add of tasks at 0x7fa783077ef0>,
 'tasks.get_type': <@task: tasks.get_type of tasks at 0x7fa783077ef0>,
 'tasks.print_obj': <@task: tasks.print_obj of tasks at 0x7fa783077ef0>,
 'tasks.return_same': <@task: tasks.return_same of tasks at 0x7fa783077ef0>}

In [146]:
print_obj

NameError: name 'print_obj' is not defined

In [147]:
app.tasks['tasks.print_obj']("still can use after delete from .py")

still can use after delete from .py


In [148]:
from pandas import DataFrame as pd
df = pd.from_records([{"a":1,"b":2},{"a":3,"b":4}])
get_type(df)

"<class 'pandas.core.frame.DataFrame'>"

In [149]:
# Exception:  <class 'kombu.exceptions.EncodeError'>
j = df
j = df.to_dict() 
j = df.to_records()

# works
j = df.to_csv() 
j = df.to_json()

try:
    r = return_same.delay(j)
    print(r.get())
except:
    print("Exception: ", sys.exc_info()[0])

{"a":{"0":1,"1":3},"b":{"0":2,"1":4}}


In [150]:
import pandas
pandas.read_json(r.get())

Unnamed: 0,a,b
0,1,2
1,3,4
