/
loops.py
123 lines (101 loc) · 4.29 KB
/
loops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""The consumers highly-optimized inner loop."""
from __future__ import absolute_import, unicode_literals
import errno
import socket
from celery import bootsteps
from celery.exceptions import WorkerShutdown, WorkerTerminate, WorkerLostError
from celery.utils.log import get_logger
from . import state
__all__ = ['asynloop', 'synloop']
# pylint: disable=redefined-outer-name
# We cache globals and attribute lookups, so disable this warning.
logger = get_logger(__name__)
def _quick_drain(connection, timeout=0.1):
try:
connection.drain_events(timeout=timeout)
except Exception as exc: # pylint: disable=broad-except
exc_errno = getattr(exc, 'errno', None)
if exc_errno is not None and exc_errno != errno.EAGAIN:
raise
def _enable_amqheartbeats(timer, connection, rate=2.0):
if connection:
tick = connection.heartbeat_check
heartbeat = connection.get_heartbeat_interval() # negotiated
if heartbeat and connection.supports_heartbeats:
timer.call_repeatedly(heartbeat / rate, tick, (rate,))
def asynloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0):
"""Non-blocking event loop."""
RUN = bootsteps.RUN
update_qos = qos.update
errors = connection.connection_errors
on_task_received = obj.create_task_handler()
_enable_amqheartbeats(hub.timer, connection, rate=hbrate)
consumer.on_message = on_task_received
obj.controller.register_with_event_loop(hub)
obj.register_with_event_loop(hub)
consumer.consume()
obj.on_ready()
# did_start_ok will verify that pool processes were able to start,
# but this will only work the first time we start, as
# maxtasksperchild will mess up metrics.
if not obj.restart_count and not obj.pool.did_start_ok():
raise WorkerLostError('Could not start worker processes')
# consumer.consume() may have prefetched up to our
# limit - drain an event so we're in a clean state
# prior to starting our event loop.
if connection.transport.driver_type == 'amqp':
hub.call_soon(_quick_drain, connection)
# FIXME: Use loop.run_forever
# Tried and works, but no time to test properly before release.
hub.propagate_errors = errors
loop = hub.create_loop()
try:
while blueprint.state == RUN and obj.connection:
# shutdown if signal handlers told us to.
should_stop, should_terminate = (
state.should_stop, state.should_terminate,
)
# False == EX_OK, so must use is not False
if should_stop is not None and should_stop is not False:
raise WorkerShutdown(should_stop)
elif should_terminate is not None and should_stop is not False:
raise WorkerTerminate(should_terminate)
# We only update QoS when there's no more messages to read.
# This groups together qos calls, and makes sure that remote
# control commands will be prioritized over task messages.
if qos.prev != qos.value:
update_qos()
try:
next(loop)
except StopIteration:
loop = hub.create_loop()
finally:
try:
hub.reset()
except Exception as exc: # pylint: disable=broad-except
logger.exception(
'Error cleaning up after event loop: %r', exc)
def synloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0, **kwargs):
"""Fallback blocking event loop for transports that doesn't support AIO."""
RUN = bootsteps.RUN
on_task_received = obj.create_task_handler()
perform_pending_operations = obj.perform_pending_operations
if getattr(obj.pool, 'is_green', False):
_enable_amqheartbeats(obj.timer, connection, rate=hbrate)
consumer.on_message = on_task_received
consumer.consume()
obj.on_ready()
while blueprint.state == RUN and obj.connection:
state.maybe_shutdown()
if qos.prev != qos.value:
qos.update()
try:
perform_pending_operations()
connection.drain_events(timeout=2.0)
except socket.timeout:
pass
except socket.error:
if blueprint.state == RUN:
raise