-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added new example for the stamping mechanism: examples/stamping (#7933)
- Loading branch information
Showing
5 changed files
with
196 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from celery import Celery | ||
|
||
app = Celery( | ||
'myapp', | ||
broker='redis://', | ||
backend='redis://', | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
"""myapp.py | ||
This is a simple example of how to use the stamping feature. | ||
It uses a custom stamping visitor to stamp a workflow with a unique | ||
monitoring id stamp (per task), and a different visitor to stamp the last | ||
task in the workflow. The last task is stamped with a consistent stamp, which | ||
is used to revoke the task by its stamped header using two different approaches: | ||
1. Run the workflow, then revoke the last task by its stamped header. | ||
2. Revoke the last task by its stamped header before running the workflow. | ||
Usage:: | ||
# The worker service reacts to messages by executing tasks. | ||
(window1)$ celery -A myapp worker -l INFO | ||
# The shell service is used to run the example. | ||
(window2)$ celery -A myapp shell | ||
# Use (copy) the content of shell.py to run the workflow via the | ||
# shell service. | ||
# Use one of two demo runs via the shell service: | ||
# 1) run_then_revoke(): Run the workflow and revoke the last task | ||
# by its stamped header during its run. | ||
# 2) revoke_then_run(): Revoke the last task by its stamped header | ||
# before its run, then run the workflow. | ||
# | ||
# See worker logs for output per defined in task_received_handler(). | ||
""" | ||
import json | ||
|
||
# Import tasks in worker context | ||
import tasks # noqa: F401 | ||
from config import app | ||
|
||
from celery.signals import task_received | ||
|
||
|
||
@task_received.connect | ||
def task_received_handler( | ||
sender=None, | ||
request=None, | ||
signal=None, | ||
**kwargs | ||
): | ||
print(f'In {signal.name} for: {repr(request)}') | ||
print(f'Found stamps: {request.stamped_headers}') | ||
print(json.dumps(request.stamps, indent=4, sort_keys=True)) | ||
|
||
|
||
if __name__ == '__main__': | ||
app.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from time import sleep | ||
|
||
from tasks import identity, mul, wait_for_revoke, xsum | ||
from visitors import MonitoringIdStampingVisitor | ||
|
||
from celery.canvas import Signature, chain, chord, group | ||
from celery.result import AsyncResult | ||
|
||
|
||
def create_canvas(n: int) -> Signature: | ||
"""Creates a canvas to calculate: n * sum(1..n) * 10 | ||
For example, if n = 3, the result is 3 * (1 + 2 + 3) * 10 = 180 | ||
""" | ||
canvas = chain( | ||
group(identity.s(i) for i in range(1, n+1)) | xsum.s(), | ||
chord(group(mul.s(10) for _ in range(1, n+1)), xsum.s()), | ||
) | ||
|
||
return canvas | ||
|
||
|
||
def revoke_by_headers(result: AsyncResult, terminate: bool) -> None: | ||
"""Revokes the last task in the workflow by its stamped header | ||
Arguments: | ||
result (AsyncResult): Can be either a frozen or a running result | ||
terminate (bool): If True, the revoked task will be terminated | ||
""" | ||
result.revoke_by_stamped_headers({'mystamp': 'I am a stamp!'}, terminate=terminate) | ||
|
||
|
||
def prepare_workflow() -> Signature: | ||
"""Creates a canvas that waits "n * sum(1..n) * 10" in seconds, | ||
with n = 3. | ||
The canvas itself is stamped with a unique monitoring id stamp per task. | ||
The waiting task is stamped with different consistent stamp, which is used | ||
to revoke the task by its stamped header. | ||
""" | ||
canvas = create_canvas(n=3) | ||
canvas.stamp(MonitoringIdStampingVisitor()) | ||
canvas = canvas | wait_for_revoke.s() | ||
return canvas | ||
|
||
|
||
def run_then_revoke(): | ||
"""Runs the workflow and lets the waiting task run for a while. | ||
Then, the waiting task is revoked by its stamped header. | ||
The expected outcome is that the canvas will be calculated to the end, | ||
but the waiting task will be revoked and terminated *during its run*. | ||
See worker logs for more details. | ||
""" | ||
canvas = prepare_workflow() | ||
result = canvas.delay() | ||
print('Wait 5 seconds, then revoke the last task by its stamped header: "mystamp": "I am a stamp!"') | ||
sleep(5) | ||
print('Revoking the last task...') | ||
revoke_by_headers(result, terminate=True) | ||
|
||
|
||
def revoke_then_run(): | ||
"""Revokes the waiting task by its stamped header before it runs. | ||
Then, run the workflow, which will not run the waiting task that was revoked. | ||
The expected outcome is that the canvas will be calculated to the end, | ||
but the waiting task will not run at all. | ||
See worker logs for more details. | ||
""" | ||
canvas = prepare_workflow() | ||
result = canvas.freeze() | ||
revoke_by_headers(result, terminate=False) | ||
result = canvas.delay() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from time import sleep | ||
|
||
from config import app | ||
|
||
from celery import Task | ||
from examples.stamping.visitors import MyStampingVisitor | ||
|
||
|
||
class MyTask(Task): | ||
"""Custom task for stamping on replace""" | ||
|
||
def on_replace(self, sig): | ||
sig.stamp(MyStampingVisitor()) | ||
return super().on_replace(sig) | ||
|
||
|
||
@app.task | ||
def identity(x): | ||
"""Identity function""" | ||
return x | ||
|
||
|
||
@app.task | ||
def mul(x: int, y: int) -> int: | ||
"""Multiply two numbers""" | ||
return x * y | ||
|
||
|
||
@app.task | ||
def xsum(numbers: list) -> int: | ||
"""Sum a list of numbers""" | ||
return sum(numbers) | ||
|
||
|
||
@app.task | ||
def waitfor(seconds: int) -> None: | ||
"""Wait for "seconds" seconds, ticking every second.""" | ||
print(f'Waiting for {seconds} seconds...') | ||
for i in range(seconds): | ||
sleep(1) | ||
print(f'{i+1} seconds passed') | ||
|
||
|
||
@app.task(bind=True, base=MyTask) | ||
def wait_for_revoke(self: MyTask, seconds: int) -> None: | ||
"""Replace this task with a new task that waits for "seconds" seconds.""" | ||
# This will stamp waitfor with MyStampingVisitor | ||
self.replace(waitfor.s(seconds)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from uuid import uuid4 | ||
|
||
from celery.canvas import StampingVisitor | ||
|
||
|
||
class MyStampingVisitor(StampingVisitor): | ||
def on_signature(self, sig, **headers) -> dict: | ||
return {'mystamp': 'I am a stamp!'} | ||
|
||
|
||
class MonitoringIdStampingVisitor(StampingVisitor): | ||
|
||
def on_signature(self, sig, **headers) -> dict: | ||
return {'monitoring_id': str(uuid4())} |