Skip to content

Commit

Permalink
task.result, async tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Sep 6, 2019
1 parent 5c98d04 commit 4b83a09
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 21 deletions.
2 changes: 1 addition & 1 deletion atasker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

from atasker.supervisor import TaskSupervisor
from atasker.supervisor import TASK_LOW
Expand Down
2 changes: 1 addition & 1 deletion atasker/co.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

from atasker import task_supervisor

Expand Down
2 changes: 1 addition & 1 deletion atasker/f.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

import traceback
import threading
Expand Down
15 changes: 12 additions & 3 deletions atasker/supervisor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

import threading
import multiprocessing
Expand Down Expand Up @@ -65,6 +65,7 @@ def __init__(self, tt, task_id, priority, task, delay=None, worker=None):
self.worker = worker
self.started = threading.Event()
self.completed = threading.Event()
self.result = None

def __cmp__(self, other):
return cmp(self.priority, other.priority) if \
Expand Down Expand Up @@ -103,11 +104,19 @@ def __init__(self, name=None):
self.thread = None
self._started = threading.Event()

async def _coro_task(self, task):
task.time_queued = time.time()
task.time_started = task.time_queued
task.mark_started()
task.result = await task.task
task.mark_completed()

def background_task(self, coro):
if not self.is_active():
raise RuntimeError('aloop {} is not active'.format(self.name))
asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return True
task = Task(TT_COROUTINE, str(uuid.uuid4()), TASK_NORMAL, coro)
asyncio.run_coroutine_threadsafe(self._coro_task(task), loop=self.loop)
return task

def run(self, coro):
if not self.is_active():
Expand Down
17 changes: 11 additions & 6 deletions atasker/threads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

import threading
import time
Expand Down Expand Up @@ -149,6 +149,8 @@ def gen_mp_callback(task_id, callback, supervisor):
def cbfunc(*args, **kwargs):
if callable(callback):
callback(*args, **kwargs)
if args:
supervisor.get_task(task_id).result = args[0]
supervisor.mark_task_completed(task_id=task_id)

return cbfunc
Expand All @@ -169,14 +171,17 @@ def start_task(*args, **kw):
return asyncio.run_coroutine_threadsafe(f(*args, **kw),
loop=loop)
elif tt == TT_THREAD:
task_id = str(uuid.uuid4())
t = threading.Thread(group=kwargs.get('group'),
target=_background_task_thread_runner,
name=kwargs.get('name'),
args=(f, supervisor) + args,
args=(f, supervisor, task_id) + args,
kwargs=kw)
if kwargs.get('daemon'): t.setDaemon(True)
return supervisor.put_task(t, kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'))
return supervisor.put_task(t,
kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'),
task_id=task_id)
return t
elif tt == TT_MP:
task_id = str(uuid.uuid4())
Expand All @@ -192,9 +197,9 @@ def start_task(*args, **kw):
return start_task


def _background_task_thread_runner(f, supervisor, *args, **kwargs):
def _background_task_thread_runner(f, supervisor, task_id, *args, **kwargs):
try:
f(*args, **kwargs)
supervisor.get_task(task_id).result = f(*args, **kwargs)
finally:
supervisor.mark_task_completed()

Expand Down
2 changes: 1 addition & 1 deletion atasker/workers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

import threading
import logging
Expand Down
4 changes: 2 additions & 2 deletions doc/supervisor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ aloops have 2 methods to execute own coroutines:

.. code:: python
# put coroutine to loop and forget
aloop.background_task(coro(args))
# put coroutine to loop
task = aloop.background_task(coro(args))
# blocking wait for result from coroutine
result = aloop.run(coro(args))
Expand Down
1 change: 1 addition & 0 deletions doc/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Task info object fields:
* **priority** task priority
* **time_queued** time when task was queued
* **time_started** time when task was started
* **result** task result
* **status** task status
**0** queued
**2** delayed
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.4.0"
__version__ = "0.4.1"

import setuptools

Expand Down
5 changes: 4 additions & 1 deletion tests/mp.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

def test(*args, **kwargs):
print('test mp method {} {}'.format(args, kwargs))
return 999

def test_mp(a, x, **kwargs):
return a + x

def test2(*args, **kwargs):
return 999
2 changes: 1 addition & 1 deletion tests/mpworker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

from atasker import BackgroundIntervalWorker

Expand Down
35 changes: 32 additions & 3 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.0"
__version__ = "0.4.1"

from pathlib import Path

Expand Down Expand Up @@ -249,6 +249,35 @@ async def t(**kwargs):
t.stop()
self.assertEqual(result.test_aloop, 'supervisor_aloop_test1')

def test_result_async(self):

def t1():
return 555

aloop = task_supervisor.create_aloop('test3')
t = background_task(t1, loop='test3')()
wait_completed([t])
self.assertEqual(t.result, 555)

def test_result_thread(self):

def t1():
return 777

t = background_task(t1)()
wait_completed([t])
self.assertEqual(t.result, 777)

def test_result_mp(self):

from mp import test2

t = background_task(test2, tt=TT_MP)()
wait_completed([t])
self.assertEqual(t.result, 999)



def test_aloop_run(self):

async def t1():
Expand All @@ -258,8 +287,8 @@ async def t2(x):
return x * 2

a = task_supervisor.create_aloop('test2')
background_task(t1, loop='test2')()
wait()
t = background_task(t1, loop='test2')()
wait_completed([t])
self.assertEqual(result.test_aloop_background_task, 1)
self.assertEqual(a.run(t2(2)), 4)

Expand Down

0 comments on commit 4b83a09

Please sign in to comment.