From 57939f5786a67ee19531b90c34c5a52f79a0f0a6 Mon Sep 17 00:00:00 2001 From: Gleicon Moraes Date: Wed, 10 Mar 2010 09:48:44 -0300 Subject: [PATCH] websockets support --- README.rst | 6 ++++++ restmq/web.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index e58a3f6..15b018b 100644 --- a/README.rst +++ b/README.rst @@ -49,6 +49,12 @@ This is the basic usage pattern for map/reduce (see examples). See below on how to purge and disconnect all consumers from a queue, using DELETE. +WebSocket consumer +================== + +Now that cyclone has websockets support, check README.websocket to test it. + +If you are using a browser or library which already supports websockets, you may take advantage of this interface REST services diff --git a/restmq/web.py b/restmq/web.py index 582f799..b44dd32 100644 --- a/restmq/web.py +++ b/restmq/web.py @@ -20,10 +20,14 @@ def __init__(self, handler, json_callback=None): def write(self, text): if not isinstance(text, types.StringType): text = cyclone.escape.json_encode(text) - if self.json_callback: - self.handler.write("%s(%s);\r\n" % (self.json_callback, text)) + + if isinstance(self.handler, cyclone.web.WebSocketHandler): + self.handler.sendMessage(text) else: - self.handler.write(text+"\r\n") + if self.json_callback: + self.handler.write("%s(%s);\r\n" % (self.json_callback, text)) + else: + self.handler.write(text+"\r\n") def flush(self): self.handler.flush() @@ -323,6 +327,46 @@ def post(self, queue): self.finish("%s\n" % cyclone.escape.json_encode({'stat':qstat})) +class WebSocketQueueHandler(cyclone.web.WebSocketHandler): + """ + Guess what, I had a fever, and the only prescription is websocket + """ + def _disconnected(self, why, handler, queue_name): + try: + self.settings.comet.presence[queue_name].remove(handler) + if not len(self.settings.comet.presence[queue_name]): + self.settings.comet.presence.pop(queue_name) + except: + pass + + #def connectionMade(self, *args, **kwargs): + def connectionMade(self, queue): + print "connection made: %s" % queue + self.queue = queue + handler = CustomHandler(self, None) + queue_name = queue.encode("utf-8") + self.settings.comet.presence[queue_name].append(handler) + self.notifyFinish().addCallback(self._disconnected, handler, queue_name) + + def connectionLost(self, why): + print "connection lost:", why + + #@defer.inlineCallbacks + def messageReceived(self, message): + """ + same idea as COMET consumer, but using websockets. how cool is that ? + """ + print "msg recv: %s" % message + self.sendMessage(message) + + #try: + # result = yield self.settings.oper.queue_add(self.queue, message) + #except Exception, e: + # raise cyclone.web.HTTPError(400, str(e)) + + #c = yield self.settings.comet.queue.put(self.queue) + #CustomHandler(self, None).finish(result) + #pass # we wont be receiving messages for now ;* class Application(cyclone.web.Application): def __init__(self): @@ -335,6 +379,7 @@ def __init__(self): (r"/stats/(.*)", StatusHandler), (r"/queue", QueueHandler), (r"/control/(.*)", QueueControlHandler), + (r"/ws/(.*)", WebSocketQueueHandler), ] db = txredisapi.lazyRedisConnectionPool(pool_size=10)