Skip to content

Commit

Permalink
background_task returns task object
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Sep 4, 2019
1 parent ecdb99b commit 22c665c
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 31 deletions.
3 changes: 2 additions & 1 deletion atasker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

from atasker.supervisor import TaskSupervisor
from atasker.supervisor import TASK_LOW
Expand All @@ -26,6 +26,7 @@
from atasker.threads import LocalProxy
from atasker.threads import Locker
from atasker.threads import background_task
from atasker.threads import wait_completed

from atasker.co import co_mp_apply

Expand Down
2 changes: 1 addition & 1 deletion atasker/co.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

from atasker import task_supervisor

Expand Down
2 changes: 1 addition & 1 deletion atasker/f.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

import traceback
import threading
Expand Down
32 changes: 27 additions & 5 deletions atasker/supervisor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

import threading
import multiprocessing
Expand All @@ -28,6 +28,7 @@
TASK_STATUS_QUEUED = 0
TASK_STATUS_DELAYED = 2
TASK_STATUS_STARTED = 100
TASK_STATUS_COMPLETED = 200
TASK_STATUS_CANCELED = -1

logger = logging.getLogger('atasker')
Expand Down Expand Up @@ -62,6 +63,8 @@ def __init__(self, tt, task_id, priority, task, delay=None, worker=None):
self.status = TASK_STATUS_QUEUED
self.delay = delay
self.worker = worker
self.started = threading.Event()
self.completed = threading.Event()

def __cmp__(self, other):
return cmp(self.priority, other.priority) if \
Expand All @@ -75,6 +78,20 @@ def __gt__(self, other):
return (self.priority > other.priority) if \
other is not None else True

def is_started(self):
return self.started.is_set()

def is_completed(self):
return self.completed.is_set()

def mark_started(self):
self.status = TASK_STATUS_STARTED
self.started.set()

def mark_completed(self):
self.status = TASK_STATUS_COMPLETED
self.completed.set()


class ALoop:

Expand Down Expand Up @@ -272,7 +289,7 @@ def put_task(self,
else:
q = self._Qmp[priority]
asyncio.run_coroutine_threadsafe(q.put(ti), loop=self.event_loop)
return task_id
return ti

async def _task_processor(self, queue, priority, tt):
logger.debug('task processor {}/{} started'.format(tt, priority))
Expand Down Expand Up @@ -472,11 +489,11 @@ async def _start_task(self, task):
with self._lock:
task.time_started = time.time()
if not task.delay:
task.status = TASK_STATUS_STARTED
task.mark_started()
if task.delay:
task.status = TASK_STATUS_DELAYED
await asyncio.sleep(task.delay)
task.status = TASK_STATUS_STARTED
task.mark_started()
if task.tt == TT_THREAD:
task.task.start()
elif task.tt == TT_MP:
Expand All @@ -496,6 +513,9 @@ async def _start_task(self, task):

def mark_task_completed(self, task=None, task_id=None, tt=None):
with self._lock:
if isinstance(task, Task):
task_id = task.id
tt = task.tt
if tt == TT_THREAD or (not task and not task_id) or isinstance(
task, threading.Thread):
if task_id and task_id in self._active_threads:
Expand All @@ -511,16 +531,18 @@ def mark_task_completed(self, task=None, task_id=None, tt=None):
format(task._atask_id, task,
len(self._active_threads),
self.thread_pool_size))
self._tasks[task._atask_id].mark_completed()
del self._tasks[task._atask_id]
else:
if task is None: task = task_id
if task is None or isinstance(task, Task): task = task_id
if task in self._active_mps:
self._active_mps.remove(task)
if debug:
logger.debug(
'removed task {}: {} mp pool size: {} / {}'.format(
task, task, len(self._active_mps),
self.mp_pool_size))
self._tasks[task].mark_completed()
del self._tasks[task]
return True

Expand Down
25 changes: 19 additions & 6 deletions atasker/threads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

import threading
import time
Expand Down Expand Up @@ -176,18 +176,18 @@ def start_task(*args, **kw):
kwargs=kw)
if kwargs.get('daemon'): t.setDaemon(True)
return supervisor.put_task(t, kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'))
kwargs.get('delay'))
return t
elif tt == TT_MP:
task_id = str(uuid.uuid4())
task = (f, args, kw,
gen_mp_callback(task_id, kwargs.get('callback'),
supervisor))
return supervisor.put_task(task,
kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'),
tt=TT_MP,
task_id=task_id)
kwargs.get('priority', TASK_NORMAL),
kwargs.get('delay'),
tt=TT_MP,
task_id=task_id)

return start_task

Expand All @@ -197,3 +197,16 @@ def _background_task_thread_runner(f, supervisor, *args, **kwargs):
f(*args, **kwargs)
finally:
supervisor.mark_task_completed()


def wait_completed(tasks, timeout=None):
t_to = (time.time() + timeout) if timeout else None
for t in tasks:
if timeout:
t_wait = t_to - time.time()
if t_wait <= 0: return False
else:
t_wait = None
if not t.completed.wait(timeout=t_wait):
return False
return True
10 changes: 5 additions & 5 deletions atasker/workers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

import threading
import logging
Expand Down Expand Up @@ -168,7 +168,7 @@ def _abort(self):
self.stop(wait=False)

def _cb_mp(self, result):
self.supervisor.mark_task_completed(task_id=self._current_executor,
self.supervisor.mark_task_completed(task=self._current_executor,
tt=TT_MP)
if self.process_result(result) is False:
self._abort()
Expand Down Expand Up @@ -331,14 +331,14 @@ async def launch_executor(self, *args, **kwargs):
self._current_executor = None
return result is not False and self._active
elif self._run_in_mp:
task_id = self.supervisor.put_task(
task = self.supervisor.put_task(
(self.run, args + self._task_args, self._task_kwargs,
self._cb_mp),
self.priority,
tt=TT_MP,
worker=self)
self._current_executor = task_id
return task_id is not None and self._active
self._current_executor = task
return task is not None and self._active
else:
t = threading.Thread(target=self._run,
name=self.name + '_run',
Expand Down
4 changes: 2 additions & 2 deletions doc/supervisor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ launch:
.. code:: python
t = threading.Thread(target=myfunc)
task_id = task_supervisor.put_task(t, priority=TASK_NORMAL, delay=None)
task = task_supervisor.put_task(t, priority=TASK_NORMAL, delay=None)
If *delay* is specified, the thread is started after the corresponding delay
(seconds).
Expand All @@ -276,7 +276,7 @@ After the function thread is finished, it should notify task supervisor:

.. code:: python
task_supervisor.mark_task_completed(task_id=task_id)
task_supervisor.mark_task_completed(task=task) # or task_id = task.id
If no *task_id* specified, current thread ID is being used:

Expand Down
25 changes: 20 additions & 5 deletions doc/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Defining task with annotation
def mytask():
print('I am working in the background!')
task_id = mytask()
task = mytask()
It's not required to notify task supervisor about task completion,
*background_task* will do this automatically as soon as task function is
Expand All @@ -34,7 +34,7 @@ To start task function without annotation, you must manually decorate it:
def mytask():
print('I am working in the background!')
task_id = background_task(mytask, name='mytask', priority=TASK_LOW)()
task = background_task(mytask, name='mytask', priority=TASK_LOW)()
.. automodule:: atasker
.. autofunction:: background_task
Expand All @@ -52,7 +52,7 @@ To put task into :ref:`multiprocessing pool<create_mp_pool>`, append parameter
from atasker import TASK_HIGH, TT_MP
task_id = background_task(
task = background_task(
tests.mp.test, priority=TASK_HIGH, tt=TT_MP)(1, 2, 3, x=2)
Optional parameter *callback* can be used to specify function which handles
Expand Down Expand Up @@ -81,13 +81,14 @@ You may put task from your coroutine, without using callback, example:
Task info
=========

Task id can later be used to obtain task info:
If you saved only task.id but not the whole object, you may later obtain Task
object again:

.. code:: python
from atasker import task_supervisor
task_info = task_supervisor.get_task_info(task_id)
task_info = task_supervisor.get_task_info(task.id)
Task info object fields:

Expand All @@ -101,3 +102,17 @@ Task info object fields:

If task info is *None*, consider the task is completed and supervisor destroyed
information about it.

Wait completed
==============

You may wait until pack of tasks is completed with the following method:

.. code:: python
from atasker import wait_completed
wait_completed([task1, task2, task3 .... ], timeout=None)
The method return *True* if all tasks are finished, or *False* if timeout was
specified but some tasks are not finished.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.3.27"
__version__ = "0.4.0"

import setuptools

Expand Down
2 changes: 1 addition & 1 deletion tests/mp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

def test(*args, **kwargs):
print('test mp method {} {}'.format(args, kwargs))
Expand Down
2 changes: 1 addition & 1 deletion tests/mpworker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

from atasker import BackgroundIntervalWorker

Expand Down
28 changes: 26 additions & 2 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.3.27"
__version__ = "0.4.0"

from pathlib import Path

Expand All @@ -23,6 +23,9 @@
background_task_thread_critical=None,
background_task_mp=None,
background_worker=0,
wait1=None,
wait2=None,
wait3=None,
background_interval_worker=0,
background_interval_worker_async_ex=0,
background_queue_worker=0,
Expand All @@ -40,7 +43,7 @@ def wait():


from atasker import task_supervisor, background_task, background_worker
from atasker import TT_MP, TASK_CRITICAL
from atasker import TT_MP, TASK_CRITICAL, wait_completed

from atasker import FunctionCollection, TaskCollection, g

Expand Down Expand Up @@ -260,6 +263,27 @@ async def t2(x):
self.assertEqual(result.test_aloop_background_task, 1)
self.assertEqual(a.run(t2(2)), 4)

def test_wait_completed(self):

@background_task
def t1():
time.sleep(0.1)
result.wait1 = 1

@background_task
def t2():
time.sleep(0.2)
result.wait2 = 2

@background_task
def t3():
time.sleep(0.3)
result.wait3 = 3

tasks = [t1(), t2(), t3()]
wait_completed(tasks)
self.assertEqual(result.wait1 + result.wait2 + result.wait3, 6)


task_supervisor.set_thread_pool(pool_size=20, reserve_normal=5, reserve_high=5)
task_supervisor.set_mp_pool(pool_size=20, reserve_normal=5, reserve_high=5)
Expand Down

0 comments on commit 22c665c

Please sign in to comment.