Skip to content
Permalink
Browse files

fix(service): fix ServiceManager replicas router

  • Loading branch information...
hanxiao committed Oct 12, 2019
1 parent 7265f76 commit fce94d94e0ab237e556696944f124a275f0330fc
Showing with 7 additions and 5 deletions.
  1. +3 −3 gnes/cli/parser.py
  2. +2 −0 gnes/service/base.py
  3. +2 −2 tests/test_gnes_flow.py
@@ -185,11 +185,11 @@ def set_service_parser(parser=None):
'dump_interval will be ignored')
parser.add_argument('--parallel_backend', type=str, choices=['thread', 'process'], default='thread',
help='parallel backend of the service')
parser.add_argument('--num_parallel', type=int, default=1,
help='number of parallel services running at the same time, '
parser.add_argument('--num_parallel', '--replicas', type=int, default=1,
help='number of parallel services running at the same time (i.e. replicas), '
'`port_in` and `port_out` will be set to random, '
'and routers will be added automatically when necessary')
parser.add_argument('--parallel_type', type=ParallelType.from_string, choices=list(ParallelType),
parser.add_argument('--parallel_type', '--replica_type', type=ParallelType.from_string, choices=list(ParallelType),
default=ParallelType.PUSH_NONBLOCK,
help='parallel type of the concurrent services')
parser.add_argument('--check_version', action=ActionNoYes, default=True,
@@ -553,11 +553,13 @@ def __init__(self, service_cls, args):
if args.num_parallel > 1:
from .router import RouterService
_head_router = copy.deepcopy(args)
_head_router.yaml_path = resolve_yaml_path('BaseRouter')
_head_router.port_ctrl = self._get_random_port()
port_out = self._get_random_port()
_head_router.port_out = port_out

_tail_router = copy.deepcopy(args)
_tail_router.yaml_path = resolve_yaml_path('BaseRouter')
port_in = self._get_random_port()
_tail_router.port_in = port_in
_tail_router.port_ctrl = self._get_random_port()
@@ -121,7 +121,7 @@ def _test_index_flow(self, backend):

flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'))
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'),
service_in='prep')
@@ -137,7 +137,7 @@ def _test_index_flow(self, backend):
def _test_query_flow(self, backend):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'))
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))

0 comments on commit fce94d9

Please sign in to comment.
You can’t perform that action at this time.