Skip to content

Commit

Permalink
Added new example for the stamping mechanism: examples/stamping
Browse files Browse the repository at this point in the history
  • Loading branch information
Nusnus committed Nov 30, 2022
1 parent 788dfe4 commit 05a1eeb
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 0 deletions.
7 changes: 7 additions & 0 deletions examples/stamping/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from celery import Celery

app = Celery(
'myapp',
broker='redis://',
backend='redis://',
)
52 changes: 52 additions & 0 deletions examples/stamping/myapp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""myapp.py
This is a simple example of how to use the stamping feature.
It uses a custom stamping visitor to stamp a workflow with a unique
monitoring id stamp (per task), and a different visitor to stamp the last
task in the workflow. The last task is stamped with a consistent stamp, which
is used to revoke the task by its stamped header using two different approaches:
1. Run the workflow, then revoke the last task by its stamped header.
2. Revoke the last task by its stamped header before running the workflow.
Usage::
# The worker service reacts to messages by executing tasks.
(window1)$ celery -A myapp worker -l INFO
# The shell service is used to run the example.
(window2)$ celery -A myapp shell
# Use (copy) the content of shell.py to run the workflow via the
# shell service.
# Use one of two demo runs via the shell service:
# 1) run_then_revoke(): Run the workflow and revoke the last task
# by its stamped header during its run.
# 2) revoke_then_run(): Revoke the last task by its stamped header
# before its run, then run the workflow.
#
# See worker logs for output per defined in task_received_handler().
"""
import json

# Import tasks in worker context
import tasks # noqa: F401
from config import app

from celery.signals import task_received


@task_received.connect
def task_received_handler(
sender=None,
request=None,
signal=None,
**kwargs
):
print(f'In {signal.name} for: {repr(request)}')
print(f'Found stamps: {request.stamped_headers}')
print(json.dumps(request.stamps, indent=4, sort_keys=True))


if __name__ == '__main__':
app.start()
75 changes: 75 additions & 0 deletions examples/stamping/shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from time import sleep

from tasks import identity, mul, wait_for_revoke, xsum
from visitors import MonitoringIdStampingVisitor

from celery.canvas import Signature, chain, chord, group
from celery.result import AsyncResult


def create_canvas(n: int) -> Signature:
"""Creates a canvas to calculate: n * sum(1..n) * 10
For example, if n = 3, the result is 3 * (1 + 2 + 3) * 10 = 180
"""
canvas = chain(
group(identity.s(i) for i in range(1, n+1)) | xsum.s(),
chord(group(mul.s(10) for _ in range(1, n+1)), xsum.s()),
)

return canvas


def revoke_by_headers(result: AsyncResult, terminate: bool) -> None:
"""Revokes the last task in the workflow by its stamped header
Arguments:
result (AsyncResult): Can be either a frozen or a running result
terminate (bool): If True, the revoked task will be terminated
"""
result.revoke_by_stamped_headers({'mystamp': 'I am a stamp!'}, terminate=terminate)


def prepare_workflow() -> Signature:
"""Creates a canvas that waits "n * sum(1..n) * 10" in seconds,
with n = 3.
The canvas itself is stamped with a unique monitoring id stamp per task.
The waiting task is stamped with different consistent stamp, which is used
to revoke the task by its stamped header.
"""
canvas = create_canvas(n=3)
canvas.stamp(MonitoringIdStampingVisitor())
canvas = canvas | wait_for_revoke.s()
return canvas


def run_then_revoke():
"""Runs the workflow and lets the waiting task run for a while.
Then, the waiting task is revoked by its stamped header.
The expected outcome is that the canvas will be calculated to the end,
but the waiting task will be revoked and terminated *during its run*.
See worker logs for more details.
"""
canvas = prepare_workflow()
result = canvas.delay()
print('Wait 5 seconds, then revoke the last task by its stamped header: "mystamp": "I am a stamp!"')
sleep(5)
print('Revoking the last task...')
revoke_by_headers(result, terminate=True)


def revoke_then_run():
"""Revokes the waiting task by its stamped header before it runs.
Then, run the workflow, which will not run the waiting task that was revoked.
The expected outcome is that the canvas will be calculated to the end,
but the waiting task will not run at all.
See worker logs for more details.
"""
canvas = prepare_workflow()
result = canvas.freeze()
revoke_by_headers(result, terminate=False)
result = canvas.delay()
48 changes: 48 additions & 0 deletions examples/stamping/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from time import sleep

from config import app

from celery import Task
from examples.stamping.visitors import MyStampingVisitor


class MyTask(Task):
"""Custom task for stamping on replace"""

def on_replace(self, sig):
sig.stamp(MyStampingVisitor())
return super().on_replace(sig)


@app.task
def identity(x):
"""Identity function"""
return x


@app.task
def mul(x: int, y: int) -> int:
"""Multiply two numbers"""
return x * y


@app.task
def xsum(numbers: list) -> int:
"""Sum a list of numbers"""
return sum(numbers)


@app.task
def waitfor(seconds: int) -> None:
"""Wait for "seconds" seconds, ticking every second."""
print(f'Waiting for {seconds} seconds...')
for i in range(seconds):
sleep(1)
print(f'{i+1} seconds passed')


@app.task(bind=True, base=MyTask)
def wait_for_revoke(self: MyTask, seconds: int) -> None:
"""Replace this task with a new task that waits for "seconds" seconds."""
# This will stamp waitfor with MyStampingVisitor
self.replace(waitfor.s(seconds))
14 changes: 14 additions & 0 deletions examples/stamping/visitors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from uuid import uuid4

from celery.canvas import StampingVisitor


class MyStampingVisitor(StampingVisitor):
def on_signature(self, sig, **headers) -> dict:
return {'mystamp': 'I am a stamp!'}


class MonitoringIdStampingVisitor(StampingVisitor):

def on_signature(self, sig, **headers) -> dict:
return {'monitoring_id': str(uuid4())}

0 comments on commit 05a1eeb

Please sign in to comment.