Implement SSE streaming for message/stream and tasks/resubscribe#51
Conversation
- Flatten TaskPushNotificationConfig (absorb PushNotificationConfig fields) - Rename PushNotificationAuthenticationInfo to AuthenticationInfo (schemes -> scheme) - Add ListTasksParams, ListTasksResult, ListTasksRequest, ListTasksResponse - Add GetTaskPushNotificationConfigParams, ListTaskPushNotificationConfigResult - Update MessageSendConfiguration to use task_push_notification_config - Update A2ARequest/A2AResponse unions with new types
…rams, add SendMessageResult wrapper - Remove `final` field from `TaskStatusUpdateEvent` (not in v1 spec) - Add `tenant` field to `TaskIdParams` and `MessageSendParams` - Add `SendMessageResult` wrapper type matching v1 spec shape - Update task_manager to wrap result in `SendMessageResult`
- Route all JSON-RPC methods in _agent_run_endpoint - Return UnsupportedOperationError for message/stream and tasks/resubscribe - Return PushNotificationNotSupportedError for push notification methods - Return UnsupportedOperationError for tasks/list (not yet implemented) - Add DeleteTaskPushNotificationConfigResponse type - Remove TaskStatusUpdateEvent.final (not in v1 spec) - Add tenant field to TaskIdParams and MessageSendParams - Add SendMessageResult wrapper matching v1 spec
- Add EventBus for pub/sub task events using anyio memory streams
- Add stream_message() and resubscribe_task() to TaskManager
- Route message/stream and tasks/resubscribe to StreamingResponse
- Worker emits failed status events and closes the bus on exception
- SSE events are formatted as `data: {json}\n\n` lines
| async def resubscribe_task(self, request: ResubscribeTaskRequest) -> AsyncIterator[bytes]: | ||
| """Resubscribe to an existing task's event stream.""" | ||
| request_id = request['id'] | ||
| task_id = request['params']['id'] | ||
|
|
||
| task = await self.storage.load_task(task_id) | ||
| if task is None: | ||
| error_response = StreamMessageResponse( | ||
| jsonrpc='2.0', | ||
| id=request_id, | ||
| error=TaskNotFoundError(code=-32001, message='Task not found'), | ||
| ) | ||
| yield self._format_sse_event(error_response) | ||
| return | ||
|
|
||
| # Send current task state | ||
| initial_response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=StreamResponse(task=task)) | ||
| yield self._format_sse_event(initial_response) | ||
|
|
||
| # If task is already in a terminal state, no need to subscribe | ||
| terminal_states = {'completed', 'canceled', 'failed', 'rejected'} | ||
| if task['status']['state'] in terminal_states: | ||
| return | ||
|
|
||
| async with self.event_bus.subscribe(task_id) as receive_stream: | ||
| async for event in receive_stream: | ||
| response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=event) | ||
| yield self._format_sse_event(response) |
There was a problem hiding this comment.
🚩 Race condition in resubscribe_task between terminal state check and subscribe is safe for InMemory but fragile for distributed implementations
In fasta2a/task_manager.py:217-225, the task state is checked for terminal status at line 219, and then the event bus subscription happens at line 222. For InMemoryStorage, the task variable returned by load_task is the same mutable dict stored internally (fasta2a/storage.py:78), so the check at line 219 sees real-time state updates from the worker. Additionally, between line 219 (the if check) and line 222 (async with subscribe), there are no await points, so no context switch can occur. However, if Storage were a remote/distributed implementation (e.g., Redis, database) where load_task returns a copy, the check at line 219 would use stale data and a race between the check and the subscribe could cause the SSE stream to hang forever. Consider subscribing BEFORE checking terminal state to close this window for future implementations.
Was this helpful? React with 👍 or 👎 to provide feedback.
| @asynccontextmanager | ||
| async def subscribe(self, task_id: str) -> AsyncIterator[anyio.abc.ObjectReceiveStream[StreamResponse]]: | ||
| """Subscribe to events for a task. Yields a receive stream.""" | ||
| send_stream, receive_stream = anyio.create_memory_object_stream[StreamResponse]() |
There was a problem hiding this comment.
🚩 unbounded memory object streams could accumulate events if consumer is slow
In fasta2a/event_bus.py:28, anyio.create_memory_object_stream[StreamResponse]() is called without a max_buffer_size argument. The default buffer size for anyio memory streams is 0 (unbuffered, meaning sends block until a receiver is ready). This means if the worker emits events faster than the SSE consumer reads them, the worker's emit() call will block (apply backpressure). This is actually reasonable behavior for SSE — it prevents unbounded memory growth — but it means a slow client could block the worker from processing other tasks if the worker handles task operations sequentially (which it does, at fasta2a/worker.py:44).
Was this helpful? React with 👍 or 👎 to provide feedback.
…leanup race - Accept event_bus parameter on FastA2A, pass to TaskManager - Set streaming=True in agent card capabilities - Handle subscribe() cleanup when close() already removed subscribers
| except Exception: | ||
| await self.storage.update_task(task_operation['params']['id'], state='failed') | ||
| task_id = task_operation['params']['id'] | ||
| task = await self.storage.update_task(task_id, state='failed') | ||
| from .schema import StreamResponse, TaskStatus, TaskStatusUpdateEvent | ||
|
|
||
| await self.event_bus.emit( | ||
| task_id, | ||
| StreamResponse( | ||
| status_update=TaskStatusUpdateEvent( | ||
| task_id=task_id, | ||
| context_id=task['context_id'], | ||
| status=TaskStatus(state='failed'), | ||
| ) | ||
| ), | ||
| ) | ||
| await self.event_bus.close(task_id) |
There was a problem hiding this comment.
🔴 SSE stream hangs forever on successful task completion because event_bus.close() is never called
In Worker._handle_task_operation, event_bus.close(task_id) is only called inside the except Exception block (fasta2a/worker.py:71). When run_task completes successfully (the normal/happy path), event_bus.close() is never called. This means the receive_stream in TaskManager.stream_message (fasta2a/task_manager.py:194) and TaskManager.resubscribe_task (fasta2a/task_manager.py:223) will never be closed, causing async for event in receive_stream to block indefinitely. Every successful streaming task will result in a hung SSE connection that never terminates.
How the bug manifests end-to-end
- Client sends
message/streamrequest stream_messagesubscribes to event bus and dispatches task to broker- Worker picks up task,
run_taskcompletes successfully - No code calls
event_bus.close(task_id)on success async for event in receive_streaminstream_messageblocks forever- SSE connection hangs indefinitely
The README example worker (InMemoryWorker) also does not call event_bus.close(), confirming this is a framework-level omission rather than something subclasses are expected to handle.
Was this helpful? React with 👍 or 👎 to provide feedback.
| # If task is already in a terminal state, no need to subscribe | ||
| terminal_states = {'completed', 'canceled', 'failed', 'rejected'} | ||
| if task['status']['state'] in terminal_states: | ||
| return | ||
|
|
||
| async with self.event_bus.subscribe(task_id) as receive_stream: | ||
| async for event in receive_stream: | ||
| response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=event) | ||
| yield self._format_sse_event(response) |
There was a problem hiding this comment.
🔴 TOCTOU race in resubscribe_task causes SSE connection to hang forever
In resubscribe_task, the task state is loaded at line 203, the initial response is yielded at line 215 (context switch), and then the terminal state is checked at line 219 using the stale task dict before subscribing at line 222. Between the yield at line 215 and the subscribe at line 222, the worker (running in a separate task) can complete processing, call emit() (no subscribers yet, so events are silently dropped at event_bus.py:64), and call close() (no subscribers to close at event_bus.py:69). With any non-shared-dict Storage implementation (i.e., anything other than InMemoryStorage), the terminal state check at line 219 uses a stale snapshot and won't detect the completion, so the code proceeds to subscribe at line 222. The new subscriber will then block forever in async for event in receive_stream at line 223, since the worker has already finished and will never emit or close again.
Why InMemoryStorage accidentally masks this bug
With InMemoryStorage, load_task returns a reference to the same mutable dict object stored internally. When the worker updates the task state via update_task (which replaces task['status']), the task variable in resubscribe_task reflects the updated state because it points to the same dict. So the terminal check at line 219 would catch the completion. However, this relies on a coincidental implementation detail of InMemoryStorage and breaks with any database-backed or copy-returning Storage implementation.
Was this helpful? React with 👍 or 👎 to provide feedback.
| default_input_modes=self.default_input_modes, | ||
| default_output_modes=self.default_output_modes, | ||
| capabilities=AgentCapabilities(streaming=False, push_notifications=False), | ||
| capabilities=AgentCapabilities(streaming=True, push_notifications=False), |
There was a problem hiding this comment.
🚩 AgentCard streaming capability is now unconditionally True
At fasta2a/applications.py:108, streaming is hardcoded to True. This means the agent card always advertises streaming support regardless of whether the EventBus is actually wired up correctly or whether the broker implementation supports it. Previously this was False. Consider whether this should be conditional on whether an EventBus was explicitly provided (i.e., streaming=event_bus is not None when the parameter was passed, vs the default).
Was this helpful? React with 👍 or 👎 to provide feedback.
No more separate EventBus parameter - the broker owns it, and both TaskManager and Worker access it via self.broker.event_bus.
| @abstractmethod | ||
| async def run_task(self, params: TaskSendParams) -> None: ... |
There was a problem hiding this comment.
🚩 Worker implementations must emit their own streaming events for intermediate updates
The Worker base class (fasta2a/worker.py:44-71) only emits events via the event bus in the error path. For intermediate updates (e.g., 'working' status, artifact updates during streaming), each concrete Worker.run_task implementation must call self.broker.event_bus.emit() directly. This contract is not documented anywhere in the base class or README. The README example worker at README.md:54-76 doesn't use event_bus at all, which means it won't produce any streaming events even for the message/stream endpoint. This is a documentation/design gap that could confuse implementors.
Was this helpful? React with 👍 or 👎 to provide feedback.
Matches the Broker/InMemoryBroker pattern so users can swap in a Redis/NATS-backed EventBus for multi-process deployments.
| async def emit(self, task_id: str, event: StreamResponse) -> None: | ||
| """Emit an event to all subscribers for a task.""" | ||
| for send_stream in self._subscribers.get(task_id, []): | ||
| await send_stream.send(event) |
There was a problem hiding this comment.
🔴 Iterating over live subscriber list in emit while close or subscribe cleanup can mutate it concurrently
In InMemoryEventBus.emit() at event_bus.py:64, the code iterates directly over the live list object self._subscribers.get(task_id, []). Each iteration calls await send_stream.send(event) which is a suspension point. During that suspension, a different task (e.g., an SSE handler whose client disconnected) could run the subscribe context manager's finally block, which calls subscribers.remove(send_stream) on the same list at event_bus.py:54. Mutating a list during iteration can cause elements to be skipped. Similarly, close() at event_bus.py:69 pops the list and closes all send streams, so a concurrent emit could call send on an already-closed stream, raising ClosedResourceError. This would crash the worker's task processing.
| async def emit(self, task_id: str, event: StreamResponse) -> None: | |
| """Emit an event to all subscribers for a task.""" | |
| for send_stream in self._subscribers.get(task_id, []): | |
| await send_stream.send(event) | |
| async def emit(self, task_id: str, event: StreamResponse) -> None: | |
| """Emit an event to all subscribers for a task.""" | |
| for send_stream in list(self._subscribers.get(task_id, [])): | |
| await send_stream.send(event) | |
Was this helpful? React with 👍 or 👎 to provide feedback.
| async with self.http_client.stream( | ||
| 'POST', '/', content=content, headers={'Content-Type': 'application/json'} | ||
| ) as response: | ||
| async for line in response.aiter_lines(): | ||
| if line.startswith('data: '): | ||
| data = line[6:] | ||
| yield stream_message_response_ta.validate_json(data) |
There was a problem hiding this comment.
🟡 Client stream_message silently ignores HTTP error status codes
The stream_message method in client.py does not check the HTTP response status code before iterating over SSE lines. Unlike send_message (which calls self._raise_for_status(response) at fasta2a/client.py:67), stream_message proceeds directly to parse lines from the response body. If the server returns an HTTP error (e.g., 400, 500), the client will silently iterate over the error response body, find no data: prefixed lines, and return without yielding anything — giving the caller no indication that an error occurred.
| async with self.http_client.stream( | |
| 'POST', '/', content=content, headers={'Content-Type': 'application/json'} | |
| ) as response: | |
| async for line in response.aiter_lines(): | |
| if line.startswith('data: '): | |
| data = line[6:] | |
| yield stream_message_response_ta.validate_json(data) | |
| async with self.http_client.stream( | |
| 'POST', '/', content=content, headers={'Content-Type': 'application/json'} | |
| ) as response: | |
| self._raise_for_status(response) | |
| async for line in response.aiter_lines(): | |
| if line.startswith('data: '): | |
| data = line[6:] | |
| yield stream_message_response_ta.validate_json(data) |
Was this helpful? React with 👍 or 👎 to provide feedback.
| async def emit(self, task_id: str, event: StreamResponse) -> None: | ||
| """Emit an event to all subscribers for a task.""" | ||
| for send_stream in self._subscribers.get(task_id, []): | ||
| await send_stream.send(event) |
There was a problem hiding this comment.
🚩 InMemoryEventBus.emit iterates list while awaiting — mutation risk
In emit(), the code iterates directly over self._subscribers.get(task_id, []) and calls await send_stream.send(event) for each stream. During each await, the event loop could schedule other tasks that modify the subscriber list (e.g., a subscriber unsubscribing). In the current InMemoryBroker setup, the worker processes tasks sequentially in _loop (fasta2a/worker.py:40-42), making this unlikely. However, a custom broker or concurrent task handling could trigger a RuntimeError: list modified during iteration. A snapshot (list(...)) before iteration would be safer.
Was this helpful? React with 👍 or 👎 to provide feedback.
|
Any chance to get a release that includes this? |
The bridge and its tests were ported against fasta2a's pre-v1 schema (discriminated-union `Part` with a `kind` field, `Message` with `kind`). PRs datalayer#46-datalayer#51 on main collapsed `Part` into a single flat TypedDict, dropped the `kind` discriminators, and wrapped the `message/send` result in a `{task: Task}` envelope. - `_bridge.py`: emit/parse flat `Part` (`text`/`raw`/`url`/`data` detected by key presence instead of `kind`); drop `kind='message'`. - `test_pydantic_ai.py`: construct flat `Part`/`Message`; unwrap the task from `message/send` responses via `task_from_response`; regenerate snapshots. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
EventBusclass (event_bus.py) for pub/sub task events using anyio memory streamsstream_message()andresubscribe_task()toTaskManager, yielding SSE-formatteddata: {json}\n\neventsmessage/streamandtasks/resubscribetoStreamingResponsewithtext/event-streamcontent typeEventBusfromfasta2a.__init__Workers can emit streaming events by calling
self.event_bus.emit(task_id, StreamResponse(...))and signal completion withself.event_bus.close(task_id).Test plan
🤖 Generated with Claude Code