feat: EventQueue - unify implementation between python versions#877
feat: EventQueue - unify implementation between python versions#877
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the event queue's shutdown mechanism, ensuring a graceful and robust closing process across various Python versions. By introducing a compatibility layer for Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
🧪 Code Coverage (vs
|
| Base | PR | Delta | |
|---|---|---|---|
| src/a2a/server/events/event_consumer.py | 90.91% | 91.80% | 🟢 +0.89% |
| src/a2a/server/events/event_queue.py | 90.24% | 95.37% | 🟢 +5.13% |
| Total | 90.69% | 90.77% | 🟢 +0.08% |
Generated by coverage-comment.yml
There was a problem hiding this comment.
Code Review
This pull request significantly improves the EventQueue and EventConsumer by introducing graceful shutdown mechanisms and ensuring cross-Python version compatibility for asyncio.QueueShutDown behavior. The refactoring of the close method in EventQueue is well-executed, abstracting away version-specific logic. The addition of culsans for older Python versions and the updated test suite demonstrate a thorough approach to handling these complexities. The new tests adequately cover the graceful closing scenarios and race conditions, which is crucial for robust asynchronous event handling. Additionally, the docstring for the close method has been expanded for better clarity and maintainability.
334ee08 to
ec31c7b
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant improvement to the event queue's graceful shutdown mechanism by backporting asyncio.Queue.shutdown functionality using the culsans library. This change unifies the queue behavior across different Python versions, simplifies the EventQueue implementation by removing version-specific logic, and resolves potential deadlocks in the EventConsumer. The addition of comprehensive, parametrized tests for various shutdown scenarios is a notable enhancement to the test suite's robustness. The overall changes are well-implemented and improve both the correctness and maintainability of the concurrency model.
ec31c7b to
1ac0a1c
Compare
4e5e6df to
bb3ee7a
Compare
bb3ee7a to
d7ecdbd
Compare
🤖 I have created a release *beep* *boop* --- ## [1.0.0-alpha.1](v1.0.0-alpha.0...v1.0.0-alpha.1) (2026-04-10) ### ⚠ BREAKING CHANGES * **client:** make ClientConfig.push_notification_config singular ([#955](#955)) * **client:** reorganize ClientFactory API ([#947](#947)) * **server:** add build_user function to DefaultContextBuilder to allow A2A user creation customization ([#925](#925)) * **client:** remove `ClientTaskManager` and `Consumers` from client ([#916](#916)) * **server:** migrate from Application wrappers to Starlette route-based endpoints for rest ([#892](#892)) * **server:** migrate from Application wrappers to Starlette route-based endpoints for jsonrpc ([#873](#873)) ### Features * A2A Version Header validation on server side. ([#865](#865)) ([b261ceb](b261ceb)) * Add GetExtendedAgentCard Support to RequestHandlers ([#919](#919)) ([2159140](2159140)) * Add support for more Task Message and Artifact fields in the Vertex Task Store ([#936](#936)) ([605fa49](605fa49)) * Create EventQueue interface and make tap() async. ([#914](#914)) ([9ccf99c](9ccf99c)), closes [#869](#869) * EventQueue - unify implementation between python versions ([#877](#877)) ([7437b88](7437b88)), closes [#869](#869) * EventQueue is now a simple interface with single enqueue_event method. ([#944](#944)) ([f0e1d74](f0e1d74)) * Implementation of DefaultRequestHandlerV2 ([#933](#933)) ([462eb3c](462eb3c)), closes [#869](#869) * InMemoryTaskStore creates a copy of Task by default to make it consistent with database task stores ([#887](#887)) ([8c65e84](8c65e84)), closes [#869](#869) * merge metadata of new and old artifact when append=True ([#945](#945)) ([cc094aa](cc094aa)) * **server:** add async context manager support to EventQueue ([#743](#743)) ([f68b22f](f68b22f)) * **server:** validate presence according to `google.api.field_behavior` annotations ([#870](#870)) ([4586c3e](4586c3e)) * Simplify ActiveTask.subscribe() ([#958](#958)) ([62e5e59](62e5e59)) * Support AgentExectuor enqueue of a Task object. ([#960](#960)) ([12ce017](12ce017)) * Support Message-only simplified execution without creating Task ([#956](#956)) ([354fdfb](354fdfb)) * Unhandled exception in AgentExecutor marks task as failed ([#943](#943)) ([4fc6b54](4fc6b54)) ### Bug Fixes * Add `packaging` to base dependencies ([#897](#897)) ([7a9aec7](7a9aec7)) * **client:** do not mutate SendMessageRequest in BaseClient.send_message ([#949](#949)) ([94537c3](94537c3)) * fix `athrow()` RuntimeError on streaming responses ([#912](#912)) ([ca7edc3](ca7edc3)) * fix docstrings related to `CallContextBuilder` args in constructors and make ServerCallContext mandatory in `compat` folder ([#907](#907)) ([9cade9b](9cade9b)) * fix error handling for gRPC and SSE streaming ([#879](#879)) ([2b323d0](2b323d0)) * fix JSONRPC error handling ([#957](#957)) ([6c807d5](6c807d5)) * fix REST error handling ([#893](#893)) ([405be3f](405be3f)) * handle SSE errors occurred after stream started ([#894](#894)) ([3a68d8f](3a68d8f)) * remove the use of deprecated types from VertexTaskStore ([#889](#889)) ([6d49122](6d49122)) * Remove unconditional SQLAlchemy dependency from SDK core ([#898](#898)) ([ab762f0](ab762f0)), closes [#883](#883) * remove unused import and request for FastAPI in pyproject ([#934](#934)) ([fe5de77](fe5de77)) * replace stale entry in a2a.types.__all__ with actual import name ([#902](#902)) ([05cd5e9](05cd5e9)) * wrong method name for ExtendedAgentCard endpoint in JsonRpc compat version ([#931](#931)) ([5d22186](5d22186)) ### Documentation * add Database Migration Documentation ([#864](#864)) ([fd12dff](fd12dff)) ### Miscellaneous Chores * release 1.0.0-alpha.1 ([a61f6d4](a61f6d4)) ### Code Refactoring * **client:** make ClientConfig.push_notification_config singular ([#955](#955)) ([be4c5ff](be4c5ff)) * **client:** remove `ClientTaskManager` and `Consumers` from client ([#916](#916)) ([97058bb](97058bb)), closes [#734](#734) * **client:** reorganize ClientFactory API ([#947](#947)) ([01b3b2c](01b3b2c)) * **server:** add build_user function to DefaultContextBuilder to allow A2A user creation customization ([#925](#925)) ([2648c5e](2648c5e)) * **server:** migrate from Application wrappers to Starlette route-based endpoints for jsonrpc ([#873](#873)) ([734d062](734d062)) * **server:** migrate from Application wrappers to Starlette route-based endpoints for rest ([#892](#892)) ([4be2064](4be2064)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Introduced a compatibility layer using the culsans library to backport asyncio.Queue.shutdown functionality to Python versions older than 3.13. Previous implementation was broken (deadlocks and inconsistent behaviour with 3.13 implementation). Culsans library allowed for unified code between versions.
EventConsumer now starts a background task to gracefully wait for queue to finish.
This is one of the steps towards better concurrency model in a2a python sdk.
Fixes #869