Conversation
Coverage Report
96 files skipped due to complete coverage. |
Deploying repid with
|
| Latest commit: |
aa164a1
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://b766a750.repid.pages.dev |
| Branch Preview URL: | https://kafka.repid.pages.dev |
There was a problem hiding this comment.
Pull request overview
Adds a Kafka (Redpanda-backed) server implementation to Repid to support publish/subscribe and message acknowledgements via the existing Repid server APIs (Fix #98).
Changes:
- Introduces
KafkaServer,KafkaSubscriber, andKafkaReceivedMessageimplementations usingaiokafka. - Adds Redpanda-based integration fixtures and Kafka-specific integration tests covering ack/nack/reject/reply and subscriber lifecycle.
- Adds an optional
kafkaextra (aiokafka) and updates mypy/pre-commit configuration to accommodate the new dependency.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
repid/connections/kafka/message_broker.py |
New Kafka server implementation (publish/subscribe, connection lifecycle). |
repid/connections/kafka/subscriber.py |
New consumer/subscriber loop with concurrency limiting and manual commits. |
repid/connections/kafka/message.py |
New received-message wrapper implementing ack/nack/reject/reply semantics. |
repid/connections/kafka/__init__.py |
Exposes KafkaServer from the kafka package. |
repid/connections/__init__.py |
Conditionally exports KafkaServer when aiokafka is installed. |
tests/integration/conftest.py |
Adds Redpanda container + kafka_connection fixture; includes Kafka in integration parametrization. |
tests/integration/test_kafka_specific.py |
Adds Kafka-specific integration tests for message actions and subscriber lifecycle. |
pyproject.toml |
Adds kafka optional dependency and mypy override for aiokafka.*. |
.pre-commit-config.yaml |
Adjusts mypy pre-commit entry invocation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
a768109 to
39402c3
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| await asyncio.wait_for(event.wait(), timeout=10.0) | ||
| await subscriber.close() | ||
|
|
||
| assert received_msg is not None | ||
| assert received_msg.payload == b"reject_payload" | ||
| assert received_msg.content_type == "text/plain" | ||
|
|
||
| await received_msg.reject() |
There was a problem hiding this comment.
In this test the subscriber is closed before calling received_msg.reject(). KafkaReceivedMessage.reject() ultimately calls the subscriber’s consumer.commit(...) via the mark-complete callback, but KafkaSubscriber.close() stops the consumer, so committing after close() is likely to fail/flap. Keep the subscriber open until after the message has been acted on (reject/nack/reply), or adjust the Kafka implementation so message actions can still commit after subscriber shutdown.
| await asyncio.wait_for(event.wait(), timeout=10.0) | |
| await subscriber.close() | |
| assert received_msg is not None | |
| assert received_msg.payload == b"reject_payload" | |
| assert received_msg.content_type == "text/plain" | |
| await received_msg.reject() | |
| try: | |
| await asyncio.wait_for(event.wait(), timeout=10.0) | |
| assert received_msg is not None | |
| assert received_msg.payload == b"reject_payload" | |
| assert received_msg.content_type == "text/plain" | |
| await received_msg.reject() | |
| finally: | |
| await subscriber.close() |
| await subscriber.close() | ||
|
|
||
| assert received_msg is not None | ||
|
|
||
| assert received_msg.content_type == "application/json" | ||
| await received_msg.nack() |
There was a problem hiding this comment.
Same issue as above: the subscriber is closed before received_msg.nack() is called. Since nack commits the offset via the consumer, calling it after subscriber.close() (which stops the consumer) is likely to fail/flap. Move await subscriber.close() to after the nack (and any follow-up assertions that require the action), or change the Kafka implementation to allow committing after close.
| await subscriber.close() | |
| assert received_msg is not None | |
| assert received_msg.content_type == "application/json" | |
| await received_msg.nack() | |
| assert received_msg is not None | |
| assert received_msg.content_type == "application/json" | |
| await received_msg.nack() | |
| await subscriber.close() |
| await asyncio.wait_for(event.wait(), timeout=10.0) | ||
| await subscriber.close() | ||
|
|
||
| assert received_msg is not None | ||
|
|
||
| await received_msg.reply( | ||
| payload=b"reply_response", | ||
| headers={"is_reply": "true"}, | ||
| content_type="application/json", | ||
| channel=reply_channel_name, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Same ordering issue: the subscriber is closed before received_msg.reply(...) is invoked, but reply commits via the consumer. With the consumer stopped in subscriber.close(), the commit can fail. Call reply(...) before closing the subscriber (or make Kafka message actions independent of subscriber shutdown).
| for _ in range(100): | ||
| try: | ||
| kafka_container._container.reload() | ||
| port = kafka_container.ports["29092/tcp"][0] | ||
| break | ||
| except (KeyError, AttributeError): | ||
| sleep(0.1) |
There was a problem hiding this comment.
This is an async fixture but it calls time.sleep(...) while waiting for the container port, which blocks the event loop and can slow/flap the entire async test session. Use await asyncio.sleep(...) in this loop (and consider using a monotonic deadline rather than a fixed iteration count).
| for _ in range(100): | |
| try: | |
| kafka_container._container.reload() | |
| port = kafka_container.ports["29092/tcp"][0] | |
| break | |
| except (KeyError, AttributeError): | |
| sleep(0.1) | |
| loop = asyncio.get_running_loop() | |
| deadline = loop.time() + 10 | |
| while loop.time() < deadline: | |
| try: | |
| kafka_container._container.reload() | |
| port = kafka_container.ports["29092/tcp"][0] | |
| break | |
| except (KeyError, AttributeError): | |
| await asyncio.sleep(0.1) |
| "supports_acknowledgments": True, | ||
| "supports_persistence": True, | ||
| "supports_reply": True, | ||
| "supports_lightweight_pause": False, |
There was a problem hiding this comment.
supports_lightweight_pause is set to False, but KafkaSubscriber.pause()/resume() uses Kafka’s native consumer.pause()/resume() and does not require disconnecting/re-subscribing. Runner uses this capability flag for backpressure (repid/_runner.py:291+), so marking it false will prevent pausing even though it appears supported. Consider setting supports_lightweight_pause to True (and updating the corresponding integration assertion).
| "supports_lightweight_pause": False, | |
| "supports_lightweight_pause": True, |
| self._background_tasks.add(task) | ||
| task.add_done_callback(self._background_tasks.discard) | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
_consume_loop() only suppresses CancelledError. If consumer.getmany() (or iteration/processing) raises any other exception (e.g. connection issues), the task will exit and the consumer will never be stopped, potentially leaking resources and leaving the subscriber silently inactive. Handle unexpected exceptions (log + retry with backoff, or transition to a closed state and stop the consumer in a finally).
| pass | |
| self._closed = True | |
| raise | |
| except Exception as exc: | |
| self._closed = True | |
| logger.exception("subscriber.consume_loop.error", exc_info=exc) | |
| finally: | |
| self._closed = True | |
| with contextlib.suppress(Exception): | |
| await self._consumer.stop() |
Change Summary
Kafka server
Related issue number
Fix #98
Checklist