Skip to content

Commit

Permalink
Merge pull request #11 from icgood/transient
Browse files Browse the repository at this point in the history
Transient config errors for UDP discovery issues
  • Loading branch information
icgood committed May 10, 2021
2 parents fa4b831 + 1dfec9b commit 10a50f6
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 17 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ These snippets demonstrate the UDP transport layer directly. For a more generic
approach that uses [argparse][11] and [load_transport][12], check out the
[demo][2].

If your application is deployed as a [Docker Service][13], the [UdpConfig][100]
`discovery=True` keyword argument can be used to discover configuration based
on the service name. See the [documentation][14] for more comprehensive usage.

### Checking Members

The [Members][101] object provides a few ways to check on the cluster and its
Expand Down Expand Up @@ -156,6 +160,8 @@ hinting to the extent possible and common in the rest of the codebase.
[10]: https://en.wikipedia.org/wiki/Maximum_transmission_unit
[11]: https://docs.python.org/3/library/argparse.html
[12]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.transport.load_transport
[13]: https://docs.docker.com/engine/swarm/how-swarm-mode-works/services/
[14]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#docker-services

[100]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#swimprotocol.udp.UdpConfig
[101]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.members.Member
Expand Down
18 changes: 12 additions & 6 deletions docs/intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,20 @@ Glossary
in the cluster based on their most recently known :term:`status` and
:term:`metadata`.

peer member
One of the :term:`member` instances that is remote in the cluster. A copy
of each peer member's :term:`status` and :term:`metadata` is maintained
by the :term:`local member` and updated based on failure detection and
dissemination.

status
One of three states that a remote :term:`member` can hold, as perceived
the :term:`local member`: :term:`online`, :term:`suspect`, or
One of three states that a :term:`peer member` can hold, as perceived the
:term:`local member`: :term:`online`, :term:`suspect`, or
:term:`offline`.

online : status
A :term:`status` meaning recent failure detection attempts have
successfully received an :term:`ack` from the remote :term:`member`.
successfully received an :term:`ack` from the :term:`peer member`.

suspect : status
A :term:`status` meaning that a recently-online :term:`member` has not
Expand Down Expand Up @@ -100,13 +106,13 @@ Glossary
`datagrams <https://en.wikipedia.org/wiki/Datagram>`_.

ping : packet
A :term:`packet` that requests that a remote :term:`member` reply to the
A :term:`packet` that requests that a :term:`peer member` reply to the
source :term:`member` with an :term:`ack`. This is the most basic attempt
to detect when members have gone offline.

ping-req : packet
A :term:`packet` that requests that a remote :term:`member` send its own
:term:`ping` to a second remote :term:`member`. If the recipient receives
A :term:`packet` that requests that a :term:`peer member` send its own
:term:`ping` to a second :term:`peer member`. If the recipient receives
an :term:`ack`, it is forwarded back to the source :term:`member`.

ack : packet
Expand Down
35 changes: 35 additions & 0 deletions docs/swimprotocol.udp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,41 @@

.. automodule:: swimprotocol.udp

Docker Services
---------------

If your application is deployed as a `Docker Service`_, the
:class:`~swimprotocol.udp.UdpConfig` ``discovery=True`` keyword argument can be
used to discover configuration based on the service name. For example::

config = UdpConfig(local_name='tasks.my-service:9999', discovery=True, ...)

Docker provides a `tasks`_ DNS lookup that resolves to the IP addresses of all
replicas of the service. In this example, ``tasks.my-service`` is resolved to
these IP addresses. The IP address local to the container is chosen as the
:term:`local member` and the rest are :term:`peer members <peer member>`.

In practice, this DNS lookup is often not immediately successful when the
replicas start up. A service may also be scaled down to a single replica, which
has no need of a cluster. These scenarios will raise a
:exc:`~swimprotocol.config.TransientConfigError` with a *wait_hint* value. This
exception can be caught to continuously retry the cluster configuration until
successful::

async def start() -> None:
while True:
try:
config = UdpConfig(local_name='tasks.my-service:9999',
discovery=True, ...)
except TransientConfigError as exc:
await asyncio.sleep(exc.wait_hint)
else:
break
# ...

.. _Docker Service: https://docs.docker.com/engine/swarm/how-swarm-mode-works/services/
.. _tasks: https://docs.docker.com/network/overlay/#container-discovery

``swimprotocol.udp.pack``
-------------------------

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
license = f.read()

setup(name='swim-protocol',
version='0.3.5',
version='0.3.6',
author='Ian Good',
author_email='ian@icgood.net',
description='SWIM protocol implementation for exchanging cluster '
Expand Down
22 changes: 21 additions & 1 deletion swimprotocol/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .sign import Signatures

__all__ = ['ConfigT_co', 'ConfigError', 'BaseConfig']
__all__ = ['ConfigT_co', 'ConfigError', 'TransientConfigError', 'BaseConfig']

#: Covariant type variable for :class:`BaseConfig` sub-classes.
ConfigT_co = TypeVar('ConfigT_co', bound='BaseConfig', covariant=True)
Expand All @@ -24,6 +24,24 @@ class ConfigError(Exception):
pass


class TransientConfigError(ConfigError):
"""Raised when a possibly-temporary failure has prevented configuration of
the cluster. This exception is often chained with the cause, e.g.
:exc:`OSError`. Importantly, this exception indicates that configuration of
the cluster may succeed eventually if retried.
Args:
msg: The exception message.
wait_hint: A suggested :func:`~asyncio.sleep` time before trying again.
"""

def __init__(self, msg: Optional[str] = None, *,
wait_hint: float = 60.0) -> None:
super().__init__(msg)
self.wait_hint: Final = wait_hint


class BaseConfig(metaclass=ABCMeta):
"""Configure the cluster behavior and characteristics.
:class:`~swimprotocol.transport.Transport` implementations should
Expand All @@ -49,6 +67,8 @@ class BaseConfig(metaclass=ABCMeta):
Raises:
ConfigError: The given configuration was invalid.
TransientConfigError: The configuration failed due to a failure that
may not be permanent.
"""

Expand Down
30 changes: 21 additions & 9 deletions swimprotocol/udp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import socket
from argparse import ArgumentParser, Namespace
from collections.abc import Sequence, AsyncIterator
from contextlib import asynccontextmanager, closing
from contextlib import asynccontextmanager, closing, suppress
from typing import Final, Any, Optional

from .protocol import SwimProtocol
from .pack import UdpPack
from ..address import Address, AddressParser
from ..config import BaseConfig, ConfigError
from ..config import BaseConfig, ConfigError, TransientConfigError
from ..members import Members
from ..transport import Transport
from ..worker import Worker
Expand Down Expand Up @@ -98,25 +98,37 @@ def _discover(cls, address_parser: AddressParser,
local_addr = address_parser.parse(local_name)
resolved = cls._resolve_name(local_addr.host)
local_ip = cls._find_local_ip(local_addr.host, local_addr.port)
resolved.discard(local_ip)
if local_ip not in resolved:
raise TransientConfigError(
f'Invalid local IP: {local_ip!r} not in {resolved!r}')
resolved.remove(local_ip)
if not resolved:
raise TransientConfigError(
f'Could not find peers: {local_addr.host}')
resolved_peers = {str(Address(peer_ip, local_addr.port))
for peer_ip in resolved}
kwargs['local_name'] = str(Address(local_ip, local_addr.port))
kwargs['peers'] = list(resolved_peers | set(given_peers))

@classmethod
def _resolve_name(cls, hostname: str) -> set[str]:
_, _, ipaddrlist = socket.gethostbyname_ex(hostname)
assert ipaddrlist
try:
_, _, ipaddrlist = socket.gethostbyname_ex(hostname)
except OSError as exc:
raise TransientConfigError(f'Could not resolve name: {hostname}',
wait_hint=10.0) from exc
if not ipaddrlist:
raise TransientConfigError(f'Name resolved empty: {hostname}')
return set(ipaddrlist)

@classmethod
def _find_local_ip(cls, hostname: str, port: int) -> str:
def _find_local_ip(cls, hostname: str, port: int) -> Optional[str]:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect((hostname, port))
with closing(sock):
with closing(sock), suppress(OSError):
sock.connect((hostname, port))
sockname: tuple[str, int] = sock.getsockname()
return sockname[0]
return sockname[0]
return None


class UdpTransport(Transport[UdpConfig]):
Expand Down

0 comments on commit 10a50f6

Please sign in to comment.