From bc4839a463881b34d155f6f5ccfa10b4c1cfcc54 Mon Sep 17 00:00:00 2001 From: Adam Mitchell Date: Mon, 27 Apr 2026 15:45:48 +0000 Subject: [PATCH 1/3] Add CAN report listener --- README.md | 56 ++- examples/async_can_sniffer.py | 452 +++++++++++++++++++++++++ python_thingset/__init__.py | 2 + python_thingset/_protocol.py | 25 ++ python_thingset/report.py | 25 +- python_thingset/transport/async_can.py | 303 +++++++++++++++++ python_thingset/transport/transport.py | 28 +- tests/async_can/test_async_can.py | 400 ++++++++++++++++++++++ tests/protocol/test_parse_report.py | 43 +++ tests/transport/test_receive_loop.py | 109 ++++++ 10 files changed, 1426 insertions(+), 17 deletions(-) create mode 100644 examples/async_can_sniffer.py create mode 100644 python_thingset/transport/async_can.py create mode 100644 tests/async_can/test_async_can.py create mode 100644 tests/transport/test_receive_loop.py diff --git a/README.md b/README.md index d2ed396..895fc7e 100644 --- a/README.md +++ b/README.md @@ -17,12 +17,13 @@ pip install python-thingset Each transport has its own constructor — there is no factory. -| Wire | Sync class | Async class | -|---------------|------------------|----------------------------| -| TCP/IP | `ThingSetTCP` | `AsyncThingSetTCP` | -| CAN + ISO-TP | `ThingSetCAN` | _(not planned)_ | -| Serial | `ThingSetSerial` | _(not planned)_ | -| UDP (listen) | _(none)_ | `AsyncThingSetUDPReceiver` | +| Wire | Sync class | Async class | +|---------------|------------------|-----------------------------------| +| TCP/IP | `ThingSetTCP` | `AsyncThingSetTCP` | +| CAN + ISO-TP | `ThingSetCAN` | _(not planned)_ | +| CAN (listen) | _(none)_ | `AsyncThingSetCANReportReceiver` | +| Serial | `ThingSetSerial` | _(not planned)_ | +| UDP (listen) | _(none)_ | `AsyncThingSetUDPReceiver` | All classes are context managers; `with` / `async with` handles connection setup and tear-down. @@ -140,6 +141,38 @@ reports from multiple publishers don't corrupt each other. The receive queue is bounded; on overflow the newest report is dropped rather than back-pressuring the event loop. +### CAN report receiver + +Receives publish frames from ThingSet devices on a CAN bus. Both shapes are +surfaced as `ThingSetReport`: + +- **Single-frame report** (`type=0x2`): the 16-bit data ID is embedded in the + CAN-ID; the payload is a bare CBOR-encoded value. Synthesised into a + `ThingSetReport` with `subset_id=None` and a one-entry `values` map. +- **Multi-frame report** (`type=0x1`): chunks reassemble per-sender via + `msg#` and `seq#` in the CAN-ID. Carries `subset_id`, plus optional + `eui` for `0x1E` enhanced reports. + +```python +import asyncio +from python_thingset import AsyncThingSetCANReportReceiver + +async def main(): + async with AsyncThingSetCANReportReceiver(bus="vcan0", fd=True) as receiver: + async for (source_addr, bus_name), report in receiver: + print(source_addr, report.subset_id, report.values) + +asyncio.run(main()) +``` + +Reassembly buffers are keyed per source node address. On a sequence +mismatch within an in-flight message the receiver skips the frame without +advancing state — this matches the firmware-side reassembly behaviour and +lets the receiver latch onto one stream even when a publisher interleaves +two concurrent multi-frame reports with a shared `msg#`. `ThingSetReport`'s +`subset_id` is therefore typed `int | None` (was `int` in 0.2.x) since +single-frame reports don't carry one. + ## Gateway forwarding A TCP client can address a CAN-side module behind an IP↔CAN gateway (e.g. an @@ -210,6 +243,17 @@ python examples/async_udp_sniffer.py --decorate \ --record-fields examples/record_fields.example.json ``` +A matching CAN sniffer prints publish frames as they arrive on a CAN +interface. `--decorate` fetches each source node's schema over ISO-TP in +the background and annotates printed IDs with their schema path: + +```sh +python examples/async_can_sniffer.py -i vcan0 --source 10 +python examples/async_can_sniffer.py -i vcan0 --source 10 -v --decorate +python examples/async_can_sniffer.py -i vcan0 --decorate \ + --record-fields examples/record_fields.example.json +``` + ## Development ```sh diff --git a/examples/async_can_sniffer.py b/examples/async_can_sniffer.py new file mode 100644 index 0000000..48c6e23 --- /dev/null +++ b/examples/async_can_sniffer.py @@ -0,0 +1,452 @@ +"""Listen for ThingSet reports on a CAN bus and print each one. + +Single-frame and multi-frame reports both surface as +:class:`ThingSetReport`. Single-frame ones synthesise from the CAN-ID +data field (``subset_id is None``, single-entry ``values`` map); +multi-frame reports carry ``subset_id`` and optionally ``eui`` in the +usual envelope. + +Usage: + python examples/async_can_sniffer.py [-i can0|vcan0|...] [--source HEX] + [-v|--verbose] [--no-fd] + [--decorate] [--record-fields PATH] + +Stop with Ctrl+C. +""" + +import argparse +import asyncio +import json +import logging +import time + +from python_thingset import ( + AsyncThingSetCANReportReceiver, + SchemaNode, + SchemaTree, + ThingSetCAN, + ThingSetStatus, +) + + +def _fmt(v) -> str: + """Dict keys (ThingSet IDs) as uppercase hex; values as plain repr.""" + if isinstance(v, dict): + parts = [] + for k, val in v.items(): + if isinstance(k, int) and not isinstance(k, bool): + key = f"0x{k:X}" + else: + key = repr(k) + parts.append(f"{key}: {_fmt(val)}") + return "{" + ", ".join(parts) + "}" + if isinstance(v, list): + return "[" + ", ".join(_fmt(i) for i in v) + "]" + return repr(v) + + +def _decorate_id(obj_id: int, tree: SchemaTree | None) -> str: + label = f"{obj_id:#06X}" + if tree is not None: + node = tree.by_id.get(obj_id) + if node is not None: + label += f" {node.path}" + return label + + +def _print_decorated( + indent: str, + label: str, + value, + tree: SchemaTree | None, + schema_cache: "_SchemaCache | None", + source: int | None, +) -> None: + """Recursively print (label, value) with schema decoration. Mirrors + the UDP sniffer: dicts expand one inner key per line, lists of dicts + expand per-entry. Unknown inner IDs get scheduled for metadata + resolution so the *next* report displays them with names.""" + if isinstance(value, dict): + if schema_cache is not None and source is not None: + schema_cache.maybe_resolve_unknowns(source, value.keys()) + print(f"{indent}{label}:") + for k, v in value.items(): + inner_label = ( + _decorate_id(k, tree) if isinstance(k, int) else repr(k) + ) + _print_decorated( + indent + " ", inner_label, v, tree, schema_cache, source + ) + return + + if ( + isinstance(value, list) + and value + and all(isinstance(i, dict) for i in value) + ): + print(f"{indent}{label}: (list of {len(value)})") + for idx, entry in enumerate(value): + _print_decorated( + indent + " ", f"[{idx}]", entry, tree, schema_cache, source + ) + return + + print(f"{indent}{label} -> {_fmt(value)}") + + +_METADATA_OVERLAY = 0x19 +_METADATA_KEY_NAME = 26 +_METADATA_KEY_TYPE = 27 +_METADATA_KEY_ACCESS = 28 + + +class _SchemaCache: + """Lazily fetches the schema for each newly-seen source node. + + Unlike the UDP variant there's no gateway-forwarding model on CAN — + each source node has exactly one schema, keyed on the 8-bit source + address. All fetches share a single underlying ``ThingSetCAN`` + client (constructed lazily on first need) and run in a worker + thread to keep the asyncio loop responsive. An ``asyncio.Lock`` + serialises overlapping fetches because ``ThingSetCAN`` re-binds + its ISO-TP socket per request and is not concurrent-safe. + """ + + SCHEMA_FETCH_TIMEOUT_S = 30.0 + METADATA_FETCH_TIMEOUT_S = 5.0 + + def __init__( + self, + bus: str, + static_fields: dict[int, str] | None = None, + ) -> None: + self._bus = bus + self._trees: dict[int, SchemaTree | None] = {} + self._fetching: set[int] = set() + self._resolved_ids: dict[int, set[int]] = {} + self._tasks: set[asyncio.Task] = set() + self._static_fields = static_fields or {} + self._client: ThingSetCAN | None = None + self._client_lock = asyncio.Lock() + + def get(self, source: int) -> SchemaTree | None: + return self._trees.get(source) + + def has_entry(self, source: int) -> bool: + return source in self._trees or source in self._fetching + + def maybe_start_fetch(self, source: int) -> None: + if self.has_entry(source): + return + self._fetching.add(source) + self._spawn(self._fetch(source)) + + def maybe_resolve_unknowns(self, source: int, ids) -> None: + tree = self._trees.get(source) + if tree is None: + return + seen = self._resolved_ids.setdefault(source, set()) + to_resolve = [ + i for i in ids + if isinstance(i, int) and i not in tree.by_id and i not in seen + ] + if not to_resolve: + return + seen.update(to_resolve) + self._spawn(self._resolve(source, to_resolve)) + + async def close(self) -> None: + # Cancel outstanding background work first so to_thread calls + # don't wake up after we've torn the client down. + for t in list(self._tasks): + t.cancel() + for t in list(self._tasks): + try: + await t + except (asyncio.CancelledError, Exception): + pass + if self._client is not None: + await asyncio.to_thread(self._client.disconnect) + self._client = None + + def _spawn(self, coro) -> None: + task = asyncio.create_task(coro) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + + async def _ensure_client(self) -> ThingSetCAN: + if self._client is not None: + return self._client + # ThingSetCAN's constructor blocks for up to ~500 ms during + # address-claim negotiation; offload to a thread. + self._client = await asyncio.to_thread(ThingSetCAN, self._bus) + return self._client + + async def _fetch(self, source: int) -> None: + label = f"0x{source:02X}" + try: + async with self._client_lock: + client = await self._ensure_client() + tree = await asyncio.wait_for( + asyncio.to_thread(client.discover_schema, 0, source), + timeout=self.SCHEMA_FETCH_TIMEOUT_S, + ) + self._merge_static_fields(tree) + self._trees[source] = tree + static_note = ( + f" (+{len(self._static_fields)} static field names)" + if self._static_fields + else "" + ) + print(f" [schema] {label}: {len(tree)} nodes cached{static_note}") + except Exception as e: + self._trees[source] = None + print( + f" [schema] {label}: fetch failed " + f"({e.__class__.__name__}: {e})" + ) + finally: + self._fetching.discard(source) + + def _merge_static_fields(self, tree: SchemaTree) -> None: + for obj_id, name in self._static_fields.items(): + if obj_id in tree.by_id: + continue + node = SchemaNode( + id=obj_id, name=name, type="", access=0, path=name, children=[] + ) + tree.by_id[obj_id] = node + tree.by_path.setdefault(name, node) + + async def _resolve(self, source: int, ids: list[int]) -> None: + label = f"0x{source:02X}" + id_list = ", ".join(f"{i:#x}" for i in ids) + try: + async with self._client_lock: + client = await self._ensure_client() + resp = await asyncio.wait_for( + asyncio.to_thread( + client.fetch, _METADATA_OVERLAY, ids, source + ), + timeout=self.METADATA_FETCH_TIMEOUT_S, + ) + except Exception as e: + print( + f" [schema] {label}: metadata fetch failed " + f"({e.__class__.__name__}) for [{id_list}]" + ) + return + tree = self._trees.get(source) + if tree is None: + return + if resp.status_code != ThingSetStatus.CONTENT: + status = ( + f"{resp.status_code:#x} ({resp.status_string})" + if resp.status_code is not None + else "TIMEOUT" + ) + print( + f" [schema] {label}: metadata rejected {status} " + f"for [{id_list}] — device doesn't expose these at top level" + ) + return + resolved = [] + missing = [] + for idx, obj_id in enumerate(ids): + md = ( + resp.values[idx].value + if idx < len(resp.values) + else None + ) + if not isinstance(md, dict): + missing.append(obj_id) + continue + name = md.get(_METADATA_KEY_NAME, "") + if not name: + missing.append(obj_id) + continue + node = SchemaNode( + id=obj_id, + name=name, + type=md.get(_METADATA_KEY_TYPE, ""), + access=md.get(_METADATA_KEY_ACCESS, 0), + path=name, + children=[], + ) + tree.by_id[obj_id] = node + if name not in tree.by_path: + tree.by_path[name] = node + resolved.append(obj_id) + if resolved: + names = ", ".join( + f"{i:#x}->{tree.by_id[i].name}" for i in resolved + ) + print(f" [schema] {label}: resolved {len(resolved)} ({names})") + if missing: + miss_list = ", ".join(f"{i:#x}" for i in missing) + print( + f" [schema] {label}: no metadata for [{miss_list}] " + f"(likely record-internal fields)" + ) + + +async def main( + bus: str, + fd: bool, + source_filter: int | None, + verbose: bool, + decorate: bool, + static_fields: dict[int, str] | None, +) -> None: + src_str = ( + f", source=0x{source_filter:02X}" if source_filter is not None else "" + ) + fd_str = "FD" if fd else "classic" + decorate_str = ", decorate" if decorate else "" + print( + f"Listening for ThingSet reports on {bus} ({fd_str}{src_str}" + f"{decorate_str}) — Ctrl+C to stop\n" + ) + count = 0 + started = time.perf_counter() + schema_cache = ( + _SchemaCache(bus=bus, static_fields=static_fields) if decorate else None + ) + try: + async with AsyncThingSetCANReportReceiver(bus=bus, fd=fd) as receiver: + async for (source, _bus_name), report in receiver: + if source_filter is not None and source != source_filter: + continue + count += 1 + elapsed = time.perf_counter() - started + + if schema_cache is not None: + schema_cache.maybe_start_fetch(source) + schema_cache.maybe_resolve_unknowns( + source, list(report.values.keys()) + ) + tree = schema_cache.get(source) if schema_cache else None + + shape = "single" if report.subset_id is None else "multi" + subset = ( + f"subset={report.subset_id:#x}" + if report.subset_id is not None + else "subset=none" + ) + eui = ( + f" header_eui={report.eui:#018x}" + if report.eui is not None + else "" + ) + print( + f"#{count:<4} t={elapsed:6.2f}s src=0x{source:02X} " + f"{shape} {subset}{eui} values={len(report.values)}" + ) + + items = list(report.values.items()) + shown = items if verbose else items[:5] + for k, v in shown: + label = ( + _decorate_id(k, tree) if isinstance(k, int) else repr(k) + ) + nested = isinstance(v, dict) or ( + isinstance(v, list) + and v + and all(isinstance(i, dict) for i in v) + ) + if schema_cache is not None and nested: + _print_decorated( + " ", label, v, tree, schema_cache, source + ) + else: + vs = _fmt(v) + if not verbose and len(vs) > 60: + vs = vs[:57] + "..." + print(f" {label} -> {vs}") + if not verbose and len(items) > 5: + print(f" ... and {len(items) - 5} more") + finally: + if schema_cache is not None: + await schema_cache.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + parser.add_argument( + "-i", + "--bus", + default="can0", + help="CAN interface to bind (default: can0; use vcan0 for testing)", + ) + parser.add_argument( + "--no-fd", + action="store_true", + help="Disable CAN FD mode (use classic CAN with 8-byte frames)", + ) + parser.add_argument( + "--source", + type=lambda s: int(s, 16), + default=None, + metavar="HEX", + help="Only show reports whose source node address matches " + "(8-bit hex, e.g. 10 or 0x10)", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Print every value in each report without truncation", + ) + parser.add_argument( + "--decorate", + action="store_true", + help=( + "On first sighting of each source node, fetch the device " + "schema over CAN (ISO-TP) in the background and annotate " + "printed IDs with their schema path (e.g. `Metadata/rBoard`)" + ), + ) + parser.add_argument( + "--record-fields", + type=str, + default=None, + metavar="PATH", + help=( + "Path to a JSON file supplying names for record[] internal " + "field IDs that the device doesn't expose via the metadata " + "overlay. Format: {\"0x6451\": \"mCellVoltage\", ...}. " + "Only used when --decorate is active." + ), + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable DEBUG-level logging of every received frame (very noisy)", + ) + args = parser.parse_args() + if args.debug: + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s.%(msecs)03d %(name)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + static_fields: dict[int, str] | None = None + if args.record_fields: + with open(args.record_fields) as f: + raw = json.load(f) + static_fields = {int(k, 16): v for k, v in raw.items()} + + try: + asyncio.run( + main( + bus=args.bus, + fd=not args.no_fd, + source_filter=args.source, + verbose=args.verbose, + decorate=args.decorate, + static_fields=static_fields, + ) + ) + except KeyboardInterrupt: + print("\nstopped.") diff --git a/python_thingset/__init__.py b/python_thingset/__init__.py index 35b91f2..2312b7c 100644 --- a/python_thingset/__init__.py +++ b/python_thingset/__init__.py @@ -4,10 +4,12 @@ from .response import ThingSetRequest, ThingSetResponse, ThingSetStatus, ThingSetValue from .schema import SchemaNode, SchemaTree from .transport import ThingSetCAN, ThingSetSerial, ThingSetTCP, ThingSetTransport +from .transport.async_can import AsyncThingSetCANReportReceiver from .transport.async_tcp import AsyncThingSetTCP from .transport.async_udp import AsyncThingSetUDPReceiver __all__ = [ + "AsyncThingSetCANReportReceiver", "AsyncThingSetClient", "AsyncThingSetTCP", "AsyncThingSetUDPReceiver", diff --git a/python_thingset/_protocol.py b/python_thingset/_protocol.py index faa39ad..fc71bc8 100644 --- a/python_thingset/_protocol.py +++ b/python_thingset/_protocol.py @@ -124,6 +124,31 @@ def parse_report(self, payload: bytes) -> Union[ThingSetReport, None]: return None return ThingSetReport(subset_id=subset_id, values=values, eui=eui) + def build_single_frame_report( + self, data_id: int, payload: bytes + ) -> Union[ThingSetReport, None]: + """Synthesize a report from a CAN single-frame publish. + + On CAN, single-frame reports carry a single data ID in the + CAN arbitration ID and the bare CBOR-encoded value as payload + — no envelope byte, no subset id. We surface this through the + same ``ThingSetReport`` shape with ``subset_id=None`` and a + one-entry ``values`` map, so consumers can treat all reports + uniformly. + + Returns ``None`` if the payload doesn't decode as a valid + single CBOR document. + """ + if self.wire_format is not WireFormat.BINARY: + raise ValueError("build_single_frame_report is binary only") + if not payload: + return None + try: + value = cbor2.loads(payload) + except (cbor2.CBORDecodeError, cbor2.CBORDecodeEOF): + return None + return ThingSetReport(subset_id=None, values={data_id: value}, eui=None) + def try_consume(self, buffer: bytes) -> Tuple[Union[ParsedResponse, None], int]: """Extract one complete binary response from the start of ``buffer``. diff --git a/python_thingset/report.py b/python_thingset/report.py index 2481ee6..e4a7c4d 100644 --- a/python_thingset/report.py +++ b/python_thingset/report.py @@ -5,10 +5,10 @@ # """Decoded ThingSet publish/subscribe reports. -Reports are broadcast by devices on UDP port 9002 (for IP transports) -without any subscribe handshake — the device picks its own schedule. -An :class:`AsyncThingSetUDPReceiver` reassembles UDP fragments and -yields ``(addr, ThingSetReport)`` pairs. +Reports are broadcast by devices on UDP port 9002 (IP) or onto the +CAN bus without any subscribe handshake — the device picks its own +schedule. The async receivers reassemble fragments and yield +``(addr, ThingSetReport)`` pairs. """ from dataclasses import dataclass @@ -19,18 +19,25 @@ class ThingSetReport: """A single publish/subscribe message decoded off the wire. - The transport-level source address (``(ip, port)``) lives on the - receiver's iterator tuple, not here — a report is transport- - agnostic content. + The transport-level source address lives on the receiver's + iterator tuple, not here — a report is transport-agnostic content. + + ``subset_id`` is the CBOR uint that follows the type byte in the + standard report envelope. It is ``None`` for synthetic reports + built from CAN single-frame publishes (where the data ID is in + the CAN-ID itself and there is no subset concept). """ - subset_id: int + subset_id: Union[int, None] values: Dict[int, Any] eui: Union[int, None] = None def __str__(self) -> str: eui_part = f"EUI={self.eui:#018x}" if self.eui is not None else "no EUI" + subset = ( + f"{self.subset_id:#x}" if self.subset_id is not None else "none" + ) return ( - f"ThingSetReport(subset={self.subset_id:#x}, {eui_part}, " + f"ThingSetReport(subset={subset}, {eui_part}, " f"{len(self.values)} values)" ) diff --git a/python_thingset/transport/async_can.py b/python_thingset/transport/async_can.py new file mode 100644 index 0000000..7da4bdf --- /dev/null +++ b/python_thingset/transport/async_can.py @@ -0,0 +1,303 @@ +# +# Copyright (c) 2024-2025 Brill Power. +# +# SPDX-License-Identifier: Apache-2.0 +# +"""Async CAN report receiver for ThingSet publish/subscribe. + +ThingSet devices broadcast reports onto the CAN bus in two distinct +shapes (see ``ThingSet.Net/CanID.cs``): + + * **Single-frame report** (``type=0x2``): the 16-bit data ID is + embedded in the CAN-ID itself; the payload is the bare + CBOR-encoded value. We synthesise these into a ``ThingSetReport`` + with ``subset_id=None`` and a one-entry ``values`` map so consumers + can treat all reports uniformly. + + * **Multi-frame report** (``type=0x1``): chunks reassemble per-sender + using ``msg#`` (2 bits) + ``msgtype`` (2 bits) + ``seq#`` (4 bits) + in the CAN-ID. The reassembled buffer matches the UDP envelope + (``[0x1F][cbor subset][cbor map]`` or + ``[0x1E][cbor eui][cbor subset][cbor map]``) and runs through the + existing ``ThingSetProtocol.parse_report``. + +Public API mirrors :class:`AsyncThingSetUDPReceiver` — async iterator +yielding ``((source_addr, bus_name), ThingSetReport)``. +""" + +import asyncio +import logging +from typing import Dict, Tuple, Union + +import can + +from .._protocol import ThingSetProtocol, WireFormat +from ..report import ThingSetReport + + +logger = logging.getLogger(__name__) + + +# ----- CAN-ID bitfield layout (29-bit extended ID) ----- + +_PRIORITY_POS = 26 +_PRIORITY_MASK = 0x7 << _PRIORITY_POS # not currently used; kept for clarity + +_TYPE_POS = 24 +_TYPE_MASK = 0x3 << _TYPE_POS + +_TYPE_REQ_RESP = 0x0 << _TYPE_POS +_TYPE_MULTI_FRAME_REPORT = 0x1 << _TYPE_POS +_TYPE_SINGLE_FRAME_REPORT = 0x2 << _TYPE_POS +_TYPE_NETWORK = 0x3 << _TYPE_POS + +# Source addr is bits 0-7 +_SOURCE_MASK = 0xFF + +# Single-frame report layout: data ID at bits 8-23 +_DATA_ID_POS = 8 +_DATA_ID_MASK = 0xFFFF << _DATA_ID_POS + +# Multi-frame report layout +_SEQ_POS = 8 +_SEQ_MASK = 0xF << _SEQ_POS # 4 bits + +_MULTIFRAME_TYPE_POS = 12 +_MULTIFRAME_TYPE_MASK = 0x3 << _MULTIFRAME_TYPE_POS # 2 bits + +_MSG_NUM_POS = 14 +_MSG_NUM_MASK = 0x3 << _MSG_NUM_POS # 2 bits + +# MultiFrameMessageType values (already shifted into position) +_MFT_FIRST = 0x0 << _MULTIFRAME_TYPE_POS +_MFT_CONSECUTIVE = 0x1 << _MULTIFRAME_TYPE_POS +_MFT_LAST = 0x2 << _MULTIFRAME_TYPE_POS +_MFT_SINGLE = 0x3 << _MULTIFRAME_TYPE_POS + + +def _is_first(mft: int) -> bool: + """A frame that starts a new message buffer.""" + return mft == _MFT_FIRST or mft == _MFT_SINGLE + + +def _is_last(mft: int) -> bool: + """A frame that completes a message buffer.""" + return mft == _MFT_LAST or mft == _MFT_SINGLE + + +class _ReassemblyBuffer: + __slots__ = ("data", "expected_seq", "message_number", "started") + + def __init__(self) -> None: + self.data = bytearray() + self.expected_seq = 0 + self.message_number: int = -1 + self.started = False + + def reset(self) -> None: + self.data.clear() + self.expected_seq = 0 + self.started = False + + +class AsyncThingSetCANReportReceiver: + """Listen for ThingSet publish frames on a CAN bus. + + Use as an async context manager. Iterate with + ``async for ((source_addr, bus_name), report) in receiver``. + """ + + DEFAULT_QUEUE_SIZE = 1024 + + def __init__( + self, + bus: str = "can0", + interface: str = "socketcan", + fd: bool = True, + queue_size: int = DEFAULT_QUEUE_SIZE, + ) -> None: + self._bus_name = bus + self._interface = interface + self._fd = fd + self._queue_size = queue_size + self._protocol = ThingSetProtocol(WireFormat.BINARY) + self._queue: "asyncio.Queue[Tuple[Tuple[int, str], ThingSetReport]]" = ( + asyncio.Queue(maxsize=queue_size) + ) + self._buffers: Dict[int, _ReassemblyBuffer] = {} + self._can_bus: Union[can.BusABC, None] = None + self._reader: Union[can.AsyncBufferedReader, None] = None + self._notifier: Union[can.Notifier, None] = None + self._task: Union[asyncio.Task, None] = None + + async def start(self) -> None: + if self._can_bus is not None: + return + loop = asyncio.get_running_loop() + # Hardware-level filter: only receive frames whose type field + # is single-frame (0x2) or multi-frame (0x1) report. Two + # filters because the bitmask doesn't allow OR'ing of two + # disjoint values — but the kernel accepts a list and ORs + # the matches. + filters = [ + { + "can_id": _TYPE_SINGLE_FRAME_REPORT, + "can_mask": _TYPE_MASK, + "extended": True, + }, + { + "can_id": _TYPE_MULTI_FRAME_REPORT, + "can_mask": _TYPE_MASK, + "extended": True, + }, + ] + self._can_bus = can.Bus( + channel=self._bus_name, + interface=self._interface, + fd=self._fd, + can_filters=filters, + ) + self._reader = can.AsyncBufferedReader() + self._notifier = can.Notifier(self._can_bus, [self._reader], loop=loop) + self._task = asyncio.create_task( + self._consume_frames(), + name=f"thingset-can-rx-{self._bus_name}", + ) + + async def close(self) -> None: + if self._task is not None: + self._task.cancel() + try: + await self._task + except (asyncio.CancelledError, Exception): + pass + self._task = None + if self._notifier is not None: + try: + self._notifier.stop() + except Exception: + pass + self._notifier = None + if self._can_bus is not None: + try: + self._can_bus.shutdown() + except Exception: + pass + self._can_bus = None + self._reader = None + + async def _consume_frames(self) -> None: + assert self._reader is not None + try: + while True: + msg = await self._reader.get_message() + self._handle_message(msg) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("CAN receiver loop terminated unexpectedly") + + def _handle_message(self, msg: can.Message) -> None: + if not msg.is_extended_id: + return # ThingSet uses 29-bit IDs exclusively + + can_id = msg.arbitration_id + msg_type = can_id & _TYPE_MASK + source = can_id & _SOURCE_MASK + + if msg_type == _TYPE_SINGLE_FRAME_REPORT: + data_id = (can_id & _DATA_ID_MASK) >> _DATA_ID_POS + payload = bytes(msg.data[: msg.dlc]) + report = self._protocol.build_single_frame_report(data_id, payload) + if report is not None: + self._enqueue(source, report) + return + + if msg_type == _TYPE_MULTI_FRAME_REPORT: + self._handle_multi_frame(source, can_id, bytes(msg.data[: msg.dlc])) + return + + def _handle_multi_frame( + self, source: int, can_id: int, data: bytes + ) -> None: + mft = can_id & _MULTIFRAME_TYPE_MASK + seq = (can_id & _SEQ_MASK) >> _SEQ_POS + msg_num = (can_id & _MSG_NUM_MASK) >> _MSG_NUM_POS + + if logger.isEnabledFor(logging.DEBUG): + mft_name = { + _MFT_FIRST: "FIRST", + _MFT_CONSECUTIVE: "CONSEC", + _MFT_LAST: "LAST", + _MFT_SINGLE: "SINGLE", + }.get(mft, f"?({mft >> _MULTIFRAME_TYPE_POS})") + logger.debug( + "rx mf src=0x%02X msg#=%d mft=%s seq=%d len=%d id=0x%08X", + source, msg_num, mft_name, seq, len(data), can_id, + ) + + buf = self._buffers.setdefault(source, _ReassemblyBuffer()) + + if _is_first(mft): + buf.reset() + buf.started = True + buf.message_number = msg_num + elif not buf.started or buf.message_number != msg_num: + buf.reset() + return + + # Sequence mismatch: skip the frame but keep buffer state + # intact. Two scenarios where this matters: + # 1. We joined mid-stream — every frame mismatches until the + # next FIRST resets us cleanly. + # 2. The publisher interleaves two concurrent multi-frame + # reports onto the wire with a shared msg# (firmware quirk + # observed on bpux ACMU). One stream's frames match the + # expected sequence; the other's miss. Skipping the misses + # without advancing state lets the matching stream + # reassemble correctly. This mirrors the C# HMCU's + # ReportBuffer, which returns error on mismatch without + # touching its sequence counter. + if seq != (buf.expected_seq & 0xF): + logger.debug( + "ThingSet CAN multi-frame seq skip from 0x%02X: " + "expected %d, got %d (msg#=%d can_id=0x%08X)", + source, + buf.expected_seq & 0xF, + seq, + msg_num, + can_id, + ) + return + + buf.data.extend(data) + buf.expected_seq += 1 + + if _is_last(mft): + payload = bytes(buf.data) + buf.reset() + report = self._protocol.parse_report(payload) + if report is not None: + self._enqueue(source, report) + + def _enqueue(self, source: int, report: ThingSetReport) -> None: + try: + self._queue.put_nowait(((source, self._bus_name), report)) + except asyncio.QueueFull: + logger.warning( + "ThingSet CAN report queue full; dropping report from 0x%02X", + source, + ) + + def __aiter__(self) -> "AsyncThingSetCANReportReceiver": + return self + + async def __anext__(self) -> Tuple[Tuple[int, str], ThingSetReport]: + return await self._queue.get() + + async def __aenter__(self) -> "AsyncThingSetCANReportReceiver": + await self.start() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() diff --git a/python_thingset/transport/transport.py b/python_thingset/transport/transport.py index 21201b0..e4b37fc 100644 --- a/python_thingset/transport/transport.py +++ b/python_thingset/transport/transport.py @@ -7,6 +7,11 @@ from abc import ABC, abstractmethod from typing import Any +from ..log import get_logger + + +_logger = get_logger() + class ThingSetTransport(ABC): """Abstract base class for ThingSet transport drivers. @@ -32,10 +37,29 @@ def stop_receiving(self) -> None: self._thread.join() def _receive_loop(self) -> None: + # A single bad frame from the bus must not kill the receive + # thread — callers waiting on a response in get_response() + # would hang forever and the next reconnect would race against + # a still-running thread. Log and continue; if the underlying + # socket is permanently broken, receive() will keep raising + # and the caller's timeout will surface the failure. while self._running: - message = self.receive() + try: + message = self.receive() + except Exception as e: + _logger.warning( + "%s receive raised %s: %s — continuing", + type(self).__name__, e.__class__.__name__, e, + ) + continue if message: - self._handle_message(message) + try: + self._handle_message(message) + except Exception as e: + _logger.warning( + "%s handler raised %s: %s — continuing", + type(self).__name__, e.__class__.__name__, e, + ) @abstractmethod def _handle_message(self, message: Any) -> None: diff --git a/tests/async_can/test_async_can.py b/tests/async_can/test_async_can.py new file mode 100644 index 0000000..2b3e71b --- /dev/null +++ b/tests/async_can/test_async_can.py @@ -0,0 +1,400 @@ +"""Async CAN receiver tests via python-can's virtual bus. + +Two ``can.Bus`` instances on the same ``virtual`` channel talk to +each other in-process — no hardware needed. The sender constructs +arbitration IDs by hand to mirror the wire format documented in +``ThingSet.Net/CanID.cs``. +""" + +import asyncio +from typing import List + +import cbor2 +import can +import pytest + +from python_thingset import AsyncThingSetCANReportReceiver + + +# --- CAN-ID layout helpers (kept independent of the implementation +# constants so tests document the wire format). +_PRIORITY_REPORT_LOW = 0x7 << 26 +_TYPE_MULTI_FRAME = 0x1 << 24 +_TYPE_SINGLE_FRAME = 0x2 << 24 + +_MFT_FIRST = 0x0 << 12 +_MFT_CONSECUTIVE = 0x1 << 12 +_MFT_LAST = 0x2 << 12 +_MFT_SINGLE = 0x3 << 12 + + +def _single_frame_id(source: int, data_id: int, priority: int = _PRIORITY_REPORT_LOW) -> int: + return priority | _TYPE_SINGLE_FRAME | (data_id << 8) | source + + +def _multi_frame_id(source: int, msg_num: int, mft: int, seq: int) -> int: + return ( + _PRIORITY_REPORT_LOW + | _TYPE_MULTI_FRAME + | ((msg_num & 0x3) << 14) + | mft + | ((seq & 0xF) << 8) + | (source & 0xFF) + ) + + +def _send_can(sender: can.BusABC, can_id: int, data: bytes) -> None: + sender.send( + can.Message( + arbitration_id=can_id, + is_extended_id=True, + data=data, + is_fd=True, + ) + ) + + +@pytest.fixture +def virtual_channel(request): + """Per-test virtual channel name so tests don't cross-talk.""" + return f"virt-{request.node.name}" + + +async def _open_pair(channel: str): + """Open a receiver bound to the channel and return a sender bus + on the same channel. Caller closes the sender.""" + receiver = AsyncThingSetCANReportReceiver( + bus=channel, interface="virtual", fd=True + ) + await receiver.start() + sender = can.Bus(channel=channel, interface="virtual", fd=True) + return receiver, sender + + +async def _next(receiver, timeout: float = 1.0): + return await asyncio.wait_for(receiver.__anext__(), timeout=timeout) + + +async def test_single_frame_int_value(virtual_channel): + receiver, sender = await _open_pair(virtual_channel) + try: + _send_can( + sender, + _single_frame_id(source=0x10, data_id=0x602), + cbor2.dumps(42, canonical=True), + ) + (src, bus), report = await _next(receiver) + assert src == 0x10 + assert bus == virtual_channel + assert report.subset_id is None + assert report.eui is None + assert report.values == {0x602: 42} + finally: + sender.shutdown() + await receiver.close() + + +async def test_single_frame_string_value(virtual_channel): + receiver, sender = await _open_pair(virtual_channel) + try: + _send_can( + sender, + _single_frame_id(source=0x10, data_id=0xF03), + cbor2.dumps("native_sim", canonical=True), + ) + _, report = await _next(receiver) + assert report.values == {0xF03: "native_sim"} + finally: + sender.shutdown() + await receiver.close() + + +async def test_single_frame_high_priority_also_received(virtual_channel): + """Priority-bits don't matter for filtering — any report-typed + frame should be received regardless of priority class.""" + receiver, sender = await _open_pair(virtual_channel) + try: + # Priority 5 = ReportHigh + can_id = (0x5 << 26) | _TYPE_SINGLE_FRAME | (0x100 << 8) | 0x21 + _send_can(sender, can_id, cbor2.dumps(7, canonical=True)) + (src, _), report = await _next(receiver) + assert src == 0x21 + assert report.values == {0x100: 7} + finally: + sender.shutdown() + await receiver.close() + + +async def test_multi_frame_single_type(virtual_channel): + """MFT=Single (0x3): a multi-frame-typed report that fits in + one frame. C# treats this as the same as First+Last.""" + receiver, sender = await _open_pair(virtual_channel) + try: + body = ( + bytes([0x1F]) + + cbor2.dumps(0x400, canonical=True) + + cbor2.dumps({0x6E: 1, 0x6F: 2}, canonical=True) + ) + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=0, mft=_MFT_SINGLE, seq=0), + body, + ) + (src, _), report = await _next(receiver) + assert src == 0x10 + assert report.subset_id == 0x400 + assert report.values == {0x6E: 1, 0x6F: 2} + finally: + sender.shutdown() + await receiver.close() + + +async def test_multi_frame_three_chunks_reassemble(virtual_channel): + receiver, sender = await _open_pair(virtual_channel) + try: + # Construct a body too big for one frame — many keys + values = {k: f"v{k}" * 3 for k in range(20)} + body = ( + bytes([0x1F]) + + cbor2.dumps(0x400, canonical=True) + + cbor2.dumps(values, canonical=True) + ) + chunk = len(body) // 3 + frames = [ + (body[:chunk], _MFT_FIRST, 0), + (body[chunk : 2 * chunk], _MFT_CONSECUTIVE, 1), + (body[2 * chunk :], _MFT_LAST, 2), + ] + for data, mft, seq in frames: + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=1, mft=mft, seq=seq), + data, + ) + (src, _), report = await _next(receiver) + assert src == 0x10 + assert report.subset_id == 0x400 + assert report.values == values + finally: + sender.shutdown() + await receiver.close() + + +async def test_multi_frame_enhanced_with_eui(virtual_channel): + receiver, sender = await _open_pair(virtual_channel) + try: + body = ( + bytes([0x1E]) + + cbor2.dumps(0xBADB1B0000000001, canonical=True) + + cbor2.dumps(0x400, canonical=True) + + cbor2.dumps({0x1: 1}, canonical=True) + ) + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=0, mft=_MFT_SINGLE, seq=0), + body, + ) + _, report = await _next(receiver) + assert report.eui == 0xBADB1B0000000001 + assert report.subset_id == 0x400 + assert report.values == {0x1: 1} + finally: + sender.shutdown() + await receiver.close() + + +async def test_multi_frame_msg_num_mismatch_drops_buffer(virtual_channel): + receiver, sender = await _open_pair(virtual_channel) + try: + body = bytes([0x1F]) + cbor2.dumps(0x400, canonical=True) + cbor2.dumps({0x1: 1}, canonical=True) + half = len(body) // 2 + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=1, mft=_MFT_FIRST, seq=0), + body[:half], + ) + # Wrong msg_num on the next frame — should drop the buffer + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=2, mft=_MFT_LAST, seq=1), + body[half:], + ) + # Then send a clean Single-type to confirm receiver still works + clean = ( + bytes([0x1F]) + + cbor2.dumps(0x500, canonical=True) + + cbor2.dumps({0x99: 99}, canonical=True) + ) + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=3, mft=_MFT_SINGLE, seq=0), + clean, + ) + _, report = await _next(receiver) + assert report.subset_id == 0x500 # the bad multi-frame was dropped + finally: + sender.shutdown() + await receiver.close() + + +async def test_multi_frame_two_streams_same_msg_num_interleaved(virtual_channel): + """Firmware quirk seen in the wild: two concurrent multi-frame + publishes from the same source share msg#, with frames interleaved + on the wire. The second FIRST resets state, latching onto the + second stream; the first stream's leftover frames must be skipped + without poisoning reassembly.""" + receiver, sender = await _open_pair(virtual_channel) + try: + body_a = ( + bytes([0x1F]) + + cbor2.dumps(0xA00, canonical=True) + + cbor2.dumps({0xA1: "A" * 60}, canonical=True) + ) + body_b = ( + bytes([0x1F]) + + cbor2.dumps(0xB00, canonical=True) + + cbor2.dumps({0xB1: "B" * 60}, canonical=True) + ) + # Split each so we get FIRST + 1 CONSEC + LAST (3 frames each). + def split3(b: bytes): + n = len(b) // 3 + return [b[:n], b[n : 2 * n], b[2 * n :]] + + a0, a1, a2 = split3(body_a) + b0, b1, b2 = split3(body_b) + + # Wire order: A starts → A's CONSEC → B's FIRST (resets state) → + # A's LAST (mismatch, skipped) → B's CONSEC → B's LAST. + _send_can(sender, _multi_frame_id(0x10, 0, _MFT_FIRST, 0), a0) + _send_can(sender, _multi_frame_id(0x10, 0, _MFT_CONSECUTIVE, 1), a1) + _send_can(sender, _multi_frame_id(0x10, 0, _MFT_FIRST, 0), b0) + _send_can(sender, _multi_frame_id(0x10, 0, _MFT_LAST, 2), a2) # mismatch — skip + _send_can(sender, _multi_frame_id(0x10, 0, _MFT_CONSECUTIVE, 1), b1) + _send_can(sender, _multi_frame_id(0x10, 0, _MFT_LAST, 2), b2) + _, report = await _next(receiver, timeout=2.0) + assert report.subset_id == 0xB00 + assert report.values == {0xB1: "B" * 60} + finally: + sender.shutdown() + await receiver.close() + + +async def test_multi_frame_consecutive_without_first_dropped(virtual_channel): + receiver, sender = await _open_pair(virtual_channel) + try: + # Send a CONSECUTIVE frame with no preceding FIRST — must be discarded + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=0, mft=_MFT_CONSECUTIVE, seq=0), + b"garbage", + ) + # Then send a clean message + body = ( + bytes([0x1F]) + + cbor2.dumps(0x400, canonical=True) + + cbor2.dumps({0x1: 1}, canonical=True) + ) + _send_can( + sender, + _multi_frame_id(source=0x10, msg_num=1, mft=_MFT_SINGLE, seq=0), + body, + ) + _, report = await _next(receiver) + assert report.subset_id == 0x400 + finally: + sender.shutdown() + await receiver.close() + + +async def test_per_sender_buffers_independent(virtual_channel): + """Two senders interleaving multi-frame reports must not corrupt + each other's reassembly buffers.""" + receiver, sender = await _open_pair(virtual_channel) + try: + body_a = bytes([0x1F]) + cbor2.dumps(0xA, canonical=True) + cbor2.dumps({0x1: "A"}, canonical=True) + body_b = bytes([0x1F]) + cbor2.dumps(0xB, canonical=True) + cbor2.dumps({0x2: "B"}, canonical=True) + ha = len(body_a) // 2 + hb = len(body_b) // 2 + _send_can(sender, _multi_frame_id(0xA1, 0, _MFT_FIRST, 0), body_a[:ha]) + _send_can(sender, _multi_frame_id(0xB2, 0, _MFT_FIRST, 0), body_b[:hb]) + _send_can(sender, _multi_frame_id(0xA1, 0, _MFT_LAST, 1), body_a[ha:]) + _send_can(sender, _multi_frame_id(0xB2, 0, _MFT_LAST, 1), body_b[hb:]) + + got: List = [] + for _ in range(2): + got.append(await _next(receiver, timeout=2.0)) + + sources = {addr[0] for addr, _ in got} + subsets = {r.subset_id for _, r in got} + assert sources == {0xA1, 0xB2} + assert subsets == {0xA, 0xB} + finally: + sender.shutdown() + await receiver.close() + + +async def test_async_context_manager_cleans_up(virtual_channel): + async with AsyncThingSetCANReportReceiver( + bus=virtual_channel, interface="virtual", fd=True + ) as receiver: + sender = can.Bus(channel=virtual_channel, interface="virtual", fd=True) + try: + _send_can( + sender, + _single_frame_id(source=0x10, data_id=0xF03), + cbor2.dumps("ok", canonical=True), + ) + _, report = await _next(receiver) + assert report.values == {0xF03: "ok"} + finally: + sender.shutdown() + # After exit, internal state cleared + assert receiver._can_bus is None + assert receiver._task is None + + +async def test_close_idempotent(virtual_channel): + receiver = AsyncThingSetCANReportReceiver( + bus=virtual_channel, interface="virtual", fd=True + ) + await receiver.start() + await receiver.close() + await receiver.close() # second close must not raise + + +async def test_short_payload_single_frame_decodes_zero(virtual_channel): + """Single-frame with just the int 0 (one CBOR byte 0x00).""" + receiver, sender = await _open_pair(virtual_channel) + try: + _send_can( + sender, + _single_frame_id(source=0x10, data_id=0xE05), + bytes([0x00]), + ) + _, report = await _next(receiver) + assert report.values == {0xE05: 0} + finally: + sender.shutdown() + await receiver.close() + + +async def test_malformed_single_frame_payload_silently_dropped(virtual_channel): + receiver, sender = await _open_pair(virtual_channel) + try: + # 0x1c is a reserved CBOR initial byte — malformed + _send_can( + sender, + _single_frame_id(source=0x10, data_id=0xE05), + bytes([0x1C, 0x00, 0x00, 0x00]), + ) + # Then a valid one — receiver should still work + _send_can( + sender, + _single_frame_id(source=0x10, data_id=0xE05), + cbor2.dumps(7, canonical=True), + ) + _, report = await _next(receiver, timeout=2.0) + assert report.values == {0xE05: 7} + finally: + sender.shutdown() + await receiver.close() diff --git a/tests/protocol/test_parse_report.py b/tests/protocol/test_parse_report.py index 67295c9..2d448bb 100644 --- a/tests/protocol/test_parse_report.py +++ b/tests/protocol/test_parse_report.py @@ -124,3 +124,46 @@ def test_values_empty_dict_is_valid(): assert report is not None assert report.values == {} assert report.subset_id == 0x400 + + +# --- build_single_frame_report (CAN single-frame publishes) ----------- + +def test_single_frame_round_trip_int(): + payload = cbor2.dumps(42, canonical=True) + report = _protocol.build_single_frame_report(0x1001, payload) + assert report is not None + assert report.subset_id is None + assert report.eui is None + assert report.values == {0x1001: 42} + + +def test_single_frame_round_trip_float(): + payload = cbor2.dumps(3.14, canonical=True) + report = _protocol.build_single_frame_report(0x602, payload) + assert report.values == {0x602: 3.14} + + +def test_single_frame_round_trip_string(): + payload = cbor2.dumps("native_sim", canonical=True) + report = _protocol.build_single_frame_report(0xF03, payload) + assert report.values == {0xF03: "native_sim"} + + +def test_single_frame_round_trip_array(): + payload = cbor2.dumps([1, 2, 3, 4], canonical=True) + report = _protocol.build_single_frame_report(0xF05, payload) + assert report.values == {0xF05: [1, 2, 3, 4]} + + +def test_single_frame_empty_payload_returns_none(): + assert _protocol.build_single_frame_report(0x100, b"") is None + + +def test_single_frame_malformed_cbor_returns_none(): + assert _protocol.build_single_frame_report(0x100, bytes([0x1C])) is None + + +def test_single_frame_text_wire_format_raises(): + text = ThingSetProtocol(WireFormat.TEXT) + with pytest.raises(ValueError, match="binary only"): + text.build_single_frame_report(0x100, b"\x01") diff --git a/tests/transport/test_receive_loop.py b/tests/transport/test_receive_loop.py new file mode 100644 index 0000000..41ad50f --- /dev/null +++ b/tests/transport/test_receive_loop.py @@ -0,0 +1,109 @@ +"""Resilience contract for ThingSetTransport._receive_loop. + +A single bad frame from the wire (kernel ISO-TP rejecting a malformed +PDU, isotp.recv raising EILSEQ, a CBOR decode blowup in a handler, +etc.) must not kill the receive thread — the surrounding RPC layer +relies on get_response()'s timeout to surface failures, and a dead +thread leaves the next reconnect racing against a still-running one. +""" + +import threading +import time +from typing import Any, List + +import pytest + +from python_thingset.transport.transport import ThingSetTransport + + +class _FakeTransport(ThingSetTransport): + """Transport whose receive() can be programmed to raise then return.""" + + def __init__(self, behaviours: List): + super().__init__() + self._behaviours = list(behaviours) + self._handled: List[Any] = [] + self._handle_raises = False + + def receive(self) -> Any: + if not self._behaviours: + time.sleep(0.01) + return None + item = self._behaviours.pop(0) + if isinstance(item, Exception): + raise item + return item + + def _handle_message(self, message: Any) -> None: + if self._handle_raises: + raise RuntimeError("handler boom") + self._handled.append(message) + + def connect(self) -> None: # pragma: no cover - unused + pass + + def disconnect(self) -> None: # pragma: no cover - unused + pass + + def send(self, data: Any) -> None: # pragma: no cover - unused + pass + + +def _wait_until(predicate, timeout: float = 1.0) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(0.005) + return False + + +def test_receive_exception_does_not_kill_thread(): + """receive() raising must be logged-and-continued, not propagated.""" + t = _FakeTransport([ + OSError(84, "Invalid or incomplete multibyte or wide character"), + b"after-error", + ]) + t.start_receiving() + try: + assert _wait_until(lambda: b"after-error" in t._handled), ( + "thread died on exception instead of swallowing it" + ) + assert t._thread is not None and t._thread.is_alive() + finally: + t.stop_receiving() + + +def test_handle_message_exception_does_not_kill_thread(): + """A handler that raises also must not stop the loop.""" + t = _FakeTransport([b"first", b"second"]) + t._handle_raises = True + + seen = threading.Event() + original_handle = t._handle_message + + call_count = {"n": 0} + + def counting_handle(msg): + call_count["n"] += 1 + if call_count["n"] == 2: + seen.set() + original_handle(msg) + + t._handle_message = counting_handle # type: ignore[assignment] + t.start_receiving() + try: + assert seen.wait(timeout=1.0), "second message never reached handler" + assert t._thread is not None and t._thread.is_alive() + finally: + t.stop_receiving() + + +def test_stop_receiving_is_idempotent(): + """stop_receiving() must be safe to call after the thread already + exited or was never started.""" + t = _FakeTransport([]) + t.stop_receiving() # never started + t.start_receiving() + t.stop_receiving() + t.stop_receiving() # already stopped From 53b3375b2020f4c5c0194bf1f5fdd9d6ce4e8bb9 Mon Sep 17 00:00:00 2001 From: Adam Mitchell Date: Mon, 27 Apr 2026 15:47:05 +0000 Subject: [PATCH 2/3] r u f f --- tests/transport/test_receive_loop.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/transport/test_receive_loop.py b/tests/transport/test_receive_loop.py index 41ad50f..c10ffa6 100644 --- a/tests/transport/test_receive_loop.py +++ b/tests/transport/test_receive_loop.py @@ -11,7 +11,6 @@ import time from typing import Any, List -import pytest from python_thingset.transport.transport import ThingSetTransport From edfca63e53c3d5c305c5ee1abf268bbbea5b3c55 Mon Sep 17 00:00:00 2001 From: Adam Mitchell Date: Mon, 27 Apr 2026 15:49:19 +0000 Subject: [PATCH 3/3] Bump to 0.3.1 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c64431d..957011f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "python_thingset" -version = "0.3.0" +version = "0.3.1" description = "A Python library for ThingSet functionality" authors = [ { name = "Adam Mitchell", email = "adam.mitchell@brillpower.com" }