Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

BufferedChannel can be closed eagerly via callback

Fix a bug in streaming:
	- when server is done streaming, it send STREAM_DONE and close the
	  channel (heartbeat is closed as well).
	- because client handle STREAM_DONE only when user's code pull data
	  from the client side iterator, and if the user's code doest
	  consume the iterator, then the heartbeat will be still active on
	  the client, even if the stream is technically closed.

The solution:
	- A callback is called before enqueuing events in BufferChannel. If
	  this callback return True, the BufferChannel is closed.
	- This way, the client can close properly the BufferChannel and
	  eventually consume the remaining queued events.
  • Loading branch information...
commit 01b89e5b93002ef63771aea9682be92d36ee3ab4 1 parent 72086db
@bombela bombela authored
View
99 tests/test_client_heartbeat.py
@@ -93,3 +93,102 @@ def slow(self):
assert client.slow() == 2
print 'GOT ANSWER'
+
+
+def test_client_hb_doesnt_linger_on_streaming():
+ endpoint = random_ipc_endpoint()
+
+ class MySrv(zerorpc.Server):
+
+ @zerorpc.stream
+ def iter(self):
+ return xrange(42)
+
+ srv = MySrv(heartbeat=1, context=zerorpc.Context())
+ srv.bind(endpoint)
+ gevent.spawn(srv.run)
+
+ client1 = zerorpc.Client(endpoint, heartbeat=1, context=zerorpc.Context())
+
+ def test_client():
+ assert list(client1.iter()) == list(xrange(42))
+ print 'sleep 3s'
+ gevent.sleep(3)
+
+ gevent.spawn(test_client).join()
+
+
+def est_client_drop_few():
+ endpoint = random_ipc_endpoint()
+
+ class MySrv(zerorpc.Server):
+
+ def lolita(self):
+ return 42
+
+ srv = MySrv(heartbeat=1, context=zerorpc.Context())
+ srv.bind(endpoint)
+ gevent.spawn(srv.run)
+
+ client1 = zerorpc.Client(endpoint, heartbeat=1, context=zerorpc.Context())
+ client2 = zerorpc.Client(endpoint, heartbeat=1, context=zerorpc.Context())
+ client3 = zerorpc.Client(endpoint, heartbeat=1, context=zerorpc.Context())
+
+ assert client1.lolita() == 42
+ assert client2.lolita() == 42
+
+ gevent.sleep(3)
+ assert client3.lolita() == 42
+
+
+def test_client_drop_empty_stream():
+ endpoint = random_ipc_endpoint()
+
+ class MySrv(zerorpc.Server):
+
+ @zerorpc.stream
+ def iter(self):
+ return []
+
+ srv = MySrv(heartbeat=1, context=zerorpc.Context())
+ srv.bind(endpoint)
+ gevent.spawn(srv.run)
+
+ client1 = zerorpc.Client(endpoint, heartbeat=1, context=zerorpc.Context())
+
+ def test_client():
+ print 'grab iter'
+ i = client1.iter()
+
+ print 'sleep 3s'
+ gevent.sleep(3)
+
+ gevent.spawn(test_client).join()
+
+
+def test_client_drop_stream():
+ endpoint = random_ipc_endpoint()
+
+ class MySrv(zerorpc.Server):
+
+ @zerorpc.stream
+ def iter(self):
+ return xrange(500)
+
+ srv = MySrv(heartbeat=1, context=zerorpc.Context())
+ srv.bind(endpoint)
+ gevent.spawn(srv.run)
+
+ client1 = zerorpc.Client(endpoint, heartbeat=1, context=zerorpc.Context())
+
+ def test_client():
+ print 'grab iter'
+ i = client1.iter()
+
+ print 'consume some'
+ assert list(next(i) for x in xrange(142)) == list(xrange(142))
+
+ print 'sleep 3s'
+ gevent.sleep(3)
+
+ gevent.spawn(test_client).join()
View
13 zerorpc/channel.py
@@ -171,12 +171,21 @@ def __init__(self, channel, inqueue_size=100):
self._input_queue = gevent.queue.Queue()
self._lost_remote = False
self._verbose = False
+ self._on_close_if = None
self._recv_task = gevent.spawn(self._recver)
@property
def recv_is_available(self):
return self._channel.recv_is_available
+ @property
+ def on_close_if(self):
+ return self._on_close_if
+
+ @on_close_if.setter
+ def on_close_if(self, cb):
+ self._on_close_if = cb
+
def __del__(self):
self.close()
@@ -205,6 +214,10 @@ def _recver(self):
'BufferedChannel, queue overflow on event:', event)
else:
self._input_queue.put(event)
+ if self._on_close_if is not None and self._on_close_if(event):
+ self._recv_task = None
+ self.close()
+ return
def create_event(self, name, args, xheader={}):
return self._channel.create_event(name, args, xheader)
View
6 zerorpc/patterns.py
@@ -56,6 +56,11 @@ def accept_answer(self, event):
def process_answer(self, context, bufchan, event, method,
raise_remote_error):
+
+ def is_stream_done(event):
+ return event.name == 'STREAM_DONE'
+ bufchan.on_close_if = is_stream_done
+
def iterator(event):
while event.name == 'STREAM':
yield event.args
@@ -63,6 +68,7 @@ def iterator(event):
if event.name == 'ERR':
raise_remote_error(event)
bufchan.close()
+
return iterator(event)
Please sign in to comment.
Something went wrong with that request. Please try again.