Skip to content

Commit

Permalink
Fix #1: deliver wait queue on reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
rudyryk committed Oct 8, 2015
1 parent 27b5e3a commit 1c20662
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 28 deletions.
116 changes: 90 additions & 26 deletions bachata/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class RedisQueue(base.BaseQueue):
:param conn_params: Redis connection params
"""
CLOSE_COMMAND = '!'

def __init__(self, loop=None, conn_params=None):
self.loop = loop
self.conn_params = conn_params
Expand All @@ -58,9 +60,9 @@ def add_socket(self, channel, websocket, proto=None):
"""
websocket.is_closed = False
self.loop.create_task(self.listen_queue(channel, websocket))
ready_message = proto.make_message(type=proto.TRANS_READY)
websocket.write_message(proto.dump_message(ready_message))
self.loop.create_task(self.listen_queue(channel, websocket))

def del_socket(self, channel, websocket, proto=None):
"""Unregister WebSocket from receiving messages from channel.
Expand All @@ -71,7 +73,8 @@ def del_socket(self, channel, websocket, proto=None):
"""
websocket.is_closed = True
self.loop.create_task(self.put_message([channel], '', proto=proto))
self.loop.create_task(self.put_message(
[channel], self.CLOSE_COMMAND, proto=proto))

@asyncio.coroutine
def connect(self):
Expand Down Expand Up @@ -145,20 +148,23 @@ def put_message(self, channels, message, proto=None, from_channel=None):
:param from_channel: Message from channel
"""
raw_message = proto.dump_message(message)
if isinstance(message, dict):
message_dump = proto.dump_message(message)
else:
message_dump = None

for channel in channels:
# Store every message which has ID within separate list,
# also store from channel with message as 2-nd list item.
if isinstance(message, dict) and ('id' in message):
if message_dump and ('id' in message):
message_key = '%s:%s' % (channel, message['id'])
queue_data = message_key
values = (raw_message, from_channel or '')
values = (message_dump, from_channel or '')
yield from self.conn.rpush(message_key, *values)
# If message has no ID or is not dict itself,
# then just pass it as is.
else:
queue_data = raw_message
queue_data = message_dump or message

# Put message ID or raw message on queue
yield from self.conn.lpush(channel, queue_data)
Expand Down Expand Up @@ -198,30 +204,88 @@ def pop_delivered(self, channel, message_id, proto=None):

@asyncio.coroutine
def listen_queue(self, channel, websocket):
"""Start queue listener for channel and WebSocket connection."""
"""Start queue listener for channel and WebSocket connection.
Send wait queue first to deliver messages that was not delivered
on previous session. Then start listening for new messages. Every
message with ID is send through WebSocket and put on wait queue.
After delivery confirmation message is removed from wait queue,
see :meth:`.pop_delivered` method.
"""
redis_conn = yield from aioredis.create_redis(
loop=self.loop, **self.conn_params)

# Send wait queue first
wait_queue = '%s:wait' % channel
yield from self._send_wait_queue(
wait_queue, redis_conn, channel, websocket)

# Wait for new messages and send
while True:
wait_queue = '%s:wait' % channel
val = yield from redis_conn.brpoplpush(channel, wait_queue,
timeout=10)
if websocket.is_closed:
raw = yield from redis_conn.brpoplpush(
channel, wait_queue)

if not raw:
continue

val = raw.decode('utf-8')

if val == self.CLOSE_COMMAND:
yield from redis_conn.lrem(wait_queue, 0, self.CLOSE_COMMAND)
redis_conn.close()
return
elif val:
val_str = val.decode('utf-8')

# Messages with confirmation are stored separatelly
# and only their id is passed to queue. Messages
# without delivery confirmation are just sent as is.
# So, in the first case message id is popped from list
# and in the second case raw message is popped.

if val_str.startswith(channel): # get by id and send
raw_message = yield from redis_conn.lindex(val_str, 0)
if raw_message:
websocket.write_message(raw_message)
else: # just send
else:
pop_wait = yield from self._write_message(
redis_conn, val, channel, websocket)
if pop_wait:
yield from redis_conn.lpop(wait_queue)
websocket.write_message(val_str)

@asyncio.coroutine
def _send_wait_queue(self, wait_queue, redis_conn, channel, websocket):
"""Send messages from waiting queue.
:param wait_queue: Wait queue key
:param redis_conn: Redis connection
:param channel: Message channel
:param websocket: WebSocket connection
"""
wait_messages = yield from redis_conn.lrange(wait_queue, 0, -1)
for raw in reversed(wait_messages):
val = raw.decode('utf-8')
if val.startswith(channel):
yield from self._write_message(
redis_conn, val, channel, websocket)
else:
# Actually we should not be here, if everything works fine!
# But due to [old] bugs there could be trash messages on wait
# queue, so we just clean them up.
# TODO: place WARNING here
yield from self.conn.lrem(wait_queue, 1, val)

@asyncio.coroutine
def _write_message(self, redis_conn, msg_or_id, channel, websocket):
"""Write message to WebSocket output by ID or raw value.
Messages with confirmation are stored separatelly
and only their id is passed to queue. Messages
without delivery confirmation are just sent as is.
:param redis_conn: Redis connection
:param msg_or_id: Message ID or dump to str
:param channel: Message channel
:param websocket: WebSocket connection
:return: `True` if message should be removed from wait
queue, because doesn't need confirmation.
"""
# get by id and send
if msg_or_id.startswith(channel):
message_dump = yield from redis_conn.lindex(msg_or_id, 0)
if message_dump:
websocket.write_message(message_dump)
# just send
else:
websocket.write_message(msg_or_id)
return True
14 changes: 12 additions & 2 deletions example/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,18 @@

ws.onmessage = function(evt) {
console.log("Message received: ", evt.data);
if (opts.onmessage) {
opts.onmessage(JSON.parse(evt.data));

var message = JSON.parse(evt.data);

if (message && opts.onmessage) {
opts.onmessage(message);
}

if (message && message.id) {
ws.send(JSON.stringify({
type: 200, // GOT IT
data: message.id
}));
}
};

Expand Down

0 comments on commit 1c20662

Please sign in to comment.