Skip to content

[Live] Multiple responses after agent transfer and repeat response on session resumption #3395

@GrahamGGreig

Description

@GrahamGGreig

Issues

Noticed two issues when running Vertex AI Live agents in with both Audio input and text input (audio out only).

1.) Using an architecture with a parent agent and sub agent where both have a callback context tool. Whenever the user transfers to an agent, if they then call a tool, the agent will respond N + 1 times where N is the number of transfers that have happened in the current conversation (e.g., root_agent -> sub_agent -> root_agent -> tool_call: Will respond 3 times). Depending on latency, the audio will fully play fully or the latter responses will cut off the audio of the earlier ones. The transcription will always output all of the responses in full.

2.) When reconnecting using SessionResumption, if an agent transfer has taken place, only the conversation history up to the agent transfer is sent. Since this is a user turn, the agent will then respond answering the query that caused the agent transfer. This only happens with SessionResumption and does not happen when manually resetting the session. Still happens after transferring back to the root agent.

Not sure if these are both Gemini model issues or ADK issues.

Code

Basic Code For replication is here. Would need to be hooked up to a front end. Running through routes that have been added to ADKs get_fast_api_app(). All built off the SSE example available on the ADK docs website (or was it looks to have been taken down now).

Agents

def get_root_agent():
    return LlmAgent(
        name="root_agent",
        model= "gemini-live-2.5-flash", # also: "gemini-live-2.5-flash-preview-native-audio-09-2025"
        description="The root agent",
        instruction="deafault instruction",
        tools=[get_math_tool()]
    )
    
def get_sub_agent():
    return LlmAgent(
        name="sub_agent",
        model= "gemini-live-2.5-flash", # also: "gemini-live-2.5-flash-preview-native-audio-09-2025"
        description="Used whenever the user says they want to talk to the sub agent.",
        instruction="default instruction",
        tools=[get_math_tool()]
    )
    
def get_agent():
    root_agent = get_root_agent()
    sub_agent = get_sub_agent()

    root_agent.sub_agents = [sub_agent]
    sub_agent.parent_agent = root_agent

    return root_agent

Tool

def get_math_tool():
    return FunctionTool(func=solve_math)


def solve_math(
    tool_context: Optional[ToolContext] = None,
) -> int:
    return 2 + 2


def get_description():
    return "This tool is uesed to get the answer to the math problem"


# Set the docstring for the function (required by ADK)
solve_math.__doc__ = get_description()
NAME = solve_math.__name__

Setup

async def start_agent_session(
    app_name: str, user_id: str, session_id: str,
) -> Tuple[AsyncGenerator, LiveRequestQueue]:

    agent = get_agent()
    session_service = DatabaseSessionService("sqlite:///./sessions.db")

    session = await session_service.get_session(
        app_name=app_name, user_id=user_id, session_id=session_id
    )

    runner = BiDiRunner(
        app_name=app_name,
        agent=agent,
        session_service=session_service,
    )

    run_config = RunConfig(
        streaming_mode=StreamingMode.BIDI,
        response_modalities=[types.Modality.AUDIO],
        speech_config=types.SpeechConfig(
            voice_config=types.VoiceConfig(
                prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=VOICE_NAME)
            )
        ),
        output_audio_transcription=types.AudioTranscriptionConfig(),
        input_audio_transcription=types.AudioTranscriptionConfig(),
        session_resumption=types.SessionResumptionConfig(),
        save_live_audio=False,
    )

    live_request_queue = LiveRequestQueue()

    live_events = runner.run_live(
        session=session,
        live_request_queue=live_request_queue,
        run_config=run_config,
    )

    return live_events, live_request_queue

Communication

async def agent_to_client_sse(
    live_events: AsyncGenerator, current_session_id: str
) -> AsyncGenerator[str, None]:
    try:

      async for event in live_events:

          # Check for turn completion or interruption
          if event.turn_complete or event.interrupted:
              if event.interrupted:
                  message = {
                      "type": "interrupted",
                      "data": "Response interrupted by user input",
                  }
                  yield f"data: {json.dumps(message)}\n\n"

              if event.turn_complete:
                  message = {
                      "type": "turn_complete",
                      "session_id": current_session_id,
                  }
                  yield f"data: {json.dumps(message)}\n\n"
              continue

          if (
              hasattr(event, "session_resumption_update")
              and event.session_resumption_update
          ):
              update = event.session_resumption_update

              if update.resumable and update.new_handle:
                  current_session_id = update.new_handle
                  message = {"type": "session_id", "data": current_session_id}
                  yield f"data: {json.dumps(message)}\n\n"

          # Handle content
          if event.content and event.content.parts:
              for part in event.content.parts:
                  if hasattr(part, "inline_data") and part.inline_data:
                      if part.inline_data.mime_type.startswith("audio/pcm"):
                          audio_data = part.inline_data.data
                          if audio_data:
                              message = {
                                  "type": "audio",
                                  "data": base64.b64encode(audio_data).decode(
                                      "ascii"
                                  ),
                              }
                              yield f"data: {json.dumps(message)}\n\n"
                              continue

          if event.output_transcription:
              output_texts.append(event.output_transcription.text)
              message = {
                  "type": "text",
                  "role": "model",
                  "data": event.output_transcription.text,
              }
              yield f"data: {json.dumps(message)}\n\n"

          if event.input_transcription:
              message = {
                  "type": "text",
                  "role": "user",
                  "data": event.input_transcription.text,
              }
              yield f"data: {json.dumps(message)}\n\n"
              input_texts.append(event.input_transcription.text)

    except Exception as e:
        import traceback
        traceback.print_exc()
        error_message = {"type": "error", "data": f"Stream error: {str(e)}"}
        yield f"data: {json.dumps(error_message)}\n\n"
        
        
async def process_client_message(
    message_data: Dict[str, Any], live_request_queue: LiveRequestQueue, session_id: str
) -> bool:  
    try:
        msg_type = message_data.get("type", "")
        if msg_type == "audio":
            data_b64 = message_data.get("data", "")
            if not data_b64:
                logger.warning("Empty audio payload; dropping")
                return False
            try:
                audio_bytes = base64.b64decode(data_b64, validate=True)
            except Exception as de:
                logger.warning(f"Invalid base64 audio payload: {de}; dropping")
                return False
            if len(audio_bytes) == 0 or (len(audio_bytes) % 2) != 0:
                return False
            live_request_queue.send_realtime(
                types.Blob(
                    data=audio_bytes,
                    mime_type=f"audio/pcm;rate=16000",
                )
            )
            return True

        elif msg_type == "text" or message_data.get("mime_type") == "text/plain":
            text_content = message_data.get("data", "").strip()
            if text_content:
                content = types.Content(
                    role="user",
                    parts=[types.Part.from_text(text=text_content)],
                )
                live_request_queue.send_content(content=content)

        else:
            return False

    except Exception as e:
        import traceback

        traceback.print_exc()
        return False

Routes

@app.get(
        path="/apps/{app_name}/users/{user_id}/sessions/{session_id}/events",
    )
    async def sse_endpoint(
        app_name: str, user_id: str, session_id: str, is_audio: str = "false"
    ):
        try:
            live_events, live_request_queue = await start_agent_session(
                app_name, user_id, session_id, is_audio=(is_audio.lower() == "true")
            )

            active_sessions[user_id + session_id] = live_request_queue

            logger.info(f"Client #{user_id} connected via SSE, audio mode: {is_audio}")

            def cleanup():
                active_id = user_id + session_id
                try:
                    live_request_queue.close()
                    if active_id in active_sessions:
                        del active_sessions[active_id]
                    logger.info(f"Client #{active_id} disconnected from SSE")
                except Exception as e:
                    logger.error(f"Error cleaning up session for {active_id}: {e}")

            async def event_generator():
                session_info = {
                    "type": "session_id",
                    "data": session_id,
                }
                session_id_message = json.dumps(session_info)
                yield f"data: {session_id_message}\n\n"

                try:
                    async for data in agent_to_client_sse(live_events, session_id):
                        yield data
                except Exception as e:
                    logger.error(
                        f"Error in SSE stream for userId: {user_id}, session_id: {session_id} : {e}"
                    )
                finally:
                    cleanup()

            return StreamingResponse(
                event_generator(),
                media_type="text/event-stream",
                headers={
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                    "X-Accel-Buffering": "no",
                    "Access-Control-Allow-Origin": "*",
                    "Access-Control-Allow-Headers": "Cache-Control",
                },
            )

        except Exception as e:
            logger.error(f"Failed to create SSE session for {user_id}: {e}")
            raise HTTPException(status_code=500, detail="Failed to create session")


    @app.post(
        path="/apps/{app_name}/users/{user_id}/sessions/{session_id}/send",
    )
    async def send_message_endpoint(
        app_name: str, user_id: str, session_id: str, request: Request
    ):

        try:
            live_request_queue = active_sessions.get(user_id + session_id)
            if not live_request_queue:
                raise HTTPException(status_code=404, detail="Session not found")

            message = await request.json()

            if message.get("type") == "end":
                live_request_queue.close()
                return {"status": "session ended"}

            success = await process_client_message(
                message, live_request_queue, session_id
            )

            if not success:
                raise HTTPException(status_code=400, detail="Failed to process message")

            return {"status": "sent"}

        except HTTPException:
            raise
        except Exception as e:
            logger.error(f"Error processing message for {user_id}: {e}")
            raise HTTPException(status_code=500, detail="Internal server error")

To Reproduce

Issue 1:

  1. Transfer between two bidi agents using the gemini models above through Vertex AI API
  2. Get the agent to call a function
  3. See multiple texts returned.

Issue 2:

  1. Transfer between two bidi agents using the gemini models above through Vertex AI API
  2. Send other messages to the ub agent if you like.
  3. Wait for session resumption
  4. Hear agent respond without prompt.
  5. Print the history of llm_connection.send_history() - (Line 135 of base_llm_flow.py) - during session resumption to see last call is "user" and an agent transfer.

Desktop (please complete the following information):

  • OS: Windows (WSL Dev Container)
  • Python version(python -V): 3.13.7
  • ADK version(pip show google-adk): 1.17.0

Model Information:

  • Which model is being used: "gemini-live-2.5-flash" or "gemini-live-2.5-flash-preview-native-audio-09-2025"

Metadata

Metadata

Assignees

Labels

live[Component] This issue is related to live, voice and video chat

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions