forked from celery/celery
/
mediator.py
85 lines (62 loc) · 2.36 KB
/
mediator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# -*- coding: utf-8 -*-
"""
celery.worker.mediator
~~~~~~~~~~~~~~~~~~~~~~
The mediator is an internal thread that moves tasks
from an internal :class:`Queue` to the worker pool.
This is only used if rate limits are enabled, as it moves
messages from the rate limited queue (which holds tasks
that are allowed to be processed) to the pool. Disabling
rate limits will also disable this machinery,
and can improve performance.
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import logging
from Queue import Empty
from celery.app import app_or_default
from celery.utils.threads import bgThread
from celery.utils.log import get_logger
from .abstract import StartStopComponent
logger = get_logger(__name__)
class WorkerComponent(StartStopComponent):
name = "worker.mediator"
requires = ("pool", "queues", )
def __init__(self, w, **kwargs):
w.mediator = None
def include_if(self, w):
return w.start_mediator
def create(self, w):
m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,
app=w.app, callback=w.process_task)
return m
class Mediator(bgThread):
#: The task queue, a :class:`~Queue.Queue` instance.
ready_queue = None
#: Callback called when a task is obtained.
callback = None
def __init__(self, ready_queue, callback, app=None, **kw):
self.app = app_or_default(app)
self.ready_queue = ready_queue
self.callback = callback
self._does_debug = logger.isEnabledFor(logging.DEBUG)
super(Mediator, self).__init__()
def body(self):
try:
task = self.ready_queue.get(timeout=1.0)
except Empty:
return
if task.revoked():
return
if self._does_debug:
logger.debug("Mediator: Running callback for task: %s[%s]",
task.name, task.id)
try:
self.callback(task)
except Exception, exc:
logger.error("Mediator callback raised exception %r",
exc, exc_info=True,
extra={"data": {"id": task.id,
"name": task.name,
"hostname": task.hostname}})