-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
pidbox.py
122 lines (98 loc) · 3.54 KB
/
pidbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Worker Pidbox (remote control)."""
import socket
import threading
from kombu.common import ignore_errors
from kombu.utils.encoding import safe_str
from celery.utils.collections import AttributeDict
from celery.utils.functional import pass1
from celery.utils.log import get_logger
from . import control
__all__ = ('Pidbox', 'gPidbox')
logger = get_logger(__name__)
debug, error, info = logger.debug, logger.error, logger.info
class Pidbox:
"""Worker mailbox."""
consumer = None
def __init__(self, c):
self.c = c
self.hostname = c.hostname
self.node = c.app.control.mailbox.Node(
safe_str(c.hostname),
handlers=control.Panel.data,
state=AttributeDict(
app=c.app,
hostname=c.hostname,
consumer=c,
tset=pass1 if c.controller.use_eventloop else set),
)
self._forward_clock = self.c.app.clock.forward
def on_message(self, body, message):
# just increase clock as clients usually don't
# have a valid clock to adjust with.
self._forward_clock()
try:
self.node.handle_message(body, message)
except KeyError as exc:
error('No such control command: %s', exc)
except Exception as exc:
error('Control command error: %r', exc, exc_info=True)
self.reset()
def start(self, c):
self.node.channel = c.connection.channel()
self.consumer = self.node.listen(callback=self.on_message)
self.consumer.on_decode_error = c.on_decode_error
def on_stop(self):
pass
def stop(self, c):
self.on_stop()
self.consumer = self._close_channel(c)
def reset(self):
self.stop(self.c)
self.start(self.c)
def _close_channel(self, c):
if self.node and self.node.channel:
ignore_errors(c, self.node.channel.close)
def shutdown(self, c):
self.on_stop()
if self.consumer:
debug('Canceling broadcast consumer...')
ignore_errors(c, self.consumer.cancel)
self.stop(self.c)
class gPidbox(Pidbox):
"""Worker pidbox (greenlet)."""
_node_shutdown = None
_node_stopped = None
_resets = 0
def start(self, c):
c.pool.spawn_n(self.loop, c)
def on_stop(self):
if self._node_stopped:
self._node_shutdown.set()
debug('Waiting for broadcast thread to shutdown...')
self._node_stopped.wait()
self._node_stopped = self._node_shutdown = None
def reset(self):
self._resets += 1
def _do_reset(self, c, connection):
self._close_channel(c)
self.node.channel = connection.channel()
self.consumer = self.node.listen(callback=self.on_message)
self.consumer.consume()
def loop(self, c):
resets = [self._resets]
shutdown = self._node_shutdown = threading.Event()
stopped = self._node_stopped = threading.Event()
try:
with c.connection_for_read() as connection:
info('pidbox: Connected to %s.', connection.as_uri())
self._do_reset(c, connection)
while not shutdown.is_set() and c.connection:
if resets[0] < self._resets:
resets[0] += 1
self._do_reset(c, connection)
try:
connection.drain_events(timeout=1.0)
except socket.timeout:
pass
finally:
stopped.set()