Skip to content

Commit

Permalink
Merge branch '0.10.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry committed Aug 17, 2021
2 parents d847fcf + e431d00 commit a08fa95
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
2 changes: 2 additions & 0 deletions biothings/hub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,8 @@ def configure_extra_commands(self):
tracked=False)
self.extra_commands["validate_mapping"] = CommandDefinition(
command=self.managers["index_manager"].validate_mapping)
self.extra_commands["update_metadata"] = CommandDefinition(
command=self.managers["index_manager"].update_metadata)
if self.managers.get("snapshot_manager"):
self.extra_commands["ssm"] = CommandDefinition(
command=self.managers["snapshot_manager"], tracked=False)
Expand Down
51 changes: 32 additions & 19 deletions biothings/hub/dataindex/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def __init__(self, build_doc, indexer_env, index_name):

# -----------dest-----------

# https://elasticsearch-py.readthedocs.io/en/v7.12.0/api.html#elasticsearch.Elasticsearch
# https://elasticsearch-py.readthedocs.io/en/v7.12.0/helpers.html#elasticsearch.helpers.bulk
self.es_client_args = indexer_env.get("args", {})
self.es_blkidx_args = indexer_env.get("bulk", {})
self.es_index_name = index_name or _build_doc.target_name
Expand Down Expand Up @@ -323,15 +325,15 @@ def index(self, job_manager, **kwargs):

assert job_manager
assert all(isinstance(_id, str) for _id in ids) if ids else True
assert 50 <= batch_size <= 10000, '"batch_size" out-of-range'
assert 500 <= batch_size <= 10000, '"batch_size" out-of-range'
assert isinstance(steps, (list, tuple)), 'bad argument "steps"'
assert isinstance(mode, str), 'bad argument "mode"'

# the batch size here controls only the task partitioning
# it does not affect how the elasticsearch python client
# makes batch requests. a number larger than 10000 may exceed
# es result window size and doc_feeder maximum fetch size.
# a number smaller than 50 is too small that the documents
# a number smaller than chunk_size is too small that the docs
# can be sent to elasticsearch within one request, making it
# inefficient, amplifying the scheduling overhead.

Expand Down Expand Up @@ -572,6 +574,10 @@ def __init__(self, *args, **kwargs):
"retry_on_timeout": True,
"max_retries": 10,
},
"bulk": {
"chunk_size": 50
"raise_on_exception": False
},
"concurrency": 3
},
"index": [
Expand Down Expand Up @@ -610,7 +616,7 @@ def configure(self, conf):
for name, env in conf["env"].items():
self.register[name] = env.get("indexer", {})
self.register[name].setdefault("args", {})
self.register[name]["args"].setdefault("hosts", env.get("host"))
self.register[name]["args"]["hosts"] = env.get("host")
self.register[name]["name"] = name
self.logger.info(self.register)

Expand Down Expand Up @@ -700,7 +706,6 @@ def index(self,

return job

# TODO PENDING VERIFICATION
def update_metadata(self,
indexer_env,
index_name,
Expand All @@ -710,21 +715,29 @@ def update_metadata(self,
Update _meta for index_name, based on build_name (_meta directly
taken from the src_build document) or _meta
"""
idxkwargs = self[indexer_env]
# 1st pass we get the doc_type (don't want to ask that on the signature...)
indexer = create_backend((idxkwargs["es_host"], index_name, None)).target_esidxer
m = indexer._es.indices.get_mapping(index_name)
assert len(m[index_name]["mappings"]) == 1, "Found more than one doc_type: " + \
"%s" % m[index_name]["mappings"].keys()
doc_type = list(m[index_name]["mappings"].keys())[0]
# 2nd pass to re-create correct indexer
indexer = create_backend((idxkwargs["es_host"], index_name, doc_type)).target_esidxer
if build_name:
build = get_src_build().find_one({"_id": build_name})
assert build, "No such build named '%s'" % build_name
_meta = build.get("_meta")
assert _meta is not None, "No _meta found"
return indexer.update_mapping_meta({"_meta": _meta})
async def _update_meta(_meta):
env = self.register[indexer_env]
client = AsyncElasticsearch(**env["args"])

doc_type = None
if int((await client.info())['version']['number'].split('.')[0]) < 7:
mappings = client.indices.get_mapping(index_name)
mappings = mappings[index_name]["mappings"]
doc_type = next(iter(mappings.keys()))

if not _meta and build_name:
build = get_src_build().find_one({"_id": build_name})
_meta = (build or {}).get("_meta")

return await client.indices.put_mapping(
body=dict(_meta=_meta),
index=index_name,
doc_type=doc_type
)

job = asyncio.ensure_future(_update_meta(_meta))
job.add_done_callback(self.logger.debug)
return job

def index_info(self, remote=False):
""" Show index manager config with enhanced index information. """
Expand Down
3 changes: 1 addition & 2 deletions biothings/hub/dataindex/snapshooter.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ def __init__(self, env, index, snapshot):
)
self.indexer = ESIndexer(
index=index,
doc_type=env.build_doc['index'][index]['doc_type'],
es_host=env.build_doc['index'][index]['host'],
check_index=index is not None
)
Expand Down Expand Up @@ -367,7 +366,7 @@ class SnapshotFSEnv(SnapshotEnv):
def __init__(self, job_manager, env_config, build_doc):
super().__init__(job_manager, env_config, build_doc)
assert env_config['repository']['type'] == 'fs'
raise NotImplementedError
raise NotImplementedError()

class SnapshotS3Env(SnapshotEnv):
"""
Expand Down

0 comments on commit a08fa95

Please sign in to comment.