Skip to content

Commit

Permalink
alpha 0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Jul 14, 2019
1 parent 0c441ee commit fdb48d0
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 42 deletions.
76 changes: 39 additions & 37 deletions atasker/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,32 @@ def __init__(self,
reserve_high=5,
poll_delay=0.1):

self.poll_delay = poll_delay
self.timeout_warning = 5
self.timeout_warning_func = None
self.timeout_critical = 10
self.timeout_critical_func = None

self._active_threads = set()
self._active = False
self._main_loop_active = False
self._started = False
self.lock = threading.Lock()
self.poll_delay = poll_delay
self.max_threads = {}
self.schedulers = {}
self.queue = {TASK_LOW: [], TASK_NORMAL: [], TASK_HIGH: []}
self._lock = threading.Lock()
self._max_threads = {}
self._schedulers = {}
self._queue = {TASK_LOW: [], TASK_NORMAL: [], TASK_HIGH: []}

self.set_config(
pool_size=pool_size,
reserve_normal=reserve_normal,
reserve_high=reserve_high)
self.timeout_warning = 5
self.timeout_warning_func = None
self.timeout_critical = 10
self.timeout_critical_func = None

def _higher_queues_busy(self, task_priority):
if task_priority == TASK_NORMAL:
return len(self.queue[TASK_HIGH]) > 0
return len(self._queue[TASK_HIGH]) > 0
elif task_priority == TASK_LOW:
return len(self.queue[TASK_NORMAL]) > 0 or \
len(self.queue[TASK_HIGH]) > 0
return len(self._queue[TASK_NORMAL]) > 0 or \
len(self._queue[TASK_HIGH]) > 0
else:
return False

Expand All @@ -76,50 +78,50 @@ def register_scheduler(self, scheduler):
return True

def register_sync_scheduler(self, scheduler):
with self.lock:
self.schedulers[scheduler] = None
with self._lock:
self._schedulers[scheduler] = None
return True

def unregister_sync_scheduler(self, scheduler):
with self.lock:
with self._lock:
try:
del self.schedulers[scheduler]
del self._schedulers[scheduler]
return True
except:
return False

def unregister_scheduler(self, scheduler):
with self.lock:
if scheduler not in self.schedulers:
with self._lock:
if scheduler not in self._schedulers:
return False
else:
self.schedulers[scheduler][1].cancel()
del self.schedulers[scheduler]
self._schedulers[scheduler][1].cancel()
del self._schedulers[scheduler]
return True

async def start_task(self, thread, thread_priority, time_put, delay=None):
if not self._active: return
self.lock.acquire()
self._lock.acquire()
try:
if thread_priority != TASK_CRITICAL and self.pool_size:
self.queue[thread_priority].append(thread)
self._queue[thread_priority].append(thread)
while self._active and \
(len(self._active_threads) >= \
self.max_threads[thread_priority] \
or self.queue[thread_priority][0] != thread or \
self._max_threads[thread_priority] \
or self._queue[thread_priority][0] != thread or \
self._higher_queues_busy(thread_priority)):
self.lock.release()
self._lock.release()
await asyncio.sleep(self.poll_delay)
self.lock.acquire()
self.queue[thread_priority].pop(0)
self._lock.acquire()
self._queue[thread_priority].pop(0)
if not self._active:
return
self._active_threads.add(thread)
logger.debug('new task {} pool size: {} / {}'.format(
thread, len(self._active_threads), self.pool_size))
finally:
try:
self.lock.release()
self._lock.release()
except:
pass
if delay:
Expand All @@ -139,7 +141,7 @@ async def start_task(self, thread, thread_priority, time_put, delay=None):
thread.time_started = time_started

def mark_task_completed(self, task=None):
with self.lock:
with self._lock:
if task is None:
task = threading.current_thread()
if task in self._active_threads:
Expand All @@ -152,9 +154,9 @@ def set_config(self, **kwargs):
for p in ['pool_size', 'reserve_normal', 'reserve_high']:
if p in kwargs:
setattr(self, p, int(kwargs[p]))
self.max_threads[TASK_LOW] = self.pool_size
self.max_threads[TASK_NORMAL] = self.pool_size + self.reserve_normal
self.max_threads[TASK_HIGH] = self.pool_size + \
self._max_threads[TASK_LOW] = self.pool_size
self._max_threads[TASK_NORMAL] = self.pool_size + self.reserve_normal
self._max_threads[TASK_HIGH] = self.pool_size + \
self.reserve_normal + self.reserve_high

def start(self):
Expand All @@ -180,8 +182,8 @@ async def _main_loop(self):
if r == RQ_SCHEDULER:
logger.debug('Supervisor: new scheduler {}'.format(res))
scheduler_task = self.event_loop.create_task(res.loop())
with self.lock:
self.schedulers[res] = (res, scheduler_task)
with self._lock:
self._schedulers[res] = (res, scheduler_task)
elif r == RQ_TASK:
logger.debug('Supervisor: new task {}'.format(res))
target, priority, delay = res
Expand All @@ -203,13 +205,13 @@ def _start_event_loop(self):
logger.warning('supervisor loop had active tasks')

def _cancel_all_tasks(self):
with self.lock:
with self._lock:
for task in asyncio.Task.all_tasks(loop=self.event_loop):
task.cancel()

def _stop_schedulers(self, wait=True):
with self.lock:
schedulers = self.schedulers.copy()
with self._lock:
schedulers = self._schedulers.copy()
for s in schedulers:
s.stop(wait=wait)

Expand Down
6 changes: 3 additions & 3 deletions atasker/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def after_stop(self):

# -----------------------

def __init__(self, name=None, func=None, **kwargs):
if func: self.run = func
def __init__(self, worker_name=None, executor_func=None, **kwargs):
if executor_func: self.run = executor_func
self._executor_thread = None
self._active = False
self._started = False
Expand All @@ -51,7 +51,7 @@ def __init__(self, name=None, func=None, **kwargs):
self.on_error_kwargs = kwargs.get('on_error_kwargs', {})
self.supervisor = kwargs.get('supervisor', task_supervisor)
self.poll_delay = kwargs.get('poll_delay', self.supervisor.poll_delay)
self.set_name(name)
self.set_name(worker_name)
self.thread_args = ()
self.thread_kw = {}
self.start_stop_lock = threading.Lock()
Expand Down
1 change: 1 addition & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

supervisor
tasks
workers
39 changes: 39 additions & 0 deletions doc/supervisor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Task supervisor
Task supervisor is a component which manages task thread pool and run task
:doc:`schedulers (workers)<workers>`.

.. contents::

Usage
=====

Expand Down Expand Up @@ -64,6 +66,20 @@ busy task pool is, and the pool is being extended for them with no limits.

pool size can be changed while task supervisor is running.

Poll delay
==========

Poll delay is a delay (in seconds), which is used by task queue manager, in
:doc:`workers<workers>` and some other methods like *start/stop*.

Lower poll delay = higher CPU usage, higher poll delay = faster reaction time.

Default poll delay is 0.1 second. Can be changed with:

.. code:: python
task_supervisor.poll_delay = 0.01 # set poll delay to 10ms
Blocking
========

Expand All @@ -76,6 +92,27 @@ thread, you may use method
which will just sleep until task supervisor is active.

Timeouts
========

Task supervisor can log timeouts (when task isn't launched within a specified
number of seconds) and run timeout handler functions:

.. code:: python
def warning(t):
# t = task thread object
print('Task thread {} is not launched yet'.format(t))
def critical(t):
print('All is worse than expected')
task_supervisor.timeout_warning = 5
task_supervisor.timeout_warning_func = warn
task_supervisor.timeout_critical = 10
task_supervisor.timeout_critical_func = critical
Stopping task supervisor
========================

Expand Down Expand Up @@ -156,3 +193,5 @@ Own task scheduler (worker) can be registered in task supervisor with:
Where *scheduler* = scheduler object, which should implement at least *stop*
(regular) and *loop* (async) methods.


0 comments on commit fdb48d0

Please sign in to comment.