Skip to content

Commit

Permalink
custom supervisor for bg tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Jul 20, 2019
1 parent c161a8e commit fc43f23
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
12 changes: 9 additions & 3 deletions atasker/co.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
import asyncio


async def co_mp_apply(f, args=(), kwargs={}, priority=TASK_NORMAL, delay=None):
async def co_mp_apply(f,
args=(),
kwargs={},
priority=TASK_NORMAL,
delay=None,
supervisor=None):

class CO:

async def run(self, *args, **kwargs):
self._event = asyncio.Event()
task = (self.task_id, self.func, args, kwargs, self.callback)
return task_supervisor.put_task(
return self.supervisor.put_task(
task, self.priority, self.delay, tt=TT_MP)

async def _set_event(self):
self._event.set()

def callback(self, result):
task_supervisor.mark_task_completed(self.task_id)
self.supervisor.mark_task_completed(self.task_id)
self._result = result
asyncio.run_coroutine_threadsafe(self._set_event(), loop=self._loop)

Expand All @@ -33,6 +38,7 @@ async def get_result(self):
co.task_id = str(uuid.uuid4())
co.priority = priority
co.delay = delay
co.supervisor = supervisor if supervisor else task_supervisor
co.func = f
co._loop = asyncio.get_event_loop()
if not await co.run(args, kwargs):
Expand Down
20 changes: 11 additions & 9 deletions atasker/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ def clear(self, attr):

def background_task(f, *args, **kwargs):

def gen_mp_callback(task_id, callback):
def gen_mp_callback(task_id, callback, supervisor):

def cbfunc(*args, **kwargs):
task_supervisor.mark_task_completed(task_id)
supervisor.mark_task_completed(task_id)
if callable(callback):
callback(*args, **kwargs)

Expand All @@ -44,22 +44,24 @@ def cbfunc(*args, **kwargs):
@wraps(f)
def start_task(*args, **kw):
tt = kwargs.get('tt', TT_THREAD)
supervisor = kwargs.get('supervisor', task_supervisor)
if tt == TT_THREAD:
t = threading.Thread(
group=kwargs.get('group'),
target=_background_task_thread_runner,
name=kwargs.get('name'),
args=(f,) + args,
args=(f, supervisor) + args,
kwargs=kw)
if kwargs.get('daemon'): t.setDaemon(True)
task_supervisor.put_task(t, kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'))
supervisor.put_task(t, kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'))
return t
elif tt == TT_MP:
task_id = str(uuid.uuid4())
task = (task_id, f, args, kw,
gen_mp_callback(task_id, kwargs.get('callback')))
task_supervisor.put_task(
gen_mp_callback(task_id, kwargs.get('callback'),
supervisor))
supervisor.put_task(
task,
kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'),
Expand All @@ -68,8 +70,8 @@ def start_task(*args, **kw):
return start_task


def _background_task_thread_runner(f, *args, **kwargs):
def _background_task_thread_runner(f, supervisor, *args, **kwargs):
try:
f(*args, **kwargs)
finally:
task_supervisor.mark_task_completed()
supervisor.mark_task_completed()

0 comments on commit fc43f23

Please sign in to comment.