-
Notifications
You must be signed in to change notification settings - Fork 39
/
worker.py
201 lines (169 loc) · 6.01 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# -*- coding: utf-8 -*-
"""Module containing worker functionality for the MDP implementation.
For the MDP specification see: http://rfc.zeromq.org/spec:7
"""
__license__ = """
This file is part of MDP.
MDP is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MDP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with MDP. If not, see <http://www.gnu.org/licenses/>.
"""
__author__ = 'Guido Goldstein'
__email__ = 'gst-py@a-nugget.de'
import sys
import time
from pprint import pprint
import zmq
from zmq.eventloop.zmqstream import ZMQStream
from zmq.eventloop.ioloop import IOLoop, DelayedCallback, PeriodicCallback
from domogik.mq.common import split_address
from domogik.mq.message import MQMessage
from domogik.common.configloader import Loader
class MQRep(object):
"""Class for the MDP worker side.
Thin encapsulation of a zmq.DEALER socket.
Provides a send method with optional timeout parameter.
Will use a timeout to indicate a broker failure.
"""
_proto_version = b'MDPW01'
# TODO: integrate that into API
HB_INTERVAL = 1000 # in milliseconds
HB_LIVENESS = 3 # HBs to miss before connection counts as dead
def __init__(self, context, service):
"""Initialize the MDPWorker.
context is the zmq context to create the socket from.
service is a byte-string with the service name.
"""
cfg = Loader('mq')
my_conf = cfg.load()
config = dict(my_conf[1])
self.context = context
self.endpoint = "tcp://{0}:{1}".format(config['ip'], config['req_rep_port'])
self.service = service
self.stream = None
self._tmo = None
self.need_handshake = True
self.ticker = None
self._delayed_cb = None
self._create_stream()
return
def _create_stream(self):
"""Helper to create the socket and the stream.
"""
socket = self.context.socket(zmq.DEALER)
ioloop = IOLoop.instance()
self.stream = ZMQStream(socket, ioloop)
self.stream.on_recv(self._on_message)
self.stream.socket.setsockopt(zmq.LINGER, 0)
self.stream.connect(self.endpoint)
self.ticker = PeriodicCallback(self._tick, self.HB_INTERVAL)
self._send_ready()
self.ticker.start()
return
def _send_ready(self):
"""Helper method to prepare and send the workers READY message.
"""
ready_msg = [ b'', self._proto_version, b'\x01', self.service ]
self.stream.send_multipart(ready_msg)
self.curr_liveness = self.HB_LIVENESS
return
def _tick(self):
"""Method called every HB_INTERVAL milliseconds.
"""
self.curr_liveness -= 1
## print '%.3f tick - %d' % (time.time(), self.curr_liveness)
self.send_hb()
if self.curr_liveness >= 0:
return
## print '%.3f lost connection' % time.time()
# ouch, connection seems to be dead
self.shutdown()
# try to recreate it
self._delayed_cb = DelayedCallback(self._create_stream, 5000)
self._delayed_cb.start()
return
def send_hb(self):
"""Construct and send HB message to broker.
"""
msg = [ b'', self._proto_version, b'\x04' ]
self.stream.send_multipart(msg)
return
def shutdown(self):
"""Method to deactivate the worker connection completely.
Will delete the stream and the underlying socket.
"""
if self.ticker:
self.ticker.stop()
self.ticker = None
if not self.stream:
return
self.stream.socket.close()
self.stream.close()
self.stream = None
self.timed_out = False
self.need_handshake = True
self.connected = False
return
def reply(self, msg):
"""Send the given message.
msg can either be a byte-string or a list of byte-strings.
"""
## if self.need_handshake:
## raise ConnectionNotReadyError()
# prepare full message
to_send = self.envelope
self.envelope = None
if isinstance(msg, list):
to_send.extend(msg)
else:
to_send.append(msg)
self.stream.send_multipart(to_send)
return
def _on_message(self, msg):
"""Helper method called on message receive.
msg is a list w/ the message parts
"""
# 1st part is empty
msg.pop(0)
# 2nd part is protocol version
# TODO: version check
proto = msg.pop(0)
# 3rd part is message type
msg_type = msg.pop(0)
# XXX: hardcoded message types!
# any message resets the liveness counter
self.need_handshake = False
self.curr_liveness = self.HB_LIVENESS
if msg_type == b'\x05': # disconnect
self.curr_liveness = 0 # reconnect will be triggered by hb timer
elif msg_type == b'\x02': # request
# remaining parts are the user message
envelope, msg = split_address(msg)
envelope.append(b'')
envelope = [ b'', self._proto_version, b'\x03'] + envelope # REPLY
self.envelope = envelope
mes = MQMessage()
mes.set(msg)
self.on_mdp_request(mes)
else:
# invalid message
# ignored
pass
return
def on_mdp_request(self, msg):
"""Public method called when a request arrived.
Must be overloaded!
"""
pass
#
### Local Variables:
### buffer-file-coding-system: utf-8
### mode: python
### End: