Skip to content

Commit

Permalink
Merge pull request #302 from biothings/merge_source
Browse files Browse the repository at this point in the history
Implementation of new merge_source
  • Loading branch information
jal347 committed Oct 24, 2023
2 parents 665c6ee + 5342a9a commit 069adf6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 15 deletions.
63 changes: 51 additions & 12 deletions biothings/hub/databuild/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ def no_uploader_running(job_manager):
self.logger.info("%s uploader running cannot build for now", job["source"])
else:
num_offenders += 1
self.logger.warning("uploader with pinfo: %s running, no source info. cannot build for now", job)
self.logger.warning(
"uploader with pinfo: %s running, no source info. cannot build for now", job
)
else:
pass # job is not an uploader
return num_offenders == 0
Expand Down Expand Up @@ -406,7 +408,9 @@ def get_root_document_sources(self):
none_root_srcs = [src.replace("!", "") for src in root_srcs if src.startswith("!")]
if none_root_srcs:
if len(none_root_srcs) != len(root_srcs):
raise BuilderException("If using '!' operator, all datasources must use it (cannot mix), got: %s", repr(root_srcs))
raise BuilderException(
"If using '!' operator, all datasources must use it (cannot mix), got: %s", repr(root_srcs)
)
# ok, grab sources for this build,
srcs = self.build_config.get("sources", [])
root_srcs = list(set(srcs).difference(set(none_root_srcs)))
Expand Down Expand Up @@ -684,7 +688,7 @@ def get_mapper_for_source(self, src_name, init=True):

def merge_order(self, other_sources):
"""Optionally we can override this method to customize the order in which sources should be merged.
Default as sorted by name.
Default as sorted by name.
"""
return sorted(other_sources)

Expand Down Expand Up @@ -824,7 +828,9 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
defined_root_sources = self.get_root_document_sources()
upsert = not defined_root_sources or src_name in defined_root_sources
if not upsert:
self.logger.debug("Documents from source '%s' will be stored only if a previous document exists with same _id", src_name)
self.logger.debug(
"Documents from source '%s' will be stored only if a previous document exists with same _id", src_name
)
jobs = []
total = self.source_backend[src_name].count()
btotal = math.ceil(total / batch_size)
Expand All @@ -836,10 +842,17 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=

# FIXME id_provider initialized below will be overwritten by `if _query and ids is None:` code block
if ids:
self.logger.info("Merging '%s' specific list of _ids, create merger job with batch_size=%d", src_name, batch_size)
self.logger.info(
"Merging '%s' specific list of _ids, create merger job with batch_size=%d", src_name, batch_size
)
id_provider = [ids]
else:
self.logger.info("Fetch _ids from '%s' with batch_size=%d, and create merger job with batch_size=%d", src_name, id_batch_size, batch_size)
self.logger.info(
"Fetch _ids from '%s' with batch_size=%d, and create merger job with batch_size=%d",
src_name,
id_batch_size,
batch_size,
)
id_provider = id_feeder(self.source_backend[src_name], batch_size=id_batch_size, logger=self.logger)

if _query and ids is not None:
Expand All @@ -850,7 +863,14 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
# use doc_feeder but post-process doc to keep only the _id
id_provider = map(
lambda docs: [d["_id"] for d in docs],
doc_feeder(self.source_backend[src_name], query=_query, step=batch_size, inbatch=True, fields={"_id": 1}, logger=self.logger)
doc_feeder(
self.source_backend[src_name],
query=_query,
step=batch_size,
inbatch=True,
fields={"_id": 1},
logger=self.logger,
),
)
else:
# when passing a list of _ids, IDs will be sent to the query, so we need to reduce the batch size
Expand All @@ -864,6 +884,13 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
meta = src_master.find_one({"_id": src_name}) or {}
merger = meta.get("merger", "upsert")
self.logger.info("Documents from source '%s' will be merged using %s", src_name, merger)
merger_kwargs = meta.get("merger_kwargs")
if merger_kwargs:
self.logger.info(
"Documents from source '%s' will be using these extra parameters during the merge %s",
src_name,
merger_kwargs,
)

doc_cleaner = self.document_cleaner(src_name)
for big_doc_ids in id_provider:
Expand All @@ -877,7 +904,12 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
pinfo["description"] = "#%d/%d (%.1f%%)" % (bnum, btotal, (cnt / total * 100))
self.logger.info(
"Creating merger job #%d/%d, to process '%s' %d/%d (%.1f%%)",
bnum, btotal, src_name, cnt, total, (cnt / total * 100.0)
bnum,
btotal,
src_name,
cnt,
total,
(cnt / total * 100.0),
)
job = await job_manager.defer_to_process(
pinfo,
Expand All @@ -891,7 +923,8 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
upsert,
merger,
bnum,
)
merger_kwargs,
),
)

def batch_merged(f, batch_num):
Expand Down Expand Up @@ -991,7 +1024,7 @@ def fix_batch_duplicates(docs, fail_if_struct_is_different=False):
return list(dids.values())


def merger_worker(col_name, dest_name, ids, mapper, cleaner, upsert, merger, batch_num):
def merger_worker(col_name, dest_name, ids, mapper, cleaner, upsert, merger, batch_num, merger_kwargs=None):
try:
src = mongo.get_src_db()
tgt = mongo.get_target_db()
Expand All @@ -1014,9 +1047,15 @@ def merger_worker(col_name, dest_name, ids, mapper, cleaner, upsert, merger, bat
if merger == "merge_struct":
stored_docs = dest.mget_from_ids([d["_id"] for d in docs])
ddocs = dict([(d["_id"], d) for d in docs])
for d in stored_docs:
ddocs[d["_id"]] = merge_struct(d, ddocs[d["_id"]])
if merger_kwargs:
for d in stored_docs:
# Merge the old document in mongodb into the new document
ddocs[d["_id"]] = merge_struct(ddocs[d["_id"]], d, **merger_kwargs)
else:
for d in stored_docs:
ddocs[d["_id"]] = merge_struct(d, ddocs[d["_id"]])
docs = list(ddocs.values())

cnt = dest.update(docs, upsert=upsert)
return cnt
except Exception as e:
Expand Down
17 changes: 14 additions & 3 deletions biothings/utils/dataload.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def rec_handler(infile, block_end="\n", skip=0, include_block_end=False, as_list
# List Utility functions
# ===============================================================================


# if dict value is a list of length 1, unlist
def unlist(d):
for key, val in d.items():
Expand Down Expand Up @@ -935,7 +936,12 @@ def dict_to_list(gene_d):
return doc_li


def merge_struct(v1, v2, aslistofdict=None):
def merge_struct(v1, v2, aslistofdict=None, include=None, exclude=None):
"""merge two structures, v1 and v2, into one.
:param aslistofdict: a string indicating the key name that should be treated as a list of dict
:param include: when given a list of strings, only merge these keys (optional)
:param exclude: when given a list of strings, exclude these keys from merging (optional)
"""
if isinstance(v1, list):
if isinstance(v2, list):
v1 = v1 + [x for x in v2 if x not in v1]
Expand All @@ -949,8 +955,13 @@ def merge_struct(v1, v2, aslistofdict=None):

elif isinstance(v1, dict):
assert isinstance(v2, dict), "v2 %s not a dict (v1: %s)" % (v2, v1)
for k in list(v1.keys()):
if k in v2:
to_merge = list(v1.keys())
if include:
to_merge = include
for k in to_merge:
if exclude and k in exclude:
continue
elif k in v2:
if aslistofdict == k:
v1elem = v1[k]
v2elem = v2[k]
Expand Down

0 comments on commit 069adf6

Please sign in to comment.