forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 40
/
signals.py
385 lines (244 loc) · 7.73 KB
/
signals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
"""
==============
celery.signals
==============
Signals allows decoupled applications to receive notifications when
certain actions occur elsewhere in the application.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
.. contents::
:local:
.. _signal-basics:
Basics
======
Several kinds of events trigger signals, you can connect to these signals
to perform actions as they trigger.
Example connecting to the :signal:`task_sent` signal:
.. code-block:: python
from celery.signals import task_sent
def task_sent_handler(sender=None, task_id=None, task=None, args=None,
kwargs=None, **kwds):
print("Got signal task_sent for task id %s" % (task_id, ))
task_sent.connect(task_sent_handler)
Some signals also have a sender which you can filter by. For example the
:signal:`task_sent` signal uses the task name as a sender, so you can
connect your handler to be called only when tasks with name `"tasks.add"`
has been sent by providing the `sender` argument to
:class:`~celery.utils.dispatch.signal.Signal.connect`:
.. code-block:: python
task_sent.connect(task_sent_handler, sender="tasks.add")
.. _signal-ref:
Signals
=======
Task Signals
------------
.. signal:: task_sent
task_sent
~~~~~~~~~
Dispatched when a task has been sent to the broker.
Note that this is executed in the client process, the one sending
the task, not in the worker.
Sender is the name of the task being sent.
Provides arguments:
* task_id
Id of the task to be executed.
* task
The task being executed.
* args
the tasks positional arguments.
* kwargs
The tasks keyword arguments.
* eta
The time to execute the task.
* taskset
Id of the taskset this task is part of (if any).
.. signal:: task_prerun
task_prerun
~~~~~~~~~~~
Dispatched before a task is executed.
Sender is the task class being executed.
Provides arguments:
* task_id
Id of the task to be executed.
* task
The task being executed.
* args
the tasks positional arguments.
* kwargs
The tasks keyword arguments.
.. signal:: task_postrun
task_postrun
~~~~~~~~~~~~
Dispatched after a task has been executed.
Sender is the task class executed.
Provides arguments:
* task_id
Id of the task to be executed.
* task
The task being executed.
* args
The tasks positional arguments.
* kwargs
The tasks keyword arguments.
* retval
The return value of the task.
.. signal:: task_failure
task_failure
~~~~~~~~~~~~
Dispatched when a task fails.
Sender is the task class executed.
Provides arguments:
* task_id
Id of the task.
* exception
Exception instance raised.
* args
Positional arguments the task was called with.
* kwargs
Keyword arguments the task was called with.
* traceback
Stack trace object.
* einfo
The :class:`celery.datastructures.ExceptionInfo` instance.
Worker Signals
--------------
.. signal:: worker_init
worker_init
~~~~~~~~~~~
Dispatched before the worker is started.
.. signal:: worker_ready
worker_ready
~~~~~~~~~~~~
Dispatched when the worker is ready to accept work.
.. signal:: worker_process_init
worker_process_init
~~~~~~~~~~~~~~~~~~~
Dispatched by each new pool worker process when it starts.
.. signal:: worker_shutdown
worker_shutdown
~~~~~~~~~~~~~~~
Dispatched when the worker is about to shut down.
Celerybeat Signals
------------------
.. signal:: beat_init
beat_init
~~~~~~~~~
Dispatched when celerybeat starts (either standalone or embedded).
Sender is the :class:`celery.beat.Service` instance.
.. signal:: beat_embedded_init
beat_embedded_init
~~~~~~~~~~~~~~~~~~
Dispatched in addition to the :signal:`beat_init` signal when celerybeat is
started as an embedded process. Sender is the
:class:`celery.beat.Service` instance.
Eventlet Signals
----------------
.. signal:: eventlet_pool_started
eventlet_pool_started
~~~~~~~~~~~~~~~~~~~~~
Sent when the eventlet pool has been started.
Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
.. signal:: eventlet_pool_preshutdown
eventlet_pool_preshutdown
~~~~~~~~~~~~~~~~~~~~~~~~~
Sent when the worker shutdown, just before the eventlet pool
is requested to wait for remaining workers.
Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
.. signal:: eventlet_pool_postshutdown
eventlet_pool_postshutdown
~~~~~~~~~~~~~~~~~~~~~~~~~~
Sent when the pool has been joined and the worker is ready to shutdown.
Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
.. signal:: eventlet_pool_apply
eventlet_pool_apply
~~~~~~~~~~~~~~~~~~~
Sent whenever a task is applied to the pool.
Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
Provides arguments:
* target
The target function.
* args
Positional arguments.
* kwargs
Keyword arguments.
Logging Signals
---------------
.. signal:: setup_logging
setup_logging
~~~~~~~~~~~~~
Celery won't configure the loggers if this signal is connected,
so you can use this to completely override the logging configuration
with your own.
If you would like to augment the logging configuration setup by
Celery then you can use the :signal:`after_setup_logger` and
:signal:`after_setup_task_logger` signals.
Provides arguments:
* loglevel
The level of the logging object.
* logfile
The name of the logfile.
* format
The log format string.
* colorize
Specify if log messages are colored or not.
.. signal:: after_setup_logger
after_setup_logger
~~~~~~~~~~~~~~~~~~
Sent after the setup of every global logger (not task loggers).
Used to augment logging configuration.
Provides arguments:
* logger
The logger object.
* loglevel
The level of the logging object.
* logfile
The name of the logfile.
* format
The log format string.
* colorize
Specify if log messages are colored or not.
.. signal:: after_setup_task_logger
after_setup_task_logger
~~~~~~~~~~~~~~~~~~~~~~~
Sent after the setup of every single task logger.
Used to augment logging configuration.
Provides arguments:
* logger
The logger object.
* loglevel
The level of the logging object.
* logfile
The name of the logfile.
* format
The log format string.
* colorize
Specify if log messages are colored or not.
"""
from celery.utils.dispatch import Signal
task_sent = Signal(providing_args=["task_id", "task",
"args", "kwargs",
"eta", "taskset"])
task_prerun = Signal(providing_args=["task_id", "task",
"args", "kwargs"])
task_postrun = Signal(providing_args=["task_id", "task",
"args", "kwargs", "retval"])
task_failure = Signal(providing_args=["task_id", "exception",
"args", "kwargs", "traceback",
"einfo"])
worker_init = Signal(providing_args=[])
worker_process_init = Signal(providing_args=[])
worker_ready = Signal(providing_args=[])
worker_shutdown = Signal(providing_args=[])
setup_logging = Signal(providing_args=["loglevel", "logfile",
"format", "colorize"])
after_setup_logger = Signal(providing_args=["logger", "loglevel", "logfile",
"format", "colorize"])
after_setup_task_logger = Signal(providing_args=["logger", "loglevel",
"logfile", "format",
"colorize"])
beat_init = Signal(providing_args=[])
beat_embedded_init = Signal(providing_args=[])
eventlet_pool_started = Signal(providing_args=[])
eventlet_pool_preshutdown = Signal(providing_args=[])
eventlet_pool_postshutdown = Signal(providing_args=[])
eventlet_pool_apply = Signal(providing_args=["target", "args", "kwargs"])