Skip to content

Commit

Permalink
Merge pull request #286 from biothings/mongodb_no_cursor_timeout_fix
Browse files Browse the repository at this point in the history
Fix to Issue#282
  • Loading branch information
newgene committed May 4, 2023
2 parents 5cf7e45 + b49540d commit d648aed
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 173 deletions.
89 changes: 39 additions & 50 deletions biothings/hub/databuild/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,10 @@ def no_uploader_running(job_manager):
if "source" in job:
if job["source"] in offending_sources:
num_offenders += 1
self.logger.info("%s uploader running cannot build for now" % job["source"])
self.logger.info("%s uploader running cannot build for now", job["source"])
else:
num_offenders += 1
self.logger.warning(
"uploader with pinfo: %s running, no source info. " "cannot build for now" % job
)
self.logger.warning("uploader with pinfo: %s running, no source info. cannot build for now", job)
else:
pass # job is not an uploader
return num_offenders == 0
Expand Down Expand Up @@ -388,15 +386,15 @@ def clean_old_collections(self):
cols = sorted(cols, reverse=True)
to_drop = cols[self.keep_archive :]
for colname in to_drop:
self.logger.info("Cleaning old archive collection '%s'" % colname)
self.logger.info("Cleaning old archive collection '%s'", colname)
db[colname].drop()

def init_mapper(self, mapper_name):
if self.mappers[mapper_name].need_load():
if mapper_name is None:
self.logger.info("Initializing default mapper")
else:
self.logger.info("Initializing mapper name '%s'" % mapper_name)
self.logger.info("Initializing mapper name '%s'", mapper_name)
self.mappers[mapper_name].load()

def generate_document_query(self, src_name):
Expand All @@ -408,13 +406,11 @@ def get_root_document_sources(self):
none_root_srcs = [src.replace("!", "") for src in root_srcs if src.startswith("!")]
if none_root_srcs:
if len(none_root_srcs) != len(root_srcs):
raise BuilderException(
"If using '!' operator, all datasources must use it (cannot mix), got: %s" % (repr(root_srcs))
)
raise BuilderException("If using '!' operator, all datasources must use it (cannot mix), got: %s", repr(root_srcs))
# ok, grab sources for this build,
srcs = self.build_config.get("sources", [])
root_srcs = list(set(srcs).difference(set(none_root_srcs)))
# self.logger.info("'except root' sources %s resolves to root source = %s" % (repr(none_root_srcs),root_srcs))
# self.logger.info("'except root' sources %s resolves to root source = %s", repr(none_root_srcs), root_srcs)

# resolve possible regex based source name (split-collections sources)
root_srcs = self.resolve_sources(root_srcs)
Expand Down Expand Up @@ -484,7 +480,7 @@ def store_metadata(self, res, sources, job_manager):
# also search for _meta in build_config
bmeta = self.build_config.get("_meta")
if bmeta:
self.logger.info("Found _meta in build_config, merging: %s" % pformat(bmeta))
self.logger.info("Found _meta in build_config, merging: %s", pformat(bmeta))
self.custom_metadata.update(self.build_config.get("_meta", {}))

def update_src_meta_stats(self):
Expand Down Expand Up @@ -591,7 +587,7 @@ def merge(

self.custom_metadata = {}
self.clean_old_collections()
self.logger.info("Merging into target collection '%s'" % self.target_backend.target_name)
self.logger.info("Merging into target collection '%s'", self.target_backend.target_name)
strargs = "[sources=%s,target_name=%s]" % (sources, target_name)

try:
Expand Down Expand Up @@ -643,7 +639,7 @@ def stored(f):
"_meta": _meta,
},
)
self.logger.info("success %s" % strargs, extra={"notify": True})
self.logger.info("success %s", strargs, extra={"notify": True})
# set next step
build_conf = AutoBuildConfig(build["build_config"])
if build_conf.should_diff_new_build():
Expand All @@ -653,7 +649,7 @@ def stored(f):
except Exception as e:
strargs = "[sources=%s]" % sources
self.register_status("failed", job={"err": repr(e)})
self.logger.exception("failed %s: %s" % (strargs, e), extra={"notify": True})
self.logger.exception("failed %s: %s", strargs, e, extra={"notify": True})
raise

postjob.add_done_callback(stored)
Expand All @@ -665,7 +661,7 @@ def stored(f):
except (KeyboardInterrupt, Exception) as e:
self.logger.exception(e)
self.register_status("failed", job={"err": repr(e)})
self.logger.exception("failed %s: %s" % (strargs, e), extra={"notify": True})
self.logger.exception("failed %s: %s", strargs, e, extra={"notify": True})
raise

def get_mapper_for_source(self, src_name, init=True):
Expand All @@ -681,7 +677,7 @@ def get_mapper_for_source(self, src_name, init=True):
try:
init and self.init_mapper(mapper_name)
mapper = self.mappers[mapper_name]
self.logger.info("Found mapper '%s' for source '%s'" % (mapper, src_name))
self.logger.info("Found mapper '%s' for source '%s'", mapper, src_name)
return mapper
except KeyError:
raise BuilderException("Found mapper named '%s' but no mapper associated" % mapper_name)
Expand Down Expand Up @@ -710,15 +706,15 @@ async def merge_sources(self, source_names, steps=("merge", "post"), batch_size=
other_sources = list(set(source_names).difference(set(root_sources)))
# got root doc sources but not part of the merge ? that's weird...
if defined_root_sources and not root_sources:
self.logger.warning("Root document sources found (%s) but not part of the merge..." % defined_root_sources)
self.logger.warning("Root document sources found (%s) but not part of the merge...", defined_root_sources)

source_names = sorted(source_names)
root_sources = sorted(root_sources)
other_sources = sorted(other_sources)

self.logger.info("Sources to be merged: %s" % source_names)
self.logger.info("Root sources: %s" % root_sources)
self.logger.info("Other sources: %s" % other_sources)
self.logger.info("Sources to be merged: %s", source_names)
self.logger.info("Root sources: %s", root_sources)
self.logger.info("Other sources: %s", other_sources)

got_error = False

Expand All @@ -735,7 +731,7 @@ def merged(f, name, stats):
res = f.result()
stats.update(res)
except Exception as e:
self.logger.exception("Failed merging source '%s': %s" % (name, e))
self.logger.exception("Failed merging source '%s': %s", name, e)
nonlocal got_error
got_error = e

Expand All @@ -753,15 +749,15 @@ def merged(f, name, stats):
self.register_status(
"building", transient=True, init=True, job={"step": "merge-root", "sources": root_sources}
)
self.logger.info("Merging root document sources: %s" % root_sources)
self.logger.info("Merging root document sources: %s", root_sources)
await merge(root_sources)
self.register_status("success", job={"step": "merge-root", "sources": root_sources})

if other_sources:
self.register_status(
"building", transient=True, init=True, job={"step": "merge-others", "sources": other_sources}
)
self.logger.info("Merging other resources: %s" % other_sources)
self.logger.info("Merging other resources: %s", other_sources)
await merge(other_sources)
self.register_status("success", job={"step": "merge-others", "sources": other_sources})

Expand All @@ -784,10 +780,10 @@ def merged(f, name, stats):

def postmerged(f):
try:
self.logger.info("Post-merge completed [%s]" % f.result())
self.logger.info("Post-merge completed [%s]", f.result())
self.register_status("success", job={"step": "post-merge"})
except Exception as e:
self.logger.exception("Failed post-merging source: %s" % e)
self.logger.exception("Failed post-merging source: %s", e)
nonlocal got_error
got_error = e

Expand Down Expand Up @@ -822,9 +818,7 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
defined_root_sources = self.get_root_document_sources()
upsert = not defined_root_sources or src_name in defined_root_sources
if not upsert:
self.logger.debug(
"Documents from source '%s' will be stored only if a previous document exists with same _id" % src_name
)
self.logger.debug("Documents from source '%s' will be stored only if a previous document exists with same _id", src_name)
jobs = []
total = self.source_backend[src_name].count()
btotal = math.ceil(total / batch_size)
Expand All @@ -833,17 +827,14 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
got_error = False
# grab ids only, so we can get more, let's say 10 times more
id_batch_size = batch_size * 10

# FIXME id_provider initialized below will be overwritten by `if _query and ids is None:` code block
if ids:
self.logger.info(
"Merging '%s' specific list of _ids, create merger job with batch_size=%d" % (src_name, batch_size)
)
self.logger.info("Merging '%s' specific list of _ids, create merger job with batch_size=%d", src_name, batch_size)
id_provider = [ids]
else:
self.logger.info(
"Fetch _ids from '%s' with batch_size=%d, and create merger job with batch_size=%d"
% (src_name, id_batch_size, batch_size)
)
id_provider = id_feeder(self.source_backend[src_name], batch_size=id_batch_size)
self.logger.info("Fetch _ids from '%s' with batch_size=%d, and create merger job with batch_size=%d", src_name, id_batch_size, batch_size)
id_provider = id_feeder(self.source_backend[src_name], batch_size=id_batch_size, logger=self.logger)

if _query and ids is not None:
self.logger.info("Query/filter involved, but also specific list of _ids. Ignoring query and use _ids")
Expand All @@ -853,9 +844,7 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
# use doc_feeder but post-process doc to keep only the _id
id_provider = map(
lambda docs: [d["_id"] for d in docs],
doc_feeder(
self.source_backend[src_name], query=_query, step=batch_size, inbatch=True, fields={"_id": 1}
),
doc_feeder(self.source_backend[src_name], query=_query, step=batch_size, inbatch=True, fields={"_id": 1}, logger=self.logger)
)
else:
# when passing a list of _ids, IDs will be sent to the query, so we need to reduce the batch size
Expand All @@ -868,7 +857,7 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
src_master = self.source_backend.master
meta = src_master.find_one({"_id": src_name}) or {}
merger = meta.get("merger", "upsert")
self.logger.info("Documents from source '%s' will be merged using %s" % (src_name, merger))
self.logger.info("Documents from source '%s' will be merged using %s", src_name, merger)

doc_cleaner = self.document_cleaner(src_name)
for big_doc_ids in id_provider:
Expand All @@ -881,8 +870,8 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
pinfo["step"] = src_name
pinfo["description"] = "#%d/%d (%.1f%%)" % (bnum, btotal, (cnt / total * 100))
self.logger.info(
"Creating merger job #%d/%d, to process '%s' %d/%d (%.1f%%)"
% (bnum, btotal, src_name, cnt, total, (cnt / total * 100.0))
"Creating merger job #%d/%d, to process '%s' %d/%d (%.1f%%)",
bnum, btotal, src_name, cnt, total, (cnt / total * 100.0)
)
job = await job_manager.defer_to_process(
pinfo,
Expand All @@ -896,7 +885,7 @@ async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=
upsert,
merger,
bnum,
),
)
)

def batch_merged(f, batch_num):
Expand All @@ -912,7 +901,7 @@ def batch_merged(f, batch_num):
# raise error as soon as we know
if got_error:
raise got_error
self.logger.info("%d jobs created for merging step" % len(jobs))
self.logger.info("%d jobs created for merging step", len(jobs))
tasks = asyncio.gather(*jobs)

def done(f):
Expand Down Expand Up @@ -1035,13 +1024,13 @@ def merger_worker(col_name, dest_name, ids, mapper, cleaner, upsert, merger, bat
)
exc_fn = os.path.join(btconfig.LOG_FOLDER, "%s.exc.pick" % logger_name)
pickle.dump(e, open(exc_fn, "wb"))
logger.info("Exception was dumped in pickle file '%s'" % exc_fn)
logger.info("Exception was dumped in pickle file '%s'", exc_fn)
ids_fn = os.path.join(btconfig.LOG_FOLDER, "%s.ids.pick" % logger_name)
pickle.dump(ids, open(ids_fn, "wb"))
logger.info("IDs dumped in pickle file '%s'" % ids_fn)
logger.info("IDs dumped in pickle file '%s'", ids_fn)
dat_fn = os.path.join(btconfig.LOG_FOLDER, "%s.docs.pick" % logger_name)
pickle.dump(docs, open(dat_fn, "wb"))
logger.info("Data (batch of docs) dumped in pickle file '%s'" % dat_fn)
logger.info("Data (batch of docs) dumped in pickle file '%s'", dat_fn)
raise


Expand Down Expand Up @@ -1199,7 +1188,7 @@ def delete_merge(self, merge_name):
if meta:
db.remove({"_id": merge_name})
else:
self.logger.warning("No metadata found for merged collection '%s'" % merge_name)
self.logger.warning("No metadata found for merged collection '%s'", merge_name)
self.delete_merged_data(merge_name)

def archive_merge(self, merge_name):
Expand All @@ -1210,7 +1199,7 @@ def archive_merge(self, merge_name):
meta["archived"] = datetime.now()
db.replace_one({"_id": merge_name}, meta)
else:
self.logger.warning("No metadata found for merged collection '%s'" % merge_name)
self.logger.warning("No metadata found for merged collection '%s'", merge_name)
self.delete_merged_data(merge_name)

def get_query_for_list_merge(self, only_archived, status=None):
Expand Down Expand Up @@ -1413,7 +1402,7 @@ def whatsnewcomparedto(build_name, old=None):
},
}
except Exception as e:
self.logger.warning("Can't check what's new for source '%s': %s" % (src_name, e))
self.logger.warning("Can't check what's new for source '%s': %s", src_name, e)
return {build_name: new}

if old is None and build_name is None:
Expand Down

0 comments on commit d648aed

Please sign in to comment.