-
Notifications
You must be signed in to change notification settings - Fork 3
/
event_bus.py
93 lines (65 loc) · 2.53 KB
/
event_bus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from typing import Callable, List, Type, TypeVar, cast
from attr import define
from injector import Injector, Provider, UnknownProvider, UnsatisfiedRequirement
from itca.foundation.event import Event
TEvent = TypeVar("TEvent")
class EventBus:
def publish(self, event: Event) -> None:
pass
class Listener(List[TEvent]):
"""Simple generic used to associate listeners with events using DI.
e.g Listener[BidderHasBeenOverbid].
"""
def __call__(self, event: TEvent) -> None:
raise NotImplementedError
class AsyncListener(List[TEvent]):
"""An async counterpart of Listener[Event]."""
def __call__(self, event: TEvent) -> None:
raise NotImplementedError
class EventListenerProvider(Provider):
"""Useful for configuring bind for event listeners.
Using DI for dispatching events to listeners requires ability to bind
multiple listeners to a single key (Listener[Event]).
"""
def __init__(self, cls: Type[TEvent]) -> None:
self._cls = cls
def get(self, injector: Injector) -> list[TEvent]:
return [injector.create_object(self._cls)]
class AsyncEventListenerProvider(Provider):
"""An async counterpart of EventListenerProvider.
In async, one does not need to actually construct the instance.
It is enough to obtain class itself.
"""
def __init__(self, cls: Type[TEvent]) -> None:
self._cls = cls
def get(self, _injector: Injector) -> list[Type[TEvent]]:
return [self._cls]
RunAsyncHandler = Callable[[Type[AsyncListener[TEvent]], TEvent], None]
@define
class InjectorEventBus(EventBus):
_container: Injector
_run_async_handler: RunAsyncHandler
def publish(self, event: Event) -> None:
event_cls = type(event)
try:
sync_listeners = self._container.get(
Listener[event_cls] # type: ignore
)
except (UnsatisfiedRequirement, UnknownProvider):
pass
else:
assert isinstance(sync_listeners, list)
for listener in cast(List[Listener], sync_listeners):
listener(event)
try:
async_handlers = self._container.get(
AsyncListener[event_cls] # type: ignore
)
except (UnsatisfiedRequirement, UnknownProvider):
pass
else:
assert isinstance(async_handlers, list)
for async_handler in cast(
List[Type[AsyncListener]], async_handlers
):
self._run_async_handler(async_handler, event)