Skip to content

Commit

Permalink
aloops
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Aug 22, 2019
1 parent 8d074c3 commit 4285ce7
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 98 deletions.
26 changes: 15 additions & 11 deletions _test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
# task_supervisor.default_executor_loop = loop
task_supervisor.start()

al = task_supervisor.create_aloop('myworkers', default=True, daemon=True, start=False)
task_supervisor.start_aloop('myworkers')

f = TaskCollection()
# from multiprocessing import Pool

Expand All @@ -49,9 +52,9 @@


@background_worker(interval=0.5)
def myworker(*args, **kwargs):
async def myworker(*args, **kwargs):
global c
# print('worker is running')
print('worker is running {}'.format(threading.current_thread()))
# print(args)
# print(kwargs)
c += 1
Expand Down Expand Up @@ -101,10 +104,11 @@ def myeventworker(**kwargs):
# time.sleep(1)


#@background_task
def test(*args, **kwargs):
# @background_task
async def test(*args, **kwargs):
print('job ttt: test', args, kwargs)
time.sleep(1)
await asyncio.sleep(1)
return '12345'


@f(priority=atasker.TASK_CRITICAL)
Expand Down Expand Up @@ -144,8 +148,8 @@ def someworker(**kwargs):
# time.sleep(1)
# task_supervisor.stop(wait=2)
# exit()
myworker.start(123, x=2)
myqueuedworker.start()
myworker.start(123, x=2, _loop=al)
# myqueuedworker.start()
# myeventworker.start()
# someworker.start()
# w2=atasker.W2() #interval=0.1)
Expand All @@ -155,11 +159,11 @@ def someworker(**kwargs):
# w2.put('xxx')
# w2.put('xxx')
# w2.put('xxx')
myqueuedworker.put('task1')
# myqueuedworker.put('task1')
# myevent.set()
# time.sleep(2)
myqueuedworker.put('task2')

# myqueuedworker.put('task2')
print(al.background_task(test()))
# myqueuedworker.put('task3')
# myqueuedworker.put('task4')
# for i in range(100):
Expand Down Expand Up @@ -220,7 +224,7 @@ def cb(result):
# someworker.stop(wait=True)
# print('worker stopped')
for x in range(0,30):
print(task_supervisor.get_stats())
print(task_supervisor.get_info().aloops)
time.sleep(0.1)
# task_supervisor.block()
# loop.run_forever()
Expand Down
2 changes: 1 addition & 1 deletion atasker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.23"
__version__ = "0.3.0"

from atasker.supervisor import TaskSupervisor
from atasker.supervisor import TASK_LOW
Expand Down
2 changes: 1 addition & 1 deletion atasker/co.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.23"
__version__ = "0.3.0"

from atasker import task_supervisor

Expand Down
2 changes: 1 addition & 1 deletion atasker/f.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.23"
__version__ = "0.3.0"

import traceback
import threading
Expand Down
161 changes: 155 additions & 6 deletions atasker/supervisor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.23"
__version__ = "0.3.0"

import threading
import multiprocessing
Expand Down Expand Up @@ -47,6 +47,99 @@ def __init__(self, tt, task_id, priority):
self.time_started = None


class ALoop:

def __init__(self, name=None):
self.name = name if name else str(uuid.uuid4())
self._active = False
self.daemon = False
self.poll_delay = default_poll_delay
self.thread = None
self._started = threading.Event()

def background_task(self, coro):
if not self.is_active():
raise RuntimeError('aloop {} is not active'.format(self.name))
asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return True

def run(self, coro):
if not self.is_active():
raise RuntimeError('aloop {} is not active'.format(self.name))
future = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return future.result()

def start(self):
if not self._active:
self._started.clear()
t = threading.Thread(name='supervisor_aloop_{}'.format(self.name),
target=self._start_loop)
t.setDaemon(self.daemon)
t.start()
self._started.wait()

def get_loop(self):
return None if not self._active else self.loop

def _start_loop(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self.loop.run_until_complete(self._loop())
except CancelledError:
logger.warning('aloop {} had active tasks'.format(self.name))

async def _loop(self):
self._stop_event = asyncio.Event()
self.thread = threading.current_thread()
self._active = True
logger.info('aloop {} started'.format(self.name))
self._started.set()
await self._stop_event.wait()
logger.info('aloop {} finished'.format(self.name))

def _cancel_all_tasks(self):
for task in asyncio.Task.all_tasks(loop=self.loop):
task.cancel()

async def _set_stop_event(self):
self._stop_event.set()

def stop(self, wait=True, cancel_tasks=False):
if self._active:
if cancel_tasks:
self._cancel_all_tasks()
logger.debug('aloop {} remaining tasks canceled'.format(
self.name))
if isinstance(wait, bool):
to_wait = None
else:
to_wait = time.time() + wait
self._active = False
asyncio.run_coroutine_threadsafe(self._set_stop_event(),
loop=self.loop)
while True:
if to_wait and time.time() > to_wait:
logger.warning(
'aloop {} wait timeout, canceling all tasks'.format(
self.name))
self._cancel_all_tasks()
break
else:
can_break = True
for t in asyncio.Task.all_tasks(self.loop):
if not t.cancelled() and not t.done():
can_break = False
break
if can_break: break
time.sleep(self.poll_delay)
if wait and self.thread:
self.thread.join()

def is_active(self):
return self._active


class TaskSupervisor:

timeout_message = 'Task {} started in {:.3f} seconds. ' + \
Expand Down Expand Up @@ -75,9 +168,10 @@ def __init__(self):
self._thread_queue = {TASK_LOW: [], TASK_NORMAL: [], TASK_HIGH: []}
self._mp_queue = {TASK_LOW: [], TASK_NORMAL: [], TASK_HIGH: []}
self._task_info = {}
self.default_async_executor_loop = None
self.default_aloop = None
self.mp_pool = None
self.daemon = False
self.aloops = {}

self.set_thread_pool(pool_size=thread_pool_default_size,
reserve_normal=default_reserve_normal,
Expand Down Expand Up @@ -186,6 +280,52 @@ def _get_active_count(self, tt):
elif tt == TT_MP:
return len(self._active_mps)

def create_aloop(self, name, daemon=False, start=True, default=False):
with self._lock:
if name in self.aloops:
logger.error('loop {} already exists'.format(name))
return False
l = ALoop(name)
l.daemon = daemon
l.poll_delay = self.poll_delay
with self._lock:
self.aloops[name] = l
if start:
l.start()
if default:
self.set_default_aloop(l)
return l

def set_default_aloop(self, aloop):
self.default_aloop = aloop

def get_aloop(self, name):
with self._lock:
return self.aloops.get(name)

def start_aloop(self, name):
with self._lock:
if name not in self.aloops:
logger.error('loop {} not found'.format(name))
return False
else:
self.aloops[name].start()
return True

def stop_aloop(self, name, wait=True, cancel_tasks=False, _lock=True):
if _lock:
self._lock.acquire()
try:
if name not in self.aloops:
logger.error('loop {} not found'.format(name))
return False
else:
self.aloops[name].stop(wait=wait, cancel_tasks=cancel_tasks)
return True
finally:
if _lock:
self._lock.release()

def get_info(self, tt=None):

class SupervisorInfo:
Expand All @@ -208,6 +348,7 @@ class SupervisorInfo:
result.mp_tasks = list(self._active_mps)
result.mp_tasks_count = len(result.mp_tasks)
result.mp_queue = self._mp_queue.copy()
result.aloops = self.aloops.copy()
result.task_info = {}
for n, v in self._task_info.items():
if tt is None or v.tt == tt:
Expand Down Expand Up @@ -313,12 +454,13 @@ def mark_task_completed(self, task=None, task_id=None, tt=None):
del self._task_info[task]
return True

def start(self):
def start(self, daemon=None):
self._active = True
self._main_loop_active = True
t = threading.Thread(name='supervisor_event_loop',
target=self._start_event_loop,
daemon=self.daemon)
t = threading.Thread(
name='supervisor_event_loop',
target=self._start_event_loop,
daemon=daemon if daemon is not None else self.daemon)
t.start()
self._started.wait()

Expand Down Expand Up @@ -395,6 +537,13 @@ def stop(self, wait=True, stop_schedulers=True, cancel_tasks=False):
if stop_schedulers:
self._stop_schedulers(True if wait else False)
logger.debug('schedulers stopped')
with self._lock:
for i, l in self.aloops.items():
self.stop_aloop(i,
wait=wait,
cancel_tasks=cancel_tasks,
_lock=False)
logger.debug('async loops stopped')
if cancel_tasks:
self._cancel_all_tasks()
logger.debug('remaining tasks canceled')
Expand Down
2 changes: 1 addition & 1 deletion atasker/threads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.23"
__version__ = "0.3.0"

import threading
import time
Expand Down
11 changes: 7 additions & 4 deletions atasker/workers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.23"
__version__ = "0.3.0"

import threading
import logging
Expand All @@ -14,7 +14,7 @@
from atasker import task_supervisor

from atasker import TASK_NORMAL
from atasker.supervisor import TT_COROUTINE, TT_THREAD, TT_MP
from atasker.supervisor import TT_COROUTINE, TT_THREAD, TT_MP, ALoop

logger = logging.getLogger('atasker/workers')

Expand Down Expand Up @@ -237,15 +237,18 @@ class BackgroundAsyncWorker(BackgroundWorker):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor_loop = kwargs.get(
'loop', self.supervisor.default_async_executor_loop)
self.executor_loop = kwargs.get('loop')

def _register(self):
self.supervisor.register_scheduler(self)
self._started.wait()

def _start(self, *args, **kwargs):
self.executor_loop = kwargs.get('_loop', self.executor_loop)
if not self.executor_loop and self.supervisor.default_aloop:
self.executor_loop = self.supervisor.default_aloop
if isinstance(self.executor_loop, ALoop):
self.executor_loop = self.executor_loop.get_loop()
self._register()

def _stop(self, *args, **kwargs):
Expand Down

0 comments on commit 4285ce7

Please sign in to comment.