Skip to content

Commit

Permalink
start/stop fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Aug 10, 2019
1 parent b530d4a commit 391df51
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 20 deletions.
2 changes: 1 addition & 1 deletion atasker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

from atasker.supervisor import TaskSupervisor
from atasker.supervisor import TASK_LOW
Expand Down
2 changes: 1 addition & 1 deletion atasker/co.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

from atasker import task_supervisor

Expand Down
2 changes: 1 addition & 1 deletion atasker/f.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

import traceback
import threading
Expand Down
15 changes: 7 additions & 8 deletions atasker/supervisor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

import threading
import multiprocessing
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self):
self._active_mps = set()
self._active = False
self._main_loop_active = False
self._started = False
self._started = threading.Event()
self._lock = threading.Lock()
self._max_threads = {}
self._max_mps = {}
Expand Down Expand Up @@ -107,7 +107,7 @@ def _higher_queues_busy(self, tt, task_priority):
return False

def put_task(self, task, priority=TASK_NORMAL, delay=None, tt=TT_THREAD):
if not self._started:
if not self._started.is_set():
return False
asyncio.run_coroutine_threadsafe(
self._Q.put((RQ_TASK, (tt, task, priority, delay), time.time())),
Expand All @@ -122,7 +122,7 @@ def create_mp_pool(self, *args, **kwargs):
processes=multiprocessing.cpu_count())

def register_scheduler(self, scheduler):
if not self._started:
if not self._started.is_set():
return False
asyncio.run_coroutine_threadsafe(
self._Q.put((RQ_SCHEDULER, scheduler, time.time())),
Expand Down Expand Up @@ -248,15 +248,15 @@ def start(self):
t = threading.Thread(
name='supervisor_event_loop', target=self._start_event_loop)
t.start()
while not self._started:
time.sleep(self.poll_delay)
self._started.wait()

def block(self):
while self._active:
time.sleep(0.1)

async def _main_loop(self):
self._Q = asyncio.queues.Queue()
self._started.set()
logger.info('supervisor event loop started')
while self._main_loop_active:
data = await self._Q.get()
Expand Down Expand Up @@ -292,7 +292,6 @@ def _start_event_loop(self):
self.thread_pool_size, self.thread_reserve_normal,
self.thread_reserve_high, mp))
try:
self._started = True
self.event_loop.run_until_complete(self._main_loop())
except CancelledError:
logger.warning('supervisor loop had active tasks')
Expand Down Expand Up @@ -348,5 +347,5 @@ def stop(self, wait=True, stop_schedulers=True, cancel_tasks=False):
break
if can_break: break
time.sleep(self.poll_delay)
self._started = False
self._started.clear()
logger.info('supervisor stopped')
2 changes: 1 addition & 1 deletion atasker/threads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

import threading
import time
Expand Down
8 changes: 4 additions & 4 deletions atasker/workers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

import threading
import logging
Expand Down Expand Up @@ -379,8 +379,8 @@ def get_queue_obj(self):

class BackgroundEventWorker(BackgroundAsyncWorker):

def trigger(self):
if self._current_executor:
def trigger(self, force=False):
if self._current_executor and not force:
return
asyncio.run_coroutine_threadsafe(
self._set_event(), loop=self.supervisor.event_loop)
Expand All @@ -405,7 +405,7 @@ async def loop(self, *args, **kwargs):

def send_stop_events(self, *args, **kwargs):
try:
self.trigger()
self.trigger(force=True)
except:
pass

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.2.11"
__version__ = "0.2.12"

import setuptools

Expand Down
2 changes: 1 addition & 1 deletion tests/mp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

def test(*args, **kwargs):
print('test mp method {} {}'.format(args, kwargs))
Expand Down
2 changes: 1 addition & 1 deletion tests/mpworker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

from atasker import BackgroundIntervalWorker

Expand Down
2 changes: 1 addition & 1 deletion tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.2.11"
__version__ = "0.2.12"

from pathlib import Path

Expand Down

0 comments on commit 391df51

Please sign in to comment.