Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce filter_predicate to tame event bus traffic. #34

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ chains

# tox/pytest cache
.cache
.pytest_cache

# Test output logs
logs
Expand Down
8 changes: 8 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ ConnectionConfig
:undoc-members:
:show-inheritance:

ListenerConfig
--------------

.. autoclass:: lahja.endpoint.ListenerConfig
:members:
:undoc-members:
:show-inheritance:

Exceptions
----------

Expand Down
9 changes: 5 additions & 4 deletions examples/inter_process_ping_pong.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
BaseEvent,
Endpoint,
ConnectionConfig,
ListenerConfig,
)


Expand All @@ -27,8 +28,8 @@ def run_proc1():
loop = asyncio.get_event_loop()
endpoint = Endpoint()
endpoint.start_serving_nowait(ConnectionConfig.from_name('e1'))
endpoint.connect_to_endpoints_blocking(
ConnectionConfig.from_name('e2')
endpoint.add_listener_endpoints_blocking(
ListenerConfig.from_name('e2')
)
endpoint.subscribe(SecondThingHappened, lambda event:
print("Received via SUBSCRIBE API in proc1: ", event.payload)
Expand All @@ -53,8 +54,8 @@ def run_proc2():
loop = asyncio.get_event_loop()
endpoint = Endpoint()
endpoint.start_serving_nowait(ConnectionConfig.from_name('e2'))
endpoint.connect_to_endpoints_blocking(
ConnectionConfig.from_name('e1')
endpoint.add_listener_endpoints_blocking(
ListenerConfig.from_name('e1')
)
endpoint.subscribe(FirstThingHappened, lambda event:
print("Received via SUBSCRIBE API in proc2:", event.payload)
Expand Down
9 changes: 5 additions & 4 deletions examples/request_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BaseRequestResponseEvent,
BroadcastConfig,
ConnectionConfig,
ListenerConfig,
)


Expand All @@ -30,8 +31,8 @@ def run_proc1():
loop = asyncio.get_event_loop()
endpoint = Endpoint()
endpoint.start_serving_nowait(ConnectionConfig.from_name('e1'))
endpoint.connect_to_endpoints_blocking(
ConnectionConfig.from_name('e2'),
endpoint.add_listener_endpoints_blocking(
ListenerConfig.from_name('e2'),
)
print("subscribing")
# Listen for `GetSomethingRequest`'s
Expand All @@ -47,8 +48,8 @@ def run_proc2():
endpoint = Endpoint()
loop = asyncio.get_event_loop()
endpoint.start_serving_nowait(ConnectionConfig.from_name('e2'))
endpoint.connect_to_endpoints_blocking(
ConnectionConfig.from_name('e1'),
endpoint.add_listener_endpoints_blocking(
ListenerConfig.from_name('e1'),
)
loop.run_until_complete(proc2_worker(endpoint))

Expand Down
2 changes: 2 additions & 0 deletions lahja/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from .endpoint import ( # noqa: F401
Endpoint,
filter_none,
ConnectionConfig,
ListenerConfig,
)
from .exceptions import ( # noqa: F401
ConnectionAttemptRejected,
Expand Down
119 changes: 96 additions & 23 deletions lahja/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,61 @@ def from_name(cls, name: str, base_path: Optional[pathlib.Path]=None) -> 'Connec
raise TypeError("Provided `base_path` must be a directory")


def filter_none(event: BaseEvent) -> bool:
return True


class ListenerConfig(NamedTuple):
"""
Configuration class needed to add an :class:`~lahja.endpoint.Endpoint` as a listener of another
:class:`~lahja.endpoint.Endpoint` so that it can receive its events.

The ``name`` and ``path`` (IPC) of the listener :class:`~lahja.endpoint.Endpoint` are required
but the convenience APIs :meth:`~lahja.endpoint.ListenerConfig.from_name` and
:meth:`~lahja.endpoint.ListenerConfig.from_connection_config` make it easy to derive a path
from the ``name`` if no custom ``path`` handling is required.

Additionally, listeners can provide a ``filter_predicate`` to shield the listener from events
that it is not interested in. The ``filter_predicate`` receives the event in question and
should return ``True`` if the listener wants to receive the event or ``False`` if it doesn't.

Example:

.. code:: python

producer.add_listener_endpoints(
ListenerConfig.from_name(
'receiver',
filter_predicate=lambda ev: not isinstance(ev, BadEvent),
)
)

This is an effective measure to shield an :class:`~lahja.endpoint.Endpoint` from
events that aren't needed by any callsite.
"""
name: str
path: pathlib.Path
filter_predicate: Callable[[BaseEvent], bool]

@classmethod
def from_name(
cls,
name: str,
base_path: Optional[pathlib.Path]=None,
filter_predicate: Callable[[BaseEvent], bool] = filter_none) -> 'ListenerConfig':

connection_config = ConnectionConfig.from_name(name, base_path)
return cls.from_connection_config(connection_config, filter_predicate)

@classmethod
def from_connection_config(
cls,
connection_config: ConnectionConfig,
filter_predicate: Callable[[BaseEvent], bool] = filter_none) -> 'ListenerConfig':

return cls(connection_config.name, connection_config.path, filter_predicate)


class EndpointConnector:
"""
Expose the receiving queue of an :class:`~lahja.endpoint.Endpoint` so that any other
Expand Down Expand Up @@ -91,6 +146,11 @@ def put_nowait(self, item_and_config: Tuple[BaseEvent, Optional[BroadcastConfig]
self._callmethod('put_nowait', (item_and_config,))


class ListeningEndpoint(NamedTuple):
config: ListenerConfig
connector: ProxyEndpointConnector


class Endpoint:
"""
The :class:`~lahja.endpoint.Endpoint` enables communication between different processes
Expand All @@ -110,7 +170,7 @@ class Endpoint:
_loop: asyncio.AbstractEventLoop

def __init__(self) -> None:
self._connected_endpoints: Dict[str, BaseProxy] = {}
self._listening_endpoints: Dict[str, ListeningEndpoint] = {}
self._futures: Dict[Optional[str], asyncio.Future] = {}
self._handler: Dict[Type[BaseEvent], List[Callable[[BaseEvent], Any]]] = {}
self._queues: Dict[Type[BaseEvent], List[asyncio.Queue]] = {}
Expand Down Expand Up @@ -190,15 +250,15 @@ async def wait_until_serving(self) -> None:
loop=self.event_loop
)

def _throw_if_already_connected(self, *endpoints: ConnectionConfig) -> None:
def _throw_if_already_connected(self, *endpoints: ListenerConfig) -> None:
seen: Set[str] = set()

for config in endpoints:
if config.name in seen:
raise ConnectionAttemptRejected(
f"Trying to connect to {config.name} twice. Names must be uniqe."
)
elif config.name in self._connected_endpoints.keys():
elif config.name in self._listening_endpoints.keys():
raise ConnectionAttemptRejected(
f"Already connected to {config.name} at {config.path}. Names must be unique."
)
Expand Down Expand Up @@ -231,42 +291,44 @@ class ConnectorManager(BaseManager):
server = manager.get_server() # type: ignore
threading.Thread(target=server.serve_forever, daemon=True).start()

def connect_to_endpoints_blocking(self, *endpoints: ConnectionConfig, timeout: int=30) -> None:
def add_listener_endpoints_blocking(self, *endpoints: ListenerConfig, timeout: int=30) -> None:
"""
Connect to the given endpoints and block until the connection to every endpoint is
established. Raises a ``TimeoutError`` if connections do not become available within
``timeout`` seconds (default 30 seconds).
Add the ``endpoints`` as new listeners so that they can receive events from this endpoint.
Block until the connection to every endpoint is established. Raises a ``TimeoutError`` if
connections do not become available within ``timeout`` seconds (default 30 seconds).
"""
self._throw_if_already_connected(*endpoints)
for endpoint in endpoints:
wait_for_path_blocking(endpoint.path, timeout)
self._connect_if_not_already_connected(endpoint)

async def connect_to_endpoints(self, *endpoints: ConnectionConfig) -> None:
async def add_listener_endpoints(self, *endpoints: ListenerConfig) -> None:
"""
Connect to the given endpoints and await until all connections are established.
Add the ``endpoints`` as new listeners so that they can receive events from this endpoint.
Await until all connections are established.
"""
self._throw_if_already_connected(*endpoints)
await asyncio.gather(
*(self._await_connect_to_endpoint(endpoint) for endpoint in endpoints),
loop=self.event_loop
)

def connect_to_endpoints_nowait(self, *endpoints: ConnectionConfig) -> None:
def add_listener_endpoints_nowait(self, *endpoints: ListenerConfig) -> None:
"""
Connect to the given endpoints as soon as they become available but do not block.
Add the ``endpoints`` as new listeners so that they can receive events from this endpoint.
This API returns immediately and establishs connections as soon as they become available.
"""
self._throw_if_already_connected(*endpoints)
for endpoint in endpoints:
asyncio.ensure_future(self._await_connect_to_endpoint(endpoint))

async def _await_connect_to_endpoint(self, endpoint: ConnectionConfig) -> None:
async def _await_connect_to_endpoint(self, endpoint: ListenerConfig) -> None:
await wait_for_path(endpoint.path)
self._connect_if_not_already_connected(endpoint)

def _connect_if_not_already_connected(self, endpoint: ConnectionConfig) -> None:
def _connect_if_not_already_connected(self, endpoint: ListenerConfig) -> None:

if endpoint.name in self._connected_endpoints.keys():
if endpoint.name in self._listening_endpoints.keys():
self.logger.warning(
"Tried to connect to %s but we are already connected to that Endpoint",
endpoint.name
Expand All @@ -283,10 +345,13 @@ class ConnectorManager(BaseManager):

manager = ConnectorManager(address=str(endpoint.path)) # type: ignore
manager.connect()
self._connected_endpoints[endpoint.name] = manager.get_connector() # type: ignore
self._listening_endpoints[endpoint.name] = ListeningEndpoint(
config=endpoint,
connector=manager.get_connector(), # type: ignore
)

def is_connected_to(self, endpoint_name: str) -> bool:
return endpoint_name in self._connected_endpoints
return endpoint_name in self._listening_endpoints

def _process_item(self, item: BaseEvent, config: BroadcastConfig) -> None:
if item is TRANSPARENT_EVENT:
Expand Down Expand Up @@ -327,30 +392,38 @@ def stop(self) -> None:
self._receiving_queue.put_nowait((TRANSPARENT_EVENT, None))
self._internal_queue.put_nowait((TRANSPARENT_EVENT, None))

def broadcast(self, item: BaseEvent, config: Optional[BroadcastConfig] = None) -> None:
def broadcast(self, item: BaseEvent, config: BroadcastConfig = BroadcastConfig()) -> None:
"""
Broadcast an instance of :class:`~lahja.misc.BaseEvent` on the event bus. Takes
an optional second parameter of :class:`~lahja.misc.BroadcastConfig` to decide
where this event should be broadcasted to. By default, events are broadcasted across
all connected endpoints with their consuming call sites.
"""
item._origin = self.name
if config is not None and config.internal:
if config.internal:
# Internal events simply bypass going through the central event bus
# and are directly put into the local receiving queue instead.
self._internal_queue.put_nowait((item, config))
else:
# Broadcast to every connected Endpoint that is allowed to receive the event
for name, connector in self._connected_endpoints.items():
allowed = (config is None) or config.allowed_to_receive(name)
if allowed:
connector.put_nowait((item, config))
for name, listening_endpoint in self._listening_endpoints.items():

# Event is not directed at specific endpoint, check the `filter_predicate`
# in case the listener does not want to receive this kind of broadcast
if config.is_not_exclusive() and listening_endpoint.config.filter_predicate(item):
listening_endpoint.connector.put_nowait((item, config))
# It is directed at a specific endpoint, no further filters apply in this case
elif config.filter_endpoint == name:
listening_endpoint.connector.put_nowait((item, config))
# This event should not get broadcasted to this listener, skip
else:
continue

TResponse = TypeVar('TResponse', bound=BaseEvent)

async def request(self,
item: BaseRequestResponseEvent[TResponse],
config: Optional[BroadcastConfig] = None) -> TResponse:
config: BroadcastConfig = BroadcastConfig()) -> TResponse:
"""
Broadcast an instance of :class:`~lahja.misc.BaseRequestResponseEvent` on the event bus and
immediately wait on an expected answer of type :class:`~lahja.misc.BaseEvent`. Optionally
Expand Down
21 changes: 15 additions & 6 deletions lahja/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,37 @@ def unsubscribe(self) -> None:
class BroadcastConfig:

def __init__(self,
filter_endpoint: Optional[str] = None,
filter_event_id: Optional[str] = None,
filter_endpoint: str = '',
filter_event_id: str = '',
internal: bool = False) -> None:

self.filter_endpoint = filter_endpoint
self.filter_event_id = filter_event_id
self.internal = internal

if self.internal and self.filter_endpoint is not None:
if self.internal and len(self.filter_endpoint) > 0:
raise ValueError("`internal` can not be used with `filter_endpoint")

def allowed_to_receive(self, endpoint: str) -> bool:
return self.filter_endpoint is None or self.filter_endpoint == endpoint
def is_not_exclusive(self) -> bool:
return len(self.filter_endpoint) == 0


class BaseEvent:

_origin = ''
_id: Optional[str] = None
_id: str = ''
_config: Optional[BroadcastConfig] = None

def broadcast_config(self, internal: bool = False) -> BroadcastConfig:
"""
Retrieve a :class:`~lahja.misc.BroadcastConfig` based on the origin of the event. A
:class:`~lahja.misc.BroadcastConfig` generated through this API ensures that an event
will only be send to the :class:`~lahja.endpoint.Endpoint` where the origin event came
from. Furthermore, if the event was send using the :meth:`~lahja.endpoint.Endpoint.request`
API retrieving a :class:`~lahja.misc.BroadcastConfig` through this API will guarantee to
only propagate the event as a direct response to the callsite that initiated the origin
event.
"""
if internal:
return BroadcastConfig(
internal=True,
Expand Down
11 changes: 6 additions & 5 deletions lahja/tools/benchmark/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
BroadcastConfig,
ConnectionConfig,
Endpoint,
ListenerConfig,
)
from lahja.tools.benchmark.constants import (
DRIVER_ENDPOINT,
Expand All @@ -36,7 +37,7 @@

class DriverProcessConfig(NamedTuple):
num_events: int
connected_endpoints: Tuple[ConnectionConfig, ...]
connected_endpoints: Tuple[ListenerConfig, ...]
throttle: float
payload_bytes: int

Expand All @@ -59,7 +60,7 @@ def launch(config: DriverProcessConfig) -> None:
loop = asyncio.get_event_loop()
event_bus = Endpoint()
event_bus.start_serving_nowait(ConnectionConfig.from_name(DRIVER_ENDPOINT))
event_bus.connect_to_endpoints_blocking(*config.connected_endpoints)
event_bus.add_listener_endpoints_blocking(*config.connected_endpoints)
# UNCOMMENT FOR DEBUGGING
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -114,8 +115,8 @@ def launch(name: str, num_events: int) -> None:
loop = asyncio.get_event_loop()
event_bus = Endpoint()
event_bus.start_serving_nowait(ConnectionConfig.from_name(name))
event_bus.connect_to_endpoints_blocking(
ConnectionConfig.from_name(REPORTER_ENDPOINT)
event_bus.add_listener_endpoints_blocking(
ListenerConfig.from_name(REPORTER_ENDPOINT)
)
# UNCOMMENT FOR DEBUGGING
# logger = multiprocessing.log_to_stderr()
Expand Down Expand Up @@ -168,6 +169,6 @@ def launch(config: ReportingProcessConfig) -> None:
loop = asyncio.get_event_loop()
event_bus = Endpoint()
event_bus.start_serving_nowait(ConnectionConfig.from_name(REPORTER_ENDPOINT))
event_bus.connect_to_endpoints_blocking(ConnectionConfig.from_name(ROOT_ENDPOINT))
event_bus.add_listener_endpoints_blocking(ListenerConfig.from_name(ROOT_ENDPOINT))

loop.run_until_complete(ReportingProcess.worker(event_bus, logger, config))
Loading