Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
fix(stream-client): request queue size is limited by 1000
Browse files Browse the repository at this point in the history
  • Loading branch information
felix committed Sep 19, 2019
1 parent 4f38944 commit 05db02f
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions gnes/client/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,36 @@ class StreamingClient(GrpcClient):
def __init__(self, args):
super().__init__(args)

self._request_queue = queue.Queue()
self._request_queue = queue.Queue(maxsize=1000)
self._is_streaming = threading.Event()

self._dispatch_thread = threading.Thread(target=self._start)
self._dispatch_thread.setDaemon(1)
self._dispatch_thread.start()
self._dispatch_thread.setDaemon(True)

def send_request(self, request):
self._request_queue.put(request)
self._request_queue.put(request, block=True)

# create a new streaming call
if not self._is_streaming.is_set():
self._dispatch_thread.start()

def _start(self):
self._is_streaming.set()
response_stream = self.stream_call(self._request_generator())
self.stream_call(self._request_generator())
self._is_streaming.clear()

def _request_generator(self):
while self._is_streaming.is_set():
try:
request = self._request_queue.get(block=True, timeout=1.0)
request = self._request_queue.get(block=True, timeout=5.0)
if request is None:
break
yield request
except queue.Empty:
pass
continue
except Exception as e:
print('exception: %s' % str(e))
break

@handler.register(NotImplementedError)
def _handler_default(self, resp: 'gnes_pb2.Response'):
Expand Down

0 comments on commit 05db02f

Please sign in to comment.