Skip to content
Merged
172 changes: 172 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# stompman Project Context

## Project Overview

stompman is a modern, asynchronous Python client for the STOMP (Simple Text Oriented Messaging Protocol) messaging protocol. It provides a typed, modern, and comprehensible API for working with STOMP-compatible message brokers like ActiveMQ Artemis and ActiveMQ Classic.

The project consists of two main packages:
1. `stompman` - The core STOMP client library
2. `faststream-stomp` - A FastStream broker implementation for STOMP

## Key Features

- Fully asynchronous implementation using Python's asyncio
- Modern, typed API with comprehensive type hints
- Automatic connection management with reconnection capabilities
- Support for transactions, subscriptions, and message acknowledgment
- Built-in heartbeat support for connection health monitoring
- Integration with FastStream for declarative message handling
- Compatible with STOMP 1.2 protocol specification
- Tested with ActiveMQ Artemis and ActiveMQ Classic

## Project Structure

```
stompman/
├── packages/
│ ├── stompman/ # Core STOMP client library
│ │ ├── stompman/ # Main source code
│ │ └── test_stompman/ # Unit and integration tests
│ └── faststream-stomp/ # FastStream broker implementation
│ ├── faststream_stomp/ # Main source code
│ └── test_faststream_stomp/ # Unit and integration tests
├── examples/ # Usage examples
├── docker-compose.yml # Development environment with ActiveMQ containers
└── Justfile # Project commands and workflows
```

## Core Components (stompman package)

### Main Classes

- `Client` - The main entry point for interacting with STOMP servers
- `ConnectionParameters` - Configuration for connecting to STOMP servers
- `Heartbeat` - Configuration for connection heartbeats

### Key Methods

- `Client.send()` - Send messages to destinations
- `Client.subscribe()` - Subscribe to destinations with automatic ACK/NACK handling
- `Client.subscribe_with_manual_ack()` - Subscribe with manual ACK/NACK control
- `Client.begin()` - Start a transaction context manager
- `Client.is_alive()` - Check connection health

### Error Handling

- `FailedAllConnectAttemptsError` - Raised when all connection attempts fail
- `FailedAllWriteAttemptsError` - Raised when writes fail after all retries
- Various other specific error types for different failure scenarios

## FastStream Integration (faststream-stomp package)

Provides a FastStream broker implementation that allows using FastStream's declarative approach with STOMP:

- `StompBroker` - Main broker class
- Decorators for subscribers and publishers
- Testing utilities with `TestStompBroker`

## Development Environment

The project uses Docker Compose to provide a development environment with:
- ActiveMQ Artemis on port 9000
- ActiveMQ Classic on port 9001

## Building and Running

### Prerequisites

- Python 3.11 or newer
- uv (package manager)
- Docker and Docker Compose (for development environment)

### Setup

```bash
# Install dependencies
just install

# Or manually:
uv lock --upgrade
uv sync --all-extras --all-packages --frozen
```

### Running Tests

```bash
# Run fast tests (unit tests only)
just test-fast

# Run all tests (including integration tests with Docker)
just test

# Run tests with specific arguments
just test-fast -k "test_specific_feature"
```

### Code Quality

```bash
# Run linters
just lint

# Check types
just check-types

# Format code
uv run ruff format .
```

### Running Examples

```bash
# Start ActiveMQ Artemis
just run-artemis

# Run consumer example
just run-consumer

# Run producer example
just run-producer
```

## Development Conventions

### Code Style

- Strict adherence to type hints with mypy in strict mode
- Code formatting with ruff (line length 120)
- Comprehensive unit and integration tests
- Modern Python features (3.11+) encouraged

### Testing

- Unit tests in `test_stompman/` directory
- Integration tests that require Docker containers
- Property-based testing with hypothesis
- Test coverage reporting enabled

### CI/CD

- Automated testing on multiple platforms
- Type checking and linting in CI pipeline
- Automated publishing to PyPI

## Common Development Tasks

1. **Adding a new feature**:
- Implement in the appropriate module under `stompman/`
- Add unit tests in `test_stompman/`
- Update documentation in docstrings and README if needed

2. **Fixing a bug**:
- Write a failing test that reproduces the issue
- Fix the implementation
- Verify the test now passes

3. **Updating dependencies**:
- Modify `pyproject.toml` files
- Run `uv lock --upgrade` to update lock files

4. **Running integration tests**:
- Ensure Docker is running
- Run `just test` to start containers and run tests
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def handle_destination(message_frame: Annotated[stompman.MessageFrame, Context("
extra = {"destination": destination, "message_id": message_id}
assert log_mock.mock_calls[-3:] == [
mock.call("Received", extra=extra),
mock.call(message="MyError: ", extra=extra, exc_info=MyError()),
mock.call(message="MyError: ", extra=extra, exc_info=MyError(), log_level=logging.ERROR),
mock.call(message="Processed", extra=extra),
]

Expand Down
2 changes: 2 additions & 0 deletions packages/stompman/stompman/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ConnectionManager:
_reconnect_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
_task_group: asyncio.TaskGroup = field(init=False, default_factory=asyncio.TaskGroup)
_send_heartbeat_task: asyncio.Task[None] = field(init=False, repr=False)
_reconnection_count: int = field(default=0, init=False)

async def __aenter__(self) -> Self:
await self._task_group.__aenter__()
Expand Down Expand Up @@ -171,6 +172,7 @@ def _clear_active_connection_state(self, error_reason: ConnectionLostError) -> N
self._active_connection_state.lifespan.connection_parameters,
)
self._active_connection_state = None
self._reconnection_count += 1

async def write_heartbeat_reconnecting(self) -> None:
for _ in range(self.write_retry_attempts):
Expand Down
22 changes: 22 additions & 0 deletions packages/stompman/stompman/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ class BaseSubscription:
ack: AckMode
_connection_manager: ConnectionManager
_active_subscriptions: ActiveSubscriptions
_bound_reconnection_count: int = field(init=False)

async def _subscribe(self) -> None:
await self._connection_manager.write_frame_reconnecting(
SubscribeFrame.build(
subscription_id=self.id, destination=self.destination, ack=self.ack, headers=self.headers
)
)
self._bound_reconnection_count = self._connection_manager._reconnection_count
self._active_subscriptions.add(self) # type: ignore[arg-type]

async def unsubscribe(self) -> None:
Expand All @@ -91,6 +93,16 @@ async def _nack(self, frame: MessageFrame) -> None:
frame.headers.keys(),
)
return
if self._bound_reconnection_count != self._connection_manager._reconnection_count:
LOGGER.debug(
"skipping nack for message frame: connection changed since message was received. "
"message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s",
frame.headers["message-id"],
self.id,
self._bound_reconnection_count,
self._connection_manager._reconnection_count,
)
return
await self._connection_manager.maybe_write_frame(NackFrame(headers={"id": ack_id, "subscription": self.id}))

async def _ack(self, frame: MessageFrame) -> None:
Expand All @@ -112,6 +124,16 @@ async def _ack(self, frame: MessageFrame) -> None:
frame.headers.keys(),
)
return
if self._bound_reconnection_count != self._connection_manager._reconnection_count:
LOGGER.debug(
"skipping ack for message frame: connection changed since message was received. "
"message_id: %s, subscription_id: %s, bound_reconnection_count: %s, current_reconnection_count: %s",
frame.headers["message-id"],
self.id,
self._bound_reconnection_count,
self._connection_manager._reconnection_count,
)
return
await self._connection_manager.maybe_write_frame(AckFrame(headers={"id": ack_id, "subscription": self.id}))


Expand Down
40 changes: 39 additions & 1 deletion packages/stompman/test_stompman/test_subscription.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
from functools import partial
from typing import get_args
from unittest import mock
Expand All @@ -10,6 +11,7 @@
AckFrame,
AckMode,
ConnectedFrame,
ConnectionLostError,
ErrorFrame,
FailedAllConnectAttemptsError,
HeartbeatFrame,
Expand All @@ -20,7 +22,6 @@
SubscribeFrame,
UnsubscribeFrame,
)
from stompman.errors import ConnectionLostError

from test_stompman.conftest import (
CONNECT_FRAME,
Expand Down Expand Up @@ -373,6 +374,43 @@ async def close_connection_soon(client: stompman.Client) -> None:
assert isinstance(inner_inner_group.exceptions[0], FailedAllConnectAttemptsError)


async def test_subscription_skips_ack_nack_after_reconnection(
monkeypatch: pytest.MonkeyPatch, faker: faker.Faker, caplog: pytest.LogCaptureFixture
) -> None:
subscription_id, destination, message_id, ack_id = faker.pystr(), faker.pystr(), faker.pystr(), faker.pystr()
monkeypatch.setattr(stompman.subscription, "_make_subscription_id", mock.Mock(return_value=subscription_id))
message_frame = build_dataclass(
MessageFrame,
headers={"destination": destination, "message-id": message_id, "subscription": subscription_id, "ack": ack_id},
)
connection_class, collected_frames = create_spying_connection(*get_read_frames_with_lifespan([message_frame]))
stored_message = None

async def track_ack_nack_frames(message: stompman.subscription.AckableMessageFrame) -> None:
nonlocal stored_message
stored_message = message
await asyncio.sleep(0)

async with EnrichedClient(connection_class=connection_class) as client:
subscription = await client.subscribe_with_manual_ack(destination, track_ack_nack_frames)
await asyncio.sleep(0)
client._connection_manager._clear_active_connection_state(build_dataclass(ConnectionLostError))
await asyncio.sleep(0)

with caplog.at_level(logging.DEBUG, logger="stompman"):
assert stored_message
await stored_message.ack()
await stored_message.nack()

await subscription.unsubscribe()

assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, AckFrame)]
assert not [one_frame for one_frame in collected_frames if isinstance(one_frame, NackFrame)]
assert any(
"connection changed since message was received" in one_message.lower() for one_message in caplog.messages
)


def test_make_subscription_id() -> None:
stompman.subscription._make_subscription_id()

Expand Down