From a1f3a2aa439da8f3612ee12b7b5ff17d8a009739 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 13:50:20 +0300 Subject: [PATCH 1/9] Add comprehensive project documentation for stompman and faststream-stomp packages --- AGENTS.md | 157 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..dd3022e --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,157 @@ +# stompman Project Context + +## Project Overview + +stompman is a modern, asynchronous Python client for the STOMP (Simple Text Oriented Messaging Protocol) messaging protocol. It provides a typed, modern, and comprehensible API for working with STOMP-compatible message brokers like ActiveMQ Artemis and ActiveMQ Classic. + +The project consists of two main packages: +1. `stompman` - The core STOMP client library +2. `faststream-stomp` - A STOMP broker implementation for the FastStream framework + +## Key Features + +- Fully asynchronous implementation using Python's asyncio +- Modern, typed API with comprehensive type hints +- Automatic connection management with reconnection capabilities +- Support for transactions, subscriptions, and message acknowledgment +- Integration with FastStream framework +- Support for both ActiveMQ Artemis and ActiveMQ Classic +- Implements STOMP 1.2 protocol specification +- Built-in heartbeat support + +## Project Structure + +``` +stompman/ +├── packages/ +│ ├── stompman/ # Core STOMP client library +│ │ ├── stompman/ # Main source code +│ │ └── test_stompman/ # Unit and integration tests +│ └── faststream-stomp/ # FastStream integration +│ ├── faststream_stomp/ # Main source code +│ └── test_faststream_stomp/ # Tests +├── examples/ # Usage examples +├── docker-compose.yml # Development environment with ActiveMQ instances +└── Justfile # Task runner with common commands +``` + +## Core Components (stompman package) + +### Main Classes + +- `Client` - Main entry point for interacting with STOMP servers +- `ConnectionParameters` - Configuration for connecting to STOMP servers +- `Heartbeat` - Configuration for heartbeat intervals +- `Transaction` - Context manager for transactional message sending +- `AutoAckSubscription` - Subscription with automatic message acknowledgment +- `ManualAckSubscription` - Subscription with manual message acknowledgment + +### Key Methods + +- `Client.send()` - Send messages to destinations +- `Client.subscribe()` - Subscribe to destinations with auto-acknowledgment +- `Client.subscribe_with_manual_ack()` - Subscribe with manual acknowledgment +- `Client.begin()` - Start a transaction context +- `Client.is_alive()` - Check if the client connection is healthy + +## FastStream Integration (faststream-stomp package) + +Provides a `StompBroker` class that integrates with the FastStream framework, allowing developers to build event-driven applications with STOMP messaging. + +## Development Environment + +The project uses Docker Compose to provide a development environment with ActiveMQ instances: + +- ActiveMQ Artemis on port 9000 +- ActiveMQ Classic on port 9001 + +## Building and Running + +### Prerequisites + +- Python 3.11+ +- uv (package manager) +- Docker and Docker Compose (for development environment) + +### Common Commands + +Using Just (task runner): + +```bash +# Install dependencies +just install + +# Run linting +just lint + +# Check types +just check-types + +# Run fast tests (excluding integration tests) +just test-fast + +# Run all tests (requires Docker) +just test + +# Run integration environment +just run-artemis + +# Run example consumer +just run-consumer + +# Run example producer +just run-producer +``` + +Using uv directly: + +```bash +# Install dependencies +uv sync --all-extras --all-packages --frozen + +# Run linting +uv run ruff check . +uv run ruff format . + +# Check types +uv run mypy . + +# Run tests +uv run pytest +``` + +### Development Workflow + +1. Start the development environment: `docker compose up -d` +2. Install dependencies: `just install` +3. Run tests: `just test` +4. Make changes to the code +5. Run linting and type checking: `just lint && just check-types` + +## Testing + +The project uses pytest for testing with the following configuration: + +- Unit tests are located in `test_stompman/` directories +- Integration tests require running ActiveMQ instances +- Tests use both ActiveMQ Artemis and ActiveMQ Classic when possible +- Code coverage reporting is enabled by default + +## Coding Standards + +The project follows these coding standards: + +- Strict type checking with mypy +- Code formatting with ruff +- Comprehensive unit and integration tests +- Modern Python features (3.11+) +- Clean, readable, and well-documented code + +## Examples + +The `examples/` directory contains sample code showing how to: + +- Create a basic consumer +- Create a basic producer +- Use the FastStream integration +- Implement broadcast messaging patterns From cd9f8e06ca8979bbd840109321c5b0a6159ef7e0 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 14:35:21 +0300 Subject: [PATCH 2/9] Refactor documentation structure and improve clarity --- AGENTS.md | 169 +++++++++++++++++++++++++++++------------------------- 1 file changed, 92 insertions(+), 77 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index dd3022e..395859d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,7 +6,7 @@ stompman is a modern, asynchronous Python client for the STOMP (Simple Text Orie The project consists of two main packages: 1. `stompman` - The core STOMP client library -2. `faststream-stomp` - A STOMP broker implementation for the FastStream framework +2. `faststream-stomp` - A FastStream broker implementation for STOMP ## Key Features @@ -14,54 +14,60 @@ The project consists of two main packages: - Modern, typed API with comprehensive type hints - Automatic connection management with reconnection capabilities - Support for transactions, subscriptions, and message acknowledgment -- Integration with FastStream framework -- Support for both ActiveMQ Artemis and ActiveMQ Classic -- Implements STOMP 1.2 protocol specification -- Built-in heartbeat support +- Built-in heartbeat support for connection health monitoring +- Integration with FastStream for declarative message handling +- Compatible with STOMP 1.2 protocol specification +- Tested with ActiveMQ Artemis and ActiveMQ Classic ## Project Structure ``` stompman/ ├── packages/ -│ ├── stompman/ # Core STOMP client library -│ │ ├── stompman/ # Main source code -│ │ └── test_stompman/ # Unit and integration tests -│ └── faststream-stomp/ # FastStream integration +│ ├── stompman/ # Core STOMP client library +│ │ ├── stompman/ # Main source code +│ │ └── test_stompman/ # Unit and integration tests +│ └── faststream-stomp/ # FastStream broker implementation │ ├── faststream_stomp/ # Main source code -│ └── test_faststream_stomp/ # Tests -├── examples/ # Usage examples -├── docker-compose.yml # Development environment with ActiveMQ instances -└── Justfile # Task runner with common commands +│ └── test_faststream_stomp/ # Unit and integration tests +├── examples/ # Usage examples +├── docker-compose.yml # Development environment with ActiveMQ containers +└── Justfile # Project commands and workflows ``` ## Core Components (stompman package) ### Main Classes -- `Client` - Main entry point for interacting with STOMP servers +- `Client` - The main entry point for interacting with STOMP servers - `ConnectionParameters` - Configuration for connecting to STOMP servers -- `Heartbeat` - Configuration for heartbeat intervals -- `Transaction` - Context manager for transactional message sending -- `AutoAckSubscription` - Subscription with automatic message acknowledgment -- `ManualAckSubscription` - Subscription with manual message acknowledgment +- `Heartbeat` - Configuration for connection heartbeats ### Key Methods - `Client.send()` - Send messages to destinations -- `Client.subscribe()` - Subscribe to destinations with auto-acknowledgment -- `Client.subscribe_with_manual_ack()` - Subscribe with manual acknowledgment -- `Client.begin()` - Start a transaction context -- `Client.is_alive()` - Check if the client connection is healthy +- `Client.subscribe()` - Subscribe to destinations with automatic ACK/NACK handling +- `Client.subscribe_with_manual_ack()` - Subscribe with manual ACK/NACK control +- `Client.begin()` - Start a transaction context manager +- `Client.is_alive()` - Check connection health + +### Error Handling + +- `FailedAllConnectAttemptsError` - Raised when all connection attempts fail +- `FailedAllWriteAttemptsError` - Raised when writes fail after all retries +- Various other specific error types for different failure scenarios ## FastStream Integration (faststream-stomp package) -Provides a `StompBroker` class that integrates with the FastStream framework, allowing developers to build event-driven applications with STOMP messaging. +Provides a FastStream broker implementation that allows using FastStream's declarative approach with STOMP: -## Development Environment +- `StompBroker` - Main broker class +- Decorators for subscribers and publishers +- Testing utilities with `TestStompBroker` -The project uses Docker Compose to provide a development environment with ActiveMQ instances: +## Development Environment +The project uses Docker Compose to provide a development environment with: - ActiveMQ Artemis on port 9000 - ActiveMQ Classic on port 9001 @@ -69,89 +75,98 @@ The project uses Docker Compose to provide a development environment with Active ### Prerequisites -- Python 3.11+ +- Python 3.11 or newer - uv (package manager) - Docker and Docker Compose (for development environment) -### Common Commands - -Using Just (task runner): +### Setup ```bash # Install dependencies just install -# Run linting -just lint +# Or manually: +uv lock --upgrade +uv sync --all-extras --all-packages --frozen +``` -# Check types -just check-types +### Running Tests -# Run fast tests (excluding integration tests) +```bash +# Run fast tests (unit tests only) just test-fast -# Run all tests (requires Docker) +# Run all tests (including integration tests with Docker) just test -# Run integration environment -just run-artemis - -# Run example consumer -just run-consumer - -# Run example producer -just run-producer +# Run tests with specific arguments +just test-fast -k "test_specific_feature" ``` -Using uv directly: +### Code Quality ```bash -# Install dependencies -uv sync --all-extras --all-packages --frozen +# Run linters +just lint + +# Check types +just check-types -# Run linting -uv run ruff check . +# Format code uv run ruff format . +``` -# Check types -uv run mypy . +### Running Examples + +```bash +# Start ActiveMQ Artemis +just run-artemis + +# Run consumer example +just run-consumer -# Run tests -uv run pytest +# Run producer example +just run-producer ``` -### Development Workflow +## Development Conventions + +### Code Style -1. Start the development environment: `docker compose up -d` -2. Install dependencies: `just install` -3. Run tests: `just test` -4. Make changes to the code -5. Run linting and type checking: `just lint && just check-types` +- Strict adherence to type hints with mypy in strict mode +- Code formatting with ruff (line length 120) +- Comprehensive unit and integration tests +- Modern Python features (3.11+) encouraged -## Testing +### Testing -The project uses pytest for testing with the following configuration: +- Unit tests in `test_stompman/` directory +- Integration tests that require Docker containers +- Property-based testing with hypothesis +- Test coverage reporting enabled -- Unit tests are located in `test_stompman/` directories -- Integration tests require running ActiveMQ instances -- Tests use both ActiveMQ Artemis and ActiveMQ Classic when possible -- Code coverage reporting is enabled by default +### CI/CD -## Coding Standards +- Automated testing on multiple platforms +- Type checking and linting in CI pipeline +- Automated publishing to PyPI -The project follows these coding standards: +## Common Development Tasks -- Strict type checking with mypy -- Code formatting with ruff -- Comprehensive unit and integration tests -- Modern Python features (3.11+) -- Clean, readable, and well-documented code +1. **Adding a new feature**: + - Implement in the appropriate module under `stompman/` + - Add unit tests in `test_stompman/` + - Update documentation in docstrings and README if needed -## Examples +2. **Fixing a bug**: + - Write a failing test that reproduces the issue + - Fix the implementation + - Verify the test now passes -The `examples/` directory contains sample code showing how to: +3. **Updating dependencies**: + - Modify `pyproject.toml` files + - Run `uv lock --upgrade` to update lock files -- Create a basic consumer -- Create a basic producer -- Use the FastStream integration -- Implement broadcast messaging patterns +4. **Running integration tests**: + - Ensure Docker is running + - Run `just test` to start containers and run tests From 717e73536811db9ae55e04e808355d15200a9004 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 14:48:15 +0300 Subject: [PATCH 3/9] Add reconnection tracking to prevent stale ACK/NACK operations --- .../stompman/stompman/connection_manager.py | 2 + packages/stompman/stompman/subscription.py | 21 ++++ .../test_stompman/test_subscription.py | 106 ++++++++++++++++++ 3 files changed, 129 insertions(+) diff --git a/packages/stompman/stompman/connection_manager.py b/packages/stompman/stompman/connection_manager.py index 6f8ec25..1766c84 100644 --- a/packages/stompman/stompman/connection_manager.py +++ b/packages/stompman/stompman/connection_manager.py @@ -55,6 +55,7 @@ class ConnectionManager: _reconnect_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock) _task_group: asyncio.TaskGroup = field(init=False, default_factory=asyncio.TaskGroup) _send_heartbeat_task: asyncio.Task[None] = field(init=False, repr=False) + _reconnection_count: int = field(default=0, init=False) async def __aenter__(self) -> Self: await self._task_group.__aenter__() @@ -171,6 +172,7 @@ def _clear_active_connection_state(self, error_reason: ConnectionLostError) -> N self._active_connection_state.lifespan.connection_parameters, ) self._active_connection_state = None + self._reconnection_count += 1 async def write_heartbeat_reconnecting(self) -> None: for _ in range(self.write_retry_attempts): diff --git a/packages/stompman/stompman/subscription.py b/packages/stompman/stompman/subscription.py index 6d483d7..76eabb2 100644 --- a/packages/stompman/stompman/subscription.py +++ b/packages/stompman/stompman/subscription.py @@ -59,6 +59,7 @@ class BaseSubscription: ack: AckMode _connection_manager: ConnectionManager _active_subscriptions: ActiveSubscriptions + _subscription_reconnection_count: int = field(default=0, init=False) async def _subscribe(self) -> None: await self._connection_manager.write_frame_reconnecting( @@ -66,6 +67,8 @@ async def _subscribe(self) -> None: subscription_id=self.id, destination=self.destination, ack=self.ack, headers=self.headers ) ) + # Store the current reconnection count when subscribing + self._subscription_reconnection_count = self._connection_manager._reconnection_count self._active_subscriptions.add(self) # type: ignore[arg-type] async def unsubscribe(self) -> None: @@ -91,6 +94,15 @@ async def _nack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return + # Check if the connection is still the same as when the message was received + if self._subscription_reconnection_count != self._connection_manager._reconnection_count: + LOGGER.debug( + "skipping nack for message frame: connection changed since message was received. " + "message_id: %s, subscription_id: %s", + frame.headers["message-id"], + self.id, + ) + return await self._connection_manager.maybe_write_frame(NackFrame(headers={"id": ack_id, "subscription": self.id})) async def _ack(self, frame: MessageFrame) -> None: @@ -112,6 +124,15 @@ async def _ack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return + # Check if the connection is still the same as when the message was received + if self._subscription_reconnection_count != self._connection_manager._reconnection_count: + LOGGER.debug( + "skipping ack for message frame: connection changed since message was received. " + "message_id: %s, subscription_id: %s", + frame.headers["message-id"], + self.id, + ) + return await self._connection_manager.maybe_write_frame(AckFrame(headers={"id": ack_id, "subscription": self.id})) diff --git a/packages/stompman/test_stompman/test_subscription.py b/packages/stompman/test_stompman/test_subscription.py index ec4a133..4f1774c 100644 --- a/packages/stompman/test_stompman/test_subscription.py +++ b/packages/stompman/test_stompman/test_subscription.py @@ -1,4 +1,5 @@ import asyncio +import logging from functools import partial from typing import get_args from unittest import mock @@ -348,6 +349,111 @@ async def handle_message(message: stompman.subscription.AckableMessageFrame) -> async def test_client_listen_raises_on_aexit(monkeypatch: pytest.MonkeyPatch, faker: faker.Faker) -> None: monkeypatch.setattr("asyncio.sleep", partial(asyncio.sleep, 0)) + +async def test_subscription_skips_ack_nack_after_reconnection( + monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture +) -> None: + """Test that subscriptions skip ACK/NACK frames after a reconnection.""" + subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr() + monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) + + # Create a message frame + message_frame = build_dataclass( + MessageFrame, + headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id}, + ) + + # Create a mock subscription to test our logic directly + from stompman.connection_manager import ConnectionManager + from stompman.subscription import ActiveSubscriptions, ManualAckSubscription + + # Create mock connection manager with a reconnection count + connection_manager = mock.Mock(spec=ConnectionManager) + connection_manager._reconnection_count = 1 # Simulate a reconnection happened + connection_manager.maybe_write_frame = mock.AsyncMock(return_value=True) + + # Create active subscriptions + active_subscriptions = ActiveSubscriptions() + + # Create subscription + subscription = ManualAckSubscription( + destination=destination, + handler=mock.AsyncMock(), + ack="client-individual", + headers=None, + _connection_manager=connection_manager, + _active_subscriptions=active_subscriptions, + ) + + # Set the subscription reconnection count to 0 (before reconnection) + subscription._subscription_reconnection_count = 0 + + # Add subscription to active subscriptions + active_subscriptions.add(subscription) + + with caplog.at_level(logging.DEBUG, logger="stompman"): + # Try to send ACK - this should be skipped + await subscription._ack(message_frame) + # Try to send NACK - this should be skipped + await subscription._nack(message_frame) + + # Check that we logged the skipping messages + assert any("connection changed since message was received" in msg for msg in caplog.messages) + # Check that maybe_write_frame was never called + connection_manager.maybe_write_frame.assert_not_called() + + +async def test_subscription_sends_ack_nack_normally(monkeypatch: pytest.MonkeyPatch, faker: faker.Faker) -> None: + """Test that subscriptions send ACK/NACK frames normally when no reconnection happened.""" + subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr() + monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) + + # Create a message frame + message_frame = build_dataclass( + MessageFrame, + headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id}, + ) + + # Create a mock subscription to test our logic directly + from stompman.connection_manager import ConnectionManager + from stompman.subscription import ActiveSubscriptions, ManualAckSubscription + + # Create mock connection manager with a reconnection count + connection_manager = mock.Mock(spec=ConnectionManager) + connection_manager._reconnection_count = 0 # No reconnection + connection_manager.maybe_write_frame = mock.AsyncMock(return_value=True) + + # Create active subscriptions + active_subscriptions = ActiveSubscriptions() + + # Create subscription + subscription = ManualAckSubscription( + destination=destination, + handler=mock.AsyncMock(), + ack="client-individual", + headers=None, + _connection_manager=connection_manager, + _active_subscriptions=active_subscriptions, + ) + + # Set the subscription reconnection count to 0 (same as current) + subscription._subscription_reconnection_count = 0 + + # Add subscription to active subscriptions + active_subscriptions.add(subscription) + + # Try to send ACK - this should be sent normally + await subscription._ack(message_frame) + # Try to send NACK - this should be sent normally + await subscription._nack(message_frame) + + # Check that maybe_write_frame was called for both ACK and NACK + assert connection_manager.maybe_write_frame.call_count == 2 + # Check that the calls were for AckFrame and NackFrame + calls = connection_manager.maybe_write_frame.call_args_list + assert isinstance(calls[0][0][0], AckFrame) + assert isinstance(calls[1][0][0], NackFrame) + connection_class, _ = create_spying_connection(*get_read_frames_with_lifespan([])) connection_class.connect = mock.AsyncMock(side_effect=[connection_class(), None, None, None]) # type: ignore[method-assign] From ce3b7a9b0fe71e45513821ee1a4cab4616e855f0 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 15:24:13 +0300 Subject: [PATCH 4/9] Remove redundant subscription reconnection count initialization and improve logging context for ack/nack operations. --- packages/stompman/stompman/subscription.py | 13 +-- .../test_stompman/test_subscription.py | 100 +++++------------- 2 files changed, 31 insertions(+), 82 deletions(-) diff --git a/packages/stompman/stompman/subscription.py b/packages/stompman/stompman/subscription.py index 76eabb2..27a2533 100644 --- a/packages/stompman/stompman/subscription.py +++ b/packages/stompman/stompman/subscription.py @@ -59,7 +59,7 @@ class BaseSubscription: ack: AckMode _connection_manager: ConnectionManager _active_subscriptions: ActiveSubscriptions - _subscription_reconnection_count: int = field(default=0, init=False) + _subscription_reconnection_count: int = field(init=False) async def _subscribe(self) -> None: await self._connection_manager.write_frame_reconnecting( @@ -67,7 +67,6 @@ async def _subscribe(self) -> None: subscription_id=self.id, destination=self.destination, ack=self.ack, headers=self.headers ) ) - # Store the current reconnection count when subscribing self._subscription_reconnection_count = self._connection_manager._reconnection_count self._active_subscriptions.add(self) # type: ignore[arg-type] @@ -94,13 +93,14 @@ async def _nack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return - # Check if the connection is still the same as when the message was received if self._subscription_reconnection_count != self._connection_manager._reconnection_count: LOGGER.debug( "skipping nack for message frame: connection changed since message was received. " - "message_id: %s, subscription_id: %s", + "message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s", frame.headers["message-id"], self.id, + self._subscription_reconnection_count, + self._connection_manager._reconnection_count, ) return await self._connection_manager.maybe_write_frame(NackFrame(headers={"id": ack_id, "subscription": self.id})) @@ -124,13 +124,14 @@ async def _ack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return - # Check if the connection is still the same as when the message was received if self._subscription_reconnection_count != self._connection_manager._reconnection_count: LOGGER.debug( "skipping ack for message frame: connection changed since message was received. " - "message_id: %s, subscription_id: %s", + "message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s", frame.headers["message-id"], self.id, + self._subscription_reconnection_count, + self._connection_manager._reconnection_count, ) return await self._connection_manager.maybe_write_frame(AckFrame(headers={"id": ack_id, "subscription": self.id})) diff --git a/packages/stompman/test_stompman/test_subscription.py b/packages/stompman/test_stompman/test_subscription.py index 4f1774c..70d8f67 100644 --- a/packages/stompman/test_stompman/test_subscription.py +++ b/packages/stompman/test_stompman/test_subscription.py @@ -349,6 +349,30 @@ async def handle_message(message: stompman.subscription.AckableMessageFrame) -> async def test_client_listen_raises_on_aexit(monkeypatch: pytest.MonkeyPatch, faker: faker.Faker) -> None: monkeypatch.setattr("asyncio.sleep", partial(asyncio.sleep, 0)) + connection_class, _ = create_spying_connection(*get_read_frames_with_lifespan([])) + connection_class.connect = mock.AsyncMock(side_effect=[connection_class(), None, None, None]) # type: ignore[method-assign] + + async def close_connection_soon(client: stompman.Client) -> None: + await asyncio.sleep(0) + client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError)) + + with pytest.raises(ExceptionGroup) as exc_info: # noqa: PT012 + async with asyncio.TaskGroup() as task_group, EnrichedClient(connection_class=connection_class) as client: + await client.subscribe(faker.pystr(), noop_message_handler, on_suppressed_exception=noop_error_handler) + task_group.create_task(close_connection_soon(client)) + + assert len(exc_info.value.exceptions) == 1 + inner_group = exc_info.value.exceptions[0] + + assert isinstance(inner_group, ExceptionGroup) + assert len(inner_group.exceptions) == 1 + + inner_inner_group = inner_group.exceptions[0] + assert isinstance(inner_inner_group, ExceptionGroup) + assert len(inner_inner_group.exceptions) == 1 + + assert isinstance(inner_inner_group.exceptions[0], FailedAllConnectAttemptsError) + async def test_subscription_skips_ack_nack_after_reconnection( monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture @@ -403,82 +427,6 @@ async def test_subscription_skips_ack_nack_after_reconnection( connection_manager.maybe_write_frame.assert_not_called() -async def test_subscription_sends_ack_nack_normally(monkeypatch: pytest.MonkeyPatch, faker: faker.Faker) -> None: - """Test that subscriptions send ACK/NACK frames normally when no reconnection happened.""" - subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr() - monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) - - # Create a message frame - message_frame = build_dataclass( - MessageFrame, - headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id}, - ) - - # Create a mock subscription to test our logic directly - from stompman.connection_manager import ConnectionManager - from stompman.subscription import ActiveSubscriptions, ManualAckSubscription - - # Create mock connection manager with a reconnection count - connection_manager = mock.Mock(spec=ConnectionManager) - connection_manager._reconnection_count = 0 # No reconnection - connection_manager.maybe_write_frame = mock.AsyncMock(return_value=True) - - # Create active subscriptions - active_subscriptions = ActiveSubscriptions() - - # Create subscription - subscription = ManualAckSubscription( - destination=destination, - handler=mock.AsyncMock(), - ack="client-individual", - headers=None, - _connection_manager=connection_manager, - _active_subscriptions=active_subscriptions, - ) - - # Set the subscription reconnection count to 0 (same as current) - subscription._subscription_reconnection_count = 0 - - # Add subscription to active subscriptions - active_subscriptions.add(subscription) - - # Try to send ACK - this should be sent normally - await subscription._ack(message_frame) - # Try to send NACK - this should be sent normally - await subscription._nack(message_frame) - - # Check that maybe_write_frame was called for both ACK and NACK - assert connection_manager.maybe_write_frame.call_count == 2 - # Check that the calls were for AckFrame and NackFrame - calls = connection_manager.maybe_write_frame.call_args_list - assert isinstance(calls[0][0][0], AckFrame) - assert isinstance(calls[1][0][0], NackFrame) - - connection_class, _ = create_spying_connection(*get_read_frames_with_lifespan([])) - connection_class.connect = mock.AsyncMock(side_effect=[connection_class(), None, None, None]) # type: ignore[method-assign] - - async def close_connection_soon(client: stompman.Client) -> None: - await asyncio.sleep(0) - client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError)) - - with pytest.raises(ExceptionGroup) as exc_info: # noqa: PT012 - async with asyncio.TaskGroup() as task_group, EnrichedClient(connection_class=connection_class) as client: - await client.subscribe(faker.pystr(), noop_message_handler, on_suppressed_exception=noop_error_handler) - task_group.create_task(close_connection_soon(client)) - - assert len(exc_info.value.exceptions) == 1 - inner_group = exc_info.value.exceptions[0] - - assert isinstance(inner_group, ExceptionGroup) - assert len(inner_group.exceptions) == 1 - - inner_inner_group = inner_group.exceptions[0] - assert isinstance(inner_inner_group, ExceptionGroup) - assert len(inner_inner_group.exceptions) == 1 - - assert isinstance(inner_inner_group.exceptions[0], FailedAllConnectAttemptsError) - - def test_make_subscription_id() -> None: stompman.subscription._make_subscription_id() From c2e71ba2a7da48399b90547c08c0b293423bb8f8 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 15:24:25 +0300 Subject: [PATCH 5/9] Rename subscription reconnection count variable for clarity --- packages/stompman/stompman/subscription.py | 12 ++++++------ packages/stompman/test_stompman/test_subscription.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/stompman/stompman/subscription.py b/packages/stompman/stompman/subscription.py index 27a2533..14c9268 100644 --- a/packages/stompman/stompman/subscription.py +++ b/packages/stompman/stompman/subscription.py @@ -59,7 +59,7 @@ class BaseSubscription: ack: AckMode _connection_manager: ConnectionManager _active_subscriptions: ActiveSubscriptions - _subscription_reconnection_count: int = field(init=False) + _bound_reconnection_count: int = field(init=False) async def _subscribe(self) -> None: await self._connection_manager.write_frame_reconnecting( @@ -67,7 +67,7 @@ async def _subscribe(self) -> None: subscription_id=self.id, destination=self.destination, ack=self.ack, headers=self.headers ) ) - self._subscription_reconnection_count = self._connection_manager._reconnection_count + self._bound_reconnection_count = self._connection_manager._reconnection_count self._active_subscriptions.add(self) # type: ignore[arg-type] async def unsubscribe(self) -> None: @@ -93,13 +93,13 @@ async def _nack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return - if self._subscription_reconnection_count != self._connection_manager._reconnection_count: + if self._bound_reconnection_count != self._connection_manager._reconnection_count: LOGGER.debug( "skipping nack for message frame: connection changed since message was received. " "message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s", frame.headers["message-id"], self.id, - self._subscription_reconnection_count, + self._bound_reconnection_count, self._connection_manager._reconnection_count, ) return @@ -124,13 +124,13 @@ async def _ack(self, frame: MessageFrame) -> None: frame.headers.keys(), ) return - if self._subscription_reconnection_count != self._connection_manager._reconnection_count: + if self._bound_reconnection_count != self._connection_manager._reconnection_count: LOGGER.debug( "skipping ack for message frame: connection changed since message was received. " "message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s", frame.headers["message-id"], self.id, - self._subscription_reconnection_count, + self._bound_reconnection_count, self._connection_manager._reconnection_count, ) return diff --git a/packages/stompman/test_stompman/test_subscription.py b/packages/stompman/test_stompman/test_subscription.py index 70d8f67..e9006a0 100644 --- a/packages/stompman/test_stompman/test_subscription.py +++ b/packages/stompman/test_stompman/test_subscription.py @@ -410,7 +410,7 @@ async def test_subscription_skips_ack_nack_after_reconnection( ) # Set the subscription reconnection count to 0 (before reconnection) - subscription._subscription_reconnection_count = 0 + subscription._bound_reconnection_count = 0 # Add subscription to active subscriptions active_subscriptions.add(subscription) From 263b87e4f3ff6105e3d1ffb798460f52c5baa788 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 15:26:18 +0300 Subject: [PATCH 6/9] Refactor subscription tests and imports --- .../test_stompman/test_subscription.py | 33 ++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/packages/stompman/test_stompman/test_subscription.py b/packages/stompman/test_stompman/test_subscription.py index e9006a0..2d480f4 100644 --- a/packages/stompman/test_stompman/test_subscription.py +++ b/packages/stompman/test_stompman/test_subscription.py @@ -11,9 +11,11 @@ AckFrame, AckMode, ConnectedFrame, + ConnectionLostError, ErrorFrame, FailedAllConnectAttemptsError, HeartbeatFrame, + ManualAckSubscription, MessageFrame, NackFrame, ReceiptFrame, @@ -21,7 +23,8 @@ SubscribeFrame, UnsubscribeFrame, ) -from stompman.errors import ConnectionLostError +from stompman.connection_manager import ConnectionManager +from stompman.subscription import ActiveSubscriptions from test_stompman.conftest import ( CONNECT_FRAME, @@ -377,29 +380,16 @@ async def close_connection_soon(client: stompman.Client) -> None: async def test_subscription_skips_ack_nack_after_reconnection( monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture ) -> None: - """Test that subscriptions skip ACK/NACK frames after a reconnection.""" subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr() monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) - - # Create a message frame message_frame = build_dataclass( MessageFrame, headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id}, ) - - # Create a mock subscription to test our logic directly - from stompman.connection_manager import ConnectionManager - from stompman.subscription import ActiveSubscriptions, ManualAckSubscription - - # Create mock connection manager with a reconnection count - connection_manager = mock.Mock(spec=ConnectionManager) - connection_manager._reconnection_count = 1 # Simulate a reconnection happened - connection_manager.maybe_write_frame = mock.AsyncMock(return_value=True) - - # Create active subscriptions + connection_manager = mock.Mock( + spec=ConnectionManager, _reconnection_count=1, maybe_write_frame=mock.AsyncMock(return_value=True) + ) active_subscriptions = ActiveSubscriptions() - - # Create subscription subscription = ManualAckSubscription( destination=destination, handler=mock.AsyncMock(), @@ -408,22 +398,13 @@ async def test_subscription_skips_ack_nack_after_reconnection( _connection_manager=connection_manager, _active_subscriptions=active_subscriptions, ) - - # Set the subscription reconnection count to 0 (before reconnection) - subscription._bound_reconnection_count = 0 - - # Add subscription to active subscriptions active_subscriptions.add(subscription) with caplog.at_level(logging.DEBUG, logger="stompman"): - # Try to send ACK - this should be skipped await subscription._ack(message_frame) - # Try to send NACK - this should be skipped await subscription._nack(message_frame) - # Check that we logged the skipping messages assert any("connection changed since message was received" in msg for msg in caplog.messages) - # Check that maybe_write_frame was never called connection_manager.maybe_write_frame.assert_not_called() From 20801b4aa4aa4f54489e44e77d486d01dad9442e Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 15:27:38 +0300 Subject: [PATCH 7/9] Add log_level parameter to error log call in integration test --- .../faststream-stomp/test_faststream_stomp/test_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/faststream-stomp/test_faststream_stomp/test_integration.py b/packages/faststream-stomp/test_faststream_stomp/test_integration.py index 8fc3ac6..2bda3a1 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_integration.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_integration.py @@ -226,7 +226,7 @@ def handle_destination(message_frame: Annotated[stompman.MessageFrame, Context(" extra = {"destination": destination, "message_id": message_id} assert log_mock.mock_calls[-3:] == [ mock.call("Received", extra=extra), - mock.call(message="MyError: ", extra=extra, exc_info=MyError()), + mock.call(message="MyError: ", extra=extra, exc_info=MyError(), log_level=logging.ERROR), mock.call(message="Processed", extra=extra), ] From b2c6c79f5fa1efa257a4c88e00b9cd283db16550 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 15:34:11 +0300 Subject: [PATCH 8/9] Refactor subscription tests and fix reconnection logic --- .../test_stompman/test_subscription.py | 67 +++++++++++++------ 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/packages/stompman/test_stompman/test_subscription.py b/packages/stompman/test_stompman/test_subscription.py index 2d480f4..f1f1bec 100644 --- a/packages/stompman/test_stompman/test_subscription.py +++ b/packages/stompman/test_stompman/test_subscription.py @@ -15,7 +15,6 @@ ErrorFrame, FailedAllConnectAttemptsError, HeartbeatFrame, - ManualAckSubscription, MessageFrame, NackFrame, ReceiptFrame, @@ -23,8 +22,6 @@ SubscribeFrame, UnsubscribeFrame, ) -from stompman.connection_manager import ConnectionManager -from stompman.subscription import ActiveSubscriptions from test_stompman.conftest import ( CONNECT_FRAME, @@ -380,32 +377,60 @@ async def close_connection_soon(client: stompman.Client) -> None: async def test_subscription_skips_ack_nack_after_reconnection( monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture ) -> None: + """Test that subscriptions skip ACK/NACK frames after a reconnection using a real client.""" subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr() monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) + + # Create a message frame as if it was received before reconnection message_frame = build_dataclass( MessageFrame, headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id}, ) - connection_manager = mock.Mock( - spec=ConnectionManager, _reconnection_count=1, maybe_write_frame=mock.AsyncMock(return_value=True) - ) - active_subscriptions = ActiveSubscriptions() - subscription = ManualAckSubscription( - destination=destination, - handler=mock.AsyncMock(), - ack="client-individual", - headers=None, - _connection_manager=connection_manager, - _active_subscriptions=active_subscriptions, - ) - active_subscriptions.add(subscription) - with caplog.at_level(logging.DEBUG, logger="stompman"): - await subscription._ack(message_frame) - await subscription._nack(message_frame) + # Use a spying connection to capture frames + connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame])) + + # Track if ack/nack frames were sent + stored_message = None + + async def track_ack_nack_frames(message: stompman.subscription.AckableMessageFrame) -> None: + # Store reference to the message for later ACK/NACK after reconnection + nonlocal stored_message + stored_message = message + # Don't send ack/nack here, we want to test sending them after reconnection + # Add a small await to make this a proper async function + await asyncio.sleep(0) + + async with EnrichedClient(connection_class=connection_class) as client: + # Subscribe to create a subscription with _subscription_reconnection_count = 0 + subscription = await client.subscribe_with_manual_ack(destination, track_ack_nack_frames) + await asyncio.sleep(0) # Allow message processing + + # Simulate connection loss/reconnection by clearing active connection state + # This will increment _reconnection_count to 1 + client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError)) + + # Wait for reconnection + await asyncio.sleep(0) + + # Try to send ACK/NACK after reconnection - these should be skipped + with caplog.at_level(logging.DEBUG, logger="stompman"): + assert stored_message + await stored_message.ack() + await stored_message.nack() - assert any("connection changed since message was received" in msg for msg in caplog.messages) - connection_manager.maybe_write_frame.assert_not_called() + await subscription.unsubscribe() + + # Verify that no ACK or NACK frames were sent + ack_frames = [frame for frame in collected_frames if isinstance(frame, AckFrame)] + nack_frames = [frame for frame in collected_frames if isinstance(frame, NackFrame)] + assert len(ack_frames) == 0, "No ACK frames should be sent after reconnection" + assert len(nack_frames) == 0, "No NACK frames should be sent after reconnection" + + # Verify that we logged the skipping messages + assert any("connection changed since message was received" in msg.lower() for msg in caplog.messages), ( + "Should log message about connection change" + ) def test_make_subscription_id() -> None: From 4380c7bd46ae8c7002e631036f6de6d008ea86d9 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Tue, 21 Oct 2025 15:35:48 +0300 Subject: [PATCH 9/9] Simplify test_subscription_skips_ack_nack_after_reconnection logic and assertions --- .../test_stompman/test_subscription.py | 32 +++---------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/packages/stompman/test_stompman/test_subscription.py b/packages/stompman/test_stompman/test_subscription.py index f1f1bec..488d114 100644 --- a/packages/stompman/test_stompman/test_subscription.py +++ b/packages/stompman/test_stompman/test_subscription.py @@ -377,43 +377,26 @@ async def close_connection_soon(client: stompman.Client) -> None: async def test_subscription_skips_ack_nack_after_reconnection( monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture ) -> None: - """Test that subscriptions skip ACK/NACK frames after a reconnection using a real client.""" subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr() monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id)) - - # Create a message frame as if it was received before reconnection message_frame = build_dataclass( MessageFrame, headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id}, ) - - # Use a spying connection to capture frames connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame])) - - # Track if ack/nack frames were sent stored_message = None async def track_ack_nack_frames(message: stompman.subscription.AckableMessageFrame) -> None: - # Store reference to the message for later ACK/NACK after reconnection nonlocal stored_message stored_message = message - # Don't send ack/nack here, we want to test sending them after reconnection - # Add a small await to make this a proper async function await asyncio.sleep(0) async with EnrichedClient(connection_class=connection_class) as client: - # Subscribe to create a subscription with _subscription_reconnection_count = 0 subscription = await client.subscribe_with_manual_ack(destination, track_ack_nack_frames) - await asyncio.sleep(0) # Allow message processing - - # Simulate connection loss/reconnection by clearing active connection state - # This will increment _reconnection_count to 1 + await asyncio.sleep(0) client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError)) - - # Wait for reconnection await asyncio.sleep(0) - # Try to send ACK/NACK after reconnection - these should be skipped with caplog.at_level(logging.DEBUG, logger="stompman"): assert stored_message await stored_message.ack() @@ -421,15 +404,10 @@ async def track_ack_nack_frames(message: stompman.subscription.AckableMessageFra await subscription.unsubscribe() - # Verify that no ACK or NACK frames were sent - ack_frames = [frame for frame in collected_frames if isinstance(frame, AckFrame)] - nack_frames = [frame for frame in collected_frames if isinstance(frame, NackFrame)] - assert len(ack_frames) == 0, "No ACK frames should be sent after reconnection" - assert len(nack_frames) == 0, "No NACK frames should be sent after reconnection" - - # Verify that we logged the skipping messages - assert any("connection changed since message was received" in msg.lower() for msg in caplog.messages), ( - "Should log message about connection change" + assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, AckFrame)] + assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, NackFrame)] + assert any( + "connection changed since message was received" in one_message.lower() for one_message in caplog.messages )