Environement setup

In [None]:
import numpy as np
import pandas as pd
import os

for dirname, _, filenames in os.walk("/kaggle/input"):
    for filename in filenames:
        print(os.path.join(dirname, filename))

print("Environment ready.")

Secrets Import

In [None]:
import os
from kaggle_secrets import UserSecretsClient

try:
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("‚úÖ Setup and authentication complete.")
except Exception as e:
    print(
        f"üîë Authentication Error: Please make sure you have added 'GOOGLE_API_KEY' to your Kaggle secrets. Details: {e}"
    )

ADK Import

In [None]:
import json
import requests
import subprocess
import time
import uuid
import warnings
warnings.filterwarnings("ignore")

from google.adk.agents import LlmAgent
from google.adk.agents.remote_a2a_agent import (
    RemoteA2aAgent,
    AGENT_CARD_WELL_KNOWN_PATH,
)

from google.adk.a2a.utils.agent_to_a2a import to_a2a
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.memory import InMemoryMemoryService
from google.genai import types
from google.adk.tools import load_memory, preload_memory
from google.adk.agents import Agent, SequentialAgent, ParallelAgent, LoopAgent
from google.adk.runners import InMemoryRunner
from google.adk.tools import AgentTool, FunctionTool, google_search
from typing import List


print("‚úÖ ADK components imported successfully.")

Retry Function

In [None]:
retry_config = types.HttpRetryOptions(
    attempts=5,  
    exp_base=7,  
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

print("‚úÖ retry functions set successfully.")

Helper Functions

In [None]:
from IPython.core.display import display, HTML
from jupyter_server.serverapp import list_running_servers
from typing import List
from google.genai import types

def get_adk_proxy_url():
    """Retrieve Kaggle proxied URL and display an HTML button to open ADK UI."""
    PROXY_HOST = "https://kkb-production.jupyter-proxy.kaggle.net"
    ADK_PORT = "8000"

    servers = list(list_running_servers())
    if not servers:
        raise Exception("No running Jupyter servers found.")

    baseURL = servers[0]['base_url']

    try:
        path_parts = baseURL.split('/')
        kernel = path_parts[2]
        token = path_parts[3]
    except IndexError:
        raise Exception(f"Could not parse kernel/token from base URL: {baseURL}")

    url_prefix = f"/k/{kernel}/{token}/proxy/proxy/{ADK_PORT}"
    url = f"{PROXY_HOST}{url_prefix}"

    styled_html = f"""
    <div style="padding: 15px; border: 2px solid #f0ad4e; border-radius: 8px; background-color: #fef9f0; margin: 20px 0;">
        <div style="font-family: sans-serif; margin-bottom: 12px; color: #333; font-size: 1.1em;">
            <strong>‚ö†Ô∏è IMPORTANT: Action Required</strong>
        </div>
        <div style="font-family: sans-serif; margin-bottom: 15px; color: #333; line-height: 1.5;">
            The ADK web UI is <strong>not running yet</strong>. You must start it in the next cell.
            <ol style="margin-top: 10px; padding-left: 20px;">
                <li><strong>Run the next cell</strong> containing <code>!adk web ...</code>.</li>
                <li>Wait for that cell to show "Running".</li>
                <li>Return here and click the button to open the UI.</li>
            </ol>
            <em style="font-size: 0.9em; color: #555;">(Clicking before ADK is running gives a 500 error.)</em>
        </div>
        <a href='{url}' target='_blank' style="
            display: inline-block; background-color: #1a73e8; color: white; padding: 10px 20px;
            text-decoration: none; border-radius: 25px; font-family: sans-serif; font-weight: 500;
            box-shadow: 0 2px 5px rgba(0,0,0,0.2); transition: all 0.2s ease;">
            Open ADK Web UI ‚Üó
        </a>
    </div>
    """

    display(HTML(styled_html))
    return url_prefix

async def run_session_with_auto_memory(
    runner_instance,
    user_queries,
    session_id="default",
    session_service=None,
    APP_NAME=None,
    USER_ID=None
):
    if isinstance(user_queries, str):
        user_queries = [user_queries]


    try:
        session = await session_service.create_session(
            app_name=APP_NAME, user_id=USER_ID, session_id=session_id
        )
    except Exception:
        session = await session_service.get_session(
            app_name=APP_NAME, user_id=USER_ID, session_id=session_id
        )

    if session is None:
        raise Exception("‚ùå Session could not be created or retrieved.")

    for query in user_queries:
        print(f"\nUser > {query}")

        query_content = types.Content(role="user", parts=[types.Part(text=query)])

        async for event in runner_instance.run_async(
            user_id=USER_ID,
            session_id=session.id,
            new_message=query_content
        ):
            if event.is_final_response() and event.content and event.content.parts:
                text = event.content.parts[0].text
                if text and text != "None":
                    print(f"Model >", text)

async def run_with_confirmation_loop(runner_instance, session_id, APP_NAME, USER_ID, user_query=None):
    """
    Run the root agent with interactive confirmation loop.
    Only prompt the user after they've submitted a query.
    """
    if user_query:
    
        await run_session_with_auto_memory(
            runner_instance=runner_instance,
            user_queries=user_query,
            session_id=session_id,
            session_service=session_service,
            APP_NAME=APP_NAME,
            USER_ID=USER_ID
        )

    
    while True:
        reply = input("\nDo you want to refine the travel plan? (yes/no): ").strip().lower()
        if reply not in ["yes", "no"]:
            print("Please reply 'yes' or 'no'.")
            continue

        await reply_tool.func(reply)

        await run_session_with_auto_memory(
            runner_instance=runner_instance,
            user_queries="",
            session_id=session_id,
            session_service=session_service,
            APP_NAME=APP_NAME,
            USER_ID=USER_ID
        )

        if reply == "no":
            print("‚úÖ Loop ended by user.")
            break



def load_all_memories(APP_NAME, USER_ID, memory_service):
    memories = memory_service.get_memories(APP_NAME, USER_ID)
    return {m.key: m.value for m in memories}


async def auto_save_to_memory(callback_context):
    await callback_context._invocation_context.memory_service.add_session_to_memory(
        callback_context._invocation_context.session
    )

print("‚úÖ Combined helpers loaded.")


Memory Service and Session service

In [None]:
memory_service = InMemoryMemoryService()
session_service = InMemorySessionService()

APP_NAME = "travel-planner"
USER_ID = "user123"

print("Memory + Session initialized.")


Planner Agent

In [None]:
planner_agent = Agent(
    name="Planner",
    model="gemini-2.5-flash-lite",
    instruction=(
        "Extract 3 things from the user query:\n"
        "1. Flight details\n"
        "2. Hotel requirements\n"
        "3. Budget\n"
        "Then pass these downstream."
    ),
    tools=[google_search],
    output_key="planner_output"
)

print("Planner agent Ready.")


flight agent

In [None]:
flight_agent = Agent(
    name="FlightAgent",
    model="gemini-2.5-flash-lite",
    instruction=(
        "Search for available flights based on planner_output."
    ),
    tools=[AgentTool(planner_agent), google_search],
    output_key="flight_results"
)

print("Flight agent Ready.")


hotel_agent

In [None]:
hotel_agent = Agent(
    name="HotelAgent",
    model="gemini-2.5-flash-lite",
    instruction=(
        "Search for hotels based on landmarks, location and date range."
    ),
    tools=[AgentTool(planner_agent), google_search],
    output_key="hotel_results"
)

print("Hotel agent Ready.")


budget_agent

In [None]:
budget_agent = Agent(
    name="BudgetAgent",
    model="gemini-2.5-flash-lite",
    instruction=(
        "Evaluate whether the flight and hotel options fit the user's budget. "
        "Return affordability verdict + recommendations."
    ),
    tools=[
        AgentTool(planner_agent),
        AgentTool(flight_agent),
        AgentTool(hotel_agent)
    ],
    output_key="budget_results"
)

print("Budget agent Ready.")


Aggregator Agent

In [None]:
aggregator_agent = Agent(
    name="Aggregator",
    model="gemini-2.5-flash-lite",
    instruction=(
        "Combine outputs:\n\n"
        "FLIGHTS:\n{flight_results}\n\n"
        "HOTELS:\n{hotel_results}\n\n"
        "BUDGET:\n{budget_results}\n\n"
        "Also, remember all past queries and responses from the user memory."
        "Produce a clean, useful summary."
    ),
    tools=[preload_memory], 
    output_key="summary"
)

print("Aggregator agent Ready.")


Confirmation Function tool

In [None]:
def user_reply(response: str):
    return {"reply": response}

reply_tool = FunctionTool(func=user_reply)

Confirmation+loop Agent

In [None]:
ask_confirmation = LlmAgent(
    name="AskConfirmation",
    model="gemini-2.5-flash-lite",
    instruction=(
        "Ask the user: 'Do you want to refine the travel plan? (yes/no)'. "
        "Call UserReplyTool with their answer."
    ),
    tools=[reply_tool]
)

confirmation_sequence = SequentialAgent(
    name="ConfirmationSequence",
    sub_agents=[ask_confirmation]
)

confirmation_loop = LoopAgent(
    name="ConfirmationLoop",
    sub_agents=[confirmation_sequence],
    max_iterations=5  # avoid infinite loops
)

print("Loop agent OK")

agent runners

In [None]:
parallel_team = ParallelAgent(
    name="ParallelTeam",
    sub_agents=[flight_agent, hotel_agent]
)

root_agent = SequentialAgent(
    name="TravelAdvisorSystem",
    sub_agents=[
        planner_agent,
        parallel_team,
        budget_agent,
        aggregator_agent,
        confirmation_loop
    ]
)

print("Root workflow assembled.")

run agent and call runner

In [None]:
runner = Runner(
    agent=root_agent,
    session_service=session_service,
    memory_service=memory_service,
    app_name=APP_NAME
)

query = """
Available flights from Dhaka to Berlin on November 30th,2025 return flight on Berlin to Dhaka on december 2nd 2025. 
Cheapest hotel near the airport from 30 november to 2nd december 2025. 
Budget 800 USD.
"""

response = await run_session_with_auto_memory(
    runner_instance=runner,
    user_queries=query,
    session_id="travel_session_1",
    session_service=session_service,
    APP_NAME=APP_NAME,
    USER_ID=USER_ID
)



Loop Prompt

In [None]:
user_query = input("Enter your travel request: ")
await run_with_confirmation_loop(
    runner_instance=runner,
    session_id="travel_session_1",
    APP_NAME=APP_NAME,
    USER_ID=USER_ID,
    user_query=user_query
)

Memory Inspection(For Debugging)

In [None]:
def load_session_memories(memory_service, APP_NAME, USER_ID):
    """
    Returns a dict of all key-value pairs stored in memory for this app and user.
    """
    result = {}
    
    app_mem = getattr(memory_service, "_memory", {}).get(APP_NAME, {})
    user_mem = app_mem.get(USER_ID, {})
    
    for k, v in user_mem.items():
        result[k] = v
    
    return result

memories = load_session_memories(memory_service, APP_NAME, USER_ID)
print("\n==== Session Memory Snapshot ====")
for k, v in memories.items():
    print(f"{k} => {v}")

Let's try the the WEB UI!

In [None]:
!adk create sample-agent --model gemini-2.5-flash-lite --api_key $GOOGLE_API_KEY

Get custom URL

In [None]:
url_prefix = get_adk_proxy_url()

Run ADK WEB

In [None]:
!adk web --url_prefix {url_prefix}