Skip to content

Commit

Permalink
Merge pull request #10 from icgood/dns
Browse files Browse the repository at this point in the history
Resolve local name by DNS for peers
  • Loading branch information
icgood committed May 9, 2021
2 parents 10271ab + 9570f03 commit fa4b831
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 11 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
license = f.read()

setup(name='swim-protocol',
version='0.3.4',
version='0.3.5',
author='Ian Good',
author_email='ian@icgood.net',
description='SWIM protocol implementation for exchanging cluster '
Expand Down
1 change: 0 additions & 1 deletion swimprotocol/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \
"""
secret = args.swim_secret or cls._get_env(env_prefix, 'SECRET')
local_name = args.swim_name or cls._get_env(env_prefix, 'NAME')
local_name = os.getenv(f'{env_prefix}_NAME', args.swim_name)
local_metadata = {key: val.encode('utf-8')
for key, val in args.swim_metadata}
peers = args.swim_peers or cls._get_env_list(env_prefix, 'PEERS')
Expand Down
59 changes: 50 additions & 9 deletions swimprotocol/udp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from __future__ import annotations

import asyncio
import socket
from argparse import ArgumentParser, Namespace
from collections.abc import AsyncIterator
from collections.abc import Sequence, AsyncIterator
from contextlib import asynccontextmanager, closing
from typing import Final, Any, Optional

from .protocol import SwimProtocol
from .pack import UdpPack
from ..address import AddressParser
from ..config import BaseConfig
from ..address import Address, AddressParser
from ..config import BaseConfig, ConfigError
from ..members import Members
from ..transport import Transport
from ..worker import Worker
Expand All @@ -31,6 +32,9 @@ class UdpConfig(BaseConfig):
string does not specify a hostname, e.g. ``':1234'``.
default_port: The port number to connect to if an address string does
not specify a port number, e.g. ``'myhost'``.
discovery: Resolve the local address as a DNS **A**/**AAAA** record
containing peers. The local IP address will also be auto-discovered
by attempting to :meth:`~socket.socket.connect` to the hostname.
kwargs: Additional keyword arguments passed to the
:class:`~swimprotocol.config.BaseConfig` constructor.
Expand All @@ -40,12 +44,17 @@ def __init__(self, *, bind_host: Optional[str] = None,
bind_port: Optional[int] = None,
default_host: Optional[str] = None,
default_port: Optional[int] = None,
discovery: bool = False,
**kwargs: Any) -> None:
address_parser = AddressParser(
default_host=default_host,
default_port=default_port)
if discovery:
self._discover(address_parser, kwargs)
super().__init__(**kwargs)
self.bind_host: Final = bind_host
self.bind_port: Final = bind_port
self.default_host: Final = default_host
self.default_port: Final = default_port
self.address_parser: Final = address_parser

@classmethod
def add_arguments(cls, parser: ArgumentParser, *,
Expand All @@ -64,6 +73,9 @@ def add_arguments(cls, parser: ArgumentParser, *,
group.add_argument(f'{prefix}udp-port', metavar='NUM', type=int,
dest='swim_udp_port',
help='The default port number.')
group.add_argument(f'{prefix}udp-discovery', action='store_true',
dest='swim_udp_discovery',
help='Find cluster with DNS discovery.')

@classmethod
def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \
Expand All @@ -73,7 +85,38 @@ def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \
'bind_host': args.swim_udp_bind,
'bind_port': args.swim_udp_bind_port,
'default_host': args.swim_udp_host,
'default_port': args.swim_udp_port}
'default_port': args.swim_udp_port,
'discovery': args.swim_udp_discovery}

@classmethod
def _discover(cls, address_parser: AddressParser,
kwargs: dict[str, Any]) -> None:
local_name: str = kwargs.pop('local_name', None)
given_peers: Sequence[str] = kwargs.pop('peers', [])
if local_name is None:
raise ConfigError('The cluster instance needs a local name.')
local_addr = address_parser.parse(local_name)
resolved = cls._resolve_name(local_addr.host)
local_ip = cls._find_local_ip(local_addr.host, local_addr.port)
resolved.discard(local_ip)
resolved_peers = {str(Address(peer_ip, local_addr.port))
for peer_ip in resolved}
kwargs['local_name'] = str(Address(local_ip, local_addr.port))
kwargs['peers'] = list(resolved_peers | set(given_peers))

@classmethod
def _resolve_name(cls, hostname: str) -> set[str]:
_, _, ipaddrlist = socket.gethostbyname_ex(hostname)
assert ipaddrlist
return set(ipaddrlist)

@classmethod
def _find_local_ip(cls, hostname: str, port: int) -> str:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect((hostname, port))
with closing(sock):
sockname: tuple[str, int] = sock.getsockname()
return sockname[0]


class UdpTransport(Transport[UdpConfig]):
Expand All @@ -93,9 +136,7 @@ class UdpTransport(Transport[UdpConfig]):

def __init__(self, config: UdpConfig) -> None:
super().__init__(config)
self.address_parser: Final = AddressParser(
default_host=config.default_host,
default_port=config.default_port)
self.address_parser: Final = config.address_parser
self.udp_pack: Final = UdpPack(config.signatures)
self._local_address = self.address_parser.parse(config.local_name)

Expand Down

0 comments on commit fa4b831

Please sign in to comment.