Skip to content

Commit

Permalink
websockets support
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleicon Moraes committed Mar 10, 2010
1 parent 286d81d commit 57939f5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
6 changes: 6 additions & 0 deletions README.rst
Expand Up @@ -49,6 +49,12 @@ This is the basic usage pattern for map/reduce (see examples).

See below on how to purge and disconnect all consumers from a queue, using DELETE.

WebSocket consumer
==================

Now that cyclone has websockets support, check README.websocket to test it.

If you are using a browser or library which already supports websockets, you may take advantage of this interface


REST services
Expand Down
51 changes: 48 additions & 3 deletions restmq/web.py
Expand Up @@ -20,10 +20,14 @@ def __init__(self, handler, json_callback=None):
def write(self, text):
if not isinstance(text, types.StringType):
text = cyclone.escape.json_encode(text)
if self.json_callback:
self.handler.write("%s(%s);\r\n" % (self.json_callback, text))

if isinstance(self.handler, cyclone.web.WebSocketHandler):
self.handler.sendMessage(text)
else:
self.handler.write(text+"\r\n")
if self.json_callback:
self.handler.write("%s(%s);\r\n" % (self.json_callback, text))
else:
self.handler.write(text+"\r\n")

def flush(self):
self.handler.flush()
Expand Down Expand Up @@ -323,6 +327,46 @@ def post(self, queue):

self.finish("%s\n" % cyclone.escape.json_encode({'stat':qstat}))

class WebSocketQueueHandler(cyclone.web.WebSocketHandler):
"""
Guess what, I had a fever, and the only prescription is websocket
"""
def _disconnected(self, why, handler, queue_name):
try:
self.settings.comet.presence[queue_name].remove(handler)
if not len(self.settings.comet.presence[queue_name]):
self.settings.comet.presence.pop(queue_name)
except:
pass

#def connectionMade(self, *args, **kwargs):
def connectionMade(self, queue):
print "connection made: %s" % queue
self.queue = queue
handler = CustomHandler(self, None)
queue_name = queue.encode("utf-8")
self.settings.comet.presence[queue_name].append(handler)
self.notifyFinish().addCallback(self._disconnected, handler, queue_name)

def connectionLost(self, why):
print "connection lost:", why

#@defer.inlineCallbacks
def messageReceived(self, message):
"""
same idea as COMET consumer, but using websockets. how cool is that ?
"""
print "msg recv: %s" % message
self.sendMessage(message)

#try:
# result = yield self.settings.oper.queue_add(self.queue, message)
#except Exception, e:
# raise cyclone.web.HTTPError(400, str(e))

#c = yield self.settings.comet.queue.put(self.queue)
#CustomHandler(self, None).finish(result)
#pass # we wont be receiving messages for now ;*

class Application(cyclone.web.Application):
def __init__(self):
Expand All @@ -335,6 +379,7 @@ def __init__(self):
(r"/stats/(.*)", StatusHandler),
(r"/queue", QueueHandler),
(r"/control/(.*)", QueueControlHandler),
(r"/ws/(.*)", WebSocketQueueHandler),
]

db = txredisapi.lazyRedisConnectionPool(pool_size=10)
Expand Down

0 comments on commit 57939f5

Please sign in to comment.