feat: Implement MainEventBus architecture for event queue processing#611
feat: Implement MainEventBus architecture for event queue processing#611ehsavoie merged 9 commits intoa2aproject:mainfrom
Conversation
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the core event processing architecture to introduce a robust, centralized event bus. The primary goal is to ensure data consistency and prevent race conditions by enforcing a strict 'persistence-before-distribution' model for all events. This change significantly impacts how events are handled internally, from agent output to client consumption, and includes improvements to thread management and API streaming behavior. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
The pull request introduces a new MainEventBus architecture for centralized event processing, ensuring event persistence before client visibility and eliminating race conditions. This is a significant architectural improvement. The changes refactor event handling across ReplicatedQueueManager, EventQueue, EventConsumer, and DefaultRequestHandler to integrate with the new MainEventBus and MainEventBusProcessor. SSE handling in A2AServerRoutes has also been refactored for better separation of concerns and robustness, including critical buffering controls. Thread pool configuration for agent execution has been improved with bounded queues and core thread timeouts. Many tests have been updated to reflect the asynchronous nature of the new event processing and use more robust synchronization mechanisms like callbacks and latches. Overall, the changes are well-structured and address important architectural and concurrency challenges.
server-common/src/main/java/io/a2a/server/events/EventQueue.java
Outdated
Show resolved
Hide resolved
fa1d994 to
1777156
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant architectural refactoring by implementing a MainEventBus for centralized event processing. This new architecture ensures that events are persisted to the TaskStore before they are distributed to client-facing ChildQueues, effectively eliminating race conditions and guaranteeing data consistency. The changes involve introducing new components like MainEventBus, MainEventBusProcessor, and SseFormatter, along with substantial modifications to EventQueue, QueueManager, and DefaultRequestHandler to integrate with this new event-driven model. The AsyncExecutorProducer has been improved with a bounded queue and core thread timeout to prevent thread pool exhaustion, and a dedicated EventConsumerExecutor has been added for I/O-bound polling, enhancing overall system stability and performance. Test cases have been updated to properly synchronize with the asynchronous nature of the new event processing, ensuring robust validation of the new flow. Overall, this is a well-executed and critical improvement to the system's event handling and reliability.
03f2499 to
0f51071
Compare
|
/gemini review |
…replication race conditions Introduce centralized event bus with single background processor to guarantee events persist to TaskStore before distribution to clients. This eliminates race conditions when multiple concurrent requests update the same task. **Key Components:** - MainEventBus: Shared BlockingQueue for all MainQueue events - MainEventBusProcessor: Single background thread for ordered processing - Processing sequence: TaskStore.save() → PushNotifications → distributeToChildren() - TaskStateProvider interface: Task state queries for queue lifecycle management **Event Flow:** ``` AgentExecutor → MainQueue → MainEventBus → MainEventBusProcessor → TaskStore (persist first) → Push Notifications → ChildQueues → Clients ``` **Benefits:** - Events persist before clients receive them (no stale data) - Serial processing prevents concurrent TaskStore updates - Platform-agnostic ChildQueue synchronization (works across gRPC/JSONRPC/REST) - Clean separation: MainQueue (no local queue) vs ChildQueue (local queue for clients) Implement two-level protection to keep MainQueues open for fire-and-forget tasks and late resubscriptions while cleaning up finalized tasks: **Level 1** - Cleanup Callback: Check TaskStateProvider.isTaskFinalized() before removing queue from QueueManager map **Level 2** - Auto-Close Prevention: MainQueue.childClosing() checks finality before closing when last ChildQueue disconnects **Result:** Non-final tasks keep queues open for resubscription; finalized tasks clean up immediately ReplicatedQueueManager.onTaskFinalized() sent full Task objects to remote instances via Kafka, while local instances sent TaskStatusUpdateEvent. Client auto-close logic only checked for TaskStatusUpdateEvent.isFinal(), causing connection leaks on remote instances. **ReplicatedQueueManager.onTaskFinalized():** Convert Task to TaskStatusUpdateEvent before sending to Kafka, ensuring consistent event types across all instances **EventConsumer:** Add 50ms delay before tube.complete() to allow SSE buffer flush in replicated scenarios where events arrive via Kafka with timing variations **SSEEventListener (JSONRPC):** Check both TaskStatusUpdateEvent.isFinal() and Task.status().state().isFinal() for auto-close **RestSSEEventListener (REST):** Add complete auto-close logic (was missing entirely) **Benefits:** - Handles late subscriptions to completed tasks gracefully - Prevents connection leaks in all scenarios - Consistent behavior across JSONRPC and REST transports - Defensive programming for edge cases **MultiInstanceReplicationTest:** - Add TaskEvent handling and container log dumping on failure - Verify both APP1 and APP2 receive all events including final states - Test late-arriving replicated events and poison pill ordering **Integration Tests:** - EventConsumerTest: Grace period mechanism for replicated scenarios - ReplicatedQueueManagerTest: Event type conversion validation - All existing tests updated for MainEventBus architecture **EventQueue.Builder:** Now requires MainEventBus parameter (validates non-null) **QueueManager Implementations:** Must handle TaskStateProvider for lifecycle checks Existing code continues to work - InMemoryQueueManager and ReplicatedQueueManager automatically inject MainEventBus via CDI. Custom QueueManager implementations should inject MainEventBus and pass to EventQueue.Builder. **Core Architecture:** - server-common/.../events/MainEventBus.java (new) - server-common/.../events/MainEventBusProcessor.java (new) - server-common/.../events/EventQueue.java (requires MainEventBus) - server-common/.../events/InMemoryQueueManager.java (queue lifecycle) **Replication:** - extras/queue-manager-replicated/core/.../ReplicatedQueueManager.java (event conversion) - extras/queue-manager-replicated/core/.../ReplicatedEventQueueItem.java (Task support) **Client Transports:** - client/transport/jsonrpc/.../SSEEventListener.java (enhanced auto-close) - client/transport/rest/.../RestSSEEventListener.java (add auto-close) **Event Processing:** - server-common/.../events/EventConsumer.java (grace period + buffer flush) - server-common/.../requesthandlers/DefaultRequestHandler.java (MainEventBus integration) **Task Management:** - server-common/.../tasks/TaskStore.java (TaskStateProvider interface) - extras/task-store-database-jpa/.../JpaDatabaseTaskStore.java (implement TaskStateProvider) ✅ All unit tests pass (150+ tests) ✅ MultiInstanceReplicationTest passes (both instances receive all events) ✅ TCK tests pass (no connection leaks) ✅ Integration tests pass (EventConsumer, QueueManager, TaskStore)
d4a03d8 to
46f7691
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-executed architectural refactoring to centralize event processing using a MainEventBus. The changes correctly enforce a 'persist-then-distribute' pattern, which enhances data consistency and eliminates potential race conditions. The introduction of a dedicated executor for event consumers and improvements to the agent executor's thread pool configuration are excellent changes that will improve the system's robustness and scalability under high load. The handling of client disconnects and support for fire-and-forget tasks are also notable improvements. My review includes one comment regarding a minor inconsistency in code comments to improve clarity for future maintenance. Overall, this is a high-quality contribution that greatly improves the core eventing architecture.
server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
Outdated
Show resolved
Hide resolved
...ed/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
Outdated
Show resolved
Hide resolved
| // Client-side auto-close on final events to prevent connection leaks | ||
| // Handles both TaskStatusUpdateEvent and Task objects with final states | ||
| // This covers late subscriptions to completed tasks and ensures no connection leaks | ||
| boolean shouldClose = false; |
There was a problem hiding this comment.
This seems to be duplicate of JSONRPC own update
There was a problem hiding this comment.
@ehsavoie Not sure what you mean here? Won't the client be using either this or JSONRPC (or gRPC)?
Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
Updating test using a CyclicBarrier for better synchronization
Introduces centralized event processing with single background thread to
guarantee event persistence before client visibility and eliminate race
conditions in concurrent task updates.
Key Changes:
(TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren())
to prevent premature cleanup for fire-and-forget tasks
persistence completes before returning to client
for graceful drain when agent completes
prevents exhaustion during high concurrency
Null TaskId Support:
Additional Fixes:
tags around
Architecture Impact: