Skip to content

Commit

Permalink
Add file synchronization tool
Browse files Browse the repository at this point in the history
  • Loading branch information
icgood committed Aug 13, 2023
1 parent 06d9b76 commit 36b971c
Show file tree
Hide file tree
Showing 19 changed files with 485 additions and 264 deletions.
46 changes: 42 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,55 @@ synchronize a distributed group of processes.
$ pip install swim-protocol
```

### File Sync Tool

A basic tool for reporting cluster membership and synchronizing metadata as
files is available:

```console
$ swim-protocol-sync --name 127.0.0.1:2001 --peer 127.0.0.1:2002 ~/node1
$ swim-protocol-sync --name 127.0.0.1:2002 --peer 127.0.0.1:2001 ~/node2
```

While running, the state of the cluster and the metadata of each member are
visible on the filesystem.

```console
$ tree -a ~/node1
node1
├── .available
│   └── 127.0.0.1:2002 -> ../127.0.0.1:2002
├── .local -> 127.0.0.1:2001
├── .offline
├── .online
│   └── 127.0.0.1:2002 -> ../127.0.0.1:2002
├── .suspect
├── .unavailable
├── 127.0.0.1:2001
│   └── file-one.txt
└── 127.0.0.1:2002
└── file-two.txt
```

To change the metdata of the local cluster member, edit the files and issue a
SIGHUP to the process:

```console
$ vim ~/node1/.local/file-one.txt
$ pkill -HUP -f swim-protocol-sync
```

#### Running the Demo

There is a [demo][2] application included as a reference implementation. Try it
out by running the following, each from a new terminal window, and use _Ctrl-C_
to exit:

```console
$ swim-protocol-demo -c --name 127.0.0.1:2001 --peer 127.0.0.1:2003 --metadata name one
$ swim-protocol-demo -c --name 127.0.0.1:2002 --peer 127.0.0.1:2001 --metadata name two
$ swim-protocol-demo -c --name 127.0.0.1:2003 --peer 127.0.0.1:2001 --metadata name three
$ swim-protocol-demo -c --name 127.0.0.1:2004 --peer 127.0.0.1:2003 --metadata name four
$ swim-protocol-demo --name 127.0.0.1:2001 --peer 127.0.0.1:2003
$ swim-protocol-demo --name 127.0.0.1:2002 --peer 127.0.0.1:2001
$ swim-protocol-demo --name 127.0.0.1:2003 --peer 127.0.0.1:2001
$ swim-protocol-demo --name 127.0.0.1:2004 --peer 127.0.0.1:2003
```

Typing in any window will disseminate what has been typed across the cluster
Expand Down
5 changes: 0 additions & 5 deletions doc/source/swimprotocol.udp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,3 @@ successful::
-------------------------

.. automodule:: swimprotocol.udp.pack

``swimprotocol.udp.protocol``
-----------------------------

.. automodule:: swimprotocol.udp.protocol
33 changes: 19 additions & 14 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,25 @@ classifiers = [
'API Documentation' = 'https://icgood.github.io/swim-protocol/'

[project.scripts]
swim-protocol-sync = 'swimprotocol.sync:main'
swim-protocol-demo = 'swimprotocol.demo:main'

[project.optional-dependencies]
dev = [
'mypy',
'pytest',
'pytest-asyncio',
'pytest-cov',
'ruff',
'pycodestyle',
'autopep8',
]
doc = [
'sphinx',
'sphinx-autodoc-typehints',
'cloud_sptheme',
]

[tool.hatch.version]
path = 'swimprotocol/__about__.py'

Expand Down Expand Up @@ -92,15 +109,7 @@ exclude_lines = [
]

[tool.hatch.envs.default]
dependencies = [
'mypy',
'pytest',
'pytest-asyncio',
'pytest-cov',
'ruff',
'pycodestyle',
'autopep8',
]
features = ['dev']

[tool.hatch.envs.default.scripts]
run-pytest = 'py.test --cov-report=term-missing --cov=swimprotocol'
Expand All @@ -113,11 +122,7 @@ check = ['run-pytest', 'run-autopep8', 'run-mypy', 'run-ruff']
python = ['3.10', '3.11']

[tool.hatch.envs.doc]
dependencies = [
'sphinx',
'sphinx-autodoc-typehints',
'cloud_sptheme',
]
features = ['doc']

[tool.hatch.envs.doc.scripts]
build = 'make -C doc html'
Expand Down
2 changes: 1 addition & 1 deletion swimprotocol/__about__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#: The package version string.
__version__ = '0.4.0'
__version__ = '0.5.0'
6 changes: 4 additions & 2 deletions swimprotocol/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Final, Optional
from typing import Callable, Final, Optional, TypeAlias

__all__ = ['Address', 'AddressParser']

_AddressType: TypeAlias = Callable[[str, int], 'Address']


@dataclass(frozen=True, order=True)
class Address:
Expand Down Expand Up @@ -46,7 +48,7 @@ class AddressParser:
"""

def __init__(self, address_type: type[Address] = Address, *,
def __init__(self, address_type: _AddressType = Address, *,
default_host: Optional[str] = None,
default_port: Optional[int] = None) -> None:
super().__init__()
Expand Down
15 changes: 5 additions & 10 deletions swimprotocol/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class BaseConfig:
Args:
secret: The shared secret for cluster packet signatures.
local_name: The unique name of the local cluster member.
local_metadata: The local cluster member metadata.
peers: At least one name of another known node in the cluster.
local_metadata: The initial local cluster member metadata.
ping_interval: Time between :term:`ping` attempts to random cluster
members.
ping_timeout: Time to wait for an :term:`ack` after sending a
Expand All @@ -71,10 +71,12 @@ class BaseConfig:
"""

_empty: dict[str, bytes] = {}

def __init__(self, *, secret: Union[None, str, bytes],
local_name: str,
local_metadata: Mapping[str, bytes],
peers: Sequence[str],
local_metadata: Mapping[str, bytes] = _empty,
ping_interval: float = 1.0,
ping_timeout: float = 0.3,
ping_req_count: int = 1,
Expand All @@ -84,8 +86,8 @@ def __init__(self, *, secret: Union[None, str, bytes],
super().__init__()
self._signatures = Signatures(secret)
self.local_name: Final = local_name
self.local_metadata: Final = local_metadata
self.peers: Final = peers
self.local_metadata: Final = local_metadata
self.ping_interval: Final = ping_interval
self.ping_timeout: Final = ping_timeout
self.ping_req_count: Final = ping_req_count
Expand Down Expand Up @@ -127,10 +129,6 @@ def add_arguments(cls, parser: ArgumentParser, *,
"""
group = parser.add_argument_group('swim options')
group.add_argument(f'{prefix}metadata', dest='swim_metadata',
nargs=2, metavar=('KEY', 'VAL'),
default=[], action='append',
help='Metadata for this node.')
group.add_argument(f'{prefix}secret', dest='swim_secret',
metavar='STRING',
help='The secret string used to verify messages.')
Expand Down Expand Up @@ -194,12 +192,9 @@ def parse_args(cls, args: Namespace, *, env_prefix: str = 'SWIM') \
"""
secret = args.swim_secret or cls._get_env(env_prefix, 'SECRET')
local_name = args.swim_name or cls._get_env(env_prefix, 'NAME')
local_metadata = {key: val.encode('utf-8')
for key, val in args.swim_metadata}
peers = args.swim_peers or cls._get_env_list(env_prefix, 'PEERS')
return {'secret': secret,
'local_name': local_name,
'local_metadata': local_metadata,
'peers': peers}

@final
Expand Down
12 changes: 4 additions & 8 deletions swimprotocol/demo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from contextlib import suppress, AsyncExitStack

from .changes import change_metadata
from .log import run_logging
from .screen import run_screen
from ..config import BaseConfig, ConfigError
from ..members import Members
from ..transport import load_transport, Transport
from ..worker import Worker

__all__ = ['main']

Expand All @@ -36,8 +36,6 @@ def main() -> int:
transport_type.config_type.add_arguments(parser)

group = parser.add_argument_group('swim demo options')
group.add_argument('-c', '--curses', action='store_true',
help='Enable the curses display.')
group.add_argument('-i', '--token-interval', metavar='SECONDS',
type=float, help='Randomize metadata on an interval.')

Expand All @@ -58,13 +56,11 @@ async def run(transport_type: type[Transport[BaseConfig]],
config = transport_type.config_type.from_args(args)
transport = transport_type(config)
members = Members(config)
worker = Worker(config, members)
async with AsyncExitStack() as stack:
stack.enter_context(suppress(CancelledError))
worker = await stack.enter_async_context(transport.enter(members))
if args.curses:
await stack.enter_async_context(run_screen(members))
else:
stack.enter_context(run_logging(members))
await stack.enter_async_context(transport.enter(worker))
await stack.enter_async_context(run_screen(members))
await stack.enter_async_context(change_metadata(
members, args.token_interval))
task = asyncio.create_task(worker.run())
Expand Down
22 changes: 0 additions & 22 deletions swimprotocol/demo/log.py

This file was deleted.

20 changes: 7 additions & 13 deletions swimprotocol/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

import asyncio
from abc import abstractmethod
from asyncio import Event, Task
from collections.abc import MutableSet, Sequence
from asyncio import Event
from collections.abc import Sequence
from contextlib import ExitStack
from typing import TypeVar, Generic, Protocol, Any, NoReturn
from weakref import WeakKeyDictionary

from .tasks import Subtasks

__all__ = ['ListenerCallback', 'Listener']

ListenT = TypeVar('ListenT')
Expand All @@ -29,33 +31,25 @@ async def __call__(self, item: ListenT_contra, /) -> Any:
...


class Listener(Generic[ListenT]):
class Listener(Generic[ListenT], Subtasks):
"""Implements basic listener and callback functionality. Producers can
call :meth:`.notify` with an item, and consumers can wait for those items
with :meth:`.poll` or register a callback with :meth:`.on_notify`.
Args:
cls: The item type that will be given to :meth:`.notify`, returned by
:meth:`.poll`, and passed to :meth:`.on_notify` callbacks.
"""

def __init__(self, cls: type[ListenT]) -> None:
def __init__(self) -> None:
super().__init__()
self.event = Event()
self._running: MutableSet[Task[Any]] = set()
self._waiting: WeakKeyDictionary[Event, list[ListenT]] = \
WeakKeyDictionary()

async def _run_callback_poll(self, callback: ListenerCallback[ListenT]) \
-> NoReturn:
running = self._running
while True:
items = await self.poll()
for item in items:
task = asyncio.create_task(callback(item))
running.add(task)
task.add_done_callback(running.discard)
self.run_subtask(callback(item))

def on_notify(self, callback: ListenerCallback[ListenT]) -> ExitStack:
"""Provides a context manager that causes *callback* to be called when
Expand Down
Loading

0 comments on commit 36b971c

Please sign in to comment.