Skip to content

Commit

Permalink
Merge branch '0.10.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry committed Aug 17, 2021
2 parents b7fde0f + 6e086ed commit cc3b507
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
8 changes: 5 additions & 3 deletions biothings/hub/dataindex/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def __init__(self, build_doc, indexer_env, index_name):
# -----------dest-----------

self.es_client_args = indexer_env.get("args", {})
self.es_blkidx_args = indexer_env.get("bulk", {})
self.es_index_name = index_name or _build_doc.target_name
self.es_index_settings = IndexSettings(deepcopy(DEFAULT_INDEX_SETTINGS))
self.es_index_mappings = IndexMappings(deepcopy(DEFAULT_INDEX_MAPPINGS))
Expand All @@ -266,7 +267,7 @@ def __init__(self, build_doc, indexer_env, index_name):

self.env_name = indexer_env.get("name")
self.conf_name = _build_doc.build_config.get("name")
self.target_name = _build_doc.target_name # name of the build
self.target_name = _build_doc.target_name # name of the build
self.logger, self.logfile = get_logger('index_%s' % self.es_index_name)

self.pinfo = ProcessInfo(self, indexer_env.get("concurrency", 3))
Expand Down Expand Up @@ -456,9 +457,10 @@ def batch_finished(future):
job = yield from job_manager.defer_to_process(
pinfo, dispatch,
self.mongo_client_args,
(self.mongo_database_name,
self.mongo_collection_name),
self.mongo_database_name,
self.mongo_collection_name,
self.es_client_args,
self.es_blkidx_args,
self.es_index_name,
ids, mode, batch_num
)
Expand Down
27 changes: 15 additions & 12 deletions biothings/hub/dataindex/indexer_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class ESIndex():
# previously using biothings.utils.es.ESIndexer
# recently reimplemented here for better clarity.

def __init__(self, client, index_name):
def __init__(self, client, index_name, **bulk_index_args):
self.client = client
self.index_name = index_name # the index must already exist
self.index_name = index_name # MUST exist
self.bulk_index_args = bulk_index_args

@property
@functools.lru_cache()
Expand Down Expand Up @@ -83,7 +84,7 @@ def mexists(self, ids):
id_set = {doc['_id'] for doc in res['hits']['hits']}
return [_IDExists(_id, _id in id_set) for _id in ids]

def mindex(self, docs, *args, **kwargs):
def mindex(self, docs):
""" Index and return the number of docs indexed. """

def _action(doc):
Expand All @@ -95,28 +96,30 @@ def _action(doc):
_doc.update(doc) # with _id
return _doc

return helpers.bulk(self.client, map(_action, docs), *args, **kwargs)[0]
return helpers.bulk(
self.client, map(_action, docs),
**self.bulk_index_args)[0]

# Data Collection Client

def _get_es_client(index_name, **es_client_args):
return ESIndex(Elasticsearch(**es_client_args), index_name)
def _get_es_client(es_client_args, es_blk_args, es_idx_name):
return ESIndex(Elasticsearch(**es_client_args), es_idx_name, **es_blk_args)

def _get_mg_client(*dbcol_name, **mongo_client_args):
return MongoClient(**mongo_client_args)[dbcol_name[0]][dbcol_name[1]]
def _get_mg_client(mg_client_args, mg_dbs_name, mg_col_name):
return MongoClient(**mg_client_args)[mg_dbs_name][mg_col_name]

# --------------
# Entry Point
# --------------

def dispatch(
mg_client_args, mg_col_name,
es_client_args, es_idx_name,
mg_client_args, mg_dbs_name, mg_col_name,
es_client_args, es_blk_args, es_idx_name,
ids, mode, name
):
return IndexingTask(
partial(_get_es_client, es_idx_name, **es_client_args),
partial(_get_mg_client, *mg_col_name, **mg_client_args),
partial(_get_es_client, es_client_args, es_blk_args, es_idx_name),
partial(_get_mg_client, mg_client_args, mg_dbs_name, mg_col_name),
ids, mode, f"index_{es_idx_name}", name
).dispatch()

Expand Down

0 comments on commit cc3b507

Please sign in to comment.