From aa072888ae7ea29c8cc2d7e92bf816abd8337c87 Mon Sep 17 00:00:00 2001 From: Ian Good Date: Thu, 6 May 2021 20:32:08 -0400 Subject: [PATCH] Improve config for external usage --- README.md | 42 ++++++----- docs/intro.rst | 8 +- docs/swimprotocol.rst | 6 +- setup.py | 2 +- swimprotocol/config.py | 138 ++++++++++++++++++++++++++++------ swimprotocol/demo/__init__.py | 47 +++++------- swimprotocol/demo/changes.py | 2 +- swimprotocol/demo/screen.py | 3 +- swimprotocol/listener.py | 25 ++++-- swimprotocol/members.py | 35 ++++----- swimprotocol/packet.py | 4 +- swimprotocol/plugin.py | 80 -------------------- swimprotocol/py.typed | 0 swimprotocol/transport.py | 82 +++++++++++--------- swimprotocol/udp/__init__.py | 103 +++++++++++++++++++------ swimprotocol/worker.py | 6 +- 16 files changed, 341 insertions(+), 242 deletions(-) delete mode 100644 swimprotocol/plugin.py create mode 100644 swimprotocol/py.typed diff --git a/README.md b/README.md index d71ab29..5653990 100644 --- a/README.md +++ b/README.md @@ -23,15 +23,17 @@ synchronize a distributed group of processes. $ pip install swim-protocol ``` +#### Running the Demo + There is a [demo][2] application included as a reference implementation. Try it out by running the following, each from a new terminal window, and use _Ctrl-C_ to exit: ```console -$ swim-protocol-demo -c -m name one 127.0.0.1:2001 127.0.0.1:2003 -$ swim-protocol-demo -c -m name two 127.0.0.1:2002 127.0.0.1:2001 -$ swim-protocol-demo -c -m name three 127.0.0.1:2003 127.0.0.1:2001 -$ swim-protocol-demo -c -m name four 127.0.0.1:2004 127.0.0.1:2003 +$ swim-protocol-demo -c --name 127.0.0.1:2001 --peer 127.0.0.1:2003 --metadata name one +$ swim-protocol-demo -c --name 127.0.0.1:2002 --peer 127.0.0.1:2001 --metadata name two +$ swim-protocol-demo -c --name 127.0.0.1:2003 --peer 127.0.0.1:2001 --metadata name three +$ swim-protocol-demo -c --name 127.0.0.1:2004 --peer 127.0.0.1:2003 --metadata name four ``` Every 10 seconds or so, each member will randomize its `token` metadata field, @@ -39,18 +41,14 @@ which should be disseminated across the cluster with [eventual consistency][6]. ### Getting Started -First you should create a new [Config][100] object: +First you should create a new [UdpConfig][100] object: ```python -from argparse import ArgumentParser -from swimprotocol.config import Config - -parser = ArgumentParser(...) -args = parser.parse_args() -config = Config(args, secret=b'...', - local_name='127.0.0.1:2001', - local_metadata={b'name': b'one'}, - peers=['127.0.0.1:2002']) +from swimprotocol.udp import UdpConfig + +config = UdpConfig(local_name='127.0.0.1:2001', + local_metadata={'name': b'one'}, + peers=['127.0.0.1:2002']) ``` All other config arguments have default values, which are tuned somewhat @@ -61,16 +59,20 @@ the event loop: ```python from contextlib import AsyncExitStack -from swimprotocol.transport import transport_plugins from swimprotocol.members import Members +from swimprotocol.udp import UdpTransport -transport = transport_plugins.choose('udp').init(config) +transport = UdpTransport(config) members = Members(config) async with AsyncExitStack() as stack: worker = await stack.enter_async_context(transport.enter(members)) await worker.run() # or schedule as a task ``` +These snippets demonstrate the UDP transport layer directly. For a more generic +approach that uses [argparse][11] and [load_transport][12], check out the +[demo][2]. + ### Checking Members The [Members][101] object provides a few ways to check on the cluster and its @@ -92,12 +94,12 @@ Alternatively, listen for status or metadata changes on all members: ```python from swimprotocol.member import Member -async def _on_updated(member: Member) -> None: +async def _updated(member: Member) -> None: print('updated:', member.name, member.status, member.metadata) async with AsyncExitStack() as stack: # ... - stack.enter_context(members.listener.on_notify(_on_updated)) + stack.enter_context(members.listener.on_notify(_updated)) ``` ### UDP Transport Security @@ -152,7 +154,9 @@ hinting to the extent possible and common in the rest of the codebase. [8]: https://en.wikipedia.org/wiki/Shared_secret [9]: https://docs.python.org/3/library/uuid.html#uuid.getnode [10]: https://en.wikipedia.org/wiki/Maximum_transmission_unit +[11]: https://docs.python.org/3/library/argparse.html +[12]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.transport.load_transport -[100]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.config.Config +[100]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#swimprotocol.udp.UdpConfig [101]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.members.Member [102]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#swimprotocol.udp.UdpTransport diff --git a/docs/intro.rst b/docs/intro.rst index 6ba2718..49e2d44 100644 --- a/docs/intro.rst +++ b/docs/intro.rst @@ -85,7 +85,7 @@ Glossary false positives. metadata - An immutable mapping of key/value :class:`bytes` associated with each + An immutable mapping of key/value strings associated with each :term:`member`. New mappings may be assigned, and the latest mapping will always be disseminated across the cluster. @@ -122,6 +122,12 @@ Glossary is always higher than any other observed value, used to determine whether :term:`gossip` is new enough to be applied or disseminated. + demo + The included `demo`_ is designed to randomize metadata changes on an + interval to see them disseminated across the cluster, as well as watch as + :term:`member` statuses change as demo instances are stopped and started. + .. _SWIM: https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf .. _Serf: https://www.serf.io/docs/internals/gossip.html .. _Lamport timestamp: https://en.wikipedia.org/wiki/Lamport_timestamp +.. _demo: https://github.com/icgood/swim-protocol#running-the-demo diff --git a/docs/swimprotocol.rst b/docs/swimprotocol.rst index 8981430..271cd55 100644 --- a/docs/swimprotocol.rst +++ b/docs/swimprotocol.rst @@ -19,6 +19,7 @@ ------------------------- .. automodule:: swimprotocol.listener + :special-members: __call__ ``swimprotocol.members`` ------------------------ @@ -30,11 +31,6 @@ .. automodule:: swimprotocol.packet -``swimprotocol.plugin`` ------------------------ - -.. automodule:: swimprotocol.plugin - ``swimprotocol.shuffle`` ------------------------ diff --git a/setup.py b/setup.py index 6599a8e..20883ab 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ license = f.read() setup(name='swim-protocol', - version='0.2.0', + version='0.3.0', author='Ian Good', author_email='ian@icgood.net', description='SWIM protocol implementation for exchanging cluster ' diff --git a/swimprotocol/config.py b/swimprotocol/config.py index 47fd8a3..09e7faf 100644 --- a/swimprotocol/config.py +++ b/swimprotocol/config.py @@ -1,20 +1,34 @@ from __future__ import annotations -from argparse import Namespace +import os +from abc import ABCMeta +from argparse import ArgumentParser, Namespace from collections.abc import Mapping, Sequence -from typing import Final, Union +from typing import final, TypeVar, Final, Any, Union from .sign import Signatures -__all__ = ['Config'] +__all__ = ['ConfigT_co', 'ConfigError', 'BaseConfig'] +#: Covariant type variable for :class:`BaseConfig` sub-classes. +ConfigT_co = TypeVar('ConfigT_co', bound='BaseConfig', covariant=True) -class Config: + +class ConfigError(Exception): + """Raised when the configuration is insufficient or invalid for running a + cluster, along with a human-readable message about what was wrong. + + """ + pass + + +class BaseConfig(metaclass=ABCMeta): """Configure the cluster behavior and characteristics. + :class:`~swimprotocol.transport.Transport` implementations should + sub-class to add additional configuration. Args: - args: Command-line arguments namespace. secret: The shared secret for cluster packet signatures. local_name: The unique name of the local cluster member. local_metadata: The local cluster member metadata. @@ -32,12 +46,14 @@ class Config: sync_interval: Time between sync attempts to disseminate cluster changes. + Raises: + ConfigError: The given configuration was invalid. + """ - def __init__(self, args: Namespace, *, - secret: Union[str, bytes], + def __init__(self, *, secret: Union[None, str, bytes], local_name: str, - local_metadata: Mapping[bytes, bytes], + local_metadata: Mapping[str, bytes], peers: Sequence[str], ping_interval: float = 1.0, ping_timeout: float = 0.3, @@ -46,7 +62,6 @@ def __init__(self, args: Namespace, *, suspect_timeout: float = 5.0, sync_interval: float = 0.5) -> None: super().__init__() - self.args: Final = args self._signatures = Signatures(secret) self.local_name: Final = local_name self.local_metadata: Final = local_metadata @@ -57,22 +72,103 @@ def __init__(self, args: Namespace, *, self.ping_req_timeout: Final = ping_req_timeout self.suspect_timeout: Final = suspect_timeout self.sync_interval: Final = sync_interval + self._validate() + + def _validate(self) -> None: + if not self.local_name: + raise ConfigError('This cluster instance needs a local name.') + elif not self.peers: + raise ConfigError('At least one cluster peer name is required.') + + @property + def signatures(self) -> Signatures: + """Generates and verifies cluster packet signatures.""" + return self._signatures @classmethod - def from_args(cls, args: Namespace) -> Config: - """Build a :class:`Config` from command-line arguments and sensible - defaults. + def add_arguments(cls, parser: ArgumentParser, *, + prefix: str = '--') -> None: + """Implementations (such as the :term:`demo`) may use this method to + add command-line based configuration for the transport. + + Note: + Arguments added should use *prefix* and explicitly provide a unique + name, e.g.:: + + parser.add_argument(f'{prefix}arg', dest='swim_arg', ...) + + This prevents collision with other argument names and allows custom + *prefix* values without affecting the :class:`~argparse.Namespace`. Args: - args: The command-line arguments namespace. + parser: The argument parser. + prefix: The prefix for added arguments, which should start with + ``--`` and end with ``-``, e.g. ``'--'`` or ``'--foo-'``. """ - return cls(args, secret=args.secret, - local_name=args.local, - local_metadata=dict(args.metadata), - peers=args.peers) + group = parser.add_argument_group('swim options') + group.add_argument(f'{prefix}metadata', dest='swim_metadata', + nargs=2, metavar=('KEY', 'VAL'), + default=[], action='append', + help='Metadata for this node.') + group.add_argument(f'{prefix}secret', dest='swim_secret', + metavar='STRING', + help='The secret string used to verify messages.') + group.add_argument(f'{prefix}name', dest='swim_name', + metavar='localname', + help='External name or address for this node.') + group.add_argument(f'{prefix}peer', dest='swim_peers', + metavar='peername', action='append', default=[], + help='At least one name or address of ' + 'a known peer.') - @property - def signatures(self) -> Signatures: - """Generates and verifies cluster packet signatures.""" - return self._signatures + @classmethod + def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \ + -> dict[str, Any]: + """Parse the given :class:`~argparse.Namespace` into a dictionary of + keyword arguments for the :class:`BaseConfig` constructor. Sub-classes + should override this method to add additional keyword arguments as + needed. + + The :func:`os.getenv` function can also be used, and will take priority + over values in *args*: + + * ``SWIM_SECRET``: The *secret* keyword argument. + * ``SWIM_NAME``: The *local_name* keyword argument. + * ``SWIM_PEERS``: Comma-separated *peers* keyword argument. + + Args: + args: The command-line arguments. + env_prefix: Prefix for the environment variables. + + """ + secret = os.getenv(f'{env_prefix}_SECRET', args.swim_secret) + local_name = os.getenv(f'{env_prefix}_NAME', args.swim_name) + local_metadata = {key: val.encode('utf-8') + for key, val in args.swim_metadata} + env_peers = os.getenv(f'{env_prefix}_PEERS') + if env_peers is not None: + peers = env_peers.split(',') + else: + peers = args.swim_peers + return {'secret': secret, + 'local_name': local_name, + 'local_metadata': local_metadata, + 'peers': peers} + + @final + @classmethod + def from_args(cls: type[ConfigT_co], args: Namespace, + **overrides: Any) -> ConfigT_co: + """Build and return a new cluster config object. This first calls + :meth:`.parse_args` and then passes the results as keyword arguments + to the constructor. + + Args: + args: The command-line arguments. + overrides: Keyword arguments to override. + + """ + kwargs = cls.parse_args(args) + kwargs |= overrides + return cls(**kwargs) diff --git a/swimprotocol/demo/__init__.py b/swimprotocol/demo/__init__.py index 0f76b46..ff83269 100644 --- a/swimprotocol/demo/__init__.py +++ b/swimprotocol/demo/__init__.py @@ -17,51 +17,42 @@ from .changes import change_metadata from .log import run_logging from .screen import run_screen -from ..config import Config +from ..config import BaseConfig, ConfigError from ..members import Members -from ..transport import transport_plugins +from ..transport import load_transport, Transport __all__ = ['main'] def main() -> int: parser = ArgumentParser(description=__doc__) - parser.add_argument('-m', '--metadata', nargs=2, metavar=('KEY', 'VAL'), - default=[], action='append', type=_utf8, - help='Metadata for this node.') - parser.add_argument('-t', '--transport', metavar='NAME', default='udp', - help='The transport plugin name.') - parser.add_argument('-s', '--secret', metavar='STRING', - help='The secret string used to verify messages.') - parser.add_argument('-c', '--curses', action='store_true', - help='Enable the curses display.') - parser.add_argument('-i', '--token-interval', type=float, default=10.0, - help='Cluster member token update interval.') - parser.add_argument('local', metavar='localname', - help='External name or address for this node.') - parser.add_argument('peers', metavar='peername', nargs='+', - help='At least one name or address of a known peer.') - parser.set_defaults(config_type=Config) - for transport_name, transport_type in transport_plugins.loaded.items(): - transport_type.add_arguments(transport_name, parser) + transport_type = load_transport() + transport_type.config_type.add_arguments(parser) + + group = parser.add_argument_group('swim demo options') + group.add_argument('-c', '--curses', action='store_true', + help='Enable the curses display.') + group.add_argument('-i', '--token-interval', metavar='SECONDS', + type=float, default=10.0, + help='Cluster member token update interval.') args = parser.parse_args() logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(name)s %(message)s') - return asyncio.run(run(args)) - - -def _utf8(val: str) -> bytes: - return val.encode('utf-8') + try: + return asyncio.run(run(transport_type, args)) + except ConfigError as exc: + parser.error(str(exc)) -async def run(args: Namespace) -> int: +async def run(transport_type: type[Transport[BaseConfig]], + args: Namespace) -> int: loop = asyncio.get_running_loop() - config: Config = args.config_type.from_args(args) - transport = transport_plugins.choose(args.transport).init(config) + config = transport_type.config_type.from_args(args) + transport = transport_type(config) members = Members(config) async with AsyncExitStack() as stack: stack.enter_context(suppress(CancelledError)) diff --git a/swimprotocol/demo/changes.py b/swimprotocol/demo/changes.py index 6347498..55bb7bb 100644 --- a/swimprotocol/demo/changes.py +++ b/swimprotocol/demo/changes.py @@ -17,7 +17,7 @@ async def _randomize_local(members: Members, interval: float) -> NoReturn: assert local.metadata while True: new_token = uuid.uuid4().bytes - new_metadata = dict(local.metadata) | {b'token': new_token} + new_metadata = dict(local.metadata) | {'token': new_token} members.update(local, new_metadata=new_metadata) sleep_sec = random.normalvariate(interval, interval / 5) await asyncio.sleep(sleep_sec) diff --git a/swimprotocol/demo/screen.py b/swimprotocol/demo/screen.py index b76aaf3..408a443 100644 --- a/swimprotocol/demo/screen.py +++ b/swimprotocol/demo/screen.py @@ -43,10 +43,9 @@ def _add_metadata(self, stdscr: Any, i: int, member: Member) -> None: stdscr.addstr(' unknown', curses.A_BOLD) return for key in sorted(metadata): - key_str = self._decode(key) val_str = self._decode(metadata[key]) stdscr.addstr(' ') - stdscr.addstr(key_str) + stdscr.addstr(key) stdscr.addstr('=', curses.A_DIM) stdscr.addstr(val_str, curses.color_pair(i+1) | curses.A_BOLD) diff --git a/swimprotocol/listener.py b/swimprotocol/listener.py index 6a1b4b5..dda4119 100644 --- a/swimprotocol/listener.py +++ b/swimprotocol/listener.py @@ -2,16 +2,31 @@ from __future__ import annotations import asyncio +from abc import abstractmethod from asyncio import Event -from collections.abc import Awaitable, Callable, Sequence +from collections.abc import Sequence from contextlib import ExitStack -from typing import TypeVar, Generic, Any, NoReturn +from typing import TypeVar, Generic, Protocol, Any, NoReturn from weakref import WeakKeyDictionary -__all__ = ['Listener'] +__all__ = ['ListenerCallback', 'Listener'] ListenT = TypeVar('ListenT') -ListenerCallback = Callable[[ListenT], Awaitable[Any]] +ListenT_contra = TypeVar('ListenT_contra', contravariant=True) + + +class ListenerCallback(Protocol[ListenT_contra]): + + @abstractmethod + async def __call__(self, item: ListenT_contra, /) -> Any: + """Called asynchronousely with the argument passed to + :meth:`~Listener.notify`. + + Args: + item: The object sent to the consumers. + + """ + ... class Listener(Generic[ListenT]): @@ -36,7 +51,7 @@ async def _run_callback_poll(self, callback: ListenerCallback[ListenT]) \ while True: items = await self.poll() for item in items: - await callback(item) + asyncio.create_task(callback(item)) def on_notify(self, callback: ListenerCallback[ListenT]) -> ExitStack: """Provides a context manager that causes *callback* to be called when diff --git a/swimprotocol/members.py b/swimprotocol/members.py index 58365b7..16514c8 100644 --- a/swimprotocol/members.py +++ b/swimprotocol/members.py @@ -9,7 +9,7 @@ from typing import Final, Optional, Any from weakref import WeakKeyDictionary, WeakValueDictionary -from .config import Config +from .config import BaseConfig from .listener import Listener from .shuffle import Shuffle, WeakShuffle from .status import Status @@ -25,7 +25,7 @@ class Member: #: known value, it is assigned this empty :class:`dict` for #: `identity comparisons #: `_. - METADATA_UNKNOWN: Mapping[bytes, bytes] = {} + METADATA_UNKNOWN: Mapping[str, bytes] = {} def __init__(self, name: str, local: bool) -> None: super().__init__() @@ -37,11 +37,11 @@ def __init__(self, name: str, local: bool) -> None: WeakKeyDictionary() self._status = Status.OFFLINE self._status_time = time.time() - self._metadata: frozenset[tuple[bytes, bytes]] = frozenset() + self._metadata: frozenset[tuple[str, bytes]] = frozenset() self._metadata_dict = self.METADATA_UNKNOWN self._pending_clock: Optional[int] = None self._pending_status: Optional[Status] = None - self._pending_metadata: Optional[frozenset[tuple[bytes, bytes]]] = None + self._pending_metadata: Optional[frozenset[tuple[str, bytes]]] = None def __eq__(self, other: Any) -> bool: if isinstance(other, Member): @@ -57,7 +57,7 @@ def __hash__(self) -> int: return hash(self.name) def __repr__(self) -> str: - return f'Member<{self.name}>' + return f'Member<{self.name} {self.status.name}>' @property def source(self) -> tuple[str, bytes]: @@ -83,7 +83,7 @@ def status_time(self) -> float: return self._status_time @property - def metadata(self) -> Mapping[bytes, bytes]: + def metadata(self) -> Mapping[str, bytes]: """The last known :term:`metadata` of the cluster member.""" return self._metadata_dict @@ -102,7 +102,7 @@ def _set_status(self, status: Status) -> None: if transition != self._status: self._pending_status = transition - def _set_metadata(self, metadata: Mapping[bytes, bytes]) -> None: + def _set_metadata(self, metadata: Mapping[str, bytes]) -> None: assert self._pending_metadata is None pending_metadata = frozenset(metadata.items()) if pending_metadata != self._metadata: @@ -144,7 +144,7 @@ class Members(Set[Member]): """ - def __init__(self, config: Config) -> None: + def __init__(self, config: BaseConfig) -> None: super().__init__() self.listener: Final = Listener(Member) self._next_clock = 1 @@ -168,12 +168,13 @@ def __len__(self) -> int: return len(self._members) def _refresh_statuses(self, member: Member) -> None: - member_status = member.status - for status in Status: - if member_status & status: - self._statuses[status].add(member) - else: - self._statuses[status].discard(member) + if not member.local: + member_status = member.status + for status in Status: + if member_status & status: + self._statuses[status].add(member) + else: + self._statuses[status].discard(member) @property def local(self) -> Member: @@ -245,7 +246,7 @@ def get(self, name: str, validity: Optional[bytes] = None) -> Member: def _update(self, member: Member, source: Optional[Member], clock: int, status: Optional[Status], - metadata: Optional[Mapping[bytes, bytes]]) -> None: + metadata: Optional[Mapping[str, bytes]]) -> None: next_clock = self._next_clock member._set_clock(clock, next_clock) if status is not None: @@ -260,7 +261,7 @@ def _update(self, member: Member, source: Optional[Member], def update(self, member: Member, *, new_status: Optional[Status] = None, - new_metadata: Optional[Mapping[bytes, bytes]] = None) -> None: + new_metadata: Optional[Mapping[str, bytes]] = None) -> None: """Update the cluster member status or metadata. Args: @@ -272,7 +273,7 @@ def update(self, member: Member, *, self._update(member, None, self._next_clock, new_status, new_metadata) def apply(self, member: Member, source: Member, clock: int, *, - status: Status, metadata: Optional[Mapping[bytes, bytes]]) \ + status: Status, metadata: Optional[Mapping[str, bytes]]) \ -> None: """Apply a disseminated update from *source* to *member*. diff --git a/swimprotocol/packet.py b/swimprotocol/packet.py index 6d55d7b..4922eea 100644 --- a/swimprotocol/packet.py +++ b/swimprotocol/packet.py @@ -7,7 +7,7 @@ from .status import Status -__all__ = ['Packet', 'Ping', 'PingReq', 'Ack', 'Gossip'] +__all__ = ['Packet', 'Ping', 'PingReq', 'Ack', 'Gossip', 'GossipAck'] @dataclass(frozen=True) @@ -76,7 +76,7 @@ class Gossip(Packet): name: str clock: int status: Status - metadata: Optional[Mapping[bytes, bytes]] + metadata: Optional[Mapping[str, bytes]] @dataclass(frozen=True) diff --git a/swimprotocol/plugin.py b/swimprotocol/plugin.py deleted file mode 100644 index 31dcd13..0000000 --- a/swimprotocol/plugin.py +++ /dev/null @@ -1,80 +0,0 @@ - -from __future__ import annotations - -from collections.abc import Mapping -from typing import TypeVar, Generic, Final, Optional - -from pkg_resources import iter_entry_points, DistributionNotFound - -__all__ = ['Plugins'] - -TypeT = TypeVar('TypeT', bound='type') - - -class Plugins(Generic[TypeT]): - """Allows a generic base type to be implemented using - `plugins `_ - defined internally in ``setup.py`` or other Python libraries. - - Args: - base: The base type to be implemented. - group: The entry point group name. - - """ - - def __init__(self, base: TypeT, group: str) -> None: - super().__init__() - self.base: Final = base - self.group: Final = group - self._failures: dict[str, DistributionNotFound] = {} - self._loaded: Optional[dict[str, TypeT]] = None - - @property - def loaded(self) -> Mapping[str, TypeT]: - """A mapping of plugin name to the *base* implementation sub-class. - Accessing this property will cause lazy-loading of all plugins, and - plugins that failed to load will not appear. - - """ - loaded = self._loaded - if loaded is None: - self._loaded = loaded = self._load(self.group) - return loaded - - def _load(self, group: str) -> dict[str, TypeT]: - loaded = {} - for entry_point in iter_entry_points(group): - name = entry_point.name - try: - loaded_type = entry_point.load() - except DistributionNotFound as exc: - self._failures[name] = exc - else: - loaded[name] = loaded_type - return loaded - - def choose(self, name: str) -> TypeT: - """Given a plugin name, return the *base* implementation sub-class as - loaded from the entry points. - - Args: - name: The name of the plugin entry point. - - Raises: - DistributionNotFound: The plugin failed to load due to missing - dependencies. - ValueError: The plugin name was not found. - - """ - loaded = self.loaded - if name not in loaded: - if name in self._failures: - raise self._failures[name] - else: - msg = f'{name!r} is not a valid {self.group!r} plugin' - raise ValueError(msg) - return loaded[name] - - def __repr__(self) -> str: - cls_name = self.__class__.__name__ - return f'{cls_name}({self.base!r}, {self.group!r}' diff --git a/swimprotocol/py.typed b/swimprotocol/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/swimprotocol/transport.py b/swimprotocol/transport.py index 80fdedb..d76f5bf 100644 --- a/swimprotocol/transport.py +++ b/swimprotocol/transport.py @@ -1,53 +1,69 @@ from __future__ import annotations -from abc import abstractmethod -from argparse import ArgumentParser +from abc import abstractmethod, ABCMeta from contextlib import AbstractAsyncContextManager -from typing import Protocol +from typing import Generic, TypeVar, Final, ClassVar, Optional -from .config import Config +from pkg_resources import iter_entry_points, DistributionNotFound + +from .config import ConfigT_co, BaseConfig from .members import Members -from .plugin import Plugins from .worker import Worker -__all__ = ['Transport', 'transport_plugins'] +__all__ = ['TransportT', 'load_transport', 'Transport'] + +#: Type variable for :class:`Transport` implementations. +TransportT = TypeVar('TransportT', bound='Transport[BaseConfig]') + +def load_transport(name: str = 'udp', *, group: str = __name__) \ + -> type[Transport[BaseConfig]]: + """Load and return the :class:`Transport` implementation by *name*. -class Transport(Protocol): + Args: + name: The name of the transport entry point. + group: The :mod:`pkg_resources` entry point group. + + Raises: + DistributionNotFound: A dependency of the transport entry point was not + able to be satisfied. + KeyError: The given name did not exist in the entry point group. + + """ + last_exc: Optional[DistributionNotFound] = None + for entry_point in iter_entry_points(group, name): + try: + transport_type: type[Transport[BaseConfig]] = entry_point.load() + except DistributionNotFound as exc: + last_exc = exc + else: + return transport_type + if last_exc is not None: + raise last_exc + else: + raise KeyError(f'{name!r} entry point not found in {group!r}') + + +class Transport(Generic[ConfigT_co], metaclass=ABCMeta): """Interface of the basic functionality needed to act as the :term:`transport` layer for the SWIM protocol. The transport layer is responsible for sending and receiving :term:`ping`, :term:`ping-req`, and :term:`ack` packets for failure detection, and transmitting :term:`gossip` for dissemination. - """ - - @classmethod - @abstractmethod - def add_arguments(cls, name: str, parser: ArgumentParser) -> None: - """Additional configuration needed by the transport may be added to the - current argument *parser*. - - Args: - name: The name of the transport plugin. - parser: The argument parser. + Args: + config: The cluster config object. - """ - ... - - @classmethod - @abstractmethod - def init(cls, config: Config) -> Transport: - """Initializes the :class:`Transport` and returns a new instance - given the *config*. Any arguments added by :meth:`.add_arguments` can - be accessed on ``config.args``. + """ - Args: - config: The cluster configuration object. + #: The :class:`~swimprotocol.config.BaseConfig` sub-class used by this + #: transport. + config_type: ClassVar[type[ConfigT_co]] - """ - ... + def __init__(self, config: ConfigT_co) -> None: + super().__init__() + self.config: Final = config @abstractmethod def enter(self, members: Members) -> AbstractAsyncContextManager[Worker]: @@ -60,7 +76,3 @@ def enter(self, members: Members) -> AbstractAsyncContextManager[Worker]: """ ... - - -#: Manages the loading and access of :class:`Transport` implementations. -transport_plugins = Plugins(Transport, __name__) diff --git a/swimprotocol/udp/__init__.py b/swimprotocol/udp/__init__.py index e1a793f..8507f3e 100644 --- a/swimprotocol/udp/__init__.py +++ b/swimprotocol/udp/__init__.py @@ -2,23 +2,81 @@ from __future__ import annotations import asyncio -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from collections.abc import AsyncIterator from contextlib import asynccontextmanager, closing -from typing import Final, Optional +from typing import Final, Any, Optional from .protocol import SwimProtocol from .pack import UdpPack from ..address import AddressParser -from ..config import Config +from ..config import BaseConfig from ..members import Members from ..transport import Transport from ..worker import Worker -__all__ = ['UdpTransport'] +__all__ = ['UdpConfig', 'UdpTransport'] -class UdpTransport(Transport): +class UdpConfig(BaseConfig): + """Implements :class:`~swimprotocol.config.BaseConfig`, adding additional + configuration required for :class:`UdpTransport`. + + Args: + bind_host: The hostname or IP address to bind the UDP socket. The + hostname from the *local_name* address is used by default. + bind_port: The port number to bind the UDP socket. The port from the + *local_name* address is used by default. + default_host: The hostname or IP address to connect to if an address + string does not specify a hostname, e.g. ``':1234'``. + default_port: The port number to connect to if an address string does + not specify a port number, e.g. ``'myhost'``. + kwargs: Additional keyword arguments passed to the + :class:`~swimprotocol.config.BaseConfig` constructor. + + """ + + def __init__(self, *, bind_host: Optional[str] = None, + bind_port: Optional[int] = None, + default_host: Optional[str] = None, + default_port: Optional[int] = None, + **kwargs: Any) -> None: + super().__init__(**kwargs) + self.bind_host: Final = bind_host + self.bind_port: Final = bind_port + self.default_host: Final = default_host + self.default_port: Final = default_port + + @classmethod + def add_arguments(cls, parser: ArgumentParser, *, + prefix: str = '--') -> None: + super().add_arguments(parser, prefix=prefix) + group = parser.add_argument_group('swim udp options') + group.add_argument(f'{prefix}udp-bind', metavar='INTERFACE', + dest='swim_udp_bind', + help='The bind IP address.') + group.add_argument(f'{prefix}udp-bind-port', metavar='NUM', + dest='swim_udp_bind_port', + help='The bind port.') + group.add_argument(f'{prefix}udp-host', metavar='NAME', + dest='swim_udp_host', + help='The default remote hostname.') + group.add_argument(f'{prefix}udp-port', metavar='NUM', type=int, + dest='swim_udp_port', + help='The default port number.') + + @classmethod + def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \ + -> dict[str, Any]: + kwargs = super().parse_args(args, env_prefix=env_prefix) + return kwargs | { + 'bind_host': args.swim_udp_bind, + 'bind_port': args.swim_udp_bind_port, + 'default_host': args.swim_udp_host, + 'default_port': args.swim_udp_port} + + +class UdpTransport(Transport[UdpConfig]): """Implements :class:`~swimprotocol.transport.Transport` using UDP, without `broadcast `_. @@ -31,13 +89,13 @@ class UdpTransport(Transport): """ - def __init__(self, config: Config) -> None: - super().__init__() - self.config: Final = config - self.args: Final = config.args + config_type = UdpConfig + + def __init__(self, config: UdpConfig) -> None: + super().__init__(config) self.address_parser: Final = AddressParser( - default_host=config.args.udp_host, - default_port=config.args.udp_port) + default_host=config.default_host, + default_port=config.default_port) self.udp_pack: Final = UdpPack(config.signatures) self._local_address = self.address_parser.parse(config.local_name) @@ -50,8 +108,8 @@ def bind_host(self) -> str: :func:`asyncio.loop.create_datagram_endpoint` """ - bind: Optional[str] = self.args.udp_bind - return bind or self._local_address.host + bind_host: Optional[str] = self.config.bind_host + return bind_host or self._local_address.host @property def bind_port(self) -> int: @@ -61,7 +119,8 @@ def bind_port(self) -> int: :func:`asyncio.loop.create_datagram_endpoint` """ - return self._local_address.port + bind_port: Optional[int] = self.config.bind_port + return bind_port or self._local_address.port @asynccontextmanager async def enter(self, members: Members) -> AsyncIterator[Worker]: @@ -75,15 +134,15 @@ async def enter(self, members: Members) -> AsyncIterator[Worker]: yield worker @classmethod - def add_arguments(cls, name: str, parser: ArgumentParser) -> None: + def add_arguments(cls, name: str, parser: ArgumentParser, *, + prefix: str = '--udp') -> None: group = parser.add_argument_group(f'{name} options') - group.add_argument('--udp-bind', metavar='INTERFACE', + group.add_argument(f'{prefix}-bind', metavar='INTERFACE', + dest='swim_udp_bind', help='The bind IP address.') - group.add_argument('--udp-host', metavar='NAME', + group.add_argument(f'{prefix}-host', metavar='NAME', + dest='swim_udp_host', help='The default remote hostname.') - group.add_argument('--udp-port', metavar='NUM', type=int, + group.add_argument(f'{prefix}-port', metavar='NUM', type=int, + dest='swim_udp_port', help='The default port number.') - - @classmethod - def init(cls, config: Config) -> Transport: - return cls(config) diff --git a/swimprotocol/worker.py b/swimprotocol/worker.py index 748544c..ce6a9fb 100644 --- a/swimprotocol/worker.py +++ b/swimprotocol/worker.py @@ -9,7 +9,7 @@ from typing import final, Protocol, Final, Optional, NoReturn from weakref import WeakSet, WeakKeyDictionary -from .config import Config +from .config import BaseConfig from .members import Member, Members from .packet import Packet, Ping, PingReq, Ack, Gossip, GossipAck from .status import Status @@ -58,7 +58,7 @@ class Worker: """ - def __init__(self, config: Config, members: Members, io: IO) -> None: + def __init__(self, config: BaseConfig, members: Members, io: IO) -> None: super().__init__() self.config: Final = config self.members: Final = members @@ -120,7 +120,7 @@ async def _run_handler(self) -> NoReturn: def _build_gossip(self, local: Member, member: Member) -> Gossip: if member.metadata is Member.METADATA_UNKNOWN: - metadata: Optional[Mapping[bytes, bytes]] = None + metadata: Optional[Mapping[str, bytes]] = None else: metadata = member.metadata return Gossip(source=local.source, name=member.name,