Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

It's working but messy

  • Loading branch information...
commit 77d6a1556cc349291abdbac13756acb8adc41d9b 1 parent b6daa31
@ask authored
View
8 celery/abstract.py
@@ -78,6 +78,8 @@ def bind_component(self, name, parent, **kwargs):
"""Bind component to parent object and this namespace."""
comp = self[name](parent, **kwargs)
comp.namespace = self
+ comp.logger = self.logger
+ comp.app = self.app
return comp
def import_module(self, module):
@@ -169,6 +171,12 @@ class Component(object):
#: This provides the default for :meth:`include_if`.
enabled = True
+ #: The current app (set by namespace when bound)
+ app = None
+
+ #: Logger (set by namespace when bound)
+ logger = None
+
def __init__(self, parent, **kwargs):
pass
View
511 celery/worker/consumer.py
@@ -10,128 +10,30 @@
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
-
-* :meth:`~Consumer.start` is an infinite loop, which only iterates
- again if the connection is lost. For each iteration (at start, or if the
- connection is lost) it calls :meth:`~Consumer.reset_connection`,
- and starts the consumer by calling :meth:`~Consumer.consume_messages`.
-
-* :meth:`~Consumer.reset_connection`, clears the internal queues,
- establishes a new connection to the broker, sets up the task
- consumer (+ QoS), and the broadcast remote control command consumer.
-
- Also if events are enabled it configures the event dispatcher and starts
- up the heartbeat thread.
-
-* Finally it can consume messages. :meth:`~Consumer.consume_messages`
- is simply an infinite loop waiting for events on the AMQP channels.
-
- Both the task consumer and the broadcast consumer uses the same
- callback: :meth:`~Consumer.receive_message`.
-
-* So for each message received the :meth:`~Consumer.receive_message`
- method is called, this checks the payload of the message for either
- a `task` key or a `control` key.
-
- If the message is a task, it verifies the validity of the message
- converts it to a :class:`celery.worker.job.Request`, and sends
- it to :meth:`~Consumer.on_task`.
-
- If the message is a control command the message is passed to
- :meth:`~Consumer.on_control`, which in turn dispatches
- the control command using the control dispatcher.
-
- It also tries to handle malformed or invalid messages properly,
- so the worker doesn't choke on them and die. Any invalid messages
- are acknowledged immediately and logged, so the message is not resent
- again, and again.
-
-* If the task has an ETA/countdown, the task is moved to the `eta_schedule`
- so the :class:`timer2.Timer` can schedule it at its
- deadline. Tasks without an eta are moved immediately to the `ready_queue`,
- so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
- to be sent to the pool.
-
-* When a task with an ETA is received the QoS prefetch count is also
- incremented, so another message can be reserved. When the ETA is met
- the prefetch count is decremented again, though this cannot happen
- immediately because amqplib doesn't support doing broker requests
- across threads. Instead the current prefetch count is kept as a
- shared counter, so as soon as :meth:`~Consumer.consume_messages`
- detects that the value has changed it will send out the actual
- QoS event to the broker.
-
-* Notice that when the connection is lost all internal queues are cleared
- because we can no longer ack the messages reserved in memory.
- However, this is not dangerous as the broker will resend them
- to another worker when the channel is closed.
-
-* **WARNING**: :meth:`~Consumer.stop` does not close the connection!
- This is because some pre-acked messages may be in processing,
- and they need to be finished before the channel is closed.
- For celeryd this means the pool must finish the tasks it has acked
- early, *then* close the connection.
-
"""
from __future__ import absolute_import
from __future__ import with_statement
-import logging
import socket
-import threading
-import warnings
-
-from kombu.utils.encoding import safe_repr
from .. import abstract
from ..app import app_or_default
-from ..datastructures import AttributeDict
-from ..exceptions import InvalidTaskError
from ..utils import timer2
from ..utils.functional import noop
-from . import state
-from .control import Panel
-from .heartbeat import Heart
-
RUN = 0x1
CLOSE = 0x2
-
-#: Prefetch count can't exceed short.
-PREFETCH_COUNT_MAX = 0xFFFF
-
-#: Error message for when an unregistered task is received.
-UNKNOWN_TASK_ERROR = """\
-Received unregistered task of type %s.
-The message has been ignored and discarded.
-
-Did you remember to import the module containing this task?
-Or maybe you are using relative imports?
-Please see http://bit.ly/gLye1c for more information.
-
-The full contents of the message body was:
-%s
-"""
-
-#: Error message for when an invalid task message is received.
-INVALID_TASK_ERROR = """\
-Received invalid task message: %s
-The message has been ignored and discarded.
-
-Please ensure your message conforms to the task
-message protocol as described here: http://bit.ly/hYj41y
-
-The full contents of the message body was:
-%s
-"""
-
-MESSAGE_REPORT_FMT = """\
-body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
-"""
+STOP = 0x3
class Namespace(abstract.Namespace):
name = "consumer"
+ builtin_boot_steps = ("celery.worker.tasks",
+ "celery.worker.pidbox",
+ "celery.worker.heartbeat")
+
+ def modules(self):
+ return self.builtin_boot_steps
class Component(abstract.StartStopComponent):
@@ -153,7 +55,7 @@ def create(self, w):
return c
-class Events(abstract.Component):
+class Events(abstract.StartStopComponent):
name = "consumer.events"
#requires = ("connection", )
@@ -161,7 +63,9 @@ def __init__(self, c):
c.event_dispatcher = None
def create(self, c):
- print("CREATE EVENTS")
+ return self
+
+ def start(self, c):
prev = c.event_dispatcher
c.event_dispatcher = c.app.events.Dispatcher(c.connection,
hostname=c.hostname,
@@ -171,101 +75,14 @@ def create(self, c):
c.event_dispatcher.copy_buffer(prev)
c.event_dispatcher.flush()
-
-class Heartbeat(abstract.StartStopComponent):
- """The thread that sends event heartbeats at regular intervals.
-
- The heartbeats are used by monitors to detect that a worker
- went offline."""
-
- name = "consumer.heartbeat"
- requires = ("events", )
-
- def __init__(self, c):
- c.heart = None
-
- def create(self, c):
- print("CREATE HB")
- heart = c.heart = Heart(c.priority_timer, c.event_dispatcher)
- return heart
-
-
-class QoS(object):
- """Quality of Service for Channel.
-
- For thread-safe increment/decrement of a channels prefetch count value.
-
- :param consumer: A :class:`kombu.messaging.Consumer` instance.
- :param initial_value: Initial prefetch count value.
- :param logger: Logger used to log debug messages.
-
- """
- prev = None
-
- def __init__(self, consumer, initial_value, logger):
- self.consumer = consumer
- self.logger = logger
- self._mutex = threading.RLock()
- self.value = initial_value
-
- def increment(self, n=1):
- """Increment the current prefetch count value by n."""
- with self._mutex:
- if self.value:
- new_value = self.value + max(n, 0)
- self.value = self.set(new_value)
- return self.value
-
- def _sub(self, n=1):
- assert self.value - n > 1
- self.value -= n
-
- def decrement(self, n=1):
- """Decrement the current prefetch count value by n."""
- with self._mutex:
- if self.value:
- self._sub(n)
- self.set(self.value)
- return self.value
-
- def decrement_eventually(self, n=1):
- """Decrement the value, but do not update the qos.
-
- The MainThread will be responsible for calling :meth:`update`
- when necessary.
-
- """
- with self._mutex:
- if self.value:
- self._sub(n)
-
- def set(self, pcount):
- """Set channel prefetch_count setting."""
- if pcount != self.prev:
- new_value = pcount
- if pcount > PREFETCH_COUNT_MAX:
- self.logger.warning("QoS: Disabled: prefetch_count exceeds %r",
- PREFETCH_COUNT_MAX)
- new_value = 0
- self.logger.debug("basic.qos: prefetch_count->%s", new_value)
- self.consumer.qos(prefetch_count=new_value)
- self.prev = pcount
- return pcount
-
- def update(self):
- """Update prefetch count with current value."""
- with self._mutex:
- return self.set(self.value)
+ def stop(self, c):
+ if c.event_dispatcher:
+ self.logger.debug("Shutting down event dispatcher...")
+ ev, c.event_dispatcher = c.event_dispatcher, None
+ ev.close()
class Consumer(object):
- """Listen for messages received from the broker and
- move them to the ready queue for task processing.
-
- :param ready_queue: See :attr:`ready_queue`.
- :param eta_schedule: See :attr:`eta_schedule`.
-
- """
#: The queue that holds tasks ready for immediate processing.
ready_queue = None
@@ -284,29 +101,12 @@ class Consumer(object):
#: The current hostname. Defaults to the system hostname.
hostname = None
- #: Initial QoS prefetch count for the task channel.
- initial_prefetch_count = 0
-
- #: A :class:`celery.events.EventDispatcher` for sending events.
- event_dispatcher = None
-
#: The logger instance to use. Defaults to the default Celery logger.
logger = None
#: The broker connection.
connection = None
- #: The consumer used to consume task messages.
- task_consumer = None
-
- #: The consumer used to consume broadcast commands.
- broadcast_consumer = None
-
- #: The process mailbox (kombu pidbox node).
- pidbox_node = None
- _pidbox_node_shutdown = None # used for greenlets
- _pidbox_node_stopped = None # used for greenlets
-
#: The current worker pool instance.
pool = None
@@ -323,41 +123,22 @@ def __init__(self, ready_queue, eta_schedule, logger,
priority_timer=None, controller=None):
self.app = app_or_default(app)
self.connection = None
- self.task_consumer = None
self.controller = controller
- self.broadcast_consumer = None
self.ready_queue = ready_queue
self.eta_schedule = eta_schedule
self.send_events = send_events
self.init_callback = init_callback
self.logger = logger
self.hostname = hostname or socket.gethostname()
- self.initial_prefetch_count = initial_prefetch_count
self.pool = pool
self.priority_timer = priority_timer or timer2.default_timer
- pidbox_state = AttributeDict(app=self.app,
- logger=logger,
- hostname=self.hostname,
- listener=self, # pre 2.2
- consumer=self)
- self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
- state=pidbox_state,
- handlers=Panel.data)
conninfo = self.app.broker_connection()
self.connection_errors = conninfo.connection_errors
self.channel_errors = conninfo.channel_errors
- self._does_info = self.logger.isEnabledFor(logging.INFO)
- self.strategies = {}
-
self.components = []
self.namespace = None
- def update_strategies(self):
- S = self.strategies
- for task in self.app.tasks.itervalues():
- S[task.name] = task.start_strategy(self.app, self)
-
def start(self):
"""Start the consumer.
@@ -366,132 +147,35 @@ def start(self):
consuming messages.
"""
+ reset = self.reset_connection
self.init_callback(self)
while self._state != CLOSE:
try:
- self.reset_connection()
- self.consume_messages()
+ reset()
+ drain_events = self.connection.drain_events
+
+ while self._state != CLOSE and self.connection:
+ #if qos.prev != qos.value:
+ # qos.update()
+ try:
+ drain_events(timeout=1)
+ except socket.timeout:
+ pass
+ except socket.error:
+ if self._state != CLOSE:
+ raise
except self.connection_errors + self.channel_errors:
self.logger.error("Consumer: Connection to broker lost."
+ " Trying to re-establish the connection...",
exc_info=True)
- def consume_messages(self):
- """Consume messages forever (or until an exception is raised)."""
- self._debug("Starting message consumer...")
- self.task_consumer.consume()
- self._debug("Ready to accept tasks!")
-
- while self._state != CLOSE and self.connection:
- if self.qos.prev != self.qos.value:
- self.qos.update()
- try:
- self.connection.drain_events(timeout=1)
- except socket.timeout:
- pass
- except socket.error:
- if self._state != CLOSE:
- raise
-
- def on_task(self, task):
- """Handle received task.
-
- If the task has an `eta` we enter it into the ETA schedule,
- otherwise we move it the ready queue for immediate processing.
-
- """
-
- if task.revoked():
- return
-
- if self._does_info:
- self.logger.info("Got task from broker: %s", task.shortinfo())
-
- if self.event_dispatcher.enabled:
- self.event_dispatcher.send("task-received", uuid=task.id,
- name=task.name, args=safe_repr(task.args),
- kwargs=safe_repr(task.kwargs),
- retries=task.request_dict.get("retries", 0),
- eta=task.eta and task.eta.isoformat(),
- expires=task.expires and task.expires.isoformat())
-
- if task.eta:
- try:
- eta = timer2.to_timestamp(task.eta)
- except OverflowError, exc:
- self.logger.error(
- "Couldn't convert eta %s to timestamp: %r. Task: %r",
- task.eta, exc, task.info(safe=True),
- exc_info=True)
- task.acknowledge()
- else:
- self.qos.increment()
- self.eta_schedule.apply_at(eta,
- self.apply_eta_task, (task, ))
- else:
- state.task_reserved(task)
- self.ready_queue.put(task)
-
- def on_control(self, body, message):
- """Process remote control command message."""
- try:
- self.pidbox_node.handle_message(body, message)
- except KeyError, exc:
- self.logger.error("No such control command: %s", exc)
- except Exception, exc:
- self.logger.error(
- "Error occurred while handling control command: %r",
- exc, exc_info=True)
- self.reset_pidbox_node()
-
- def apply_eta_task(self, task):
- """Method called by the timer to apply a task with an
- ETA/countdown."""
- state.task_reserved(task)
- self.ready_queue.put(task)
- self.qos.decrement_eventually()
-
- def _message_report(self, body, message):
- return MESSAGE_REPORT_FMT % (safe_repr(body),
- safe_repr(message.content_type),
- safe_repr(message.content_encoding),
- safe_repr(message.delivery_info))
-
- def receive_message(self, body, message):
- """Handles incoming messages.
-
- :param body: The message body.
- :param message: The kombu message object.
-
- """
- try:
- name = body["task"]
- except (KeyError, TypeError):
- warnings.warn(RuntimeWarning(
- "Received and deleted unknown message. Wrong destination?!? \
- the full contents of the message body was: %s" % (
- self._message_report(body, message), )))
- message.reject_log_error(self.logger, self.connection_errors)
- return
-
- try:
- self.strategies[name](message, body, message.ack_log_error)
- except KeyError, exc:
- self.logger.error(UNKNOWN_TASK_ERROR, exc, safe_repr(body),
- exc_info=True)
- message.reject_log_error(self.logger, self.connection_errors)
- except InvalidTaskError, exc:
- self.logger.error(INVALID_TASK_ERROR, str(exc), safe_repr(body),
- exc_info=True)
- message.reject_log_error(self.logger, self.connection_errors)
-
- def maybe_conn_error(self, fun):
+ def maybe_conn_error(self, fun, *args, **kwargs):
"""Applies function but ignores any connection or channel
errors raised."""
try:
- fun()
+ fun(*args, **kwargs)
except (AttributeError, ) + \
self.connection_errors + \
self.channel_errors:
@@ -499,123 +183,25 @@ def maybe_conn_error(self, fun):
def close_connection(self):
"""Closes the current broker connection and all open channels."""
-
# We must set self.connection to None here, so
# that the green pidbox thread exits.
connection, self.connection = self.connection, None
- if self.task_consumer:
- self._debug("Closing consumer channel...")
- self.task_consumer = \
- self.maybe_conn_error(self.task_consumer.close)
-
- self.stop_pidbox_node()
-
if connection:
self._debug("Closing broker connection...")
self.maybe_conn_error(connection.close)
- def stop_consumers(self, close_connection=True):
- """Stop consuming tasks and broadcast commands, also stops
- the heartbeat thread and event dispatcher.
-
- :keyword close_connection: Set to False to skip closing the broker
- connection.
-
- """
- if not self._state == RUN:
- return
-
- for component in reversed(self.components):
- component.stop()
-
- self._debug("Cancelling task consumer...")
- if self.task_consumer:
- self.maybe_conn_error(self.task_consumer.cancel)
-
- if self.event_dispatcher:
- self._debug("Shutting down event dispatcher...")
- self.event_dispatcher = \
- self.maybe_conn_error(self.event_dispatcher.close)
-
- self._debug("Cancelling broadcast consumer...")
- if self.broadcast_consumer:
- self.maybe_conn_error(self.broadcast_consumer.cancel)
-
- if close_connection:
- self.close_connection()
-
- def on_decode_error(self, message, exc):
- """Callback called if an error occurs while decoding
- a message received.
-
- Simply logs the error and acknowledges the message so it
- doesn't enter a loop.
-
- :param message: The message with errors.
- :param exc: The original exception instance.
-
- """
- self.logger.critical(
- "Can't decode message body: %r (type:%r encoding:%r raw:%r')",
- exc, message.content_type, message.content_encoding,
- safe_repr(message.body))
- message.ack()
-
- def reset_pidbox_node(self):
- """Sets up the process mailbox."""
- self.stop_pidbox_node()
- # close previously opened channel if any.
- if self.pidbox_node.channel:
- try:
- self.pidbox_node.channel.close()
- except self.connection_errors + self.channel_errors:
- pass
-
- if self.pool is not None and self.pool.is_green:
- return self.pool.spawn_n(self._green_pidbox_node)
- self.pidbox_node.channel = self.connection.channel()
- self.broadcast_consumer = self.pidbox_node.listen(
- callback=self.on_control)
- self.broadcast_consumer.consume()
-
- def stop_pidbox_node(self):
- if self._pidbox_node_stopped:
- self._pidbox_node_shutdown.set()
- self._debug("Waiting for broadcast thread to shutdown...")
- self._pidbox_node_stopped.wait()
- self._pidbox_node_stopped = self._pidbox_node_shutdown = None
- elif self.broadcast_consumer:
- self._debug("Closing broadcast channel...")
- self.broadcast_consumer = \
- self.maybe_conn_error(self.broadcast_consumer.channel.close)
-
- def _green_pidbox_node(self):
- """Sets up the process mailbox when running in a greenlet
- environment."""
- # THIS CODE IS TERRIBLE
- # Luckily work has already started rewriting the Consumer for 3.0.
- self._pidbox_node_shutdown = threading.Event()
- self._pidbox_node_stopped = threading.Event()
- try:
- with self._open_connection() as conn:
- self.pidbox_node.channel = conn.default_channel
- self.broadcast_consumer = self.pidbox_node.listen(
- callback=self.on_control)
- with self.broadcast_consumer:
- while not self._pidbox_node_shutdown.isSet():
- try:
- conn.drain_events(timeout=1.0)
- except socket.timeout:
- pass
- finally:
- self._pidbox_node_stopped.set()
+ def stop_consumers(self):
+ if self._state != STOP:
+ print("STOP CONSUMERS: %r" % (self._state == RUN, ))
+ for component in reversed(self.components):
+ print("STOPPING COMPONENT: %r" % (component, ))
+ self.maybe_conn_error(component.stop, self)
def reset_connection(self):
"""Re-establish the broker connection and set up consumers,
heartbeat and the event dispatcher."""
self._debug("Re-establishing connection to the broker...")
- self.stop_consumers()
# Clear internal queues to get rid of old messages.
# They can't be acked anyway, as a delivery tag is specific
@@ -626,18 +212,6 @@ def reset_connection(self):
# Re-establish the broker connection and setup the task consumer.
self.connection = self._open_connection()
self._debug("Connection established.")
- self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
- on_decode_error=self.on_decode_error)
- # QoS: Reset prefetch window.
- self.qos = QoS(self.task_consumer,
- self.initial_prefetch_count, self.logger)
- self.qos.update()
-
- # receive_message handles incoming messages.
- self.task_consumer.register_callback(self.receive_message)
-
- # Setup the process mailbox.
- self.reset_pidbox_node()
self.components = []
self.namespace = Namespace(app=self.app,
@@ -645,12 +219,8 @@ def reset_connection(self):
print("STARTING COMP: %r" % (self.components, ))
for component in self.components:
print("STARTING: %r" % (component, ))
- component.start()
+ component.start(self)
- # reload all task's execution strategies.
- self.update_strategies()
-
- # We're back!
self._state = RUN
def _open_connection(self):
@@ -689,7 +259,8 @@ def stop(self):
# anymore.
self._state = CLOSE
self._debug("Stopping consumers...")
- self.stop_consumers(close_connection=False)
+ self.stop_consumers()
+ self._state = STOP
@property
def info(self):
View
19 celery/worker/heartbeat.py
@@ -12,9 +12,28 @@
"""
from __future__ import absolute_import
+from .. import abstract
+
from .state import SOFTWARE_INFO
+class Component(abstract.StartStopComponent):
+ name = "consumer.heartbeat"
+ requires = ("events", )
+ heart = None
+
+ def create(self, c):
+ return self
+
+ def start(self, c):
+ self.heart = Heart(c.priority_timer, c.event_dispatcher)
+ self.heart.start()
+
+ def stop(self, c):
+ if self.heart:
+ self.heart.stop()
+
+
class Heart(object):
"""Timer sending heartbeats at regular intervals.
View
108 celery/worker/pidbox.py
@@ -0,0 +1,108 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import socket
+import threading
+
+from .. import abstract
+from ..datastructures import AttributeDict
+
+from .control import Panel
+
+
+class Component(abstract.StartStopComponent):
+ name = "consumer.pidbox"
+ requires = ("heartbeat", )
+
+ def __init__(self, c, pool=None, **kwargs):
+ self.pool = pool
+
+ def create(self, c):
+ return self
+
+ def start(self, c):
+ self.pidbox = Pidbox(connection=c.connection,
+ app=self.app, logger=self.logger,
+ hostname=c.hostname,
+ state=AttributeDict(app=self.app,
+ logger=self.logger,
+ hostname=c.hostname,
+ listener=c, consumer=c))
+ self.pidbox.start()
+
+ def stop(self, c):
+ self.pidbox.stop()
+
+
+class Pidbox(object):
+ consumer = None
+
+ def __init__(self, connection, app=None, hostname=None, state=None,
+ logger=None, handlers=None):
+ self.app = app
+ self.connection = connection
+ self.logger = logger
+ self.errors = connection.connection_errors + connection.channel_errors
+ self.node = self.app.control.mailbox.Node(hostname, state=state,
+ handlers=Panel.data)
+
+ def reset(self):
+ self.stop()
+ self.start()
+
+ def start(self):
+ # close previously opened channel if any.
+ if self.node.channel:
+ try:
+ self.node.channel.close()
+ except self.errors:
+ pass
+
+ self.node.channel = self.connection.channel()
+ self.consumer = self.node.listen(callback=self.on_control)
+ self.consumer.consume()
+
+ def stop(self):
+ consumer, self.consumer = self.consumer, None
+ if consumer:
+ self.logger.debug("Closing broadcast channel...")
+ consumer.cancel()
+ consumer.channel.close()
+
+ def on_control(self, body, message):
+ """Process remote control command message."""
+ try:
+ self.node.handle_message(body, message)
+ except KeyError, exc:
+ self.logger.error("No such control command: %s", exc)
+ except Exception, exc:
+ self.logger.error(
+ "Error occurred while handling control command: %r",
+ exc, exc_info=True)
+ self.reset()
+
+
+class GreenPidbox(Pidbox):
+ _stopped = _shutdown = None
+
+ def stop(self):
+ self._shutdown.set()
+ self._debug("Waiting for broadcast thread to shutdown...")
+ self._stopped.wait()
+ self._stopped = self._shutdown = None
+
+ def start(self):
+ self._shutdown = threading.Event()
+ self._stopped = threading.Event()
+ try:
+ with self.connection.clone() as conn:
+ self.node.channel = conn.default_channel
+ self.consumer = self.node.listen(callback=self.on_control)
+ with self.consumer:
+ while not self._shutdown.isSet():
+ try:
+ conn.drain_events(timeout=1.0)
+ except socket.timeout:
+ pass
+ finally:
+ self._stopped.set()
View
72 celery/worker/qos.py
@@ -0,0 +1,72 @@
+import threading
+
+#: Prefetch count can't exceed short.
+PREFETCH_COUNT_MAX = 0xFFFF
+
+
+class QoS(object):
+ """Quality of Service for Channel.
+
+ For thread-safe increment/decrement of a channels prefetch count value.
+
+ :param consumer: A :class:`kombu.messaging.Consumer` instance.
+ :param initial_value: Initial prefetch count value.
+ :param logger: Logger used to log debug messages.
+
+ """
+ prev = None
+
+ def __init__(self, consumer, initial_value, logger):
+ self.consumer = consumer
+ self.logger = logger
+ self._mutex = threading.RLock()
+ self.value = initial_value
+
+ def increment(self, n=1):
+ """Increment the current prefetch count value by n."""
+ with self._mutex:
+ if self.value:
+ new_value = self.value + max(n, 0)
+ self.value = self.set(new_value)
+ return self.value
+
+ def _sub(self, n=1):
+ assert self.value - n > 1
+ self.value -= n
+
+ def decrement(self, n=1):
+ """Decrement the current prefetch count value by n."""
+ with self._mutex:
+ if self.value:
+ self._sub(n)
+ self.set(self.value)
+ return self.value
+
+ def decrement_eventually(self, n=1):
+ """Decrement the value, but do not update the qos.
+
+ The MainThread will be responsible for calling :meth:`update`
+ when necessary.
+
+ """
+ with self._mutex:
+ if self.value:
+ self._sub(n)
+
+ def set(self, pcount):
+ """Set channel prefetch_count setting."""
+ if pcount != self.prev:
+ new_value = pcount
+ if pcount > PREFETCH_COUNT_MAX:
+ self.logger.warning("QoS: Disabled: prefetch_count exceeds %r",
+ PREFETCH_COUNT_MAX)
+ new_value = 0
+ self.logger.debug("basic.qos: prefetch_count->%s", new_value)
+ self.consumer.qos(prefetch_count=new_value)
+ self.prev = pcount
+ return pcount
+
+ def update(self):
+ """Update prefetch count with current value."""
+ with self._mutex:
+ return self.set(self.value)
View
2  celery/worker/strategy.py
@@ -6,7 +6,7 @@
def default(task, app, consumer):
logger = consumer.logger
hostname = consumer.hostname
- eventer = consumer.event_dispatcher
+ eventer = consumer.eventer
Req = Request
handle = consumer.on_task
connection_errors = consumer.connection_errors
View
213 celery/worker/tasks.py
@@ -0,0 +1,213 @@
+from __future__ import absolute_import
+
+import logging
+import warnings
+
+from kombu.utils.encoding import safe_repr
+
+from .. import abstract
+from ..exceptions import InvalidTaskError
+from ..utils import timer2
+
+from . import state
+from .qos import QoS
+
+#: Error message for when an unregistered task is received.
+UNKNOWN_TASK_ERROR = """\
+Received unregistered task of type %s.
+The message has been ignored and discarded.
+
+Did you remember to import the module containing this task?
+Or maybe you are using relative imports?
+Please see http://bit.ly/gLye1c for more information.
+
+The full contents of the message body was:
+%s
+"""
+
+#: Error message for when an invalid task message is received.
+INVALID_TASK_ERROR = """\
+Received invalid task message: %s
+The message has been ignored and discarded.
+
+Please ensure your message conforms to the task
+message protocol as described here: http://bit.ly/hYj41y
+
+The full contents of the message body was:
+%s
+"""
+
+MESSAGE_REPORT_FMT = """\
+body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
+"""
+
+
+class ConsumerComponent(abstract.StartStopComponent):
+ name = "consumer.tasks"
+ requires = ("heartbeat", )
+
+ def __init__(self, c, initial_prefetch_count=2, **kwargs):
+ self.initial_prefetch_count = initial_prefetch_count
+
+ def create(self, c):
+ return self
+
+ def start(self, c):
+ c.task_consumer = TaskConsumer(c.connection, hostname=c.hostname,
+ eventer=c.event_dispatcher,
+ prefetch_count=self.initial_prefetch_count,
+ logger=self.logger, app=self.app,
+ ready_queue=c.ready_queue,
+ eta_schedule=c.eta_schedule)
+ c.task_consumer.start()
+
+ def stop(self, c):
+ c.task_consumer.stop()
+
+
+class TaskConsumer(object):
+
+ def __init__(self, connection, hostname=None, eventer=None,
+ prefetch_count=None, logger=None, app=None,
+ ready_queue=None, eta_schedule=None):
+ self.app = app
+ self.connection = connection
+ self.hostname = hostname
+ self.eventer = eventer
+ self.prefetch_count = prefetch_count
+ self.logger = logger
+ self.ready_queue = ready_queue
+ self.eta_schedule = eta_schedule
+ self.strategies = {}
+ self.connection_errors = self.connection.connection_errors
+ self._does_info = self.logger.isEnabledFor(logging.INFO)
+
+ self._consumer = None
+ self.qos = None
+
+ def start(self, *args):
+ print("STARTING CONSUMER")
+ self.update_strategies()
+ self._consumer = self.app.amqp.get_task_consumer(self.connection,
+ on_decode_error=self.on_decode_error)
+ # QoS: Reset prefetch window.
+ self.qos = QoS(self._consumer, self.prefetch_count, self.logger)
+ self.qos.update()
+
+ # receive_message handles incoming messages.
+ self._consumer.register_callback(self.receive_message)
+ self._consumer.consume()
+
+ def stop(self, *args):
+ if self._consumer:
+ self.logger.debug("Cancelling task consumer...")
+ try:
+ self._consumer.cancel()
+
+ print("STOPPING CONSUMER")
+ self.logger.debug("Closing consumer channel...")
+ self._consumer.close()
+ finally:
+ self._consumer = None
+
+ def update_strategies(self):
+ self.strategies.update(dict((task.name,
+ task.start_strategy(self.app, self))
+ for task in self.app.tasks.itervalues()))
+
+ def on_decode_error(self, message, exc):
+ """Callback called if an error occurs while decoding
+ a message received.
+
+ Simply logs the error and acknowledges the message so it
+ doesn't enter a loop.
+
+ :param message: The message with errors.
+ :param exc: The original exception instance.
+
+ """
+ self.logger.critical(
+ "Can't decode message body: %r (type:%r encoding:%r raw:%r')",
+ exc, message.content_type, message.content_encoding,
+ safe_repr(message.body))
+ message.ack()
+
+ def receive_message(self, body, message):
+ """Handles incoming messages.
+
+ :param body: The message body.
+ :param message: The kombu message object.
+
+ """
+ try:
+ name = body["task"]
+ except (KeyError, TypeError):
+ warnings.warn(RuntimeWarning(
+ "Received and deleted unknown message. Wrong destination?!? \
+ the full contents of the message body was: %s" % (
+ self._message_report(body, message), )))
+ message.reject_log_error(self.logger, self.connection_errors)
+ return
+
+ try:
+ self.strategies[name](message, body, message.ack_log_error)
+ except KeyError, exc:
+ self.logger.error(UNKNOWN_TASK_ERROR, exc, safe_repr(body),
+ exc_info=True)
+ message.reject_log_error(self.logger, self.connection_errors)
+ except InvalidTaskError, exc:
+ self.logger.error(INVALID_TASK_ERROR, str(exc), safe_repr(body),
+ exc_info=True)
+ message.reject_log_error(self.logger, self.connection_errors)
+
+ def _message_report(self, body, message):
+ return MESSAGE_REPORT_FMT % (safe_repr(body),
+ safe_repr(message.content_type),
+ safe_repr(message.content_encoding),
+ safe_repr(message.delivery_info))
+
+ def on_task(self, task):
+ """Handle received task.
+
+ If the task has an `eta` we enter it into the ETA schedule,
+ otherwise we move it the ready queue for immediate processing.
+
+ """
+
+ if task.revoked():
+ return
+
+ if self._does_info:
+ self.logger.info("Got task from broker: %s", task.shortinfo())
+
+ if self.eventer.enabled:
+ self.eventer.send("task-received", uuid=task.id,
+ name=task.name, args=safe_repr(task.args),
+ kwargs=safe_repr(task.kwargs),
+ retries=task.request_dict.get("retries", 0),
+ eta=task.eta and task.eta.isoformat(),
+ expires=task.expires and task.expires.isoformat())
+
+ if task.eta:
+ try:
+ eta = timer2.to_timestamp(task.eta)
+ except OverflowError, exc:
+ self.logger.error(
+ "Couldn't convert eta %s to timestamp: %r. Task: %r",
+ task.eta, exc, task.info(safe=True),
+ exc_info=True)
+ task.acknowledge()
+ else:
+ self.qos.increment()
+ self.eta_schedule.apply_at(eta,
+ self.apply_eta_task, (task, ))
+ else:
+ state.task_reserved(task)
+ self.ready_queue.put(task)
+
+ def apply_eta_task(self, task):
+ """Method called by the timer to apply a task with an
+ ETA/countdown."""
+ state.task_reserved(task)
+ self.ready_queue.put(task)
+ self.qos.decrement_eventually()
Please sign in to comment.
Something went wrong with that request. Please try again.