Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge branch 'master' into fix-preprocessor-name
Browse files Browse the repository at this point in the history
  • Loading branch information
felix committed Oct 10, 2019
2 parents 6d11840 + 1878564 commit 044138d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
6 changes: 6 additions & 0 deletions gnes/base/__init__.py
Expand Up @@ -336,6 +336,12 @@ def _get_instance_from_yaml(cls, constructor, node, stop_on_import_error=False):
data = ruamel.yaml.constructor.SafeConstructor.construct_mapping(
constructor, node, deep=True)

_gnes_config = data.get('gnes_config', {})
for k, v in _gnes_config.items():
_gnes_config[k] = _expand_env_var(v)
if _gnes_config:
data['gnes_config'] = _gnes_config

dump_path = cls._get_dump_path_from_config(data.get('gnes_config', {}))
load_from_dump = False
if dump_path:
Expand Down
6 changes: 3 additions & 3 deletions gnes/flow/__init__.py
Expand Up @@ -430,7 +430,7 @@ def build(self, backend: Optional[str] = 'thread', copy_flow: bool = False, *arg
# for thread and process backend which runs locally, host_in and host_out should not be set
p_args.host_in = BaseService.default_host
p_args.host_out = BaseService.default_host
op_flow._service_contexts.append(Flow._service2builder[v['service']](p_args))
op_flow._service_contexts.append((Flow._service2builder[v['service']], p_args))
op_flow._build_level = Flow.BuildLevel.RUNTIME
else:
raise NotImplementedError('backend=%s is not supported yet' % backend)
Expand All @@ -447,9 +447,9 @@ def __enter__(self):
'build the flow now via build() with default parameters' % self._build_level)
self.build(copy_flow=False)
self._service_stack = ExitStack()
for k, v in self._service_contexts:
self._service_stack.enter_context(k(v))

for k in self._service_contexts:
self._service_stack.enter_context(k)
self.logger.critical('flow is built and ready, current build level is %s' % self._build_level)
return self

Expand Down
3 changes: 1 addition & 2 deletions gnes/service/indexer.py
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading

import numpy as np

Expand All @@ -27,7 +26,7 @@ def post_init(self):
from ..indexer.base import BaseIndexer
# print('id: %s, before: %r' % (threading.get_ident(), self._model))
self._model = self.load_model(BaseIndexer)
self._tmp_a = threading.get_ident()
# self._tmp_a = threading.get_ident()
# print('id: %s, after: %r, self._tmp_a: %r' % (threading.get_ident(), self._model, self._tmp_a))

@handler.register(gnes_pb2.Request.IndexRequest)
Expand Down
20 changes: 10 additions & 10 deletions tests/test_gnes_flow.py
Expand Up @@ -120,14 +120,14 @@ def _test_index_flow(self):

flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path='yaml/flow-transformer.yml')
.add(gfs.Indexer, name='vec_idx', yaml_path='yaml/flow-vecindex.yml')
.add(gfs.Indexer, name='doc_idx', yaml_path='yaml/flow-dictindex.yml',
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'))
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'),
service_in='prep')
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, service_in=['vec_idx', 'doc_idx']))

with flow.build(backend='thread') as f:
with flow.build(backend='process') as f:
f.index(txt_file=self.test_file, batch_size=20)

for k in [self.indexer1_bin, self.indexer2_bin]:
Expand All @@ -136,13 +136,13 @@ def _test_index_flow(self):
def _test_query_flow(self):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path='yaml/flow-transformer.yml')
.add(gfs.Indexer, name='vec_idx', yaml_path='yaml/flow-vecindex.yml')
.add(gfs.Router, name='scorer', yaml_path='yaml/flow-score.yml')
.add(gfs.Indexer, name='doc_idx', yaml_path='yaml/flow-dictindex.yml'))
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'))
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))

with flow.build(backend='thread') as f:
f.query(txt_file=self.test_file)
with flow.build(backend='process') as f, open(self.test_file, encoding='utf8') as fp:
f.query(bytes_gen=[v.encode() for v in fp][:10])

@unittest.SkipTest
def test_index_query_flow(self):
Expand Down

0 comments on commit 044138d

Please sign in to comment.