Skip to content

Commit

Permalink
API simplification refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
icgood committed Aug 13, 2023
1 parent 2190364 commit 301fe06
Show file tree
Hide file tree
Showing 15 changed files with 463 additions and 312 deletions.
31 changes: 21 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,28 @@ the event loop:

```python
from contextlib import AsyncExitStack
from swimprotocol.members import Members
from swimprotocol.members import Member, Members
from swimprotocol.udp import UdpTransport
from swimprotocol.worker import Worker

transport = UdpTransport(config)
members = Members(config)
worker = Worker(config, members)
transport = UdpTransport(config, worker)

async def run() -> None:
async with AsyncExitStack() as stack:
worker = await stack.enter_async_context(transport.enter(worker))
await worker.run() # or schedule as a task
await stack.enter_async_context(transport)
await stack.enter_async_context(worker)
await stack.enter_async_context(
members.listener.on_notify(on_member_change))
await ... # run your application

async def on_member_change(member: Member) -> None:
... # handle a change in member status or metadata
```

These snippets demonstrate the UDP transport layer directly. For a more generic
approach that uses [argparse][11] and [load_transport][12], check out the
approach that uses [argparse][11] and [load_transport][103], check out the
[demo][2] or the [sync tool][15].

If your application is deployed as a [Docker Service][13], the [UdpConfig][100]
Expand Down Expand Up @@ -160,16 +166,20 @@ Any UDP packets received that are malformed or have an invalid signature are
loss.

The signatures rely on a [shared secret][8] between all cluster members, given
as the `secret=b'...'` argument to the [Config][100] constructor. If
as the `secret=b'...'` argument to the [UdpConfig][100] constructor. If
`secret=None` is used, it defaults to [`uuid.getnode()`][9] but this is **not
secure** for production setups unless all sockets are bound to a local loopback
interface.

The cluster member metadata is **not** encrypted during transmission, so only
private networks should be used if metadata includes any secret data, or that
secret data should be encrypted separately by the application. Also be aware
that low [MTU][10] sizes on public networks may affect the ability to
synchronize larger amounts of metadata.
secret data should be encrypted separately by the application.

If member [metadata][12] is larger than can be transmitted in a single UDP
packet (hard-coded at 1500 bytes due to [MTU][10] sizes on public networks), a
TCP connection is used instead. There is no additional protocol for TCP; the
connection is opened, the oversized packet is transmitted, and then the
connection is closed without waiting for a response.

## Development

Expand Down Expand Up @@ -213,11 +223,12 @@ hinting to the extent possible and common in the rest of the codebase.
[9]: https://docs.python.org/3/library/uuid.html#uuid.getnode
[10]: https://en.wikipedia.org/wiki/Maximum_transmission_unit
[11]: https://docs.python.org/3/library/argparse.html
[12]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.transport.load_transport
[12]: https://icgood.github.io/swim-protocol/intro.html#term-metadata
[13]: https://docs.docker.com/engine/swarm/how-swarm-mode-works/services/
[14]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#docker-services
[15]: https://github.com/icgood/swim-protocol/blob/main/swimprotocol/sync.py

[100]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#swimprotocol.udp.UdpConfig
[101]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.members.Member
[102]: https://icgood.github.io/swim-protocol/swimprotocol.udp.html#swimprotocol.udp.UdpTransport
[103]: https://icgood.github.io/swim-protocol/swimprotocol.html#swimprotocol.transport.load_transport
5 changes: 5 additions & 0 deletions doc/source/swimprotocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@

.. automodule:: swimprotocol.status

``swimprotocol.tasks``
-------------------------

.. automodule:: swimprotocol.tasks

``swimprotocol.transport``
--------------------------

Expand Down
19 changes: 17 additions & 2 deletions doc/source/swimprotocol.udp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

.. automodule:: swimprotocol.udp

``swimprotocol.udp.config``
---------------------------

.. automodule:: swimprotocol.udp.config

Docker Services
---------------

If your application is deployed as a `Docker Service`_, the
:class:`~swimprotocol.udp.UdpConfig` ``discovery=True`` keyword argument can be
used to discover configuration based on the service name. For example::
:class:`~swimprotocol.udp.config.UdpConfig` ``discovery=True`` keyword argument
can be used to discover configuration based on the service name. For example::

config = UdpConfig(local_name='tasks.my-service:9999', discovery=True, ...)

Expand Down Expand Up @@ -43,3 +48,13 @@ successful::
-------------------------

.. automodule:: swimprotocol.udp.pack

``swimprotocol.udp.protocol``
-----------------------------

.. automodule:: swimprotocol.udp.protocol

``swimprotocol.udp.send``
-------------------------

.. automodule:: swimprotocol.udp.send
2 changes: 1 addition & 1 deletion swimprotocol/__about__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#: The package version string.
__version__ = '0.5.0'
__version__ = '0.6.0'
19 changes: 10 additions & 9 deletions swimprotocol/demo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import logging
import signal
from argparse import Namespace, ArgumentParser
from asyncio import CancelledError
from contextlib import suppress, AsyncExitStack
from asyncio import Event
from contextlib import AsyncExitStack

from .changes import change_metadata
from .screen import run_screen
Expand Down Expand Up @@ -54,17 +54,18 @@ async def run(transport_type: type[Transport[BaseConfig]],
args: Namespace) -> int:
loop = asyncio.get_running_loop()
config = transport_type.config_type.from_args(args)
transport = transport_type(config)
members = Members(config)
worker = Worker(config, members)
transport = transport_type(config, worker)
done = Event()
loop.add_signal_handler(signal.SIGINT, done.set)
loop.add_signal_handler(signal.SIGTERM, done.set)
async with AsyncExitStack() as stack:
stack.enter_context(suppress(CancelledError))
await stack.enter_async_context(transport.enter(worker))
# stack.enter_context(suppress(CancelledError))
await stack.enter_async_context(transport)
await stack.enter_async_context(worker)
await stack.enter_async_context(run_screen(members))
await stack.enter_async_context(change_metadata(
members, args.token_interval))
task = asyncio.create_task(worker.run())
loop.add_signal_handler(signal.SIGINT, task.cancel)
loop.add_signal_handler(signal.SIGTERM, task.cancel)
await task
await done.wait()
return 0
21 changes: 12 additions & 9 deletions swimprotocol/demo/screen.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import asyncio
import curses
import string
from contextlib import AsyncExitStack
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager, AsyncExitStack
from curses import wrapper
from functools import partial
from threading import Event, Condition
Expand Down Expand Up @@ -123,11 +124,13 @@ async def run_thread(self) -> None:
await asyncio.to_thread(partial(wrapper, self.main))


def run_screen(members: Members) -> AsyncExitStack:
exit_stack = AsyncExitStack()
screen = Screen(members)
main_task = asyncio.create_task(screen.run_thread())
exit_stack.push_async_callback(partial(asyncio.wait_for, main_task, None))
exit_stack.enter_context(members.listener.on_notify(screen.update))
exit_stack.callback(screen.cancel)
return exit_stack
@asynccontextmanager
async def run_screen(members: Members) -> AsyncIterator[None]:
async with AsyncExitStack() as stack:
screen = Screen(members)
main_task = asyncio.create_task(screen.run_thread())
stack.push_async_callback(partial(asyncio.wait_for, main_task, None))
await stack.enter_async_context(
members.listener.on_notify(screen.update))
stack.callback(screen.cancel)
yield
43 changes: 26 additions & 17 deletions swimprotocol/listener.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@

from __future__ import annotations

import asyncio
from abc import abstractmethod
from asyncio import Event
from collections.abc import Sequence
from contextlib import ExitStack
from typing import TypeVar, Generic, Protocol, Any, NoReturn
from weakref import WeakKeyDictionary

from .tasks import Subtasks
from .tasks import DaemonTask, TaskOwner

__all__ = ['ListenerCallback', 'Listener']
__all__ = ['ListenerCallback', 'CallbackPoll', 'Listener']

ListenT = TypeVar('ListenT')
ListenT_contra = TypeVar('ListenT_contra', contravariant=True)
Expand All @@ -31,7 +29,27 @@ async def __call__(self, item: ListenT_contra, /) -> Any:
...


class Listener(Generic[ListenT], Subtasks):
class CallbackPoll(Generic[ListenT], DaemonTask, TaskOwner):
"""Listens for items and running the callback.
"""

def __init__(self, listener: Listener[ListenT],
callback: ListenerCallback[ListenT]) -> None:
super().__init__()
self._listener = listener
self._callback = callback

async def run(self) -> NoReturn:
listener = self._listener
callback = self._callback
while True:
items = await listener.poll()
for item in items:
self.run_subtask(callback(item))


class Listener(Generic[ListenT]):
"""Implements basic listener and callback functionality. Producers can
call :meth:`.notify` with an item, and consumers can wait for those items
with :meth:`.poll` or register a callback with :meth:`.on_notify`.
Expand All @@ -44,14 +62,8 @@ def __init__(self) -> None:
self._waiting: WeakKeyDictionary[Event, list[ListenT]] = \
WeakKeyDictionary()

async def _run_callback_poll(self, callback: ListenerCallback[ListenT]) \
-> NoReturn:
while True:
items = await self.poll()
for item in items:
self.run_subtask(callback(item))

def on_notify(self, callback: ListenerCallback[ListenT]) -> ExitStack:
def on_notify(self, callback: ListenerCallback[ListenT]) \
-> CallbackPoll[ListenT]:
"""Provides a context manager that causes *callback* to be called when
a producer calls :meth:`.notify`.
Expand All @@ -60,10 +72,7 @@ def on_notify(self, callback: ListenerCallback[ListenT]) -> ExitStack:
argument from :meth:`.notify`.
"""
exit_stack = ExitStack()
task = asyncio.create_task(self._run_callback_poll(callback))
exit_stack.callback(task.cancel)
return exit_stack
return CallbackPoll(self, callback)

async def poll(self) -> Sequence[ListenT]:
"""Wait until :meth:`.notify` is called and return all *item* objects.
Expand Down
25 changes: 13 additions & 12 deletions swimprotocol/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import signal
import sys
from argparse import Namespace, ArgumentParser
from asyncio import CancelledError
from contextlib import suppress, AsyncExitStack
from asyncio import Event
from contextlib import AsyncExitStack
from functools import partial
from pathlib import Path
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -62,22 +62,23 @@ async def run(transport_type: type[Transport[BaseConfig]],
args: Namespace, base_path: Path) -> int:
loop = asyncio.get_running_loop()
config = transport_type.config_type.from_args(args)
transport = transport_type(config)
members = Members(config)
worker = Worker(config, members)
transport = transport_type(config, worker)
read_local = partial(_read_local, base_path, members)
write_member = partial(_write_member, base_path)
read_local()
if sys.platform != 'win32':
loop.add_signal_handler(signal.SIGHUP, read_local)
done = Event()
loop.add_signal_handler(signal.SIGINT, done.set)
loop.add_signal_handler(signal.SIGTERM, done.set)
async with AsyncExitStack() as stack:
stack.enter_context(suppress(CancelledError))
await stack.enter_async_context(transport.enter(worker))
stack.enter_context(members.listener.on_notify(write_member))
task = asyncio.create_task(worker.run())
if sys.platform != 'win32':
loop.add_signal_handler(signal.SIGHUP, read_local)
loop.add_signal_handler(signal.SIGINT, task.cancel)
loop.add_signal_handler(signal.SIGTERM, task.cancel)
await task
await stack.enter_async_context(transport)
await stack.enter_async_context(worker)
await stack.enter_async_context(
members.listener.on_notify(write_member))
await done.wait()
_cleanup(base_path, members)
return 0

Expand Down
53 changes: 48 additions & 5 deletions swimprotocol/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,33 @@
from __future__ import annotations

import asyncio
from abc import abstractmethod, ABCMeta
from asyncio import Task
from collections.abc import Coroutine, MutableSet
from typing import Any
from contextlib import AbstractAsyncContextManager
from typing import Any, NoReturn, Optional, TypeVar

__all__ = ['Subtasks']
__all__ = ['TaskT', 'DaemonTask', 'TaskOwner']

#: The type of task result.
TaskT = TypeVar('TaskT')

class Subtasks:
"""Base class for any class that needs to run sub-tasks."""

class TaskOwner:
"""Base class for any class that needs to run sub-tasks.
Because :mod:`asyncio` can be garbage-collected while running, the purpose
of this base class is to keep a strong reference to all running tasks. The
task removes its own reference when it is complete, effectively allowing it
to "daemonize".
"""

def __init__(self) -> None:
super().__init__()
self._running: MutableSet[Task[Any]] = set()

def run_subtask(self, coro: Coroutine[Any, Any, Any]) -> None:
def run_subtask(self, coro: Coroutine[Any, Any, TaskT]) -> Task[TaskT]:
"""Run the *coro* sub-task.
Args:
Expand All @@ -27,3 +39,34 @@ def run_subtask(self, coro: Coroutine[Any, Any, Any]) -> None:
task = asyncio.create_task(coro)
running.add(task)
task.add_done_callback(running.discard)
return task


class DaemonTask(AbstractAsyncContextManager[Task[NoReturn]],
metaclass=ABCMeta):
"""Base class for a task that is run for the duration of an ``async with``
context.
"""

def __init__(self) -> None:
super().__init__()
self._task: Optional[Task[NoReturn]] = None

async def __aenter__(self) -> Task[TaskT]:
self._task = task = asyncio.create_task(self.run())
return task

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) \
-> Any:
task = self._task
if task is not None:
task.cancel()

@abstractmethod
async def run(self) -> NoReturn:
"""The method to run while the context is entered. The task is
cancelled when the context exits.
"""
...
Loading

0 comments on commit 301fe06

Please sign in to comment.