Skip to content

Commit

Permalink
Merge pull request #1 from icgood/config
Browse files Browse the repository at this point in the history
Improve config for external usage
  • Loading branch information
icgood committed May 7, 2021
2 parents e09fbc5 + aa07288 commit 7833fc0
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 242 deletions.
42 changes: 23 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,32 @@ synchronize a distributed group of processes.
$ pip install swim-protocol
```

#### Running the Demo

There is a [demo][2] application included as a reference implementation. Try it
out by running the following, each from a new terminal window, and use _Ctrl-C_
to exit:

```console
$ swim-protocol-demo -c -m name one 127.0.0.1:2001 127.0.0.1:2003
$ swim-protocol-demo -c -m name two 127.0.0.1:2002 127.0.0.1:2001
$ swim-protocol-demo -c -m name three 127.0.0.1:2003 127.0.0.1:2001
$ swim-protocol-demo -c -m name four 127.0.0.1:2004 127.0.0.1:2003
$ swim-protocol-demo -c --name 127.0.0.1:2001 --peer 127.0.0.1:2003 --metadata name one
$ swim-protocol-demo -c --name 127.0.0.1:2002 --peer 127.0.0.1:2001 --metadata name two
$ swim-protocol-demo -c --name 127.0.0.1:2003 --peer 127.0.0.1:2001 --metadata name three
$ swim-protocol-demo -c --name 127.0.0.1:2004 --peer 127.0.0.1:2003 --metadata name four
```

Every 10 seconds or so, each member will randomize its `token` metadata field,
which should be disseminated across the cluster with [eventual consistency][6].

### Getting Started

First you should create a new [Config][100] object:
First you should create a new [UdpConfig][100] object:

```python
from argparse import ArgumentParser
from swimprotocol.config import Config

parser = ArgumentParser(...)
args = parser.parse_args()
config = Config(args, secret=b'...',
local_name='127.0.0.1:2001',
local_metadata={b'name': b'one'},
peers=['127.0.0.1:2002'])
from swimprotocol.udp import UdpConfig

config = UdpConfig(local_name='127.0.0.1:2001',
local_metadata={'name': b'one'},
peers=['127.0.0.1:2002'])
```

All other config arguments have default values, which are tuned somewhat
Expand All @@ -61,16 +59,20 @@ the event loop:

```python
from contextlib import AsyncExitStack
from swimprotocol.transport import transport_plugins
from swimprotocol.members import Members
from swimprotocol.udp import UdpTransport

transport = transport_plugins.choose('udp').init(config)
transport = UdpTransport(config)
members = Members(config)
async with AsyncExitStack() as stack:
worker = await stack.enter_async_context(transport.enter(members))
await worker.run() # or schedule as a task
```

These snippets demonstrate the UDP transport layer directly. For a more generic
approach that uses [argparse][11] and [load_transport][12], check out the
[demo][2].

### Checking Members

The [Members][101] object provides a few ways to check on the cluster and its
Expand All @@ -92,12 +94,12 @@ Alternatively, listen for status or metadata changes on all members:
```python
from swimprotocol.member import Member

async def _on_updated(member: Member) -> None:
async def _updated(member: Member) -> None:
print('updated:', member.name, member.status, member.metadata)

async with AsyncExitStack() as stack:
# ...
stack.enter_context(members.listener.on_notify(_on_updated))
stack.enter_context(members.listener.on_notify(_updated))
```

### UDP Transport Security
Expand Down Expand Up @@ -152,7 +154,9 @@ hinting to the extent possible and common in the rest of the codebase.
[8]: https://en.wikipedia.org/wiki/Shared_secret
[9]: https://docs.python.org/3/library/uuid.html#uuid.getnode
[10]: https://en.wikipedia.org/wiki/Maximum_transmission_unit
[11]: https://docs.python.org/3/library/argparse.html
[12]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.transport.load_transport

[100]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.config.Config
[100]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#swimprotocol.udp.UdpConfig
[101]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.members.Member
[102]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#swimprotocol.udp.UdpTransport
8 changes: 7 additions & 1 deletion docs/intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Glossary
false positives.

metadata
An immutable mapping of key/value :class:`bytes` associated with each
An immutable mapping of key/value strings associated with each
:term:`member`. New mappings may be assigned, and the latest mapping will
always be disseminated across the cluster.

Expand Down Expand Up @@ -122,6 +122,12 @@ Glossary
is always higher than any other observed value, used to determine whether
:term:`gossip` is new enough to be applied or disseminated.

demo
The included `demo`_ is designed to randomize metadata changes on an
interval to see them disseminated across the cluster, as well as watch as
:term:`member` statuses change as demo instances are stopped and started.

.. _SWIM: https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf
.. _Serf: https://www.serf.io/docs/internals/gossip.html
.. _Lamport timestamp: https://en.wikipedia.org/wiki/Lamport_timestamp
.. _demo: https://github.com/icgood/swim-protocol#running-the-demo
6 changes: 1 addition & 5 deletions docs/swimprotocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
-------------------------

.. automodule:: swimprotocol.listener
:special-members: __call__

``swimprotocol.members``
------------------------
Expand All @@ -30,11 +31,6 @@

.. automodule:: swimprotocol.packet

``swimprotocol.plugin``
-----------------------

.. automodule:: swimprotocol.plugin

``swimprotocol.shuffle``
------------------------

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
license = f.read()

setup(name='swim-protocol',
version='0.2.0',
version='0.3.0',
author='Ian Good',
author_email='ian@icgood.net',
description='SWIM protocol implementation for exchanging cluster '
Expand Down
138 changes: 117 additions & 21 deletions swimprotocol/config.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@

from __future__ import annotations

from argparse import Namespace
import os
from abc import ABCMeta
from argparse import ArgumentParser, Namespace
from collections.abc import Mapping, Sequence
from typing import Final, Union
from typing import final, TypeVar, Final, Any, Union

from .sign import Signatures

__all__ = ['Config']
__all__ = ['ConfigT_co', 'ConfigError', 'BaseConfig']

#: Covariant type variable for :class:`BaseConfig` sub-classes.
ConfigT_co = TypeVar('ConfigT_co', bound='BaseConfig', covariant=True)

class Config:

class ConfigError(Exception):
"""Raised when the configuration is insufficient or invalid for running a
cluster, along with a human-readable message about what was wrong.
"""
pass


class BaseConfig(metaclass=ABCMeta):
"""Configure the cluster behavior and characteristics.
:class:`~swimprotocol.transport.Transport` implementations should
sub-class to add additional configuration.
Args:
args: Command-line arguments namespace.
secret: The shared secret for cluster packet signatures.
local_name: The unique name of the local cluster member.
local_metadata: The local cluster member metadata.
Expand All @@ -32,12 +46,14 @@ class Config:
sync_interval: Time between sync attempts to disseminate cluster
changes.
Raises:
ConfigError: The given configuration was invalid.
"""

def __init__(self, args: Namespace, *,
secret: Union[str, bytes],
def __init__(self, *, secret: Union[None, str, bytes],
local_name: str,
local_metadata: Mapping[bytes, bytes],
local_metadata: Mapping[str, bytes],
peers: Sequence[str],
ping_interval: float = 1.0,
ping_timeout: float = 0.3,
Expand All @@ -46,7 +62,6 @@ def __init__(self, args: Namespace, *,
suspect_timeout: float = 5.0,
sync_interval: float = 0.5) -> None:
super().__init__()
self.args: Final = args
self._signatures = Signatures(secret)
self.local_name: Final = local_name
self.local_metadata: Final = local_metadata
Expand All @@ -57,22 +72,103 @@ def __init__(self, args: Namespace, *,
self.ping_req_timeout: Final = ping_req_timeout
self.suspect_timeout: Final = suspect_timeout
self.sync_interval: Final = sync_interval
self._validate()

def _validate(self) -> None:
if not self.local_name:
raise ConfigError('This cluster instance needs a local name.')
elif not self.peers:
raise ConfigError('At least one cluster peer name is required.')

@property
def signatures(self) -> Signatures:
"""Generates and verifies cluster packet signatures."""
return self._signatures

@classmethod
def from_args(cls, args: Namespace) -> Config:
"""Build a :class:`Config` from command-line arguments and sensible
defaults.
def add_arguments(cls, parser: ArgumentParser, *,
prefix: str = '--') -> None:
"""Implementations (such as the :term:`demo`) may use this method to
add command-line based configuration for the transport.
Note:
Arguments added should use *prefix* and explicitly provide a unique
name, e.g.::
parser.add_argument(f'{prefix}arg', dest='swim_arg', ...)
This prevents collision with other argument names and allows custom
*prefix* values without affecting the :class:`~argparse.Namespace`.
Args:
args: The command-line arguments namespace.
parser: The argument parser.
prefix: The prefix for added arguments, which should start with
``--`` and end with ``-``, e.g. ``'--'`` or ``'--foo-'``.
"""
return cls(args, secret=args.secret,
local_name=args.local,
local_metadata=dict(args.metadata),
peers=args.peers)
group = parser.add_argument_group('swim options')
group.add_argument(f'{prefix}metadata', dest='swim_metadata',
nargs=2, metavar=('KEY', 'VAL'),
default=[], action='append',
help='Metadata for this node.')
group.add_argument(f'{prefix}secret', dest='swim_secret',
metavar='STRING',
help='The secret string used to verify messages.')
group.add_argument(f'{prefix}name', dest='swim_name',
metavar='localname',
help='External name or address for this node.')
group.add_argument(f'{prefix}peer', dest='swim_peers',
metavar='peername', action='append', default=[],
help='At least one name or address of '
'a known peer.')

@property
def signatures(self) -> Signatures:
"""Generates and verifies cluster packet signatures."""
return self._signatures
@classmethod
def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \
-> dict[str, Any]:
"""Parse the given :class:`~argparse.Namespace` into a dictionary of
keyword arguments for the :class:`BaseConfig` constructor. Sub-classes
should override this method to add additional keyword arguments as
needed.
The :func:`os.getenv` function can also be used, and will take priority
over values in *args*:
* ``SWIM_SECRET``: The *secret* keyword argument.
* ``SWIM_NAME``: The *local_name* keyword argument.
* ``SWIM_PEERS``: Comma-separated *peers* keyword argument.
Args:
args: The command-line arguments.
env_prefix: Prefix for the environment variables.
"""
secret = os.getenv(f'{env_prefix}_SECRET', args.swim_secret)
local_name = os.getenv(f'{env_prefix}_NAME', args.swim_name)
local_metadata = {key: val.encode('utf-8')
for key, val in args.swim_metadata}
env_peers = os.getenv(f'{env_prefix}_PEERS')
if env_peers is not None:
peers = env_peers.split(',')
else:
peers = args.swim_peers
return {'secret': secret,
'local_name': local_name,
'local_metadata': local_metadata,
'peers': peers}

@final
@classmethod
def from_args(cls: type[ConfigT_co], args: Namespace,
**overrides: Any) -> ConfigT_co:
"""Build and return a new cluster config object. This first calls
:meth:`.parse_args` and then passes the results as keyword arguments
to the constructor.
Args:
args: The command-line arguments.
overrides: Keyword arguments to override.
"""
kwargs = cls.parse_args(args)
kwargs |= overrides
return cls(**kwargs)
47 changes: 19 additions & 28 deletions swimprotocol/demo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,42 @@
from .changes import change_metadata
from .log import run_logging
from .screen import run_screen
from ..config import Config
from ..config import BaseConfig, ConfigError
from ..members import Members
from ..transport import transport_plugins
from ..transport import load_transport, Transport

__all__ = ['main']


def main() -> int:
parser = ArgumentParser(description=__doc__)
parser.add_argument('-m', '--metadata', nargs=2, metavar=('KEY', 'VAL'),
default=[], action='append', type=_utf8,
help='Metadata for this node.')
parser.add_argument('-t', '--transport', metavar='NAME', default='udp',
help='The transport plugin name.')
parser.add_argument('-s', '--secret', metavar='STRING',
help='The secret string used to verify messages.')
parser.add_argument('-c', '--curses', action='store_true',
help='Enable the curses display.')
parser.add_argument('-i', '--token-interval', type=float, default=10.0,
help='Cluster member token update interval.')
parser.add_argument('local', metavar='localname',
help='External name or address for this node.')
parser.add_argument('peers', metavar='peername', nargs='+',
help='At least one name or address of a known peer.')
parser.set_defaults(config_type=Config)

for transport_name, transport_type in transport_plugins.loaded.items():
transport_type.add_arguments(transport_name, parser)
transport_type = load_transport()
transport_type.config_type.add_arguments(parser)

group = parser.add_argument_group('swim demo options')
group.add_argument('-c', '--curses', action='store_true',
help='Enable the curses display.')
group.add_argument('-i', '--token-interval', metavar='SECONDS',
type=float, default=10.0,
help='Cluster member token update interval.')

args = parser.parse_args()

logging.basicConfig(level=logging.INFO,
format='%(asctime)-15s %(name)s %(message)s')

return asyncio.run(run(args))


def _utf8(val: str) -> bytes:
return val.encode('utf-8')
try:
return asyncio.run(run(transport_type, args))
except ConfigError as exc:
parser.error(str(exc))


async def run(args: Namespace) -> int:
async def run(transport_type: type[Transport[BaseConfig]],
args: Namespace) -> int:
loop = asyncio.get_running_loop()
config: Config = args.config_type.from_args(args)
transport = transport_plugins.choose(args.transport).init(config)
config = transport_type.config_type.from_args(args)
transport = transport_type(config)
members = Members(config)
async with AsyncExitStack() as stack:
stack.enter_context(suppress(CancelledError))
Expand Down

0 comments on commit 7833fc0

Please sign in to comment.