In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

API Configuration 

In [None]:
import os
from kaggle_secrets import UserSecretsClient

try:
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("‚úÖ Setup and authentication complete.")
except Exception as e:
    print(
        f"üîë Authentication Error: Please make sure you have added 'GOOGLE_API_KEY' to your Kaggle secrets. Details: {e}"
    )

Import ADK components

In [None]:
import uuid
from google.genai import types

from google.adk.agents import LlmAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters

from google.adk.apps.app import App, ResumabilityConfig
from google.adk.tools.function_tool import FunctionTool

print("‚úÖ ADK components imported successfully.")

Configure Retry Options

In [None]:
retry_config = types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)

Using MCP with your Agent 

In [None]:
# MCP integration with Everything Server
mcp_image_server = McpToolset(
    connection_params=StdioConnectionParams(
        server_params=StdioServerParameters(
            command="npx",  # Run MCP server via npx
            args=[
                "-y",  # Argument for npx to auto-confirm install
                "@modelcontextprotocol/server-everything",
            ],
            tool_filter=["getTinyImage"],
        ),
        timeout=30,
    )
)

print("‚úÖ MCP Tool created")

Behind the scenes:

1. Server Launch: ADK runs npx -y @modelcontextprotocol/server-everything
2. Handshake: Establishes stdio communication channel
3. Tool Discovery: Server tells ADK: "I provide getTinyImage" functionality
4. Integration: Tools appear in agent's tool list automatically
5. Execution: When agent calls getTinyImage(), ADK forwards to MCP server
6. Response: Server result is returned to agent seamlessly

Why This Matters: You get instant access to tools without writing integration code!

Step 3: Add MCP tool to agent

Let's add the mcp_server to the agent's tool array and update the agent's instructions to handle requests to generate tiny images

In [None]:
#Create image agent with MCP integration
image_agent = LlmAgent(
    model=Gemini(
        model="gemini-2.5-flash-lite",
        retry_options=retry_config
    ),
    name="image_agent",
    instruction="Use the MCP Tool to generate images for user queries",
    tools=[mcp_image_server],
)

In [None]:
from google.adk.runners import InMemoryRunner

runner = InMemoryRunner(agent=image_agent)

In [None]:
response = await runner.run_debug("Provide a sample tiny image", verbose=True)

Display the image:

The server returns base64-encoded image data. Let's decode and display it:

In [None]:
from IPython.display import dispaly, Image as IPImage
import base64

for event in response:
    if event.context and event.content.parts:
        for part in event.content.parts:
            if hasattr(part, "function_response") and part.function_response:
                for item in part.function_response.response.get("content", []):
                    if item.get("type") == "image":
                        display(IPImage(data=base64.b64decode(item["data"])))

2.3: Extending to Other MCP Servers
The same pattern works for any MCP server - only the connection_params change. Here are some examples:

üëâ Kaggle MCP Server - For dataset and notebook operations
Kaggle provides an MCP server that lets your agents interact with Kaggle datasets, notebooks, and competitions.

In [None]:
McpToolset(
    connection_params=StdioConnectionParams(
        server_params=StdioServerParameters(
            command='npx',
            args=[
                '-y',
                'mcp-remote',
                'https://www.kaggle.com/mcp'
            ],
        ),
        timeout=30,
    )
)

In [None]:
# What it provides:

# üìä Search and download Kaggle datasets
# üìì Access notebook metadata
# üèÜ Query competition information etc.,


In [None]:
# GitHub MCP Server - For PR/Issue analysis
Mcptoolset(
    connection_params=StreamableHTTPServerParams(
        url="https://api.githubcopilot.com/mcp/",
        headers={
            "Authorization": f"Bearer {GITHUB_TOKEN}",
            "X-MCP-Toolsets": "all",
            "X-MCP-Readonly": "true"
        },
    ),
)

Long-Running Operations (Human-in-the-Loop)

In [None]:
LARGE_ORDER_THRESHOLD = 5

def place_shipping_order(
    num_containers: int, 
    destination: str, 
    tool_context: ToolContext
) -> dict:
    """
    Places a shipping order. Requires approval if ordering more than 5 containers (LARGE_ORDER_THRESHOLD).

    Args:
        num_containers: Number of containers to ship
        destination: Shipping destination 

    Returns: 
        Dictionary with order status 
    """

    # --------------------------------------------------------
    # SCENARIO 1: Small orders (‚â§5 containers) auto-approve
    if num_containers <= LARGE_ORDER_THRESHOLD:
        return {
            "status": "approval",
            "order_id": f"ORD-{num_containers}-AUTO",
            "num_containers": num_containers,
            "destination": destination,
            "message": f"Order auto-approved: {num_containers} containers to {destination}",
        }

    # --------------------------------------------------------
    # SCENARIO 2: This is the first time this tool is called. Large orders need human approval - PAUSE here.
    if not tool_context.tool_confirmation:
        tool_context.request_confirmation(
            hint=f"‚ö†Ô∏è Large order: {num_containers} containers to {destination}. Do you want to approve?",
            payload={"num_containers": num_containers, "destination": destination},
        )
        return { # This is sent to the Agent 
            "status": "pending",
            "message": f"Order for {num_containers} containers requires approval",
        }

    # --------------------------------------------------------
    # SCENARIO 3: The tool is called AGAIN and is now resuming. Handle approval response - RESUME here.
    if tool_context.tool_confirmation.confirmed:
        return {
            "status": "approved",
            "order_id": f"ORD-{num_containers}-HUMAN",
            "num_containers": num_containers,
            "destination": destination,
            "message": f"Order approved: {num_containers} containers to {destination}",
        }
    else:
        return {
            "status": "rejected",
            "message": f"Order rejected: {num_containers} containers to {destination}",
        }

print("‚úÖ Long-running functions created!")

Create the Agent, App and Runner

In [None]:
# Create shipping agent with pausable tool
shipping_agent = LlmAgent(
    name="shpping_agent",
    model=Gemini(
        model="gemini-2.5-flash-lite", 
        retry_options=retry_config
    ),
    instruction="""You are a shipping coordinator assistant.
  
  When users request to ship containers:
   1. Use the place_shipping_order tool with the number of containers and destination
   2. If the order status is 'pending', inform the user that approval is required
   3. After receiving the final result, provide a clear summary including:
      - Order status (approved/rejected)
      - Order ID (if available)
      - Number of containers and destination
   4. Keep responses concise but informative
  """,
    tools=[FunctionTool(func=place_shipping_order)],
)

print("‚úÖ Shipping Agent created!")

Wrap in Resumable App

1. A normal LlmAgent is stateless and forgets previous steps
2. It can‚Äôt remember context when a tool pauses for approval.
3. Wrapping it in a resumable App adds state persistence.
4. The App saves messages, tool name, parameters, and pause point.
5. On resume, the App restores everything so the agent continues smoothly.


In [None]:
# Wrap the agent in a resumable app - THIS IS THE KEY FOR LONG-RUNNING OPERATIONS!
shipping_app = App(
    name="shipping_coordinator",
    root_agent=shipping_agent,
    resumability_config=ResumabilityConfig(is_resumable=True),
)

print("‚úÖ Resumable app created!")

Create Session and Runner with the App

Pass app=shipping_app instead of agent=... so the runner knows about resumability.

In [None]:
session_service = InMemorySessionService()

# Create runner with the resumable app
shipping_runner = Runner(
    app=shipping_app,  # Pass the app instead of the agent
    session_service=session_service,
)

print("‚úÖ Runner created!")

 Building the Workflow:
 
 4.1 ‚Äî Handling Pause/Resume
 1. The agent does not auto-resume; your workflow must manage it.
 2. Detect when a pause happens by checking for an adk_request_confirmation event.
 3. Get the human decision (UI or simulated).
 4. Resume the agent by sending the decision with the same invocation_id.

4.2 ‚Äî Key Technical Concepts (Summary)

1. events: All agent actions (tool calls, responses, results) are emitted as events.
2. adk_request_confirmation: Special event that signals a pause and contains the invocation_id.
3. invocation_id: Unique ID for each async run; must be reused to correctly resume execution.

4.3 ‚Äî Helper Functions (Summary)

1. ```check_for_approval()``` scans through events.
2. It detects the adk_request_confirmation pause event.
3. It returns both approval_id and invocation_id needed to resume.
4. Returns None if no pause was triggered.

In [None]:
def check_for_approval(events):
    """
    Check if events contain an approval request.
    Returns:
        dict with approval details or None
    """
    for event in events:
        if event.content and event.content.parts:
            for part in event.content.parts:
                if (
                    part.function_call
                    and part.function_call.name == "adk_request_confirmation"
                ):
                    return {
                        "approval_id": part.function_call.id,
                        "invocation_id": event.invocation_id,
                    }
    return None

```print_agent_response()``` - Displays agent text

Simple helper to extract and print text from events

In [None]:
def print_agent_response(events):
    """Print agents's text responses from events."""
    for event in events:
        if event.context and event.content.parts:
            for part in event.content.parts:
                if part.text:
                    print(f"Agent > {part.text}")

```create_approval_response()``` - Formats the human decision

Takes the approval info and boolean decision (True/False) from the human
Creates a FunctionResponse that ADK understands
Wraps it in a Content object to send back to the agent

In [None]:
def create_approval_response(approval_info, approved):
    """Create approval response message."""
    confirmation_response = types.FunctionResponse(
        id=approval_info["approval_id"],
        name="adk_request_confirmation",
        response={"confirmed": approved},
    )
    return types.Content(
        role="user",
        parts=[types.Part(function_response=confirmation_response)]
    )

print("‚úÖ Helper functions defined")

4.4: The Workflow Function - Let's tie it all together!¬∂
The run_shipping_workflow() function orchestrates the entire approval flow.

Look for the code explanation in the cell below.

In [None]:
async def run_shipping_workflow(query: str, auto_approve: bool = True):
    """Runs a shipping workflow with approval handling.

    Args:
        query: User's shipping request
        auto_approve: Whether to auto-approve large orders (simulates human decision)
    """

    print(f"\n{'='*60}")
    print(f"User > {query}\n")

    # Generate unique session ID
    session_id = f"order_{uuid.uuid4().hex[:8]}"

    # Create session
    await session_service.create_session(
        app_name="shipping_coordinator", user_id="test_user", session_id=session_id
    )

    query_content = types.Content(role="user", parts=[types.Part(text=query)])
    events = []

    #----------------------------------------------------------
    # STEP 1: Send initial request to the Agent. If num_containers > 5, the Agent returns the special `adk_request_confirmation` event
    async for event in shipping_runner.run_async(
        user_id="test_user",
        session_id=session_id,
        new_message=query_content
    ):
        events.append(event)

    #----------------------------------------------------------
    # STEP 2: Loop through all the events generated and check if `adk_request_confirmation` is present.
    approval_info = check_for_approval(events)

    #----------------------------------------------------------
    # STEP 3: If the event is present, it's a large order - HANDLE APPROVAL WORKFLOW
    if approval_info:
        print(f"‚è∏Ô∏è  Pausing for approval...")
        print(f"ü§î Human Decision: {'APPROVE ‚úÖ' if auto_approve else 'REJECT ‚ùå'}\n")

        # PATH A: Resume the agent by calling run_async() again with the approval decision
        async for event in shipping_runner.run_async(
            user_id="test_user",
            session_id=session_id,
            new_meassage=create_approval_response(
                approval_info,
                auro_approval
            ), # Send human decision here
            innovation_id=approval_info[
                "invocation_id"
            ], # Critical: same invocation_id tells ADK to RESUME
        ):
            if event.content and event.content.parts:
                for part in event.content.parts:
                    if part.text:
                        print(f"Agent > {part.text}")
    #----------------------------------------------------------
    else:
        # PATH B: If the `adk_request_confirmation` is not present - no approval needed - order completed immediately.
        print_agent_response(events)

    print(f"{'='*60}\n")

print("‚úÖ Workflow function ready")

Testing the workflow:

In [None]:
# Demo 1: It's a small order. Agent receives auto-approved status from tool
await run_shipping_workflow("Ship 3 containers to Singapore")

# Demo 2: Workflow simulates human decision: APPROVE ‚úÖ
await run_shipping_workflow("Ship 10 containers to Rotterdam", auto_approve=True)

# Demo 3: Workflow simulates human decision: REJECT ‚ùå
await run_shipping_workflow("Ship 8 containers to Los Angeles", auto_approve=False)