-
Notifications
You must be signed in to change notification settings - Fork 100
/
zmqstreamserver.py
202 lines (168 loc) · 6.91 KB
/
zmqstreamserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import sys
import os
import traceback
import subprocess
from optparse import OptionParser
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
from tornado import httpserver, web, websocket
debug = True
NAME_SIZE = 256 # this must agree with NAME_SIZE in Model.js
def DEBUG(msg):
if debug:
print '<<<' + str(os.getpid()) + '>>> ZMQStreamServer --', msg
sys.stdout.flush()
def make_unicode(content):
if type(content) == str:
# Ignore errors even if the string is not proper UTF-8 or has
# broken marker bytes.
# Python built-in function unicode() can do this.
content = unicode(content, "utf-8", errors="ignore")
else:
# Assume the content object has proper __unicode__() method
content = unicode(content)
return content
class ZMQStreamHandler(websocket.WebSocketHandler):
''' A handler that forwards output from a ZMQStream to a WebSocket.
'''
def initialize(self, addr):
self.addr = addr
self.websocket_closed = False
def open(self):
stream = None
try:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(self.addr)
socket.setsockopt(zmq.SUBSCRIBE, '')
stream = ZMQStream(socket)
except Exception, err:
exc_type, exc_value, exc_traceback = sys.exc_info()
print 'ZMQStreamHandler ERROR getting ZMQ stream:', err
traceback.print_exception(exc_type, exc_value, exc_traceback)
if stream and not stream.closed():
stream.close()
else:
stream.on_recv(self._write_message)
def _write_message(self, message):
if self.websocket_closed:
print 'ZMQStreamHandler message received after websocket closed'
return
if len(message) == 1: # assume message[0] is a json string
binary = False
try:
message = message[0]
except Exception as err:
exc_type, exc_value, exc_traceback = sys.exc_info
print 'ZMQStreamHandler ERROR parsing message:', str(message), err
traceback.print_exception(exc_type, exc_value, exc_traceback)
return
elif len(message) == 2: # it's a msg of the form [topic, binary_value]
binary = True
try:
if len(message[0]) > NAME_SIZE:
raise RuntimeError("topic field of message is longer than %d characters" % NAME_SIZE)
if not isinstance(message[1], bytes):
raise TypeError("message value must be of type 'bytes', not type '%s'" %
str(type(message[1])))
#padded = bytes(message[0])+(NAME_SIZE-len(message[0]))*b'\0' # 0 padded object name in bytes
message = message[0].ljust(NAME_SIZE, '\0') + message[1] # FIXME: message is copied here
except Exception as err:
exc_type, exc_value, exc_traceback = sys.exc_info()
print 'ZMQStreamHandler ERROR parsing binary message:', str(message), err
traceback.print_exception(exc_type, exc_value, exc_traceback)
return
try:
self.write_message(message, binary=binary)
except Exception as err:
print 'ZMQStreamHandler ERROR writing message to websocket:', err
def on_message(self, message):
pass
def on_close(self):
self.websocket_closed = True
class ZMQStreamApp(web.Application):
''' A web application that serves a ZMQStream over a WebSocket.
'''
def __init__(self, zmqstream_addr, websocket_url):
handlers = [
(websocket_url, ZMQStreamHandler, dict(addr=zmqstream_addr))
]
settings = {
'login_url': '/login',
'debug': True,
}
super(ZMQStreamApp, self).__init__(handlers, **settings)
class ZMQStreamServer(object):
''' Runs an http server that serves a ZMQStream over a WebSocket.
'''
def __init__(self, options):
self.options = options
self.web_app = ZMQStreamApp(options.addr, options.url)
self.http_server = httpserver.HTTPServer(self.web_app)
def serve(self):
''' Start server listening on port & start the ioloop.
'''
DEBUG('serve %s' % self.options.port)
try:
if (self.options.external):
self.http_server.listen(self.options.port)
else:
self.http_server.listen(self.options.port, address='localhost')
except Exception as exc:
print '<<<%s>>> ZMQStreamServer -- listen on %s failed: %s' \
% (os.getpid(), self.options.port, exc)
sys.exit(1)
try:
ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
DEBUG('interrupt received, shutting down.')
@staticmethod
def get_options_parser():
''' Create a parser for command-line arguments.
'''
parser = OptionParser()
parser.add_option("-z", "--zmqstream",
dest="addr", default=0,
help="the address of the zmqstream")
parser.add_option("-p", "--port",
dest="port", type="int", default=0,
help="the port to run websocket server on")
parser.add_option("-u", "--url",
dest="url",
help="the url to expose for the websocket")
parser.add_option("-x", "--external",
dest="external", action="store_true",
help="allow access to the server from external clients")
return parser
@staticmethod
def spawn_process(zmq_url, ws_port, ws_url='/', external=False):
''' Run zmqstreamserver in its own process, mapping a zmq
stream to a websocket.
args:
zmq_url the url of the ZMQStream
ws_port the port to serve the WebSocket on
ws_url the url to map to the WebSocket
'''
file_path = os.path.abspath(__file__)
cmd = ['python', file_path,
'-z', str(zmq_url),
'-p', str(ws_port),
'-u', str(ws_url)]
if external:
cmd.append('-x')
return subprocess.Popen(cmd)
def main():
''' Process command line arguments, create server, and start it up.
'''
# make sure to install zmq ioloop before creating any tornado objects
ioloop.install()
# create the server and kick it off
parser = ZMQStreamServer.get_options_parser()
(options, args) = parser.parse_args()
server = ZMQStreamServer(options)
server.serve()
if __name__ == '__main__':
# dont run main() if this is a forked windows process
if sys.modules['__main__'].__file__ == __file__:
main()