Skip to content

Commit

Permalink
fix: allow to run async create_todump_list in CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
newgene committed May 18, 2023
1 parent 0157098 commit 0fd4fea
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
17 changes: 15 additions & 2 deletions biothings/hub/dataload/dumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2022,11 +2022,22 @@ async def create_todump_list(self, force=False, job_manager=None, **kwargs):
self.prepare_client()
self.prepare_remote_container()
# unprepare unpicklable objects so we can use multiprocessing
self.unprepare()
state = self.unprepare()
# set up job to generate remote file
pinfo = self.get_pinfo()
pinfo["step"] = "check"
job = await job_manager.defer_to_process(pinfo, partial(self.generate_remote_file))
if job_manager:
job = job_manager.defer_to_process(pinfo, partial(self.generate_remote_file))
else:
# otherwise, just run it with asyncio loop directly
async def run(fut):
res = self.generate_remote_file()
fut.set_result(res)

loop = asyncio.get_event_loop()
job = loop.create_future()
loop.create_task(run(job))

remote_error = False

def remote_done(f):
Expand All @@ -2039,6 +2050,8 @@ def remote_done(f):
if remote_error:
raise remote_error
# Need to reinit _state b/c of unprepare
self.prepare(state) # reverse of unpreare after async job is done
# TODO: test if the following two lines can be removed after we call self.prepare(state) above
if self.need_prepare():
self.prepare_client()
self.setup_log()
Expand Down
8 changes: 7 additions & 1 deletion biothings/management/dataplugin_localhub.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# flake8: noqa: B008
import asyncio
import inspect
import os
import pathlib
from shutil import copytree
Expand Down Expand Up @@ -121,7 +123,11 @@ def dump_and_upload(
uploader_classes = uploader_manager[plugin_name]
dumper = dumper_class()
dumper.prepare()
dumper.create_todump_list(force=True)
if inspect.iscoroutinefunction(dumper.create_todump_list):
loop = asyncio.get_event_loop()
loop.run_until_complete(dumper.create_todump_list(force=True))
else:
dumper.create_todump_list(force=True)
for item in dumper.to_dump:
dumper.download(item["remote"], item["local"])
dumper.steps = ["post"]
Expand Down

0 comments on commit 0fd4fea

Please sign in to comment.