Skip to content

Commit

Permalink
Merge pull request #159 from andyguenin/periodic_exception
Browse files Browse the repository at this point in the history
Propogate exceptions that are called in periodic
  • Loading branch information
timkpaine committed Feb 19, 2021
2 parents 0890ffb + 339c619 commit 167b37f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
7 changes: 5 additions & 2 deletions aat/engine/dispatch/periodic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from asyncio import Future
from datetime import datetime
from typing import Awaitable, Callable, List, Optional

Expand Down Expand Up @@ -47,10 +48,12 @@ def expires(self, timestamp: datetime) -> bool:
return False
return should_expire(self._last, timestamp, self.second, self.minute, self.hour)

async def execute(self, timestamp: datetime) -> None:
async def execute(self, timestamp: datetime) -> Optional[Future]:
if self.expires(timestamp):
asyncio.ensure_future(self._function(timestamp=timestamp))
self._last = timestamp
return asyncio.ensure_future(self._function(timestamp=timestamp))
else:
return None


class PeriodicManagerMixin(object):
Expand Down
6 changes: 5 additions & 1 deletion aat/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,18 @@ async def run(self) -> None:
await self.processEvent(event, strat)

# process any periodics
await asyncio.gather(
periodic_result = await asyncio.gather(
*(
asyncio.create_task(p.execute(self._latest))
for p in self.manager.periodics()
if p.expires(self._latest)
)
)

exceptions = [r for r in periodic_result if r.exception()]
if any(exceptions):
raise exceptions[0].exception()

# Before engine shutdown, send an exit event
await self.processEvent(Event(type=EventType.EXIT, target=None))

Expand Down

0 comments on commit 167b37f

Please sign in to comment.