Skip to content

Conversation

@onematchfox
Copy link
Contributor

Description

This callback shouldn't result in exceptions being raised. From docs on .exception():

The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn't done yet, raises InvalidStateError.

Currently, if a task has been cancelled, exceptions are thrown. E.g. the following error was observed when used with google-adk

ERROR:asyncio:Exception in callback EventConsumer.agent_task_callback() at /app/python/.venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py:153
handle: <Handle EventConsumer.agent_task_callback() at /app/python/.venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py:153>
Traceback (most recent call last):
  File "/app/python/.venv/lib/python3.13/site-packages/anyio/streams/memory.py", line 111, in receive
    return self.receive_nowait()
           ~~~~~~~~~~~~~~~~~~~^^
  File "/app/python/.venv/lib/python3.13/site-packages/anyio/streams/memory.py", line 106, in receive_nowait
    raise WouldBlock
anyio.WouldBlock

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/python/.local/share/uv/python/cpython-3.13.5-linux-x86_64-gnu/lib/python3.13/asyncio/events.py", line 89, in _run
    self._context.run(self._callback, *self._args)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/python/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 202, in sync_wrapper
    result = func(*args, **kwargs)
  File "/app/python/.venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 163, in agent_task_callback
    if agent_task.exception() is not None:
       ~~~~~~~~~~~~~~~~~~~~^^
  File "/app/python/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/python/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 172, in _run_event_stream
    await self.agent_executor.execute(request, queue)
  File "/app/python/packages/kagent-adk/src/kagent_adk/_agent_executor.py", line 124, in execute
    await self._handle_request(context, event_queue, runner)
  File "/app/python/packages/kagent-adk/src/kagent_adk/_agent_executor.py", line 188, in _handle_request
    async for adk_event in runner.run_async(**run_args):
    ...<4 lines>...
            await event_queue.enqueue_event(a2a_event)
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/runners.py", line 233, in run_async
    async for event in self._exec_with_plugin(
    ...<2 lines>...
      yield event
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/runners.py", line 273, in _exec_with_plugin
    async for event in execute_fn(invocation_context):
    ...<6 lines>...
      yield (modified_event if modified_event else event)
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/runners.py", line 230, in execute
    async for event in ctx.agent.run_async(ctx):
      yield event
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/agents/base_agent.py", line 209, in run_async
    async for event in self._run_async_impl(ctx):
      yield event
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/agents/llm_agent.py", line 283, in _run_async_impl
    async for event in self._llm_flow.run_async(ctx):
      self.__maybe_save_output_to_state(event)
      yield event
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 290, in run_async
    async for event in self._run_one_step_async(invocation_context):
      last_event = event
      yield event
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 322, in _run_one_step_async
    async for event in self._postprocess_async(
    ...<5 lines>...
      yield event
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 407, in _postprocess_async
    async for event in self._postprocess_handle_function_calls_async(
    ...<2 lines>...
      yield event
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/base_llm_flow.py", line 482, in _postprocess_handle_function_calls_async
    if function_response_event := await functions.handle_function_calls_async(
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        invocation_context, function_call_event, llm_request.tools_dict
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ):
    ^
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/functions.py", line 179, in handle_function_calls_async
    function_response = await __call_tool_async(
                        ^^^^^^^^^^^^^^^^^^^^^^^^
        tool, args=function_args, tool_context=tool_context
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/flows/llm_flows/functions.py", line 474, in __call_tool_async
    return await tool.run_async(args=args, tool_context=tool_context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/tools/base_authenticated_tool.py", line 93, in run_async
    return await self._run_async_impl(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/tools/mcp_tool/mcp_session_manager.py", line 128, in wrapper
    return await func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/python/.venv/lib/python3.13/site-packages/google/adk/tools/mcp_tool/mcp_tool.py", line 133, in _run_async_impl
    response = await session.call_tool(self.name, arguments=args)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/python/.venv/lib/python3.13/site-packages/mcp/client/session.py", line 293, in call_tool
    result = await self.send_request(
             ^^^^^^^^^^^^^^^^^^^^^^^^
    ...<12 lines>...
    )
    ^
  File "/app/python/.venv/lib/python3.13/site-packages/mcp/shared/session.py", line 272, in send_request
    response_or_error = await response_stream_reader.receive()
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/python/.venv/lib/python3.13/site-packages/anyio/streams/memory.py", line 119, in receive
    await receive_event.wait()
  File "/app/python/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 1774, in wait
    await self._event.wait()
  File "/app/python/.local/share/uv/python/cpython-3.13.5-linux-x86_64-gnu/lib/python3.13/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError: Cancelled by cancel scope 72d19ee594f0

@onematchfox onematchfox requested a review from a team as a code owner August 5, 2025 13:19
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @onematchfox, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a specific issue where an asyncio.exceptions.CancelledError was being raised in the event consumer's task callback when attempting to retrieve an exception from a cancelled task. The change refines the condition for checking task completion and exceptions, ensuring that only truly completed and non-cancelled tasks are processed for their exceptions, thereby improving the robustness of the asynchronous task handling.

Highlights

  • Graceful Exception Handling: Modified the agent_task_callback in event_consumer.py to prevent asyncio.exceptions.CancelledError when checking for exceptions on asyncio.Task objects that have been cancelled. The logic now explicitly checks if a task is done and not cancelled before attempting to retrieve its exception.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request fixes a potential CancelledError in the event consumer's task callback by checking if the task is cancelled or not done before accessing its exception. The review suggests calling cancelled and done as methods.

@onematchfox onematchfox force-pushed the fix-event-consumer-task-callback branch from 6c6e034 to b7415aa Compare August 5, 2025 13:25
@onematchfox onematchfox marked this pull request as draft August 5, 2025 13:29
@onematchfox onematchfox force-pushed the fix-event-consumer-task-callback branch from b7415aa to 783c466 Compare August 5, 2025 13:39
@onematchfox onematchfox marked this pull request as ready for review August 5, 2025 13:42
@holtskinner holtskinner enabled auto-merge (squash) August 7, 2025 14:44
@holtskinner holtskinner merged commit 2508a9b into a2aproject:main Aug 7, 2025
5 checks passed
holtskinner added a commit that referenced this pull request Aug 13, 2025
🤖 I have created a release *beep* *boop*
---


##
[0.3.1](v0.3.0...v0.3.1)
(2025-08-13)


### Features

* Add agent card as a route in rest adapter
([ba93053](ba93053))


### Bug Fixes

* gracefully handle task exceptions in event consumer
([#383](#383))
([2508a9b](2508a9b))
* openapi working in sub-app
([#324](#324))
([dec4b48](dec4b48))
* Pass `message_length` param in `get_task()`
([#384](#384))
([b6796b9](b6796b9))
* relax protobuf dependency version requirement
([#381](#381))
([0f55f55](0f55f55))
* Use HasField for simple message retrieval for grpc transport
([#380](#380))
([3032aa6](3032aa6))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

---------

Co-authored-by: Holt Skinner <13262395+holtskinner@users.noreply.github.com>
EItanya pushed a commit to kagent-dev/kagent that referenced this pull request Aug 27, 2025
Not exactly sure what the process is here - so feel free to let me know
if I've missed anything. Mostly interested in the `a2a-sdk` update, as
[`v0.3.1`](https://github.com/a2aproject/a2a-python/releases/tag/v0.3.1)
contains [a fix](a2aproject/a2a-python#383) I
put through to resolve context cancellation issues that have been
plaguing me when testing multi-agent workflows. I did scan the release
notes of both projects for breaking changes, and I don't think any of
them affected this project's usage of the libraries. Ran the default set
of agents locally, and everything seems to work fine.

Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com>
yanivmn pushed a commit to yanivmn/kagent that referenced this pull request Aug 28, 2025
)

Not exactly sure what the process is here - so feel free to let me know
if I've missed anything. Mostly interested in the `a2a-sdk` update, as
[`v0.3.1`](https://github.com/a2aproject/a2a-python/releases/tag/v0.3.1)
contains [a fix](a2aproject/a2a-python#383) I
put through to resolve context cancellation issues that have been
plaguing me when testing multi-agent workflows. I did scan the release
notes of both projects for breaking changes, and I don't think any of
them affected this project's usage of the libraries. Ran the default set
of agents locally, and everything seems to work fine.

Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com>
holtskinner pushed a commit that referenced this pull request Sep 3, 2025
# Issue

* Client disconnect triggered synchronous cleanup.
* That awaited the producer task, effectively tying producer lifetime to
the client connection.
* Reconnecting with `tasks/resubscribe` would not receive further events
because the producer had already been forced to finish.

This behaviour no longer raises a `asyncio.exceptions.CancelledError`
like claimed in #296 due to this fix: #383, but `tasks/resubscribe`
still didn't behave as expected.

# How it's reproduced

In any streaming agent: Simply sending a (longer-running)
`message/stream`, disconnecting, and then reconnecting to the task using
`tasks/resubscribe` will no longer yield events, even though the task
should have been still running.


# Fix

## Code

The fix is an one-liner. Now: 
* Client disconnect schedules cleanup in the background and returns
immediately.
* Producer continues; resubscribe taps the existing queue and receives
subsequent events.
* Cleanup still runs once the producer completes.

## Tests

**Existing tests:**
* Changed existing tests that asserted on `AgentExecutor.execute` by
adding an `asyncio.Event` latch to wait until the background producer
hits `execute`.

**New tests:**
* `test_stream_disconnect_then_resubscribe_receives_future_events` --
start streaming, disconnect, resubscribe, and confirm future events are
received.
*
`test_on_message_send_stream_client_disconnect_triggers_background_cleanup_and_producer_continues`
-- to validate that disconnecting is non-blocking, producer continues,
and cleanup completes afterward.


Fixes #296
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants