Skip to content

Commit

Permalink
async workers run body in same loop
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Dec 6, 2019
1 parent a6b7a90 commit a882a09
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
16 changes: 8 additions & 8 deletions atasker/supervisor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.5"
__author__ = 'Altertech Group, https://www.altertech.com/'
__copyright__ = 'Copyright (C) 2018-2019 Altertech Group'
__license__ = 'Apache License 2.0'
__version__ = "0.4.6"

import threading
import multiprocessing
Expand Down Expand Up @@ -571,12 +571,12 @@ def block(self):

async def _launch_scheduler_loop(self, scheduler):
try:
t = self.event_loop.create_task(scheduler.loop())
t = scheduler.worker_loop.create_task(scheduler.loop())
with self._lock:
self._schedulers[scheduler] = (scheduler, t)
if hasattr(scheduler, 'extra_loops'):
for l in scheduler.extra_loops:
self.event_loop.create_task(getattr(scheduler, l)())
scheduler.worker_loop.create_task(getattr(scheduler, l)())
await t
except CancelledError:
pass
Expand Down Expand Up @@ -606,8 +606,8 @@ async def _main_loop(self):
if r == RQ_SCHEDULER:
if debug:
logger.debug('new scheduler {}'.format(res))
self.event_loop.create_task(
self._launch_scheduler_loop(res))
asyncio.run_coroutine_threadsafe(
self._launch_scheduler_loop(res), loop=res.worker_loop)
finally:
self._Q.task_done()
for i, t in self._processors_stopped.items():
Expand Down
46 changes: 23 additions & 23 deletions atasker/workers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__author__ = "Altertech Group, https://www.altertech.com/"
__copyright__ = "Copyright (C) 2018-2019 Altertech Group"
__license__ = "Apache License 2.0"
__version__ = "0.4.5"
__author__ = 'Altertech Group, https://www.altertech.com/'
__copyright__ = 'Copyright (C) 2018-2019 Altertech Group'
__license__ = 'Apache License 2.0'
__version__ = "0.4.6"

import threading
import logging
Expand Down Expand Up @@ -244,11 +244,16 @@ def __init__(self, *args, **kwargs):
self.aloop = None

def _register(self):
if not self.executor_loop and asyncio.iscoroutinefunction(self.run):
logger.warning(
('{}: no executor loop defined, ' +
'will start executor in supervisor event loop').format(
self.name))
if asyncio.iscoroutinefunction(self.run):
if not self.executor_loop:
logger.warning(
('{}: no executor loop defined, ' +
'will start executor in supervisor event loop').format(
self.name))
self.executor_loop = self.supervisor.event_loop
self.worker_loop = self.executor_loop
else:
self.worker_loop = self.supervisor.event_loop
self.supervisor.register_scheduler(self)
self._started.wait()

Expand Down Expand Up @@ -310,7 +315,7 @@ async def _run_coroutine(self, *args, **kwargs):

def _send_executor_stop_event(self):
asyncio.run_coroutine_threadsafe(self._set_stop_event(),
loop=self.supervisor.event_loop)
loop=self.worker_loop)

async def _set_stop_event(self):
self._executor_stop_event.set()
Expand All @@ -319,17 +324,12 @@ async def launch_executor(self, *args, **kwargs):
self.last_executed = time.perf_counter()
if asyncio.iscoroutinefunction(self.run):
self._current_executor = self.run
if self.executor_loop:
asyncio.run_coroutine_threadsafe(self._run_coroutine(*args),
loop=self.executor_loop)
return True
else:
try:
result = await self.run(*(args + self._task_args),
**self._task_kwargs)
except Exception as e:
self.error(e)
result = None
try:
result = await self.run(*(args + self._task_args),
**self._task_kwargs)
except Exception as e:
self.error(e)
result = None
self._current_executor = None
return result is not False and self._active
elif self._run_in_mp:
Expand Down Expand Up @@ -364,7 +364,7 @@ def __init__(self, *args, **kwargs):

def put(self, t):
asyncio.run_coroutine_threadsafe(self._Q.put(t),
loop=self.supervisor.event_loop)
loop=self.worker_loop)

def send_stop_events(self):
try:
Expand Down Expand Up @@ -405,7 +405,7 @@ def trigger(self, force=False):
if self._current_executor and not force:
return
asyncio.run_coroutine_threadsafe(self._set_event(),
loop=self.supervisor.event_loop)
loop=self.worker_loop)

async def _set_event(self):
self._E.set()
Expand Down

0 comments on commit a882a09

Please sign in to comment.