# Celery Workflow

In [8]:
# [8] Learning about Signatures
from workflowcelery.celery import app

@app.task
def add_weird_task(a, b, c=5):
    return a + b + c

signature = add_weird_task.s(1, b=4) # .subtask equivalent
print('Type: %s' % type(signature))
print('Args: %s Kwargs: %s Name: %s' % (signature.args, signature.kwargs, signature.task))

Type: <class 'celery.canvas.Signature'>
Args: (1,) Kwargs: {'b': 4} Name: __main__.add_weird_task


In [9]:
# Execute signature syncronously
signature()

10

# Creating Workfows from groups and chains
Workflows are always asyncronous

In [10]:
# [9] Chains
from celery import chain
from workflowcelery.tasks import add_task, multiply_task

signature = chain(add_task.s(1, 2, 3), 
                  multiply_task.s(2),
                  add_task.s(10, 1))
signature



workflowcelery.tasks.add_task(1, 2, 3) | workflowcelery.tasks.multiply_task(2) | workflowcelery.tasks.add_task(10, 1)

In [11]:
# Can be equivalently written as (no need to import chain)
signature = add_task.s(1, 2, 3) | multiply_task.s(2) | add_task.s(10, 1)
signature

workflowcelery.tasks.add_task(1, 2, 3) | workflowcelery.tasks.multiply_task(2) | workflowcelery.tasks.add_task(10, 1)

In [12]:
result = signature()

In [13]:
result.status

'PENDING'

In [14]:
# Start worker for task
! celery -A workflowcelery worker -l info

[1;36m 
 -------------- celery@f181ffb567e8 v3.1.23 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-36-generic-x86_64-with-debian-8.4
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f42cbadf400
- ** ---------- .> transport:   amqp://celery:**@172.17.0.1:5672/celerydemo
- ** ---------- .> results:     postgresql://celery:**@172.17.0.1:5432/celerydemo
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                
[0m[0m
[tasks]
  . workflowcelery.tasks.add_group_task
  . workflowcelery.tasks.add_task
  . workflowcelery.tasks.multiply_task
[0m
[2016-09-01 15:46:02,933: INFO/MainProcess] Connected to amqp://celery:**@172.17.0.1:5672/celerydemo
[2016-09-01 15:46:02,943: INFO/MainProcess] mingle: searching for neighbors
[2016-09-01 15:46:03,951: INFO/MainProcess] mingle: all alone
[2016-09-01 15:46:03,971: INFO/MainProcess] Received t

In [15]:
print('Result ((1+2+3) * 2) + 10 + 1 = %d, %s' % (result.result, result.result == ((1+2+3)*2+10+1)))

Result ((1+2+3) * 2) + 10 + 1 = 23, True


# Groups
Keep in mind that groups return a list of results

In [16]:
# [10] Chains
from celery import group
from workflowcelery.tasks import add_group_task

signature = chain(
    multiply_task.s(2, 4), 
    group(
        add_task.s(2),
        multiply_task.s(2, 7)
    ),
    add_group_task.s()
)

signature

workflowcelery.tasks.multiply_task(2, 4) | (workflowcelery.tasks.add_task(2), workflowcelery.tasks.multiply_task(2, 7)) | workflowcelery.tasks.add_group_task()

In [17]:
# Equivalently
signature = multiply_task.s(2, 4) | group(add_task.s(2), multiply_task.s(2, 7)) | add_group_task.s()

In [18]:
result = signature()

In [19]:
result.status

'PENDING'

In [20]:
# Start worker for task
! celery -A workflowcelery worker -l info

[1;36m 
 -------------- celery@f181ffb567e8 v3.1.23 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-36-generic-x86_64-with-debian-8.4
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f0604d41400
- ** ---------- .> transport:   amqp://celery:**@172.17.0.1:5672/celerydemo
- ** ---------- .> results:     postgresql://celery:**@172.17.0.1:5432/celerydemo
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                
[0m[0m
[tasks]
  . workflowcelery.tasks.add_group_task
  . workflowcelery.tasks.add_task
  . workflowcelery.tasks.multiply_task
[0m
[2016-09-01 15:46:19,792: INFO/MainProcess] Connected to amqp://celery:**@172.17.0.1:5672/celerydemo
[2016-09-01 15:46:19,801: INFO/MainProcess] mingle: searching for neighbors
[2016-09-01 15:46:20,809: INFO/MainProcess] mingle: all alone
[2016-09-01 15:46:20,833: INFO/MainProcess] Received t

In [14]:
print('Result ((2*4)+2) + ((2*4)*2*7)) = %d, %s' % (result.result, result.result == ((2*4)+2) + ((2*4)*2*7)))

Result ((2*4)+2) + ((2*4)*2*7)) = 122, True
