diff --git a/README.md b/README.md index 082d9867..14a467b3 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ _pub/sub without steroids_ [![Website](https://img.shields.io/badge/website-opencyphal.org-black?color=1700b3)](https://opencyphal.org/) [![Forum](https://img.shields.io/discourse/https/forum.opencyphal.org/users.svg?logo=discourse&color=1700b3)](https://forum.opencyphal.org) +[![PyPI](https://img.shields.io/pypi/v/pycyphal2.svg)](https://pypi.org/project/pycyphal2/) [![Docs](https://img.shields.io/badge/Docs-rtfm-black?color=ff00aa&logo=readthedocs)](https://opencyphal.github.io/pycyphal) @@ -16,12 +17,10 @@ _pub/sub without steroids_ Python implementation of the [Cyphal](https://opencyphal.org) stack that runs on GNU/Linux, Windows, and macOS. -Install as follows. -Optional features inside the brackets can be removed if not needed; see `pyproject.toml` for the full list: - -``` -pip install pycyphal2[udp,pythoncan] -``` +PyCyphal v2 is published on PyPI as `pycyphal2` to enable coexistence with v1 `pycyphal` in the same Python environment. +The two packages have radically different APIs but are wire-compatible on Cyphal/CAN. +The maintenance of the original `pycyphal` package will eventually cease; +existing applications leveraging `pycyphal` should upgrade to the new API of `pycyphal2`. 📚 **Read the docs** at . diff --git a/docs/build.py b/docs/build.py index aef70d75..78453fcb 100644 --- a/docs/build.py +++ b/docs/build.py @@ -1,31 +1,97 @@ #!/usr/bin/env python """Build API docs using pdoc. Invoked via ``nox -s docs``.""" +import ast +import shutil from pathlib import Path import pkgutil import importlib import sys +import pdoc import pycyphal2 -# Discover and import all public submodules so pdoc can see them, -# then inject them into their parent's __all__ so pdoc lists them in the sidebar. -# Public modules are expected to be importable in the docs environment; failures are treated as hard errors. -for mi in pkgutil.walk_packages(pycyphal2.__path__, pycyphal2.__name__ + "."): - leaf = mi.name.rsplit(".", 1)[-1] - if leaf.startswith("_"): - continue +OUTPUT_DIRECTORY = Path("html_docs") +EXAMPLES_DIRECTORY = Path("examples") + + +def _discover_examples(directory: Path) -> list[Path]: + if not directory.is_dir(): + raise RuntimeError(f"Examples directory {directory!s} not found while building docs") + examples = sorted(path for path in directory.rglob("*.py") if path.is_file()) + if not examples: + raise RuntimeError(f"No example scripts found under {directory!s} while building docs") + return examples + + +def _load_summary(path: Path) -> str: try: - importlib.import_module(mi.name) - except Exception as ex: - raise RuntimeError(f"Failed to import public module {mi.name!r} while building docs") from ex - parent = sys.modules[mi.name.rsplit(".", 1)[0]] - if hasattr(parent, "__all__") and leaf not in parent.__all__: - parent.__all__.append(leaf) + module = ast.parse(path.read_text(encoding="utf8"), filename=str(path)) + except SyntaxError as ex: + raise RuntimeError(f"Failed to parse example {path!s} while building docs") from ex + doc = ast.get_docstring(module, clean=True) + if not doc: + return "" + for line in doc.splitlines(): + text = line.strip() + if text and not text.startswith("Usage:"): + return text + return "" + + +def _make_examples_section(examples: list[Path]) -> str: + if not examples: + return "" + lines = ["## Examples", "", "Runnable examples:"] + for path in examples: + relative = path.relative_to(EXAMPLES_DIRECTORY).as_posix() + summary = _load_summary(path) + suffix = f" - {summary}" if summary else "" + lines.append(f"- [`examples/{relative}`](examples/{relative}){suffix}") + return "\n".join(lines) + "\n" + + +def _inject_examples_section(examples: list[Path]) -> None: + section = _make_examples_section(examples) + doc = pycyphal2.__doc__ or "" + pycyphal2.__doc__ = doc.rstrip() + f"\n\n{section}" + + +def _copy_examples(examples_source: Path, output_directory: Path, examples: list[Path]) -> None: + destination = output_directory / examples_source.name + shutil.rmtree(destination, ignore_errors=True) + if not examples: + return + for source in examples: + target = destination / source.relative_to(examples_source) + target.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(source, target) + + +def main() -> None: + # Discover and import all public submodules so pdoc can see them, + # then inject them into their parent's __all__ so pdoc lists them in the sidebar. + # Public modules are expected to be importable in the docs environment; failures are treated as hard errors. + for mi in pkgutil.walk_packages(pycyphal2.__path__, pycyphal2.__name__ + "."): + leaf = mi.name.rsplit(".", 1)[-1] + if leaf.startswith("_"): + continue + try: + importlib.import_module(mi.name) + except Exception as ex: + raise RuntimeError(f"Failed to import public module {mi.name!r} while building docs") from ex + parent = sys.modules[mi.name.rsplit(".", 1)[0]] + if hasattr(parent, "__all__") and leaf not in parent.__all__: + parent.__all__.append(leaf) + + # Customization is necessary to expose special members like __aiter__, __call__, etc. + # We also use it to tweak the colors. + pdoc.render.configure(template_directory=Path(__file__).resolve().with_name("pdoc")) + examples = _discover_examples(EXAMPLES_DIRECTORY) + _inject_examples_section(examples) + pdoc.pdoc("pycyphal2", output_directory=OUTPUT_DIRECTORY) + _copy_examples(EXAMPLES_DIRECTORY, OUTPUT_DIRECTORY, examples) -import pdoc -# Customization is necessary to expose special members like __aiter__, __call__, etc. -# We also use it to tweak the colors. -pdoc.render.configure(template_directory=Path(__file__).resolve().with_name("pdoc")) -pdoc.pdoc("pycyphal2", output_directory=Path("html_docs")) +if __name__ == "__main__": + main() diff --git a/docs/pdoc/theme.css b/docs/pdoc/theme.css index d7970203..52dc1401 100644 --- a/docs/pdoc/theme.css +++ b/docs/pdoc/theme.css @@ -18,3 +18,7 @@ --def: #FC6D09; --annotation: #7aa2ff; } + +main.pdoc .docstring h3 { + font-size: 1.2rem; +} diff --git a/examples/monitor.py b/examples/monitor.py index 2c8c24e9..638526bb 100755 --- a/examples/monitor.py +++ b/examples/monitor.py @@ -13,16 +13,21 @@ import logging import sys import time -from pathlib import Path +from dataclasses import dataclass from pycyphal2 import Node, Topic, Transport -NAME = f"{Path(__file__).stem}/" SCOUT_INTERVAL = 10.0 DISPLAY_INTERVAL = 2.0 EVICTION_TIMEOUT = 600.0 +@dataclass(frozen=True) +class TopicInfo: + last_seen_monotonic: float + topic: Topic + + def make_node(transport_spec: str) -> Node: if transport_spec == "udp": from pycyphal2.udp import UDPTransport @@ -36,20 +41,25 @@ def make_node(transport_spec: str) -> Node: else: raise ValueError(f"Unknown transport {transport_spec!r}") - return Node.new(transport, NAME) + return Node.new(transport, "monitor/") # The trailing slash indicates that we want a unique ID at the end. + + +def _clear() -> str: + return "\033[2J\033[H" if sys.stdout.isatty() else ("\n" * 3) + + +def _bright(text: str) -> str: + return f"\033[1m{text}\033[0m" if sys.stdout.isatty() else text async def run(transport_spec: str) -> None: - # topic_name -> (topic_hash, last_seen_monotonic, gossip_count) - topics: dict[str, tuple[int, float, int]] = {} + topics: dict[str, TopicInfo] = {} + node = make_node(transport_spec) + subject_id_modulus = node.transport.subject_id_modulus def on_gossip(topic: Topic) -> None: - name = topic.name - prev = topics.get(name) - count = (prev[2] + 1) if prev else 1 - topics[name] = (topic.hash, time.monotonic(), count) + topics[topic.name] = TopicInfo(last_seen_monotonic=time.monotonic(), topic=topic) - node = make_node(transport_spec) _mon = node.monitor(on_gossip) async def scout_loop() -> None: @@ -65,16 +75,20 @@ async def display_loop() -> None: await asyncio.sleep(DISPLAY_INTERVAL) now = time.monotonic() # Evict stale topics. - for name in [n for n, (_, ts, _) in topics.items() if now - ts > EVICTION_TIMEOUT]: + for name in [n for n, info in topics.items() if now - info.last_seen_monotonic > EVICTION_TIMEOUT]: del topics[name] - # Clear screen and home cursor. - sys.stdout.write("\033[2J\033[H") - sys.stdout.write("#\tHASH\t\t\tCOUNT\tAGO\tNAME\n") + # Render the display. + out = [ + _clear(), + _bright(f"{'#':>3} {'HEARD':<5} {'HASH':<16} {'EVICTIONS':>10} {'SUBJECT-ID':>10} NAME\n"), + ] for idx, name in enumerate(sorted(topics), 1): - th, ts, count = topics[name] - age = int(now - ts) - sys.stdout.write(f"{idx}\t{th:016x}\t{count}\t{age // 60:02d}:{age % 60:02d}\t{name}\n") - sys.stdout.flush() + age = int(now - topics[name].last_seen_monotonic) + age_fmt = f"{age // 60:02d}:{age % 60:02d}" + t = topics[name].topic + subject_id = t.subject_id(subject_id_modulus) + out.append(f"{idx:>3} {age_fmt} {t.hash:016x} {t.evictions:>10} {subject_id:>10} {t.name}\n") + print("".join(out), end="", flush=True) await asyncio.gather(scout_loop(), display_loop()) diff --git a/src/pycyphal2/__init__.py b/src/pycyphal2/__init__.py index 907372cb..215dc16c 100644 --- a/src/pycyphal2/__init__.py +++ b/src/pycyphal2/__init__.py @@ -1,9 +1,20 @@ """ -`Cyphal `_ in Python — +[Cyphal](https://opencyphal.org) in Python — decentralized real-time pub/sub with tunable reliability, service discovery, and zero configuration. -Works anywhere, `even baremetal MCUs `_. +Works anywhere, [including baremetal MCUs](https://github.com/OpenCyphal-Garage/cy). Supports various transports such as Ethernet (UDP) and CAN FD with optional redundancy. + +## Installation + +Optional features inside the brackets can be removed if not needed; see `pyproject.toml` for the full list: + +``` +pip install pycyphal2[udp,pythoncan] +``` + +## Usage + Set up a transport, make a node, publish and subscribe: ```python @@ -25,22 +36,46 @@ async def main(): Transport modules (`pycyphal2.udp`, `pycyphal2.can`) are imported separately so that only the needed dependencies are pulled in. -The source repository contains a collection of runnable examples. +### Name resolution -Environment variables control name remapping similar to ROS: +The topic naming system shares many similarities with ROS. +A valid name contains printable ASCII characters except space (ASCII codes [33, 126]). +Normalized names do not have leading or trailing segment separators `/` and do not have consecutive separators. +Every node should have a unique name, which is called its *home*; home substitution is done via `~/`. + +| Input name | Namespace | Home | Remap | Resolved name | Note | +| ----------------- | --------- | ---- | ------------------ | --------------------- | -------------------------------- | +| `foo/bar` | `ns` | `me` | | `ns/foo/bar` | Relative name | +| `/foo//bar/` | `ns` | `me` | | `foo/bar` | Absolute name; namespace ignored | +| `~/foo/bar` | `ns` | `me` | | `me/foo/bar` | Homeful name | +| `sensor/*/temp` | `diag` | `me` | | `diag/sensor/*/temp` | Pattern with `*` | +| `/sensor/>` | `diag` | `me` | | `sensor/>` | Pattern with trailing `>` | +| `foo/bar` | `ns` | `me` | `foo/bar=~/zoo` | `me/zoo` | Remap first, then resolve | + +Only exact `~` or `~/...` is homeful; `~ns` is literal. A matching remap overrides pinning. +Pins are allowed only on verbatim names, not on patterns. + +Environment variables that control name remapping: - `CYPHAL_NAMESPACE` — default namespace prepended to relative topic names. - `CYPHAL_REMAP` — topic name remappings (`from=to` pairs, whitespace-separated). -Publication is best-effort by default. Pass ``reliable=True`` when publishing to retry delivery until +See also :meth:`Node.remap`. + +### Publish + +Publication is best-effort by default. Pass `reliable=True` when publishing to retry delivery until acknowledged by every known subscriber or until the deadline; if the remote side does not acknowledge in time, :class:`DeliveryError` is raised. ```python +pub = node.advertise("sensor/temperature") await pub(Instant.now() + 1.0, b"payload", reliable=True) ``` -Subscriptions normally yield messages as soon as they arrive. Set ``reordering_window`` [seconds] on +### Subscribe + +Subscriptions normally yield messages as soon as they arrive. Set `reordering_window` [seconds] on :meth:`Node.subscribe` to allow delaying out-of-order messages to reconstruct the original publication order. This is useful for sensor feeds and state estimators. @@ -48,6 +83,20 @@ async def main(): sub = node.subscribe("sensor/temperature", reordering_window=0.1) ``` +Pattern matching is supported: use `*` to match one name segment (e.g., `sensor/*/temperature`) +and a trailing `>` to match zero or more trailing segments (e.g., `sensor/>`). +Pattern subscribers automatically join matching topics as they appear, and unsubscribe as they disappear. + +```python +sub = node.subscribe("sensor/*/temperature") +async for arrival in sub: + topic = arrival.breadcrumb.topic + captures = sub.substitutions(topic) + print(topic.name, captures) # [('engine', 1)], where 1 is the pattern segment index +``` + +### RPC & streaming + RPC is layered directly on top of pub/sub. Use :meth:`Publisher.request` to publish a message that expects responses, and use :attr:`Arrival.breadcrumb` on the subscriber side to send a unicast reply back to the requester. One request may yield responses from multiple subscribers. @@ -66,8 +115,33 @@ async def main(): await arrival.breadcrumb(Instant.now() + 1.0, b"chunk-2", reliable=True) ``` +### Topic pinning + +Topics may be pinned to a specific subject-ID using `name#1234` to bypass automatic assignment. +This is useful for applications where a high degree of determinism is required and for Cyphal/CAN v1.0 interoperability. +Pattern names (e.g., `sensor/*/temperature/>`) cannot be pinned. + +To join a Cyphal/CAN v1.0 subject, use topic name of the form `subject_id#subject_id`; e.g., `7509#7509`. + +```python +pub = node.advertise("motor/status#1234") +sub = node.subscribe("1234#1234") +``` + +Old Cyphal/CAN v1.0 nodes do not participate in the topic discovery protocol, +so topics joined only by such nodes are not discoverable by pattern subscribers. + +## Remarks + Cyphal does not define a serialization format. Previous versions used to define the DSDL format but it has been extracted into an independent project, and Cyphal was made serialization-agnostic in v1.1+. + +PyCyphal v2 is published on PyPI as [`pycyphal2`](https://pypi.org/project/pycyphal2/) +to enable coexistence with the original [`pycyphal` v1](https://pypi.org/project/pycyphal/) +in the same Python environment. +The two packages have radically different APIs but are wire-compatible on Cyphal/CAN. +The maintenance of the original `pycyphal` package will eventually cease; +existing applications leveraging `pycyphal` should upgrade to the new API of `pycyphal2`. """ from __future__ import annotations @@ -77,7 +151,7 @@ async def main(): from ._transport import TransportArrival as TransportArrival from ._transport import SubjectWriter as SubjectWriter -__version__ = "2.0.0.dev0" +__version__ = "2.0.0.dev1" # pdoc needs __all__ to display re-exported members. __all__ = [ diff --git a/src/pycyphal2/_api.py b/src/pycyphal2/_api.py index f0731aab..a33d3cb4 100644 --- a/src/pycyphal2/_api.py +++ b/src/pycyphal2/_api.py @@ -131,7 +131,6 @@ def close(self) -> None: class Topic(ABC): """ Topics are managed automatically by the library, created and destroyed as necessary. - This is just a compact view to expose some auxiliary information. """ @property @@ -144,6 +143,16 @@ def hash(self) -> int: def name(self) -> str: raise NotImplementedError + @property + @abstractmethod + def evictions(self) -> int: + raise NotImplementedError + + @abstractmethod + def subject_id(self, modulus: int) -> int: + """The modulus can be obtained from :attr:`Transport.subject_id_modulus`.""" + raise NotImplementedError + @abstractmethod def match(self, pattern: str) -> list[tuple[str, int]] | None: """ @@ -475,6 +484,11 @@ def home(self) -> str: def namespace(self) -> str: raise NotImplementedError + @property + @abstractmethod + def transport(self) -> Transport: + raise NotImplementedError + @abstractmethod def remap(self, spec: str | dict[str, str]) -> None: """ diff --git a/src/pycyphal2/_node.py b/src/pycyphal2/_node.py index 545b2d22..b43c5db8 100644 --- a/src/pycyphal2/_node.py +++ b/src/pycyphal2/_node.py @@ -306,6 +306,7 @@ class _TopicFlyweight(Topic): _topic_hash: int _name: str + _evictions: int @property def hash(self) -> int: @@ -315,6 +316,13 @@ def hash(self) -> int: def name(self) -> str: return self._name + @property + def evictions(self) -> int: + return self._evictions + + def subject_id(self, modulus: int) -> int: + return compute_subject_id(self._topic_hash, self._evictions, modulus) + def match(self, pattern: str) -> list[tuple[str, int]] | None: return match_pattern(pattern, self._name) @@ -364,7 +372,7 @@ def __init__(self, node: NodeImpl, name: str, evictions: int, now: float) -> Non self._node = node self._name = name self._topic_hash = rapidhash(name) - self.evictions = evictions + self._evictions = evictions self.ts_origin = now self.ts_animated = now self._pub_tag_baseline = int.from_bytes(os.urandom(8), "little") @@ -392,14 +400,20 @@ def hash(self) -> int: def name(self) -> str: return self._name + @property + def evictions(self) -> int: + return self._evictions + + def set_evictions(self, evictions: int) -> None: + self._evictions = evictions + + def subject_id(self, modulus: int) -> int: + return compute_subject_id(self._topic_hash, self._evictions, modulus) + def match(self, pattern: str) -> list[tuple[str, int]] | None: return match_pattern(pattern, self._name) # -- Internal -- - @property - def subject_id(self) -> int: - return compute_subject_id(self._topic_hash, self.evictions, self._node.transport.subject_id_modulus) - def lage(self, now: float) -> int: return log_age(self.ts_origin, now) @@ -426,14 +440,14 @@ def tag_seqno(self, tag: int) -> int: def ensure_writer(self) -> SubjectWriter: if self.pub_writer is None: - sid = self.subject_id + sid = self.subject_id(self._node.transport.subject_id_modulus) self.pub_writer = self._node.acquire_subject_writer(self, sid) _logger.info("Writer acquired for '%s' sid=%d", self._name, sid) return self.pub_writer def ensure_listener(self) -> None: if self.sub_listener is None and self.couplings: - sid = self.subject_id + sid = self.subject_id(self._node.transport.subject_id_modulus) self.sub_listener = self._node.acquire_subject_listener(self, sid) _logger.info("Listener acquired for '%s' sid=%d", self._name, sid) @@ -441,12 +455,12 @@ def sync_listener(self) -> None: if self.couplings: self.ensure_listener() elif self.sub_listener is not None: - self._node.release_subject_listener(self, self.subject_id) + self._node.release_subject_listener(self, self.subject_id(self._node.transport.subject_id_modulus)) self.sub_listener = None _logger.info("Listener released for '%s'", self._name) def release_transport_handles(self) -> None: - sid = self.subject_id + sid = self.subject_id(self._node.transport.subject_id_modulus) if self.pub_writer is not None: self._node.release_subject_writer(self, sid) self.pub_writer = None @@ -487,7 +501,7 @@ def left_wins(l_lage: int, l_hash: int, r_lage: int, r_hash: int) -> bool: class NodeImpl(Node): def __init__(self, transport: Transport, *, home: str, namespace: str) -> None: - self.transport = transport + self._transport = transport self._home = home self._namespace = namespace self._remaps: dict[str, str] = {} @@ -555,6 +569,10 @@ def home(self) -> str: def namespace(self) -> str: return self._namespace + @property + def transport(self) -> Transport: + return self._transport + def remap(self, spec: str | dict[str, str]) -> None: if isinstance(spec, str): spec = dict(x.split("=", 1) for x in spec.split() if "=" in x) @@ -573,7 +591,12 @@ def advertise(self, name: str) -> Publisher: topic.pub_count += 1 topic.sync_implicit() topic.ensure_writer() - _logger.info("Advertise '%s' -> '%s' sid=%d", name, resolved, topic.subject_id) + _logger.info( + "Advertise '%s' -> '%s' sid=%d", + name, + resolved, + topic.subject_id(self.transport.subject_id_modulus), + ) return PublisherImpl(self, topic) def subscribe(self, name: str, *, reordering_window: float | None = None) -> Subscriber: @@ -662,29 +685,34 @@ def topic_ensure(self, name: str, pin: int | None) -> TopicImpl: self.couple_topic_root(topic, root) topic.sync_listener() self.notify_implicit_gc() - _logger.info("Topic created '%s' hash=%016x sid=%d", name, topic.hash, topic.subject_id) + _logger.info( + "Topic created '%s' hash=%016x sid=%d", + name, + topic.hash, + topic.subject_id(self.transport.subject_id_modulus), + ) return topic def topic_allocate(self, topic: TopicImpl, new_evictions: int, now: float) -> None: """Iterative subject-ID allocation with collision resolution. Mirrors topic_allocate() in cy.c.""" # Work queue: list of (topic, new_evictions) pairs to process. + modulus = self.transport.subject_id_modulus work: list[tuple[TopicImpl, int]] = [(topic, new_evictions)] while work: t, ev = work.pop(0) # Remove from subject-ID index first. - old_sid = t.subject_id + old_sid = t.subject_id(modulus) if old_sid in self.topics_by_subject_id and self.topics_by_subject_id[old_sid] is t: del self.topics_by_subject_id[old_sid] if ev >= EVICTIONS_PINNED_MIN: # Pinned topic: no collision detection, shared subject-IDs are fine. t.release_transport_handles() - t.evictions = ev + t.set_evictions(ev) t.sync_listener() self.schedule_gossip_urgent(t) continue - modulus = self.transport.subject_id_modulus new_sid = compute_subject_id(t.hash, ev, modulus) collider = self.topics_by_subject_id.get(new_sid) @@ -694,14 +722,14 @@ def topic_allocate(self, topic: TopicImpl, new_evictions: int, now: float) -> No if collider is None: # No collision, install. t.release_transport_handles() - t.evictions = ev + t.set_evictions(ev) self.topics_by_subject_id[new_sid] = t t.sync_listener() self.schedule_gossip_urgent(t) elif left_wins(t.lage(now), t.hash, collider.lage(now), collider.hash): # Our topic wins: take the slot, evict the collider. t.release_transport_handles() - t.evictions = ev + t.set_evictions(ev) del self.topics_by_subject_id[new_sid] self.topics_by_subject_id[new_sid] = t if collider.pub_writer is not None: @@ -1293,7 +1321,7 @@ def on_gossip( self._notify_monitors(topic) else: self.on_gossip_unknown(hdr.topic_hash, hdr.topic_evictions, hdr.topic_log_age, ts) - self._notify_monitors(_TopicFlyweight(hdr.topic_hash, name)) + self._notify_monitors(_TopicFlyweight(hdr.topic_hash, name, hdr.topic_evictions)) def on_gossip_known( self, @@ -1430,7 +1458,7 @@ def destroy_topic(self, name: str) -> None: self.decouple_topic_root(topic, topic.couplings[0].root, sync_lifecycle=False) self.topics_by_name.pop(name, None) self.topics_by_hash.pop(topic.hash, None) - sid = topic.subject_id + sid = topic.subject_id(self.transport.subject_id_modulus) if self.topics_by_subject_id.get(sid) is topic: del self.topics_by_subject_id[sid] topic.associations.clear() diff --git a/src/pycyphal2/can/__init__.py b/src/pycyphal2/can/__init__.py index 77bf67e1..9ff713d2 100644 --- a/src/pycyphal2/can/__init__.py +++ b/src/pycyphal2/can/__init__.py @@ -12,7 +12,7 @@ ``` Python-CAN is useful when the application runs not on GNU/Linux or already uses `python-can` or needs -`one of its *many* hardware backends `_ +[one of its *many* hardware backends](https://python-can.readthedocs.io/en/stable/interfaces.html) -- GS-USB, SLCAN, PCAN, etc: ```python diff --git a/tests/test_gossip.py b/tests/test_gossip.py index a3040ad6..72c51c4c 100644 --- a/tests/test_gossip.py +++ b/tests/test_gossip.py @@ -266,7 +266,7 @@ async def test_topic_destroy(): topic = node.topics_by_name.get("to_destroy") assert topic is not None topic_hash = topic.hash - sid = topic.subject_id + sid = topic.subject_id(tr.subject_id_modulus) pub.close() # Allow destroy. node.destroy_topic("to_destroy") @@ -380,7 +380,7 @@ async def test_topic_collision_during_allocate(): pub_a = node.advertise("/topic_alpha") topic_a = node.topics_by_name["topic_alpha"] - sid_a = topic_a.subject_id + sid_a = topic_a.subject_id(tr.subject_id_modulus) # Find a name that collides with topic_a's subject-ID. from pycyphal2._hash import rapidhash @@ -394,7 +394,7 @@ async def test_topic_collision_during_allocate(): pub_b = node.advertise(f"/{name}") topic_b = node.topics_by_name[name] # One of them should have been reallocated. - assert topic_a.subject_id != topic_b.subject_id + assert topic_a.subject_id(tr.subject_id_modulus) != topic_b.subject_id(tr.subject_id_modulus) pub_b.close() break diff --git a/tests/test_integration.py b/tests/test_integration.py index 80dbe13b..adf91a83 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -47,6 +47,14 @@ async def test_node_creation_and_home(): node.close() +async def test_node_exposes_transport_property(): + net = MockNetwork() + tr = MockTransport(node_id=1, network=net) + node = new_node(tr, home="my_home") + assert node.transport is tr + node.close() + + async def test_node_namespace(): """Namespace should affect name resolution.""" net = MockNetwork() @@ -121,7 +129,7 @@ async def test_pinned_topic(): pub = node.advertise("/my/topic#42") topic = list(node.topics_by_name.values())[0] - assert topic.subject_id == 42 + assert topic.subject_id(tr.subject_id_modulus) == 42 assert topic.evictions == 0xFFFFFFFF - 42 pub.close() @@ -348,7 +356,7 @@ async def test_remap_advertise_pinned(): pub = node.advertise("my/topic") topic = list(node.topics_by_name.values())[0] assert topic.name == "ns/remapped" - assert topic.subject_id == 42 + assert topic.subject_id(tr.subject_id_modulus) == 42 pub.close() node.close() diff --git a/tests/test_monitor.py b/tests/test_monitor.py index b2f5d609..dc9b7849 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -115,6 +115,7 @@ async def test_monitor_implicit_topic_creation_reports_local_topic_instead_of_fl async def test_monitor_unknown_topic_uses_flyweight_with_wire_identity() -> None: node = new_node(MockTransport(node_id=1), home="n1") + modulus = node.transport.subject_id_modulus received: list[pycyphal2.Topic] = [] node.monitor(received.append) @@ -128,6 +129,8 @@ async def test_monitor_unknown_topic_uses_flyweight_with_wire_identity() -> None assert not isinstance(received[0], TopicImpl) assert received[0].hash == topic_hash assert received[0].name == name + assert received[0].evictions == 0 + assert received[0].subject_id(modulus) == pycyphal2.SUBJECT_ID_PINNED_MAX + 1 + (topic_hash % modulus) assert received[0].match("sensor/*") == [("temp", 1)] node.close() @@ -142,6 +145,7 @@ async def test_monitor_unknown_topic_uses_flyweight_with_wire_identity() -> None ) async def test_monitor_unknown_topic_preserves_decoded_wire_name(name_bytes: bytes, expected_name: str) -> None: node = new_node(MockTransport(node_id=1), home="n1") + modulus = node.transport.subject_id_modulus received: list[pycyphal2.Topic] = [] node.monitor(received.append) @@ -154,6 +158,8 @@ async def test_monitor_unknown_topic_preserves_decoded_wire_name(name_bytes: byt assert len(received) == 1 assert received[0].hash == 0xDEADBEEFCAFEBABE assert received[0].name == expected_name + assert received[0].evictions == 3 + assert received[0].subject_id(modulus) == pycyphal2.SUBJECT_ID_PINNED_MAX + 1 + ((0xDEADBEEFCAFEBABE + 9) % modulus) node.close() @@ -179,7 +185,7 @@ async def test_monitor_is_not_invoked_for_inline_gossip_on_message_reception() - ).serialize() + b"data", ) - node.on_subject_arrival(topic.subject_id, arrival) + node.on_subject_arrival(topic.subject_id(node.transport.subject_id_modulus), arrival) assert received == [] diff --git a/tests/test_parity.py b/tests/test_parity.py index 74656cc8..b29eb243 100644 --- a/tests/test_parity.py +++ b/tests/test_parity.py @@ -44,7 +44,7 @@ async def test_crdt_collision_older_topic_wins(): # Create topic_a first (it will be older). pub_a = node.advertise("/topic_a") topic_a = node.topics_by_name["topic_a"] - sid_a = topic_a.subject_id + sid_a = topic_a.subject_id(tr.subject_id_modulus) # Search for a colliding name. modulus = tr.subject_id_modulus @@ -66,7 +66,7 @@ async def test_crdt_collision_older_topic_wins(): topic_b = node.topics_by_name[colliding_name] # topic_a should keep its subject-ID since it is older; topic_b should have been evicted. - assert topic_a.subject_id != topic_b.subject_id + assert topic_a.subject_id(tr.subject_id_modulus) != topic_b.subject_id(tr.subject_id_modulus) assert topic_b.evictions > 0 # loser got bumped assert topic_a.evictions == 0 # winner untouched @@ -211,7 +211,7 @@ async def test_msg_header_merges_lage(): remote_id=99, message=hdr.serialize() + b"payload", ) - node.on_subject_arrival(topic.subject_id, arrival) + node.on_subject_arrival(topic.subject_id(tr.subject_id_modulus), arrival) # After merge, our lage should have increased to at least the remote's claim. merged_lage = topic.lage(time.monotonic()) @@ -375,7 +375,7 @@ async def test_best_effort_full_pipeline(): await pub(pycyphal2.Instant.now() + 1.0, b"test_payload") # Verify the transport writer was invoked. - writer = tr.writers.get(topic.subject_id) + writer = tr.writers.get(topic.subject_id(tr.subject_id_modulus)) assert writer is not None assert writer.send_count >= 1 @@ -443,7 +443,7 @@ async def test_pinned_topic_formula(): for pin_val in [0, 1, 42, 100, SUBJECT_ID_PINNED_MAX]: pub = node.advertise(f"/pin_{pin_val}#{pin_val}") topic = node.topics_by_name[f"pin_{pin_val}"] - assert topic.subject_id == pin_val + assert topic.subject_id(tr.subject_id_modulus) == pin_val assert topic.evictions == 0xFFFFFFFF - pin_val pub.close() @@ -462,8 +462,8 @@ async def test_multiple_pinned_topics_share_subject_id(): topic_b = node.topics_by_name["beta"] # Both should have subject-ID 42. - assert topic_a.subject_id == 42 - assert topic_b.subject_id == 42 + assert topic_a.subject_id(tr.subject_id_modulus) == 42 + assert topic_b.subject_id(tr.subject_id_modulus) == 42 assert topic_a.pub_writer is topic_b.pub_writer assert tr.subject_writer_creations.get(42) == 1 diff --git a/tests/test_reliable.py b/tests/test_reliable.py index c5b1d24e..b0bb0ce5 100644 --- a/tests/test_reliable.py +++ b/tests/test_reliable.py @@ -160,7 +160,7 @@ async def test_reliable_publish_retry_rebuilds_writer_and_header_after_reallocat pub.priority = pycyphal2.Priority.EXCEPTIONAL pub.ack_timeout = 0.1 topic = node.topics_by_name["topic"] - old_sid = topic.subject_id + old_sid = topic.subject_id(tr.subject_id_modulus) old_evictions = topic.evictions old_messages: list[TransportArrival] = [] new_messages: list[TransportArrival] = [] @@ -189,7 +189,7 @@ async def test_reliable_publish_retry_rebuilds_writer_and_header_after_reallocat ) node.on_subject_arrival(node.broadcast_subject_id, gossip_arrival) - new_sid = topic.subject_id + new_sid = topic.subject_id(tr.subject_id_modulus) assert new_sid != old_sid observer.subject_listen(new_sid, new_messages.append) @@ -231,7 +231,7 @@ async def test_gossip_reallocation_to_occupied_subject_preserves_writer(): pub_b = node.advertise(colliding_name) topic_b = node.topics_by_name[colliding_name.removeprefix("/")] - sid_b = topic_b.subject_id + sid_b = topic_b.subject_id(tr.subject_id_modulus) writer_b = expect_mock_writer(topic_b.pub_writer) now = pycyphal2.Instant.now().s @@ -245,11 +245,11 @@ async def test_gossip_reallocation_to_occupied_subject_preserves_writer(): remote_lage = topic_a.lage(now) + 1 node.on_gossip_known(topic_a, remote_evictions, remote_lage, now, GossipScope.SHARDED) - assert topic_a.subject_id == sid_b + assert topic_a.subject_id(tr.subject_id_modulus) == sid_b assert topic_a.pub_writer is writer_b assert tr.subject_writer_creations.get(sid_b) == writer_creations_before == 1 assert topic_b.pub_writer is None - assert topic_b.subject_id != sid_b + assert topic_b.subject_id(tr.subject_id_modulus) != sid_b send_count_before = writer_b.send_count await pub_a(pycyphal2.Instant.now() + 1.0, b"payload") @@ -439,7 +439,7 @@ async def test_gossip_unknown_topic_collision(): topic_a = node.topics_by_name.get("topic_a") assert topic_a is not None - old_sid = topic_a.subject_id + old_sid = topic_a.subject_id(tr.subject_id_modulus) # Craft a gossip from a different topic that happens to claim the same subject-ID. # Use a fake hash that maps to the same subject-ID with evictions=0. @@ -469,7 +469,7 @@ async def test_gossip_unknown_topic_collision(): node.on_subject_arrival(node.broadcast_subject_id, arrival) await asyncio.sleep(0.02) # Our topic should have been reallocated. - assert topic_a.subject_id != old_sid or topic_a.evictions > 0 + assert topic_a.subject_id(tr.subject_id_modulus) != old_sid or topic_a.evictions > 0 pub.close() node.close() @@ -753,7 +753,7 @@ async def test_reliable_msg_sends_ack(): remote_id=99, message=msg_data, ) - node.on_subject_arrival(topic.subject_id, arrival) + node.on_subject_arrival(topic.subject_id(tr.subject_id_modulus), arrival) # Give ACK task time to run. await asyncio.sleep(0.02) @@ -781,7 +781,8 @@ async def test_reliable_msg_wrong_subject_dropped(): topic = list(node.topics_by_name.values())[0] subject_id_max = pycyphal2.SUBJECT_ID_PINNED_MAX + tr.subject_id_modulus - wrong_subject_id = topic.subject_id + 1 if topic.subject_id < subject_id_max else topic.subject_id - 1 + topic_subject_id = topic.subject_id(tr.subject_id_modulus) + wrong_subject_id = topic_subject_id + 1 if topic_subject_id < subject_id_max else topic_subject_id - 1 hdr = MsgRelHeader( topic_log_age=0, topic_evictions=topic.evictions, @@ -828,8 +829,8 @@ async def test_reliable_msg_dedup(): ) # Deliver twice. - node.on_subject_arrival(topic.subject_id, arrival) - node.on_subject_arrival(topic.subject_id, arrival) + node.on_subject_arrival(topic.subject_id(tr.subject_id_modulus), arrival) + node.on_subject_arrival(topic.subject_id(tr.subject_id_modulus), arrival) # Should only get one message. assert sub.queue.qsize() == 1 @@ -891,7 +892,7 @@ async def test_reliable_msg_ordered_late_drop_sends_no_ack_or_nack(): remote_id=99, message=hdr.serialize() + f"m{tag}".encode(), ) - node.on_subject_arrival(topic.subject_id, arrival) + node.on_subject_arrival(topic.subject_id(tr.subject_id_modulus), arrival) await asyncio.sleep(0.02) tr.unicast_log.clear() await sub.queue.get() @@ -908,7 +909,7 @@ async def test_reliable_msg_ordered_late_drop_sends_no_ack_or_nack(): remote_id=99, message=late_hdr.serialize() + b"late", ) - node.on_subject_arrival(topic.subject_id, late_arrival) + node.on_subject_arrival(topic.subject_id(tr.subject_id_modulus), late_arrival) await asyncio.sleep(0.02) assert tr.unicast_log == [] @@ -1011,7 +1012,7 @@ async def test_multicast_msg_ack_ignored(): remote_id=42, message=MsgAckHeader(topic_hash=topic.hash, tag=tag).serialize(), ) - node.on_subject_arrival(topic.subject_id, arrival) + node.on_subject_arrival(topic.subject_id(tr.subject_id_modulus), arrival) assert not tracker.acknowledged assert tracker.remaining == {42} diff --git a/tests/test_topic.py b/tests/test_topic.py index 09b77991..96f040f2 100644 --- a/tests/test_topic.py +++ b/tests/test_topic.py @@ -109,7 +109,7 @@ async def test_advertise_assigns_subject_id(): resolved, _, _ = resolve_name("my/topic", "test_node", "") topic = node.topics_by_name[resolved] - sid = topic.subject_id + sid = topic.subject_id(tr.subject_id_modulus) assert sid == compute_subject_id(topic.hash, topic.evictions, DEFAULT_MODULUS) assert node.topics_by_subject_id.get(sid) is topic @@ -127,7 +127,7 @@ async def test_advertise_pinned_topic(): resolved, pin, _ = resolve_name("my/topic#42", "test_node", "") assert pin == 42 topic = node.topics_by_name[resolved] - assert topic.subject_id == 42 + assert topic.subject_id(tr.subject_id_modulus) == 42 pub.close() node.close() @@ -179,7 +179,7 @@ async def test_topic_collision_evicts_loser(): topic2 = node.topics_by_name[resolved2] # Both topics should exist with non-colliding subject-IDs (the allocator resolved them). - assert topic1.subject_id != topic2.subject_id or topic1 is topic2 + assert topic1.subject_id(tr.subject_id_modulus) != topic2.subject_id(tr.subject_id_modulus) or topic1 is topic2 assert topic1.name in node.topics_by_name assert topic2.name in node.topics_by_name @@ -217,7 +217,7 @@ async def test_collision_allocator_iterates(): # Collect all subject-IDs (non-pinned). sids = set() for name, topic in node.topics_by_name.items(): - sid = topic.subject_id + sid = topic.subject_id(tr.subject_id_modulus) if sid not in sids: sids.add(sid) else: @@ -318,7 +318,7 @@ async def test_gossip_unknown_collision_we_win(): pub = node.advertise("my/topic") resolved, _, _ = resolve_name("my/topic", "test_node", "") topic = node.topics_by_name[resolved] - my_sid = topic.subject_id + my_sid = topic.subject_id(tr.subject_id_modulus) # Make our topic very old so we win. topic.ts_origin = time.monotonic() - 100000