Skip to content

Commit

Permalink
task_done in q
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Jul 20, 2019
1 parent 5978a33 commit 9ee5afe
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 26 deletions.
33 changes: 18 additions & 15 deletions atasker/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,21 +260,24 @@ async def _main_loop(self):
logger.info('supervisor event loop started')
while self._main_loop_active:
data = await self._Q.get()
if data is None: break
r, res, t_put = data
if r == RQ_SCHEDULER:
logger.debug('Supervisor: new scheduler {}'.format(res))
scheduler_task = self.event_loop.create_task(res.loop())
if hasattr(res, 'extra_loops'):
for l in res.extra_loops:
self.event_loop.create_task(getattr(res, l)())
with self._lock:
self._schedulers[res] = (res, scheduler_task)
elif r == RQ_TASK:
logger.debug('Supervisor: new task {}'.format(res))
tt, target, priority, delay = res
self.event_loop.create_task(
self._start_task(tt, target, priority, t_put, delay))
try:
if data is None: break
r, res, t_put = data
if r == RQ_SCHEDULER:
logger.debug('Supervisor: new scheduler {}'.format(res))
scheduler_task = self.event_loop.create_task(res.loop())
if hasattr(res, 'extra_loops'):
for l in res.extra_loops:
self.event_loop.create_task(getattr(res, l)())
with self._lock:
self._schedulers[res] = (res, scheduler_task)
elif r == RQ_TASK:
logger.debug('Supervisor: new task {}'.format(res))
tt, target, priority, delay = res
self.event_loop.create_task(
self._start_task(tt, target, priority, t_put, delay))
finally:
self._Q.task_done()
logger.info('supervisor event loop finished')

def _start_event_loop(self):
Expand Down
25 changes: 14 additions & 11 deletions atasker/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ async def launch_executor(self, *args, **kwargs):
task_id = uuid.uuid4()
self._current_executor = task_id
return self.supervisor.put_task(
(task_id, self.run, args + self._task_args,
self._task_kwargs, self._cb_mp),
(task_id, self.run, args + self._task_args, self._task_kwargs,
self._cb_mp),
self.priority,
tt=TT_MP) and self._active
else:
Expand Down Expand Up @@ -332,16 +332,19 @@ async def loop(self, *args, **kwargs):
self.mark_started()
while self._active:
task = await self._Q.get()
if self._current_executor:
await self._executor_stop_event.wait()
self._executor_stop_event.clear()
if self._active and task is not None:
if not await self.launch_executor(task):
try:
if self._current_executor:
await self._executor_stop_event.wait()
self._executor_stop_event.clear()
if self._active and task is not None:
if not await self.launch_executor(task):
break
else:
break
else:
break
if not self._suppress_sleep:
await asyncio.sleep(self.supervisor.poll_delay)
if not self._suppress_sleep:
await asyncio.sleep(self.supervisor.poll_delay)
finally:
self._Q.task_done()
self.mark_stopped()

def get_queue_obj(self):
Expand Down

0 comments on commit 9ee5afe

Please sign in to comment.