In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Getting Started with Live API on Agent Engine

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb">
      <img width="32px" src="https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fagents%2Fagent_engine%2Ftutorial_get_started_with_live_api_on_agent_engine.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb">
      <img src="https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg" alt="Vertex AI logo"><br> Open in Vertex AI Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb">
      <img width="32px" src="https://www.svgrepo.com/download/217753/github.svg" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

<div style="clear: both;"></div>

<b>Share to:</b>

<a href="https://www.linkedin.com/sharing/share-offsite/?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/8/81/LinkedIn_icon.svg" alt="LinkedIn logo">
</a>

<a href="https://bsky.app/intent/compose?text=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/7/7a/Bluesky_Logo.svg" alt="Bluesky logo">
</a>

<a href="https://twitter.com/intent/tweet?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/5a/X_icon_2.svg" alt="X logo">
</a>

<a href="https://reddit.com/submit?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb" target="_blank">
  <img width="20px" src="https://redditinc.com/hubfs/Reddit%20Inc/Brand/Reddit_Logo.png" alt="Reddit logo">
</a>

<a href="https://www.facebook.com/sharer/sharer.php?u=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/agents/agent_engine/tutorial_get_started_with_live_api_on_agent_engine.ipynb" target="_blank">
  <img width="20px" src="https://upload.wikimedia.org/wikipedia/commons/5/51/Facebook_f_logo_%282019%29.svg" alt="Facebook logo">
</a>

| Author(s) |
| --- |
| Hanfei Sun, Huang Xia, [Ivan Nardini](https://github.com/inardini) |

## Overview

This tutorial demonstrates how to build, deploy, and interact with **bidirectional streaming agents** using **Vertex AI Agent Engine** and the **Live API**.

Bidirectional (bidi) streaming represents a paradigm shift in AI interactions. Instead of the traditional request-response pattern, it enables **real-time, two-way communication** where both user and AI can exchange data simultaneously.

Agent Engine is Google's fully-managed, serverless platform for running AI agents at scale. Combined with the Live API's real-time streaming capabilities, you can create agents that engage in natural, fluid conversations with support for text, audio, and video.

In this tutorial, you will:

* **Build** three different types of streaming agents with increasing complexity
* **Understand** bidirectional streaming patterns and when to use each one
* **Deploy** agents to Agent Engine with a single command
* **Interact** with agents
* **Implement** real-time audio conversations

## Get started

### Install Google Gen AI SDK and other required packages

First, install the necessary packages for building streaming agents with Live API support.


In [None]:
%pip install --upgrade --quiet --force-reinstall git+https://github.com/googleapis/python-aiplatform.git
%pip install --upgrade --quiet numpy websockets google-adk

### Authenticate your notebook environment (Colab only)

If you're running this notebook on Google Colab, run the cell below to authenticate your environment.

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
# Use the environment variable if the user doesn't provide Project ID.
import os
import vertexai

PROJECT_ID = "[your-project-id]"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "global")

BUCKET_NAME = "[your-bucket-name]"  # @param {type: "string", placeholder: "[your-bucket-name]", isTemplate: true}
if not BUCKET_NAME or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID

BUCKET_URI = f"gs://{BUCKET_NAME}"

MODEL_ID = "gemini-2.0-flash-live-preview-04-09"

# Set enviroment variables for ADK
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "1"

# Create the staging bucket for Agent Engine
!gsutil mb -p $PROJECT_ID -l $LOCATION $BUCKET_URI

# Initialize Vertex AI
vertexai.init(project=PROJECT_ID, location=LOCATION)
client = vertexai.Client(project=PROJECT_ID, location=LOCATION)

print(f"Project ID: {PROJECT_ID}")
print(f"Location: {LOCATION}")
print(f"Staging Bucket: {BUCKET_URI}")
print(f"Model ID: {MODEL_ID}")

### Import libraries

Here we import all the necessary Python classes and functions we'll use throughout the tutorial. The imports are organized by category for clarity.


In [None]:
# Standard library
import os
import asyncio
import json
import base64
import time
from typing import Any, Dict, AsyncIterator, Iterator

# Google Cloud and Vertex AI
import vertexai
from vertexai.preview.reasoning_engines import AdkApp
from vertexai import types as vertexai_types
import google.auth
import google.auth.transport.requests

# ADK (Agent Development Kit)
from google.adk import LlmAgent
from google.adk.tools.tool_context import ToolContext
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types

# Audio and WebSocket handling
import numpy as np
import websockets

# Notebook display utilities
from IPython.display import Audio, display
import logging; logging.getLogger().setLevel(logging.INFO)

## Building your first Echo Agent

Before start using the Live API, let's start with a simple echo agent to understand the fundamental patterns of Agent Engine. This agent will demonstrate the three types of interactions supported: standard query, streaming query, and bidirectional streaming.

### Understanding Agent Structure

Every Agent Engine agent follows a specific pattern with three key methods:

1. **Init method**: This must be lightweight and pickle-able (serializable), so we only store simple config values.
2. **Setup method**: This is where heavy initialization (like creating database clients or loading models) goes. Agent Engine calls this once when the serverless container spins up.
3. **Query methods**: Handle different interaction patterns. In this case, query, stream_query, bidi_stream_query.
    *  query(...): The classic, synchronous request-response pattern. It takes one input and returns one complete output.
    *  stream_query(...): The server-side streaming pattern. It takes one input but returns a Python generator (yield), allowing it to stream multiple chunks back to the client. This is perfect for streaming LLM text responses.
    *  bidi_stream_query(...): The new full bidirectional streaming pattern. This is an async method that receives an asyncio.Queue. It runs in a while True loop, waiting for messages to arrive on the queue (await queue.get()) and using yield to send responses back at any time. This enables true two-way conversation and is the pattern the Live API is built on.

Finally the register_operations(...) is the agent's "routing table." It tells Agent Engine which class method to call for which endpoint.

### Create the Echo Agent

Let's implement a simple echo agent that demonstrates these interaction patterns supported by Agent Engine.


In [None]:
class EchoAgent:
    """
    A simple echo agent demonstrating Agent Engine patterns.

    This agent showcases three interaction patterns:
    1. Standard query (request-response)
    2. Streaming query (server-side streaming)
    3. Bidirectional streaming (two-way communication)
    """

    def __init__(self, project: str, location: str):
        """
        Initialize configuration (keep lightweight and pickle-able).

        Important: Only store simple data types here.
        Heavy initialization should go in set_up().
        """
        self.project = project
        self.location = location

    def set_up(self):
        """
        Perform heavy initialization (called once when deployed).

        This is where you'd initialize:
        - Database connections
        - Model loading
        - External service clients
        """
        import logging
        self.logger = logging.getLogger(__name__)
        self.logger.info("Echo Agent ready!")

    def query(self, input: str) -> Dict[str, Any]:
        """
        Standard query - returns complete response.

        This is the simplest pattern: receive input, return output.
        Use this for quick, synchronous operations.
        """
        return {
            "input": input,
            "output": f"Echo: {input}",
            "method": "query"
        }

    def stream_query(self, input: str) -> Iterator[Dict[str, Any]]:
        """
        Streaming query - yields response chunks.

        Use this pattern when you want to:
        - Stream partial results as they become available
        - Handle large responses that shouldn't be buffered
        - Provide progress updates during long operations
        """
        import time

        words = input.split()

        # Agent Engine expects each yield to be a complete, serializable response
        for i, word in enumerate(words):
            # Each chunk should be a complete response object
            chunk_response = {
                "chunk": f"{word} ",
                "progress": f"{i+1}/{len(words)}"
            }

            yield chunk_response

            # Simulate processing time
            await asyncio.sleep(0.1)

    async def bidi_stream_query(
        self,
        queue: asyncio.Queue
    ) -> AsyncIterator[Dict[str, Any]]:
        """
        Bidirectional streaming for continuous conversation.

        This enables:
        - Real-time, two-way communication
        - User interruptions mid-response
        - Long-running interactive sessions
        """
        self.logger.info("Bidi session started")

        while True:
            # Wait for message from the queue
            message = await queue.get()
            user_input = message.get("input", "")

            # Check for exit command
            if user_input.lower() in ("exit", "quit"):
                yield {"output": "Goodbye!"}
                break

            # Echo back the input
            # In a real agent, this is where you'd process the input
            yield {"output": f"Echo: {user_input}"}

    def register_operations(self):
        """
        Register available operations with Agent Engine.

        This tells Agent Engine which methods are available
        and how to route requests to them.
        """
        return {
            "": ["query"],                        # Default endpoint
            "stream": ["stream_query"],           # Streaming endpoint
            "bidi_stream": ["bidi_stream_query"]  # Bidirectional endpoint
        }

### Deploy the Echo Agent

Now let's deploy our echo agent to Agent Engine.

First, we create a local instance of our EchoAgent class.

Then, we call client.agent_engines.create(). This single command is the core of Agent Engine. It takes our local Python object (agent=echo_agent), serializes it, determines its dependencies (which we specify in requirements), stages all these files in our Cloud Storage, provisions the necessary serverless infrastructure, and deploys our agent as a scalable, secure endpoint.

Once deployed, you have the remote_echo_agent which is a client object that we can use to interact with our newly deployed agent.

In [None]:
# Create an instance of our echo agent
echo_agent = EchoAgent(
    project=PROJECT_ID,
    location=LOCATION
)

# Deploy to Agent Engine
remote_echo_agent = client.agent_engines.create(
    agent=echo_agent,
    config={
        # Display name in the console
        "display_name": "Echo Agent Tutorial",
        # Description for documentation
        "description": "Simple echo agent demonstrating streaming patterns",
        # Python dependencies needed by the agent
        "requirements": [
            "google-cloud-aiplatform[agent_engines] @ git+https://github.com/googleapis/python-aiplatform.git"
        ],
        # Cloud Storage bucket for staging
        "staging_bucket": BUCKET_URI,
        # Set the experimental mode
        "agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL
    }
)

print("✅ Agent deployed successfully!")
print(f"Resource name: {remote_echo_agent.api_resource.name}")

### Test the agent

Let's test all three interaction patterns to verify our agent is working correctly.


#### Test Standard Query

Here, we test the simplest pattern. We call the .query() method on our remote agent handle. The SDK sends the request, waits for the complete JSON response, and then prints it.


In [None]:
# Test 1: Standard query (request-response)
print("=" * 50)
print("Testing standard query()...")
print("=" * 50)

try:
    response = remote_echo_agent.query(input="Hello from standard query")
    print(f"✅ Query response: {response}")
except Exception as e:
    print(f"❌ Query failed: {e}")

#### Test Streaming Query

This tests the server-side streaming pattern. We call .stream_query() on the remote agent. Because the underlying method in our class is a generator, the client object also behaves like a generator. This lets us use a simple for loop to iterate over the response chunks as they're streamed back from the server, printing each one as it arrives.


In [None]:
# Test 2: Streaming query (server-side streaming)
print("\n" + "=" * 50)
print("Testing stream_query()...")
print("=" * 50)

try:
    print("Streaming response:")
    for chunk in remote_echo_agent.stream_query(input="Hello from streaming query"):
        print(f"  Received chunk: {chunk}")
    print("✅ Streaming completed")
except Exception as e:
    print(f"❌ Stream query failed: {e}")

#### Test Bidirectional Streaming

This tests our most advanced pattern. Because it's asynchronous, we must define an async test function.

*   We use async with client.aio.live.agent_engines.connect(...) to open a persistent, two-way connection to the agent endpoint we routed to bidi_stream_query.
*   This gives us a session object with two primary methods: session.send() (to send data to the agent) and session.receive() (to get data from the agent).
*   We simulate a conversation: we .send() a message, await its echoed .receive() response, and repeat.
*   Finally, we send "exit," which our agent's logic is designed to catch, causing it to yield a final "Goodbye!" and break its loop.


In [None]:
# Test 3: Bidirectional streaming (two-way communication)
print("\n" + "=" * 50)
print("Testing bidi_stream_query()...")
print("=" * 50)

async def test_bidi_streaming():
    """Test bidirectional streaming with multiple exchanges."""

    try:
        # Connect to the agent's bidirectional streaming endpoint
        async with client.aio.live.agent_engines.connect(
            agent_engine=remote_echo_agent.api_resource.name,
            config={"class_method": "bidi_stream_query"}
        ) as session:

            # Exchange 1: Send greeting
            await session.send({"input": "Hello from bidi"})
            response = await session.receive()
            print("Exchange 1 - Sent: 'Hello from bidi'")
            print(f"Exchange 1 - Received: {response['bidiStreamOutput']['output']}")

            # Exchange 2: Send another message
            await session.send({"input": "How are you?"})
            response = await session.receive()
            print("Exchange 2 - Sent: 'How are you?'")
            print(f"Exchange 2 - Received: {response['bidiStreamOutput']['output']}")

            # Exchange 3: Exit
            await session.send({"input": "exit"})
            response = await session.receive()
            print("Exchange 3 - Sent: 'exit'")
            print(f"Exchange 3 - Received: {response['bidiStreamOutput']['output']}")

            await session.close()
            print("✅ Bidirectional streaming completed")

    except Exception as e:
        print(f"❌ Bidi stream failed: {e}")

# Run the async test
await test_bidi_streaming()

## Live API Audio Conversations

Now that we understand the basic patterns, let's explore the Live API's real-time audio capabilities. This enables natural voice conversations with your agents.

### Understanding the Live API

The Live API is Google's real-time interface to Gemini models, enabling:

- **WebSocket connection**: Direct, persistent connection to Gemini models
- **Multimodal streaming**: Support for text, audio, and video in real-time
- **Natural interruptions**: Users can interrupt the agent mid-response
- **Low latency**: Optimized for conversational experiences (< 1 second response time)
- **Session continuity**: Maintains context throughout the conversation


### Create an agent using Live API

This agent connects directly to the Gemini Live API via WebSockets to enable a real-time audio conversation.

In this case, we have the set_up(...) to build the specific WebSocket URL (wss://...) for the Live API and set the generation config to explicitly request AUDIO responses.

Then we have the bidi_stream_query(...) which is the main entry point that Agent Engine calls:

*   It gets an auth token using _authenticate().
*   It opens a WebSocket connection to the Live API using the websockets library.
*   It initializes the connection using _setup_session().
*   It enters its main loop: it waits for text from our notebook client (await input_queue.get()), sends that text to Gemini with _send_text(), and then streams the audio response back by iterating over _receive_audio(), yielding each audio chunk back to the notebook client.


In [None]:
class LiveAudioAgent:
    """
    Agent with real-time audio capabilities using Live API.

    This implementation demonstrates:
    - WebSocket connection to Gemini Live API
    - Audio streaming configuration
    - Session management
    """

    def __init__(self, project: str, location: str, model_id: str):
        """Initialize with minimal configuration."""
        self.project = project
        self.location = location
        self.model_id = model_id

    def set_up(self):
        """Configure Live API connection."""
        # Build WebSocket URL for Live API
        host = f"{self.location}-aiplatform.googleapis.com"
        self.service_url = (
            f"wss://{host}/ws/google.cloud.aiplatform.v1."
            "LlmBidiService/BidiGenerateContent"
        )

        # Model path for Vertex AI
        self.model = (
            f"projects/{self.project}/locations/{self.location}/"
            f"publishers/google/models/{self.model_id}"
        )

        # Configure for audio responses
        self.config = {
            "response_modalities": ["AUDIO"]
        }

    async def _authenticate(self):
        """Get authentication token for WebSocket connection."""
        credentials, _ = google.auth.default()
        auth_req = google.auth.transport.requests.Request()
        credentials.refresh(auth_req)
        return credentials.token

    async def _setup_session(self, websocket):
        """Initialize Live API session."""
        setup_message = {
            "setup": {
                "model": self.model,
                "generation_config": self.config,
            }
        }
        await websocket.send(json.dumps(setup_message))

        # Receive setup confirmation
        response = await websocket.recv(decode=False)
        setup_response = json.loads(response.decode("ascii"))
        logging.info(f"Session established: {setup_response}")

    async def _send_text(self, websocket, text: str) -> bool:
        """
        Send text message to Live API.

        Returns:
            False if exit requested, True otherwise
        """
        if text.lower() in ("exit", "quit"):
            return False

        message = {
            "client_content": {
                "turns": [{
                    "role": "user",
                    "parts": [{"text": text}]
                }],
                "turn_complete": True,
            }
        }
        await websocket.send(json.dumps(message))
        return True

    async def _receive_audio(self, websocket) -> AsyncIterator[Dict]:
        """
        Receive and process audio response from Live API.

        Yields:
            Audio data dictionaries
        """
        async for raw_response in websocket:
            response = json.loads(raw_response.decode())
            server_content = response.get("serverContent")

            if not server_content:
                break

            # Extract audio from model turn
            model_turn = server_content.get("modelTurn")
            if model_turn:
                parts = model_turn.get("parts", [])
                for part in parts:
                    if "inlineData" in part:
                        # Decode PCM audio data
                        pcm_data = base64.b64decode(
                            part["inlineData"]["data"]
                        )
                        audio_array = np.frombuffer(
                            pcm_data,
                            dtype=np.int16
                        )
                        yield {"output": audio_array.tolist()}

            # Check for turn completion
            if server_content.get("turnComplete"):
                break

        yield {"output": "end of turn"}

    async def bidi_stream_query(
        self,
        input_queue: asyncio.Queue
    ) -> AsyncIterator[Dict[str, Any]]:
        """
        Handle bidirectional audio streaming.

        Args:
            input_queue: Queue for receiving user input

        Yields:
            Audio response data
        """
        # Get authentication token
        access_token = await self._authenticate()

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {access_token}",
        }

        # Establish WebSocket connection
        async with websockets.asyncio.client.connect(
            self.service_url,
            additional_headers=headers
        ) as websocket:

            # Initialize session
            await self._setup_session(websocket)

            # Main conversation loop
            while True:
                # Get user input
                request = await input_queue.get()
                text_input = request["input"]

                # Send to Live API
                if not await self._send_text(websocket, text_input):
                    break

                # Receive and yield audio response
                async for audio_data in self._receive_audio(websocket):
                    yield audio_data

### Deploy the Live Audio Agent

Let's deploy the agent. The process is identical to the first agent, but notice the requirements section in the config. We must list numpy, google-auth, and websockets because our LiveAudioAgent class imports and uses them. Agent Engine will see this list and pip install these packages into the agent's serverless container.

In [None]:
# Create Live Audio Agent
live_agent = LiveAudioAgent(
    project=PROJECT_ID,
    location=LOCATION,
    model_id=MODEL_ID  # Use a model that supports live API
)

# Deploy agent
remote_live_agent = client.agent_engines.create(
    agent=live_agent,
    config={
        "display_name": "Live Audio Agent Tutorial",
        "description": "Simple agent with real-time audio capabilities",
        "requirements": [
            "numpy",
            "google-auth",
            "websockets"
        ],
        "staging_bucket": BUCKET_URI,
        # Set the experimental mode
        "agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL
    },
)

print("✅ Agent deployed successfully!")
print(f"Resource name: {remote_live_agent.api_resource.name}")

### Test the live audio agent

To interact with the audio agent, you define a simple test client.

The client connects to our deployed agent's bidi_stream_query endpoint using the familiar client.aio.live.agent_engines.connect pattern.

It enters a while True loop that takes text from the console using input("You: "). It sends this text to our deployed agent using session.send().

It then enters an inner loop to collect the response. It calls await session.receive() repeatedly, appending each received audio array (which our agent is yielding) to the audio_chunks list. This inner loop breaks when it receives the special "end of turn" string that our agent yields when Gemini is done speaking.

Finally, it uses np.concatenate to stitch all the small audio chunks into one complete audio stream and uses display(Audio(...)) to play it directly in the notebook, creating a seamless text-in, audio-out chat experience.


In [None]:
async def interactive_audio_chat(client, agent_name: str):
    """Simple audio chat interface."""

    async with client.aio.live.agent_engines.connect(
        agent_engine=agent_name,
        config={"class_method": "bidi_stream_query"}
    ) as session:

        print("🎤 Audio Chat Ready! (type 'exit' to quit)\n")

        while True:
            await asyncio.sleep(0.1)
            user_input = input("You: ")

            if user_input.lower() == "exit":
                await session.send({"input": user_input})
                break

            # Send text
            await session.send({"input": user_input})

            # Collect audio response
            audio_chunks = []

            while True:
                response = await session.receive()
                output = response["bidiStreamOutput"]
                if output.get("output") == "end of turn":
                  break
                audio = output["output"]
                audio_chunks.append(np.array(audio))

            # Play audio
            print("Agent:")
            if audio_chunks:
                full_audio = np.concatenate(audio_chunks)
                display(Audio(full_audio, rate=24000, autoplay=True))
                print()

        await session.close()

# Run audio chat
await interactive_audio_chat(client, remote_live_agent.api_resource.name)

## Creating a Weather Assistant with ADK

In this final part, we'll build a weather assistant using the Agent Development Kit (ADK). This demonstrates how to combine Live API capabilities with tools, session management, and memory services.

### Understanding ADK Integration

The Agent Development Kit (ADK) provides high-level abstractions for building sophisticated agents:

- **Tool integration**: Easy function calling with automatic orchestration
- **Session management**: Persistent user context across conversations
- **Memory services**: Automatic conversation history management
- **Built-in patterns**: Production best practices by default
- **Error handling**: Robust error recovery and retry logic


### Define simple weather tools

This is the first tool for our ADK agent. It takes location (which the LLM will provide) and a special tool_context argument and returns a dictionary of (mock) weather data. The ADK will automatically serialize this and send it back to the LLM.

Notice the tool uses the ToolContext, which is an ADK object that gives the tool access to a persistent state. Here, we use tool_context.state (a dictionary) to store a history of all weather queries, demonstrating how tools can manage state.

In [None]:
async def get_weather(
    location: str,
    tool_context: ToolContext
) -> Dict[str, str]:
    """
    Get current weather for a location.

    This is a simple mock implementation.
    In production, integrate with a real weather API.

    Args:
        location: City or location name
        tool_context: ADK tool context for state management

    Returns:
        Weather information dictionary
    """

    import random
    from datetime import datetime

    # Mock weather data (replace with real API)
    temperature_c = random.randint(10, 35)
    temperature_f = int(temperature_c * 9/5 + 32)
    conditions = random.choice([
        "Sunny", "Cloudy", "Rainy", "Partly Cloudy"
    ])

    # Store in tool context for history
    if "weather_queries" not in tool_context.state:
        tool_context.state["weather_queries"] = []

    tool_context.state["weather_queries"].append({
        "location": location,
        "timestamp": datetime.now().isoformat(),
        "temperature": f"{temperature_c}°C"
    })

    return {
        "location": location,
        "temperature": f"{temperature_c}°C / {temperature_f}°F",
        "conditions": conditions,
        "humidity": f"{random.randint(30, 80)}%",
        "wind": f"{random.randint(5, 25)} km/h"
    }

This is our second tool. It's a simpler function that just takes a location and number of days and returns a JSON string of a (mock) forecast. It doesn't need the tool_context since it's not saving any state.

In [None]:
async def get_forecast(
    location: str,
    days: int = 3
) -> str:
    """
    Get weather forecast for multiple days.

    Args:
        location: City or location name
        days: Number of days to forecast (max 5)

    Returns:
        JSON string with forecast data
    """
    import random
    from datetime import datetime, timedelta
    import json

    days = min(days, 5)  # Limit to 5 days
    forecast = []

    for i in range(days):
        date = datetime.now() + timedelta(days=i)
        forecast.append({
            "date": date.strftime("%A, %B %d"),
            "high": f"{random.randint(15, 30)}°C",
            "low": f"{random.randint(5, 20)}°C",
            "conditions": random.choice([
                "Sunny", "Cloudy", "Rainy", "Partly Cloudy"
            ])
        })

    return json.dumps(forecast, indent=2)

### Define the ADK agent

Instead of manually writing a complex LiveAudioAgent class, we use the high-level LlmAgent class from the ADK.

We specify the model (which must support the Live API) and a detailed instruction (system prompt). And, we pass our two Python functions directly into the tools=[get_weather, get_forecast] list.

The ADK takes care of everything else: using this class together with other primitives, it will inspect the functions to create the tool definition (function calling schema), orchestrate the LLM calls, parse the LLM's request to use a tool, execute our Python function, and send the result back to the LLM to generate a final answer.


In [None]:
# Create the ADK Weather Agent
weather_agent = LlmAgent(
        model='gemini-2.0-flash-live-preview-04-09',
        name='weather_assistant',
        description='A helpful weather assistant with current conditions and forecasts',
        instruction="""
        You are a friendly and knowledgeable weather assistant.

        Your responsibilities:
        1. Provide current weather when asked
        2. Give forecasts when requested
        3. Be conversational and helpful
        4. Always mention the location in responses
        5. Provide temperature in both Celsius and Fahrenheit
        6. If asked for a forecast, provide all requested days

        Guidelines:
        - Use the get_weather tool for current conditions
        - Use the get_forecast tool for multi-day forecasts
        - Be proactive in offering additional information
        - Keep responses concise but informative
        """,
        tools=[
            get_weather,
            get_forecast,
        ],
        generate_content_config=types.GenerateContentConfig(
            temperature=0.7,
            max_output_tokens=512,
            safety_settings=[
                types.SafetySetting(
                    category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
                    threshold=types.HarmBlockThreshold.OFF,
                )
            ]
        )
    )

### Deploy the agent on Agent Engine

To be deployed on Agent Engine, an ADK Agent must be wrapped in an AdkApp. This wrapper provides all the necessary methods (like bidi_stream_query) that Agent Engine expects.

In this scenario, we provide builders for InMemorySessionService and InMemoryMemoryService. This tells the ADK to automatically manage conversation history and session context for us (in this case, just in memory). For production, you could swap these with persistent services like Firestore.

Finally, we call client.agent_engines.create() as before, but this time passing agent=app. We also add google-adk and google-genai to the requirements list.

In [None]:
# Set up session and memory services
def session_service_builder():
    from google.adk.sessions.in_memory_session_service import InMemorySessionService
    return InMemorySessionService()

def memory_service_builder():
    from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
    return InMemoryMemoryService()
    
# Wrap agent with AdkApp - this is all you need!
app = AdkApp(agent=weather_agent, session_service_builder=session_service_builder, memory_service_builder=memory_service_builder)

remote_live_adk_agent = client.agent_engines.create(
    agent=app,
    config={
        "display_name": "Weather Assistant Tutorial",
        "description": "Simple agent with real-time audio capabilities",
        "requirements": [
            "google-adk",
            "google-genai",
            "google-cloud-aiplatform[agent_engines] @ git+https://github.com/googleapis/python-aiplatform.git",
        ],
        "staging_bucket": BUCKET_URI,
        # Set the experimental mode
        "agent_server_mode": vertexai_types.AgentServerMode.EXPERIMENTAL
    },
)

print("✅ Agent deployed successfully!")
print(f"Resource name: {remote_live_adk_agent.api_resource.name}")

### Test the ADK agent

This is our final test client, built to interact with the deployed ADK agent.

The client defines a helper, prepare_live_request, to wrap our text input into the specific LiveRequest object that the deployed AdkApp expects. We connect using the same async with client.aio.live.agent_engines.connect pattern.

The main difference is in the response-handling loop. The ADK streams back a structured Event object. This event can contain text, tool-use information, or audio. Our inner loop parses this Event. If it contains inline_data, it's an audio chunk, which we collect just like in our previous agent. If it contains part.text, it's a text update (which we print).This enables the agent to provide both audio and text responses, managed automatically by the ADK.

When all events for the turn are received, we concatenate and play the full audio.

In [None]:
async def weather_chat_with_audio(client, agent_resource_name: str):
    """Interactive chat with the deployed weather agent."""

    def prepare_live_request(input_text: str):
        """Prepare request for ADK agent."""
        part = types.Part.from_text(text=input_text)
        content = types.Content(parts=[part])
        return LiveRequest(content=content)

    async with client.aio.live.agent_engines.connect(
        agent_engine=agent_resource_name,
        config={"class_method": "bidi_stream_query"}
    ) as connection:

        print("🌤️ Weather Assistant Ready!")
        print("Ask about weather in any city, or type 'exit' to quit.\n")
        print("Example questions:")
        print("- What's the weather in San Francisco?")
        print("- Give me a 5-day forecast for Tokyo")
        print("- Is it raining in London?\n")

        user_id = "weather_user"
        first_req = True

        while True:
            # Get user input
            time.sleep(0.1)  # Small delay for display
            input_text = input("You: ")

            if input_text.lower() == "exit":
                break

            # Prepare for audio response
            audio_data = []
            seen_audio = False

            # Send request
            if first_req:
                await connection.send({
                    "user_id": user_id,
                    "live_request": prepare_live_request(input_text).dict()
                })
                first_req = False
            else:
                await connection.send(prepare_live_request(input_text).dict())

            # Receive response
            print("Weather Assistant: ", end="")

            while True:
                received = await connection.receive()
                event = received["bidiStreamOutput"]

                # Handle the response format
                if "actions" in event and "requested_tool_confirmations" in event["actions"]:
                    del event["actions"]["requested_tool_confirmations"]

                event = Event.model_validate(event)

                # Extract content
                part = (
                    event.content and event.content.parts and event.content.parts[0]
                )

                if not part and seen_audio:
                    break

                if part and part.inline_data and part.inline_data.data:
                    # Audio response
                    seen_audio = True
                    chunk_data = part.inline_data.data
                    data = np.frombuffer(chunk_data, dtype=np.int16)
                    audio_data.append(data)
                elif part and part.text:
                    # Text response
                    print(part.text)
                    break
                elif part:
                    # Other response types
                    print(part)
                else:
                    # No more content
                    break

            # Play audio if available
            if audio_data:
                concatenated_audio = np.concatenate(audio_data)
                display(Audio(concatenated_audio, rate=24000, autoplay=True))
                print("[Audio response played]")

            print()  # New line for next interaction

        print("\n👋 Thanks for using Weather Assistant!")

# Run audio chat
await weather_chat_with_audio(client, remote_live_adk_agent.api_resource.name)

## Cleaning up

To avoid incurring charges for the resources created in this tutorial, clean up the deployed agents.

In [None]:
# Set to True to delete the agents
delete_agents = True  # @param {type: "boolean"}

if delete_agents:
    try:
        # Delete the echo agent
        print("Deleting Echo Agent...")
        remote_echo_agent.delete(force=True)
        print("✅ Echo Agent deleted")
    except Exception as e:
        print(f"❌ Failed to delete Echo Agent: {e}")

    try:
        # Delete the live audio agent
        print("Deleting Live Audio Agent...")
        remote_live_agent.delete(force=True)
        print("✅ Live Audio Agent deleted")
    except Exception as e:
        print(f"❌ Failed to delete Live Audio Agent: {e}")

    try:
        # Delete the weather assistant
        print("Deleting Weather Assistant...")
        remote_live_adk_agent.delete(force=True)
        print("✅ Weather Assistant deleted")
    except Exception as e:
        print(f"❌ Failed to delete Weather Assistant: {e}")

    print("\n✨ All resources cleaned up successfully!")
else:
    print("⚠️ Agents not deleted. Remember to clean up resources when done.")