Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
275 lines (234 sloc) 9.04 KB
from geventwebsocket.handler import WebSocketHandler
from gevent import pywsgi
import gevent
from gevent_zeromq import zmq
import logging
log = logging.getLogger(__name__)
import simplejson
from gevent import spawn
# demo app
class ZmqGatewayFactory(object):
""" factory returns an existing gateway if we have one,
or creates a new one and starts it if we don't
"""
def __init__(self):
self.gateways = {}
self.ctx = zmq.Context()
def get(self, socket_type, zmq_conn_string):
if (socket_type, zmq_conn_string) in self.gateways:
gateway = self.gateways[socket_type, zmq_conn_string]
return gateway
else:
if socket_type == zmq.REQ:
log.debug("spawning req socket %s" ,zmq_conn_string)
self.gateways[socket_type, zmq_conn_string] = \
ReqGateway(zmq_conn_string, ctx=self.ctx)
else:
log.debug("spawning sub socket %s" ,zmq_conn_string)
self.gateways[socket_type, zmq_conn_string] = \
SubGateway(zmq_conn_string, ctx=self.ctx)
spawn(self.gateways[socket_type, zmq_conn_string].run)
return self.gateways[socket_type, zmq_conn_string]
def shutdown(self):
"""
Close all sockets associated with this context, and then
terminate the context.
"""
self.ctx.destroy()
class WebProxyHandler(object):
""" generic handler which works with proxy objects, proxies can
register with WebProxyHandler, and deregister with them
"""
def __init__(self):
self.proxies = {}
def register(self, identity, proxy):
self.proxies[identity] = proxy
def deregister(self, identity):
try:
self.proxies.pop(identity)
except KeyError as e:
pass
def close(self):
for v in self.proxies.values():
v.deregister()
class ZmqGateway(WebProxyHandler):
""" proxy handler which handles the zeromq side of things.
"""
def __init__(self, zmq_conn_string, ctx=None):
super(ZmqGateway, self).__init__()
self.zmq_conn_string = zmq_conn_string
self.ctx = ctx
def send_proxy(self, identity, msg):
try:
self.proxies[identity].send_web(msg)
#what exception is thrown here?
except Exception as e:
log.exception(e)
self.deregister(identity)
class SubGateway(ZmqGateway):
def __init__(self, zmq_conn_string, ctx=None):
super(SubGateway, self).__init__(zmq_conn_string, ctx=ctx)
self.s = ctx.socket(zmq.SUB)
self.s.setsockopt(zmq.SUBSCRIBE, '');
self.s.connect(zmq_conn_string)
def run(self):
while(True):
msg = self.s.recv(copy=True)
try:
log.debug('subgateway, received %s', msg)
for k in self.proxies.keys():
if self.proxies[k].msgfilter in msg:
self.send_proxy(k, msg)
except Exception as e:
log.exception(e)
continue
class ReqGateway(ZmqGateway):
def __init__(self, zmq_conn_string, ctx=None):
super(ReqGateway, self).__init__(zmq_conn_string, ctx=ctx)
self.s = ctx.socket(zmq.XREQ)
self.s.connect(zmq_conn_string)
def send(self, identity, msg):
#append null string to front of message, just like REQ
#embed identity the same way
self.s.send_multipart([str(identity), '', str(msg)])
log.debug('reqgateway, sent %s', msg)
def handle_request(self, msg):
#strip off the trailing string
identity = msg[0]
msg = msg[-1]
self.send_proxy(identity, msg)
def run(self):
while True:
msg = self.s.recv_multipart(copy=True)
try:
log.debug('reqgateway, received %s', msg)
self.handle_request(msg)
except Exception as e:
log.exception(e)
continue
class BridgeWebProxyHandler(WebProxyHandler):
"""
should rename this to BridgeWebSocketGateway
proxy handler which handles the web socket side of things.
you have one of these per web socket connection. it listens on the web
socket, and when a connection request is received, grabs the appropriate
zeromq gateway from the factory. It also registers the proxy with this
object nad the zeromq gateway
"""
def __init__(self, ws, gateway_factory):
super(BridgeWebProxyHandler, self).__init__()
self.ws = ws
self.gateway_factory = gateway_factory
def zmq_allowed(self, options):
return True
def connect(self, identity, content):
content = simplejson.loads(content);
zmq_conn_string = content['zmq_conn_string']
socket_type = content['socket_type']
if socket_type == zmq.REQ:
proxy = ReqSocketProxy(identity)
else:
proxy = SubSocketProxy(identity, content.get('msgfilter', ''))
gateway = self.gateway_factory.get(socket_type, zmq_conn_string)
proxy.register(self, gateway)
def handle_request(self, msg):
msg = simplejson.loads(msg)
msg_type = msg.get('msg_type')
identity = msg.get('identity')
content = msg.get('content')
if msg_type == 'connect':
if self.zmq_allowed(content):
self.connect(identity, content)
content = simplejson.dumps({'status' : 'success'})
self.send(identity, content)
else:
content = simplejson.dumps({'status' : 'error'})
self.send(identity, content)
else:
self.send_proxy(identity, content)
def send_proxy(self, identity, content):
try:
self.proxies[identity].send_zmq(content)
#what exception is thrown here?
except Exception as e:
log.exception(e)
self.deregister(identity)
def send(self, identity, msg):
log.debug('ws sent %s', msg)
self.ws.send(simplejson.dumps({'identity' : identity,
'content' : msg}))
def run(self):
while True:
msg = self.ws.receive()
log.debug('ws received %s', msg)
if msg is None:
self.close()
break
self.handle_request(msg)
"""these proxy objects below are dumb objects. all they do is manage
relationships with their reqpective websocket and zeromq gateways.
the gateways use this object to get to the appropriate opposing gateway
SocketProxy
ReqSocketProxy
SubSocketProxy
you have one instance of this, for every fake zeromq socket you have on the
js side
"""
class SocketProxy(object):
def __init__(self, identity):
self.identity = identity
def register(self, wsgateway, zmqgateway):
self.wsgateway = wsgateway
self.zmqgateway = zmqgateway
wsgateway.register(self.identity, self)
zmqgateway.register(self.identity, self)
def deregister(self):
self.wsgateway.deregister(self.identity)
self.zmqgateway.deregister(self.identity)
def send_web(self, msg):
self.wsgateway.send(self.identity, msg)
def send_zmq(self, msg):
self.zmqgateway.send(self.identity, msg)
class ReqSocketProxy(SocketProxy):
socket_type = zmq.REQ
class SubSocketProxy(SocketProxy):
socket_type = zmq.SUB
def __init__(self, identity, msgfilter):
super(SubSocketProxy, self).__init__(identity)
self.msgfilter = msgfilter
"""
Gevent wsgi handler - the main server. once instnace of this per process
"""
class WsgiHandler(object):
bridge_class = BridgeWebProxyHandler
def __init__(self):
self.zmq_gateway_factory = ZmqGatewayFactory()
def websocket_allowed(self, environ):
return True
def wsgi_handle(self, environ, start_response):
if 'wsgi.websocket' in environ and self.websocket_allowed(environ):
handler = self.bridge_class(environ['wsgi.websocket'],
self.zmq_gateway_factory)
handler.run()
else:
start_response("404 Not Found", [])
return []
def __del__(self):
"""
Upon destruction shut down any open sockets, don't rely
on the garbage collector which can leave sockets
dangling open.
"""
self.zmq_gateway_factory.shutdown()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
app = WsgiHandler()
server = pywsgi.WSGIServer(('0.0.0.0', 8000), app.wsgi_handle,
# keyfile='/etc/nginx/server.key',
# certfile='/etc/nginx/server.crt',
handler_class=WebSocketHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
print 'Shutting down gracefully.'
server.kill()
Something went wrong with that request. Please try again.