Skip to content

Commit

Permalink
Made EventSource.dispatch_event() always run all event listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Apr 13, 2016
1 parent 9307903 commit 7bc4cb6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 12 deletions.
56 changes: 46 additions & 10 deletions asphalt/core/event.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Dict, Callable, Any, Sequence, Union, Iterable
from asyncio import ensure_future, wait
from inspect import isawaitable
from typing import Dict, Callable, Any, Sequence, Union, Iterable, Tuple

from typeguard import check_argument_types

Expand Down Expand Up @@ -36,7 +38,7 @@ def __init__(self, source: 'EventSource', topics: Sequence[str], callback: Calla
self.args = args
self.kwargs = kwargs

def remove(self):
def remove(self) -> None:
"""Remove this listener from its event source."""
self.source.remove_listener(self)

Expand All @@ -45,6 +47,21 @@ def __repr__(self):
'kwargs={0.kwargs!r})'.format(self, qualified_name(self.callback)))


class EventDispatchError(Exception):
"""
Raised when one or more event listener callback raises an exception.
:ivar Event event: the event
:ivar Sequence[Tuple[EventListener, Exception]]: a sequence containing tuples of
(listener, exception) for each exception that was raised by a listener callback
"""

def __init__(self, event: Event, exceptions: Sequence[Tuple[EventListener, Exception]]):
super().__init__('error dispatching event')
self.event = event
self.exceptions = exceptions


class EventSource:
"""A mixin class that provides support for dispatching and listening to events."""

Expand All @@ -54,7 +71,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._topics = {}

def _register_topics(self, topics: Dict[str, Any]):
def _register_topics(self, topics: Dict[str, Any]) -> None:
"""
Register a number of supported topics and their respective event classes.
Expand Down Expand Up @@ -108,20 +125,22 @@ def remove_listener(self, handle: EventListener):
except (KeyError, ValueError):
raise LookupError('listener not found') from None

async def dispatch(self, event: Union[str, Event], *args, **kwargs):
async def dispatch(self, event: Union[str, Event], *args, **kwargs) -> None:
"""
Dispatch an event, optionally constructing one first.
This method has two forms: dispatch(``event``) and dispatch(``topic``, ``*args``,
``**kwargs``). The former dispatches an existing event object while the latter
instantiates one, using this object as the source. Any extra positional and keyword
arguments are passed directly to the event class constructor.
arguments are passed directly to the constructor of the event class.
Any exceptions raised by the listener callbacks are passed
through to the caller.
All listeners are always called. If any event listener raises an exception, an
:class:`EventDispatchError` is then raised, containing the callbacks and the exceptions
they raised.
:param event: an :class:`~asphalt.core.event.Event` instance or an event topic
:raises LookupError: if the topic has not been registered in this event source
:raises EventDispatchError: if any of the listener callbacks raises an exception
"""
assert check_argument_types()
Expand All @@ -138,7 +157,24 @@ async def dispatch(self, event: Union[str, Event], *args, **kwargs):
event_class = registration['event_class']
event = event_class(self, topic, *args, **kwargs)

futures, exceptions = [], []
for listener in list(registration['listeners']):
retval = listener.callback(event, *listener.args, **listener.kwargs)
if retval is not None:
await retval
try:
retval = listener.callback(event, *listener.args, **listener.kwargs)
except Exception as e:
exceptions.append((listener, e))
else:
if isawaitable(retval):
future = ensure_future(retval)
futures.append((listener, future))

# For any callbacks that returned awaitables, wait for their completion and collect any
# exceptions they raise
for listener, future in futures:
try:
await future
except Exception as e:
exceptions.append((listener, e))

if exceptions:
raise EventDispatchError(event, exceptions)
4 changes: 3 additions & 1 deletion docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ Version history
``asphalt.core.util`` module
- *BACKWARD INCOMPATIBLE* Removed regular context manager support from the ``Context`` class
(asynchronous context manager support still remains)
- *BACKWARD INCOMPATIBLE* Modified event dispatch logic in ``EventSource`` to always run all
event listeners even if some listeners raise exceptions. A uniform exception is then raised
that contains all the exceptions and the listeners who raised them.
- Added the ability to listen to multiple topics in an EventSource with a single listener


**1.3.0**

- Allow the context manager of the ``Context`` class to be used from a non-eventloop thread when
Expand Down
33 changes: 32 additions & 1 deletion tests/test_event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from asyncio.queues import Queue
from typing import Union, Iterable

import pytest
from asyncio_extras.asyncyield import yield_async
from asyncio_extras.generator import async_generator

from asphalt.core.event import EventSource, Event, EventListener
from asphalt.core.event import EventSource, Event, EventListener, EventDispatchError


class DummyEvent(Event):
Expand Down Expand Up @@ -97,6 +102,32 @@ async def dispatch_event_multiple_topic(self, source: EventSource):
assert events[1].args == ('c', 'd')
assert events[1].kwargs == {'g': 7, 'h': 8}

@pytest.mark.asyncio
async def test_dispatch_listener_exceptions(self, source):
"""
Test that multiple exceptions raised by listeners are combined into one EventDispatchError.
"""
def plain_error(event):
raise plain_exception

async def async_error(event):
raise async_exception

plain_exception = ValueError('foo')
async_exception = KeyError('bar')
plain_listener = source.add_listener('event_a', plain_error)
async_listener = source.add_listener('event_a', async_error)
event = DummyEvent(source, 'event_a')
with pytest.raises(EventDispatchError) as exc:
await source.dispatch(event)

assert exc.value.event is event
assert exc.value.exceptions == [
(plain_listener, plain_exception),
(async_listener, async_exception)
]

@pytest.mark.asyncio
@pytest.mark.parametrize('from_handle', [True, False],
ids=['eventhandle', 'direct'])
Expand Down

0 comments on commit 7bc4cb6

Please sign in to comment.