Skip to content

Commit

Permalink
feat: added new CLIJobManager
Browse files Browse the repository at this point in the history
  • Loading branch information
newgene committed May 23, 2023
1 parent fc34b7f commit 961b2db
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions biothings/utils/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import biothings.hub # noqa
from biothings import config
from biothings.utils.common import get_random_string, sizeof_fmt, timesofar
from biothings.utils.common import get_loop, get_random_string, sizeof_fmt, timesofar
from biothings.utils.hub_db import get_src_conn

logger = config.logger
Expand Down Expand Up @@ -143,6 +143,13 @@ def find_process(pid):
return p


def norm(value, maxlen):
"""just a helper to clean/prepare job's values printing"""
if len(value) > maxlen:
value = "...%s" % value[-maxlen + 3 :]
return value


class UnknownResource(Exception):
pass

Expand Down Expand Up @@ -1288,8 +1295,28 @@ def job_info(self):
}


# just a helper to clean/prepare job's values printing
def norm(value, maxlen):
if len(value) > maxlen:
value = "...%s" % value[-maxlen + 3:]
return value
class CLIJobManager:
"""This is the minimal JobManager used in CLI mode to run async jobs, with the compatible methods as JobManager.
It won't use a dedicated ProcessPool or ThreadPool, and will just run async job directly in the asyncio loop
(which runs jobs in threads by default).
"""

def __init__(self, loop=None):
self.loop = loop or get_loop()

async def defer_to_process(self, pinfo=None, func=None, *args, **kwargs):
"""keep the same signature as JobManager.defer_to_process. The passed pinfo is ignored.
defer_to_process will still run func in the thread using defer_to_thread method.
"""
return self.defer_to_thread(pinfo, func, *args, **kwargs)

async def defer_to_thread(self, pinfo=None, func=None, *args):
"""keep the same signature as JobManager.defer_to_thread. The passed pinfo is ignored"""

async def run(fut, func):
res = func()
fut.set_result(res)

fut = self.loop.create_future()
self.loop.create_task(run(fut, func))
return fut

0 comments on commit 961b2db

Please sign in to comment.