forked from celery/celery
/
trace.py
271 lines (228 loc) · 9.83 KB
/
trace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# -*- coding: utf-8 -*-
"""
celery.task.trace
~~~~~~~~~~~~~~~~~~~~
This module defines how the task execution is traced:
errors are recorded, handlers are applied and so on.
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
# ## ---
# This is the heart of the worker, the inner loop so to speak.
# It used to be split up into nice little classes and methods,
# but in the end it only resulted in bad performance and horrible tracebacks,
# so instead we now use one closure per task class.
import os
import socket
import sys
import traceback
from warnings import warn
from kombu.utils import kwdict
from celery import current_app
from celery import states, signals
from celery.app.state import _tls
from celery.app.task import BaseTask
from celery.datastructures import ExceptionInfo
from celery.exceptions import RetryTaskError
from celery.utils.serialization import get_pickleable_exception
send_prerun = signals.task_prerun.send
prerun_receivers = signals.task_prerun.receivers
send_postrun = signals.task_postrun.send
postrun_receivers = signals.task_postrun.receivers
STARTED = states.STARTED
SUCCESS = states.SUCCESS
RETRY = states.RETRY
FAILURE = states.FAILURE
EXCEPTION_STATES = states.EXCEPTION_STATES
def mro_lookup(cls, attr, stop=()):
"""Returns the first node by MRO order that defines an attribute.
:keyword stop: A list of types that if reached will stop the search.
:returns None: if the attribute was not found.
"""
for node in cls.mro():
if node in stop:
return
if attr in node.__dict__:
return node
def defines_custom_call(task):
"""Returns true if the task or one of its bases
defines __call__ (excluding the one in BaseTask)."""
return mro_lookup(task.__class__, "__call__", stop=(BaseTask, object))
class TraceInfo(object):
__slots__ = ("state", "retval", "exc_info",
"exc_type", "exc_value", "tb", "strtb")
def __init__(self, state, retval=None, exc_info=None):
self.state = state
self.retval = retval
self.exc_info = exc_info
if exc_info:
self.exc_type, self.exc_value, self.tb = exc_info
else:
self.exc_type = self.exc_value = self.tb = None
def handle_error_state(self, task, eager=False):
store_errors = not eager
if task.ignore_result:
store_errors = task.store_errors_even_if_ignored
return {
RETRY: self.handle_retry,
FAILURE: self.handle_failure,
}[self.state](task, store_errors=store_errors)
def handle_retry(self, task, store_errors=True):
"""Handle retry exception."""
# Create a simpler version of the RetryTaskError that stringifies
# the original exception instead of including the exception instance.
# This is for reporting the retry in logs, email etc, while
# guaranteeing pickleability.
req = task.request
exc, type_, tb = self.retval, self.exc_type, self.tb
message, orig_exc = self.retval.args
if store_errors:
task.backend.mark_as_retry(req.id, orig_exc, self.strtb)
expanded_msg = "%s: %s" % (message, str(orig_exc))
einfo = ExceptionInfo((type_, type_(expanded_msg, None), tb))
task.on_retry(exc, req.id, req.args, req.kwargs, einfo)
return einfo
def handle_failure(self, task, store_errors=True):
"""Handle exception."""
req = task.request
exc, type_, tb = self.retval, self.exc_type, self.tb
if store_errors:
task.backend.mark_as_failure(req.id, exc, self.strtb)
exc = get_pickleable_exception(exc)
einfo = ExceptionInfo((type_, exc, tb))
task.on_failure(exc, req.id, req.args, req.kwargs, einfo)
signals.task_failure.send(sender=task, task_id=req.id,
exception=exc, args=req.args,
kwargs=req.kwargs, traceback=tb,
einfo=einfo)
return einfo
@property
def strtb(self):
if self.exc_info:
return '\n'.join(traceback.format_exception(*self.exc_info))
return ''
def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
Info=TraceInfo, eager=False, propagate=False):
# If the task doesn't define a custom __call__ method
# we optimize it away by simply calling the run method directly,
# saving the extra method call and a line less in the stack trace.
fun = task if defines_custom_call(task) else task.run
loader = loader or current_app.loader
backend = task.backend
ignore_result = task.ignore_result
track_started = task.track_started
track_started = not eager and (task.track_started and not ignore_result)
publish_result = not eager and not ignore_result
hostname = hostname or socket.gethostname()
loader_task_init = loader.on_task_init
loader_cleanup = loader.on_process_cleanup
task_on_success = task.on_success
task_after_return = task.after_return
task_request = task.request
store_result = backend.store_result
backend_cleanup = backend.process_cleanup
pid = os.getpid()
update_request = task_request.update
clear_request = task_request.clear
on_chord_part_return = backend.on_chord_part_return
from celery.task import sets
subtask = sets.subtask
def trace_task(uuid, args, kwargs, request=None):
R = I = None
kwargs = kwdict(kwargs)
try:
_tls.current_task = task
update_request(request or {}, args=args,
called_directly=False, kwargs=kwargs)
try:
# -*- PRE -*-
send_prerun(sender=task, task_id=uuid, task=task,
args=args, kwargs=kwargs)
loader_task_init(uuid, task)
if track_started:
store_result(uuid, {"pid": pid,
"hostname": hostname}, STARTED)
# -*- TRACE -*-
try:
R = retval = fun(*args, **kwargs)
state, einfo = SUCCESS, None
except RetryTaskError, exc:
I = Info(RETRY, exc, sys.exc_info())
state, retval, einfo = I.state, I.retval, I.exc_info
R = I.handle_error_state(task, eager=eager)
except Exception, exc:
if propagate:
raise
I = Info(FAILURE, exc, sys.exc_info())
state, retval, einfo = I.state, I.retval, I.exc_info
R = I.handle_error_state(task, eager=eager)
[subtask(errback).apply_async((uuid, ))
for errback in task_request.errbacks or []]
except BaseException, exc:
raise
except: # pragma: no cover
# For Python2.5 where raising strings are still allowed
# (but deprecated)
if propagate:
raise
I = Info(FAILURE, None, sys.exc_info())
state, retval, einfo = I.state, I.retval, I.exc_info
R = I.handle_error_state(task, eager=eager)
[subtask(errback).apply_async((uuid, ))
for errback in task_request.errbacks or []]
else:
task_on_success(retval, uuid, args, kwargs)
# callback tasks must be applied before the result is
# stored, so that result.children is populated.
[subtask(callback).apply_async((retval, ))
for callback in task_request.callbacks or []]
if publish_result:
store_result(uuid, retval, SUCCESS)
# -* POST *-
if task_request.chord:
on_chord_part_return(task)
task_after_return(state, retval, uuid, args, kwargs, einfo)
send_postrun(sender=task, task_id=uuid, task=task,
args=args, kwargs=kwargs, retval=retval)
finally:
_tls.current_task = None
clear_request()
if not eager:
try:
backend_cleanup()
loader_cleanup()
except (KeyboardInterrupt, SystemExit, MemoryError):
raise
except Exception, exc:
logger = current_app.log.get_default_logger()
logger.error("Process cleanup failed: %r", exc,
exc_info=True)
except Exception, exc:
if eager:
raise
R = report_internal_error(task, exc)
return R, I
return trace_task
def trace_task(task, uuid, args, kwargs, request=None, **opts):
try:
if task.__tracer__ is None:
task.__tracer__ = build_tracer(task.name, task, **opts)
return task.__tracer__(uuid, args, kwargs, request)
except Exception, exc:
return report_internal_error(task, exc), None
def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
opts.setdefault("eager", True)
return build_tracer(task.name, task, **opts)(
uuid, args, kwargs, request)
def report_internal_error(task, exc):
_type, _value, _tb = sys.exc_info()
try:
_value = task.backend.prepare_exception(exc)
exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
warn(RuntimeWarning(
"Exception raised outside body: %r:\n%s" % (
exc, exc_info.traceback)))
return exc_info
finally:
del(_tb)