Skip to content

Commit

Permalink
refactor: use new run_sync_or_async_job helper
Browse files Browse the repository at this point in the history
  • Loading branch information
newgene committed May 23, 2023
1 parent 06cf071 commit 3f2b99d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
10 changes: 2 additions & 8 deletions biothings/management/dataplugin_localhub.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# flake8: noqa: B008
import asyncio
import inspect
import os
import pathlib
from shutil import copytree
Expand Down Expand Up @@ -123,11 +121,7 @@ def dump_and_upload(
uploader_classes = uploader_manager[plugin_name]
dumper = dumper_class()
dumper.prepare()
if inspect.iscoroutinefunction(dumper.create_todump_list):
loop = asyncio.get_event_loop()
loop.run_until_complete(dumper.create_todump_list(force=True))
else:
dumper.create_todump_list(force=True)
utils.run_sync_or_async_job(dumper.create_todump_list, force=True)
for item in dumper.to_dump:
dumper.download(item["remote"], item["local"])
dumper.steps = ["post"]
Expand Down Expand Up @@ -319,7 +313,7 @@ def listing(
dumper_class = dumper_manager[plugin_name][0]
dumper = dumper_class()
dumper.prepare()
dumper.create_todump_list(force=True)
utils.run_sync_or_async_job(dumper.create_todump_list, force=True)
if dump:
utils.show_dumped_files(dumper.new_data_folder, plugin_name)
return
Expand Down
15 changes: 15 additions & 0 deletions biothings/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ def get_logger(name=None):
return logger


def run_sync_or_async_job(func, *args, **kwargs):
"""When func is defined as either normal or async function/method, we will call this function properly and return the results.
For an async function/method, we will use CLIJobManager to run it.
"""
if asyncio.iscoroutinefunction(func):
from biothings.utils.manager import CLIJobManager

job_manager = CLIJobManager()
kwargs["job_manager"] = job_manager
return job_manager.loop.run_until_complete(func(*args, **kwargs))
else:
# otherwise just run it as normal
return func(*args, **kwargs)


def create_data_plugin_template(name, multi_uploaders=False, parallelizer=False, logger=None):
"""Create a new data plugin from the template"""
logger = logger or get_logger()
Expand Down

0 comments on commit 3f2b99d

Please sign in to comment.