Skip to content
Permalink
8c5e9888ae
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
681 lines (573 sloc) 22.1 KB
"""Task request.
This module defines the :class:`Request` class, that specifies
how tasks are executed.
"""
import logging
import sys
from datetime import datetime
from time import monotonic, time
from weakref import ref
from billiard.common import TERM_SIGNAME
from kombu.utils.encoding import safe_repr, safe_str
from kombu.utils.objects import cached_property
from celery import signals
from celery.app.task import Context
from celery.app.trace import trace_task, trace_task_ret
from celery.exceptions import (Ignore, InvalidTaskError, Reject, Retry,
TaskRevokedError, Terminated,
TimeLimitExceeded, WorkerLostError)
from celery.platforms import signals as _signals
from celery.utils.functional import maybe, noop
from celery.utils.log import get_logger
from celery.utils.nodenames import gethostname
from celery.utils.serialization import get_pickled_exception
from celery.utils.time import maybe_iso8601, maybe_make_aware, timezone
from . import state
__all__ = ('Request',)
# pylint: disable=redefined-outer-name
# We cache globals and attribute lookups, so disable this warning.
IS_PYPY = hasattr(sys, 'pypy_version_info')
logger = get_logger(__name__)
debug, info, warn, error = (logger.debug, logger.info,
logger.warning, logger.error)
_does_info = False
_does_debug = False
def __optimize__():
# this is also called by celery.app.trace.setup_worker_optimizations
global _does_debug
global _does_info
_does_debug = logger.isEnabledFor(logging.DEBUG)
_does_info = logger.isEnabledFor(logging.INFO)
__optimize__() # noqa: E305
# Localize
tz_or_local = timezone.tz_or_local
send_revoked = signals.task_revoked.send
task_accepted = state.task_accepted
task_ready = state.task_ready
revoked_tasks = state.revoked
class Request:
"""A request for task execution."""
acknowledged = False
time_start = None
worker_pid = None
time_limits = (None, None)
_already_revoked = False
_terminate_on_ack = None
_apply_result = None
_tzlocal = None
if not IS_PYPY: # pragma: no cover
__slots__ = (
'_app', '_type', 'name', 'id', '_root_id', '_parent_id',
'_on_ack', '_body', '_hostname', '_eventer', '_connection_errors',
'_task', '_eta', '_expires', '_request_dict', '_on_reject', '_utc',
'_content_type', '_content_encoding', '_argsrepr', '_kwargsrepr',
'_args', '_kwargs', '_decoded', '__payload',
'__weakref__', '__dict__',
)
def __init__(self, message, on_ack=noop,
hostname=None, eventer=None, app=None,
connection_errors=None, request_dict=None,
task=None, on_reject=noop, body=None,
headers=None, decoded=False, utc=True,
maybe_make_aware=maybe_make_aware,
maybe_iso8601=maybe_iso8601, **opts):
self._message = message
self._request_dict = message.headers if headers is None else headers
self._body = message.body if body is None else body
self._app = app
self._utc = utc
self._decoded = decoded
if decoded:
self._content_type = self._content_encoding = None
else:
self._content_type, self._content_encoding = (
message.content_type, message.content_encoding,
)
self.__payload = self._body if self._decoded else message.payload
self.id = self._request_dict['id']
self._type = self.name = self._request_dict['task']
if 'shadow' in self._request_dict:
self.name = self._request_dict['shadow'] or self.name
self._root_id = self._request_dict.get('root_id')
self._parent_id = self._request_dict.get('parent_id')
timelimit = self._request_dict.get('timelimit', None)
if timelimit:
self.time_limits = timelimit
self._argsrepr = self._request_dict.get('argsrepr', '')
self._kwargsrepr = self._request_dict.get('kwargsrepr', '')
self._on_ack = on_ack
self._on_reject = on_reject
self._hostname = hostname or gethostname()
self._eventer = eventer
self._connection_errors = connection_errors or ()
self._task = task or self._app.tasks[self._type]
# timezone means the message is timezone-aware, and the only timezone
# supported at this point is UTC.
eta = self._request_dict.get('eta')
if eta is not None:
try:
eta = maybe_iso8601(eta)
except (AttributeError, ValueError, TypeError) as exc:
raise InvalidTaskError(
f'invalid ETA value {eta!r}: {exc}')
self._eta = maybe_make_aware(eta, self.tzlocal)
else:
self._eta = None
expires = self._request_dict.get('expires')
if expires is not None:
try:
expires = maybe_iso8601(expires)
except (AttributeError, ValueError, TypeError) as exc:
raise InvalidTaskError(
f'invalid expires value {expires!r}: {exc}')
self._expires = maybe_make_aware(expires, self.tzlocal)
else:
self._expires = None
delivery_info = message.delivery_info or {}
properties = message.properties or {}
self._delivery_info = {
'exchange': delivery_info.get('exchange'),
'routing_key': delivery_info.get('routing_key'),
'priority': properties.get('priority'),
'redelivered': delivery_info.get('redelivered'),
}
self._request_dict.update({
'reply_to': properties.get('reply_to'),
'correlation_id': properties.get('correlation_id'),
'hostname': self._hostname,
'delivery_info': self._delivery_info
})
# this is a reference pass to avoid memory usage burst
self._request_dict['args'], self._request_dict['kwargs'], _ = self.__payload
self._args = self._request_dict['args']
self._kwargs = self._request_dict['kwargs']
@property
def delivery_info(self):
return self._delivery_info
@property
def message(self):
return self._message
@property
def request_dict(self):
return self._request_dict
@property
def body(self):
return self._body
@property
def app(self):
return self._app
@property
def utc(self):
return self._utc
@property
def content_type(self):
return self._content_type
@property
def content_encoding(self):
return self._content_encoding
@property
def type(self):
return self._type
@property
def root_id(self):
return self._root_id
@property
def parent_id(self):
return self._parent_id
@property
def argsrepr(self):
return self._argsrepr
@property
def args(self):
return self._args
@property
def kwargs(self):
return self._kwargs
@property
def kwargsrepr(self):
return self._kwargsrepr
@property
def on_ack(self):
return self._on_ack
@property
def on_reject(self):
return self._on_reject
@on_reject.setter
def on_reject(self, value):
self._on_reject = value
@property
def hostname(self):
return self._hostname
@property
def eventer(self):
return self._eventer
@eventer.setter
def eventer(self, eventer):
self._eventer = eventer
@property
def connection_errors(self):
return self._connection_errors
@property
def task(self):
return self._task
@property
def eta(self):
return self._eta
@property
def expires(self):
return self._expires
@expires.setter
def expires(self, value):
self._expires = value
@property
def tzlocal(self):
if self._tzlocal is None:
self._tzlocal = self._app.conf.timezone
return self._tzlocal
@property
def store_errors(self):
return (not self.task.ignore_result or
self.task.store_errors_even_if_ignored)
@property
def task_id(self):
# XXX compat
return self.id
@task_id.setter # noqa
def task_id(self, value):
self.id = value
@property
def task_name(self):
# XXX compat
return self.name
@task_name.setter # noqa
def task_name(self, value):
self.name = value
@property
def reply_to(self):
# used by rpc backend when failures reported by parent process
return self._request_dict['reply_to']
@property
def correlation_id(self):
# used similarly to reply_to
return self._request_dict['correlation_id']
def execute_using_pool(self, pool, **kwargs):
"""Used by the worker to send this task to the pool.
Arguments:
pool (~celery.concurrency.base.TaskPool): The execution pool
used to execute this request.
Raises:
celery.exceptions.TaskRevokedError: if the task was revoked.
"""
task_id = self.id
task = self._task
if self.revoked():
raise TaskRevokedError(task_id)
time_limit, soft_time_limit = self.time_limits
result = pool.apply_async(
trace_task_ret,
args=(self._type, task_id, self._request_dict, self._body,
self._content_type, self._content_encoding),
accept_callback=self.on_accepted,
timeout_callback=self.on_timeout,
callback=self.on_success,
error_callback=self.on_failure,
soft_timeout=soft_time_limit or task.soft_time_limit,
timeout=time_limit or task.time_limit,
correlation_id=task_id,
)
# cannot create weakref to None
self._apply_result = maybe(ref, result)
return result
def execute(self, loglevel=None, logfile=None):
"""Execute the task in a :func:`~celery.app.trace.trace_task`.
Arguments:
loglevel (int): The loglevel used by the task.
logfile (str): The logfile used by the task.
"""
if self.revoked():
return
# acknowledge task as being processed.
if not self.task.acks_late:
self.acknowledge()
_, _, embed = self._payload
request = self._request_dict
# pylint: disable=unpacking-non-sequence
# payload is a property, so pylint doesn't think it's a tuple.
request.update({
'loglevel': loglevel,
'logfile': logfile,
'is_eager': False,
}, **embed or {})
retval, I, _, _ = trace_task(self.task, self.id, self._args, self._kwargs, request,
hostname=self._hostname, loader=self._app.loader,
app=self._app)
if I:
self.reject(requeue=False)
else:
self.acknowledge()
return retval
def maybe_expire(self):
"""If expired, mark the task as revoked."""
if self._expires:
now = datetime.now(self._expires.tzinfo)
if now > self._expires:
revoked_tasks.add(self.id)
return True
def terminate(self, pool, signal=None):
signal = _signals.signum(signal or TERM_SIGNAME)
if self.time_start:
pool.terminate_job(self.worker_pid, signal)
self._announce_revoked('terminated', True, signal, False)
else:
self._terminate_on_ack = pool, signal
if self._apply_result is not None:
obj = self._apply_result() # is a weakref
if obj is not None:
obj.terminate(signal)
def _announce_revoked(self, reason, terminated, signum, expired):
task_ready(self)
self.send_event('task-revoked',
terminated=terminated, signum=signum, expired=expired)
self.task.backend.mark_as_revoked(
self.id, reason, request=self._context,
store_result=self.store_errors,
)
self.acknowledge()
self._already_revoked = True
send_revoked(self.task, request=self._context,
terminated=terminated, signum=signum, expired=expired)
def revoked(self):
"""If revoked, skip task and mark state."""
expired = False
if self._already_revoked:
return True
if self._expires:
expired = self.maybe_expire()
if self.id in revoked_tasks:
info('Discarding revoked task: %s[%s]', self.name, self.id)
self._announce_revoked(
'expired' if expired else 'revoked', False, None, expired,
)
return True
return False
def send_event(self, type, **fields):
if self._eventer and self._eventer.enabled and self.task.send_events:
self._eventer.send(type, uuid=self.id, **fields)
def on_accepted(self, pid, time_accepted):
"""Handler called when task is accepted by worker pool."""
self.worker_pid = pid
# Convert monotonic time_accepted to absolute time
self.time_start = time() - (monotonic() - time_accepted)
task_accepted(self)
if not self.task.acks_late:
self.acknowledge()
self.send_event('task-started')
if _does_debug:
debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
if self._terminate_on_ack is not None:
self.terminate(*self._terminate_on_ack)
def on_timeout(self, soft, timeout):
"""Handler called if the task times out."""
if soft:
warn('Soft time limit (%ss) exceeded for %s[%s]',
timeout, self.name, self.id)
else:
task_ready(self)
error('Hard time limit (%ss) exceeded for %s[%s]',
timeout, self.name, self.id)
exc = TimeLimitExceeded(timeout)
self.task.backend.mark_as_failure(
self.id, exc, request=self._context,
store_result=self.store_errors,
)
if self.task.acks_late and self.task.acks_on_failure_or_timeout:
self.acknowledge()
def on_success(self, failed__retval__runtime, **kwargs):
"""Handler called if the task was successfully processed."""
failed, retval, runtime = failed__retval__runtime
if failed:
if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
raise retval.exception
return self.on_failure(retval, return_ok=True)
task_ready(self)
if self.task.acks_late:
self.acknowledge()
self.send_event('task-succeeded', result=retval, runtime=runtime)
def on_retry(self, exc_info):
"""Handler called if the task should be retried."""
if self.task.acks_late:
self.acknowledge()
self.send_event('task-retried',
exception=safe_repr(exc_info.exception.exc),
traceback=safe_str(exc_info.traceback))
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
"""Handler called if the task raised an exception."""
task_ready(self)
if isinstance(exc_info.exception, MemoryError):
raise MemoryError(f'Process got: {exc_info.exception}')
elif isinstance(exc_info.exception, Reject):
return self.reject(requeue=exc_info.exception.requeue)
elif isinstance(exc_info.exception, Ignore):
return self.acknowledge()
exc = exc_info.exception
if isinstance(exc, Retry):
return self.on_retry(exc_info)
# (acks_late) acknowledge after result stored.
requeue = False
if self.task.acks_late:
reject = (
self.task.reject_on_worker_lost and
isinstance(exc, WorkerLostError)
)
ack = self.task.acks_on_failure_or_timeout
if reject:
requeue = True
self.reject(requeue=requeue)
send_failed_event = False
elif ack:
self.acknowledge()
else:
# supporting the behaviour where a task failed and
# need to be removed from prefetched local queue
self.reject(requeue=False)
# These are special cases where the process would not have had time
# to write the result.
if isinstance(exc, Terminated):
self._announce_revoked(
'terminated', True, str(exc), False)
send_failed_event = False # already sent revoked event
elif not requeue and (isinstance(exc, WorkerLostError) or not return_ok):
# only mark as failure if task has not been requeued
self.task.backend.mark_as_failure(
self.id, exc, request=self._context,
store_result=self.store_errors,
)
if send_failed_event:
self.send_event(
'task-failed',
exception=safe_repr(get_pickled_exception(exc_info.exception)),
traceback=exc_info.traceback,
)
if not return_ok:
error('Task handler raised error: %r', exc,
exc_info=exc_info.exc_info)
def acknowledge(self):
"""Acknowledge task."""
if not self.acknowledged:
self._on_ack(logger, self._connection_errors)
self.acknowledged = True
def reject(self, requeue=False):
if not self.acknowledged:
self._on_reject(logger, self._connection_errors, requeue)
self.acknowledged = True
self.send_event('task-rejected', requeue=requeue)
def info(self, safe=False):
return {
'id': self.id,
'name': self.name,
'args': self._args,
'kwargs': self._kwargs,
'type': self._type,
'hostname': self._hostname,
'time_start': self.time_start,
'acknowledged': self.acknowledged,
'delivery_info': self.delivery_info,
'worker_pid': self.worker_pid,
}
def humaninfo(self):
return '{0.name}[{0.id}]'.format(self)
def __str__(self):
"""``str(self)``."""
return ' '.join([
self.humaninfo(),
f' ETA:[{self._eta}]' if self._eta else '',
f' expires:[{self._expires}]' if self._expires else '',
])
def __repr__(self):
"""``repr(self)``."""
return '<{}: {} {} {}>'.format(
type(self).__name__, self.humaninfo(),
self._argsrepr, self._kwargsrepr,
)
@cached_property
def _payload(self):
return self.__payload
@cached_property
def chord(self):
# used by backend.mark_as_failure when failure is reported
# by parent process
# pylint: disable=unpacking-non-sequence
# payload is a property, so pylint doesn't think it's a tuple.
_, _, embed = self._payload
return embed.get('chord')
@cached_property
def errbacks(self):
# used by backend.mark_as_failure when failure is reported
# by parent process
# pylint: disable=unpacking-non-sequence
# payload is a property, so pylint doesn't think it's a tuple.
_, _, embed = self._payload
return embed.get('errbacks')
@cached_property
def group(self):
# used by backend.on_chord_part_return when failures reported
# by parent process
return self._request_dict.get('group')
@cached_property
def _context(self):
"""Context (:class:`~celery.app.task.Context`) of this task."""
request = self._request_dict
# pylint: disable=unpacking-non-sequence
# payload is a property, so pylint doesn't think it's a tuple.
_, _, embed = self._payload
request.update(**embed or {})
return Context(request)
@cached_property
def group_index(self):
# used by backend.on_chord_part_return to order return values in group
return self._request_dict.get('group_index')
def create_request_cls(base, task, pool, hostname, eventer,
ref=ref, revoked_tasks=revoked_tasks,
task_ready=task_ready, trace=trace_task_ret):
default_time_limit = task.time_limit
default_soft_time_limit = task.soft_time_limit
apply_async = pool.apply_async
acks_late = task.acks_late
events = eventer and eventer.enabled
class Request(base):
def execute_using_pool(self, pool, **kwargs):
task_id = self.task_id
if (self.expires or task_id in revoked_tasks) and self.revoked():
raise TaskRevokedError(task_id)
time_limit, soft_time_limit = self.time_limits
result = apply_async(
trace,
args=(self.type, task_id, self.request_dict, self.body,
self.content_type, self.content_encoding),
accept_callback=self.on_accepted,
timeout_callback=self.on_timeout,
callback=self.on_success,
error_callback=self.on_failure,
soft_timeout=soft_time_limit or default_soft_time_limit,
timeout=time_limit or default_time_limit,
correlation_id=task_id,
)
# cannot create weakref to None
# pylint: disable=attribute-defined-outside-init
self._apply_result = maybe(ref, result)
return result
def on_success(self, failed__retval__runtime, **kwargs):
failed, retval, runtime = failed__retval__runtime
if failed:
if isinstance(retval.exception, (
SystemExit, KeyboardInterrupt)):
raise retval.exception
return self.on_failure(retval, return_ok=True)
task_ready(self)
if acks_late:
self.acknowledge()
if events:
self.send_event(
'task-succeeded', result=retval, runtime=runtime,
)
return Request