Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.


feat(service): remove async dump for better stability
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Oct 10, 2019
1 parent 17f9287 commit c8cedd0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 30 deletions.
11 changes: 7 additions & 4 deletions gnes/flow/
Expand Up @@ -62,13 +62,15 @@ class Flow:
You can use `.add()` then `.build()` to customize your own workflow.
For example:
.. highlight:: python
.. code-block:: python
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Preprocessor, yaml_path='BasePreprocessor')
.add(gfs.Encoder, yaml_path='BaseEncoder')
.add(gfs.Router, yaml_path='BaseRouter'))
with'thread') as flow:
Expand All @@ -77,6 +79,7 @@ class Flow:
Note the different default copy behaviors in `.add()` and `.build()`:
`.add()` always copy the flow by default, whereas `.build()` modify the flow in place.
You can change this behavior by giving an argument `copy_flow=False`.
_supported_orch = {'swarm', 'k8s'}
_service2parser = {
Expand Down Expand Up @@ -211,14 +214,14 @@ def add(self, service: 'Service',
**kwargs) -> 'Flow':
Add a service to the current flow object and return the new modified flow object
:param copy_flow: when set to true, then always copy the current flow
and do the modification on top of it then return, otherwise, do in-line modification
:param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
:param name: the name indentifier of the service, useful in 'service_in' and 'service_out'
:param service_in: the name of the service(s) that this service receives data from.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param service_out: the name of the service(s) that this service sends data to.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
:param kwargs: other keyword-value arguments that the service CLI supports
:return: a (new) flow object with modification
Expand Down
41 changes: 15 additions & 26 deletions gnes/service/
Expand Up @@ -320,6 +320,7 @@ def __init__(self, args):
self.is_event_loop = self._get_event()
self.is_model_changed = self._get_event()
self.is_handler_done = self._get_event()
self.last_dump_time = time.perf_counter()
self._model = None
self.use_event_loop = True
self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl)
Expand All @@ -335,29 +336,17 @@ def run(self):
except Exception as ex:
self.logger.error(ex, exc_info=True)

def _start_auto_dump(self):
if self.args.dump_interval > 0 and not self.args.read_only:
self._auto_dump_thread = threading.Thread(target=self._auto_dump)

def _auto_dump(self):
while self.is_event_loop.is_set():
if self.is_model_changed.is_set():
'auto-dumping the new change of the model every %ds...' % self.args.dump_interval)

def dump(self):
if not self.args.read_only:
if self._model:'dumping changes to the model...')
self._model.dump()'dumping finished!')
else:'no dumping as "read_only" set to true.')
if (not self.args.read_only
and self._model
and self.is_model_changed.is_set()
and (time.perf_counter() - self.last_dump_time) > self.args.dump_interval):
self.is_model_changed.clear()'dumping changes to the model, %3.0fs since last the dump'
% (time.perf_counter() - self.last_dump_time))
self.last_dump_time = time.perf_counter()'dumping finished! next dump will start in at least %3.0fs' % self.args.dump_interval)

def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', *args, **kwargs):
Expand Down Expand Up @@ -414,11 +403,10 @@ def _run(self, ctx):
self.logger.critical('ready and listening')
while self.is_event_loop.is_set():
pull_sock = None
socks = dict(poller.poll())
socks = dict(poller.poll(1))
if socks.get(in_sock) == zmq.POLLIN:
pull_sock = in_sock
elif socks.get(ctrl_sock) == zmq.POLLIN:
Expand Down Expand Up @@ -450,10 +438,11 @@ def _run(self, ctx):
'received a new message but since "use_event_loop=False" I will not handle it. '
'I will just block the thread until "is_handler_done" is set!')
# wait until some one else call is_handler_done.set()
# clear the handler status
if self.args.dump_interval == 0:
except EventLoopEnd:'break from the event loop')
except ComponentNotLoad:
Expand Down

0 comments on commit c8cedd0

Please sign in to comment.