<a href="https://colab.research.google.com/github/datacommonsorg/agent-toolkit/blob/main/notebooks/datacommons_mcp_tools_with_custom_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Copyright 2025 Google LLC. SPDX-License-Identifier: Apache-2.0

---


# ✨ Try Data Commons MCP Tools with a Custom Agent

## Introduction

This notebook shows you how to build and run an agent with Google's [Agent Development Kit (ADK)](https://google.github.io/adk-docs/) that communicates with a subprocess serving the [Data Commons MCP tools](https://pypi.org/project/datacommons-mcp/) over HTTP.

To see an implementation of an agent using the native ADK web framework with stdio, refer to the [Data Commons MCP sample agents](https://github.com/datacommonsorg/agent-toolkit/tree/main/packages/datacommons-mcp/examples/sample_agents).





---



## Create your ADK agent

#### **Step 0**: Make a copy of this notebook:
* Go to **File** > **Save a copy in Drive**.


#### **Step 1**: Add your Data Commons and Gemini API keys to the notebook's secrets:

1. Open the **Secrets** panel by clicking the key icon (key.png) in the left menu.
1. Click **+ Add new secret** and set the `Name` and `Value` fields for each required API key listed below:
   - Data Commons API Key:
     - **Name**: `DC_API_KEY`
     - **Value**: Your Data Commons API key obtained from https://apikeys.datacommons.org/
   - Gemini API Key:
     - **Name**: `GOOGLE_API_KEY` or `GEMINI_API_KEY`
     - **Value**: A Google Gemini-enabled API key obtained from https://aistudio.google.com/apikey
1. Enable **Notebook access** for both.

In [None]:
# @title #### **Step 2.** Import required libraries and initialize secrets. {"display-mode":"form"}
# @markdown 👉 Run this cell

!pip uninstall -q -y google-genai google-adk
!pip install -q google-genai==1.36.0 google-adk==1.14.1

import logging
import os
import warnings
from google.colab import userdata


logger = logging.getLogger()
logger.setLevel(logging.ERROR)

warnings.filterwarnings(
    "ignore", category=UserWarning, module="google.adk.tools.mcp_tool"
)

DC_API_KEY = None
GOOGLE_API_KEY = None

# Get and validate DC_API_KEY
try:
    DC_API_KEY = userdata.get("DC_API_KEY")
except userdata.SecretNotFoundError:
    raise ValueError(
        "`DC_API_KEY` not found in Colab secrets. "
        "Please follow the instructions in 'Step 1' to add the secret."
    ) from None
os.environ["DC_API_KEY"] = DC_API_KEY

# Get and validate the Google/Gemini API key
try:
    GOOGLE_API_KEY = userdata.get("GOOGLE_API_KEY")
except userdata.SecretNotFoundError:
    try:
        GOOGLE_API_KEY = userdata.get("GEMINI_API_KEY")
    except userdata.SecretNotFoundError:
        raise ValueError(
            "Could not find `GOOGLE_API_KEY` or `GEMINI_API_KEY` in Colab secrets. "
            "Please follow the instructions in 'Step 1' to set one of these secrets."
        ) from None

os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

print("✅ API keys successfully loaded and set.")

In [None]:
# @title #### **Step 3.** Start the Data Commons MCP server in a subprocess. {"display-mode":"form"}
# @markdown 👉  Run this cell

import asyncio
import subprocess
import sys
import time

import aiohttp

MCP_PORT = 3000


async def wait_for_server_ready(
    url: str, process: subprocess.Popen, timeout_sec: int = 15
):
    """
    Actively polls the server's HTTP endpoint AND checks that the subprocess
    is still running. Replaces the unreliable 'asyncio.sleep()'.
    """
    logger.info(f"Waiting for server to launch at {url}...")
    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        while True:
            # 1. First, check if the subprocess crashed or exited
            return_code = process.poll()
            if return_code is not None:
                # The process terminated. This is a hard failure.
                # Logs (if any) will have already printed to the notebook output.
                logger.error(
                    f"Server process failed to start (exit code {return_code}). See notebook output for server logs."
                )
                raise ChildProcessError(
                    f"MCP server process terminated unexpectedly (exit code {return_code}). See notebook output for server logs."
                )

            # 2. If it's running, try to connect via HTTP
            try:
                # We just need *any* response. Even a 404 (Not Found) or 405 (Method Not Allowed)
                # proves the server is up and responding to HTTP requests.
                # aiohttp.ClientConnectorError is the "Connection Refused" we expect to see while it's starting.
                async with session.get(url):
                    print("✅ MCP Server is up and responding.")
                    return  # Success!

            except aiohttp.ClientConnectorError:
                # This is expected. Server is not ready yet. Keep waiting.
                pass

            except Exception as e:
                # Any other exception might be a problem, log it.
                logger.warning(f"Health check received unexpected error: {e}")

            # 3. Check if we've run out of time
            if time.time() - start_time > timeout_sec:
                logger.error(
                    f"Timeout: Server did not respond within {timeout_sec} seconds."
                )
                # Kill the hung process
                process.kill()
                # Logs (if any) will have already printed to the notebook output.
                logger.error("Server logs (if any) should be visible above.")
                raise TimeoutError(
                    f"MCP Server failed to start at {url} within {timeout_sec}s."
                )

            # 4. Wait a moment before the next poll
            await asyncio.sleep(0.5)  # Poll every 500ms


#
# --- Install uv in subprocess env ---
#
logger.info("Installing 'uv' package...")
install_process = subprocess.run(
    [
        sys.executable,
        "-m",
        "pip",
        "install",
        "-q",
        "uv",
    ],  # This is the correct way to run pip
    check=False,
    capture_output=True,
    text=True,
)

if install_process.returncode != 0:
    logger.error("Subprocess failed to install 'uv': %s", install_process.stderr)
    raise RuntimeError("Could not install uv on subprocess")
else:
    logger.info("'uv' package successfully installed.")

#
# --- Kill any existing process on the MCP port ---
#
logger.info(f"Ensuring port {MCP_PORT} is free...")
# Kill any process on the port; we don't need to see the output.
subprocess.run(
    ["fuser", "-k", f"{MCP_PORT}/tcp"],
    check=False,
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL,
)

#
# ---Start MCP Server ---
#
logger.info("Starting MCP server...")
kernel_env = os.environ.copy()
mcp_server_process = subprocess.Popen(
    [
        sys.executable,  # Call Python
        "-m",
        "uv",  # Run the 'uv' module (NOT 'uvx' or the install string)
        "tool",
        "run",  # The rest of the uv command
        "datacommons-mcp",
        "serve",
        "http",
        "--port",
        str(MCP_PORT),
    ],
    env=kernel_env,
    stdout=None,
    stderr=None,
)
# Give the server a moment to start up, then check that the server is running
await asyncio.sleep(2)
await wait_for_server_ready(
    url=f"http://localhost:{MCP_PORT}", process=mcp_server_process
)

#### **Step 4**. Set up your agent.

In [None]:
# @title ##### **a.** Select a model {"display-mode":"form"}
# @markdown 👉 Select your desired agent model, then **run this cell**.

AGENT_MODEL = "gemini-2.5-pro"  # @param ["gemini-2.5-flash","gemini-2.5-pro","gemini-2.5-flash-lite"] {"allow-input":true}

print(f"✅ Agent model selected.")

In [None]:
# @title ##### **b.** Define the agent's core instructions (system prompt). {"display-mode":"form"}
# @markdown 👉 Select or write your own prompt to set the agent's permanent rules and personality, then **run this cell**.
# @markdown
# @markdown > Here are some tips for writing great agent instructions: [Google ADK - Guiding the Agent: Instructions](https://google.github.io/adk-docs/agents/llm-agents/#guiding-the-agent-instructions-instruction).
AGENT_INSTRUCTIONS = "Use the Data Commons MCP tools to respond to user queries. Cite the data source when possible." # @param ["Use the Data Commons MCP tools to respond to user queries. Cite the data source when possible.","You are an agent that compares statistics between multiple locations. Use the Data Commons tools to fetch the data, present the comparison in a markdown table, and always cite the data source.","You are an agent that retrieves time-series data. When a user asks for a statistic over a range of years for a specific place, use the Data Commons tools to find the data, present it as a year-by-year list, and cite the source."] {"allow-input":true}

print("✅ Agent instructions initialized.")

In [None]:
# @title ##### **c.** Create the agent and initialize a session {"display-mode":"form"}
# @markdown 👉 Run this cell
# @markdown
# @markdown **Note**: You must re-run this cell every time you update the AGENT_INSTRUCTIONS or change the model!

from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool.mcp_toolset import (
    McpToolset,
    StreamableHTTPConnectionParams,
)
from google.genai import types

from dataclasses import dataclass, field

from IPython.display import Markdown, clear_output, display

#
# ---Define the Agent ---
# Key Concept: Create the agent w/ system prompt, model, and MCP tools.
#

datcom_agent = LlmAgent(
    name="datacommons_agent",
    model=AGENT_MODEL,
    instruction=AGENT_INSTRUCTIONS,
    tools=[
        McpToolset(
            connection_params=StreamableHTTPConnectionParams(
                url=f"http://localhost:{MCP_PORT}/mcp"
            )
        )
    ],
)
logging.info(f"Agent '{datcom_agent.name}' created using model '{AGENT_MODEL}'.")


#
# -----------------------------------------------------------------------------
# The rest of this cell is only required for running the agent in Colab. This
# is automatically handled when running with adk web.
# ----------------------------------------------------------------------------
#

#
# ---Session Management ---
# Key Concept: SessionService stores conversation history & state.
# InMemorySessionService is  simple, non-persistent storage for this tutorial.
#
session_service = InMemorySessionService()

# Define constants for identifying the interaction context
APP_NAME = "datacommons_app"
USER_ID = "user_1"
SESSION_ID = "session_001"

# Create the specific session where the conversation will happen
session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID,
)
logging.info(
    f"Session created: App='{APP_NAME}', User='{USER_ID}', Session='{SESSION_ID}'"
)

#
# ---Runner ---
# Key Concept: Runner orchestrates the agent execution loop.
#
runner = Runner(agent=datcom_agent, app_name=APP_NAME, session_service=session_service)
logging.info(f"Runner created for agent '{runner.agent.name}'.")

#
# ---Async Agent Interaction Function ---
#


@dataclass
class AgentTurn:
    """Holds all information for one turn of conversation."""

    user_query: str
    agent_response: str
    tool_calls: list[str] = field(default_factory=list)

    def _get_text(self) -> list[str]:
        # Build the output as a list of strings
        output = [f"\n>> 👤 User: {self.user_query}\n"]

        if self.tool_calls:
            output.append("🛠️ Tool Calls:")
            for call in self.tool_calls:
                output.append(f" - {call}")
            output.append("")  # Add a newline

        return output

    def __str__(self) -> str:
        """This method is called when you use print() on the object."""

        # Build the output as a list of strings
        output = self._get_text()
        output.append(f"<< 🤖 Agent: {self.agent_response}\n")

        # Join all the parts into a single string
        return "\n".join(output)

    def print_pretty(self) -> None:
        raw_text = self._get_text()
        print("\n".join(raw_text))
        print("<< 🤖 Agent:\n")
        display(Markdown(self.agent_response))
        print("")


def print_logs() -> None:
    for log in logs:
        log.print_pretty()


async def call_agent_async(query: str, runner, user_id, session_id) -> None:
    """Sends a query to the agent and prints the final response."""

    if logs:
        print("Session History:")
        print_logs()
        print(
            "---------------------------------------------------------\n\nCurrent Query:"
        )

    print(f"\n>> 👤 User Query: {query}")
    print("\n...waiting for agent's response\n")

    # Prepare the user's message in ADK format
    content = types.Content(role="user", parts=[types.Part(text=query)])

    final_response_text = "Agent did not produce a final response."  # Default

    # Iterate through events to find the final answer.
    tool_calls = []
    async for event in runner.run_async(
        user_id=user_id, session_id=session_id, new_message=content
    ):
        # You can uncomment the line below to see *all* events during execution
        logger.info(
            f"  [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}"
        )
        if event.author == "datacommons_agent":
            for tool_call in event.get_function_calls():
                if not tool_calls:
                    print("🛠️ Tool Calls:")

                args = ", ".join(
                    sorted([f"{k}={repr(v)}" for k, v in tool_call.args.items()])
                )
                tool_call_str = f"{tool_call.name}({args})"
                print(f" - {tool_call_str}")
                tool_calls.append(tool_call_str)

        # Key Concept: is_final_response() marks the concluding message for the turn.
        if event.is_final_response():
            if event.content and event.content.parts:
                # Assuming text response in the first part
                final_response_text = event.content.parts[0].text
            elif (
                event.actions and event.actions.escalate
            ):  # Handle potential errors/escalations
                final_response_text = (
                    f"Agent escalated: {event.error_message or 'No specific message.'}"
                )
            # Add more checks here if needed (e.g., specific error codes)
            break  # Stop processing events once the final response is found

    log = AgentTurn(
        user_query=query, agent_response=final_response_text, tool_calls=tool_calls
    )
    clear_output(wait=True)
    if logs:
        print("Session History:")
        print_logs()
        print(
            "---------------------------------------------------------\n\nCurrent Query:\n"
        )
    log.print_pretty()
    logs.append(
        AgentTurn(
            user_query=query, agent_response=final_response_text, tool_calls=tool_calls
        )
    )


logs = []

print(f"✅ Agent created with a fresh session history.")



---
## 🚀 Take your agent for a spin!

You're all set! It's time to chat with the agent you just built.

#### **👉 How to use this cell:**

* **Type your question:** Click into the input box below.

* **Run the cell:** Press the ▶ "play" button on the left.
  * **Pro-tip:** You can also use a keyboard shortcut to run the cell, and your cursor will stay in the box, ready for your next question:
    * **Windows:** `Ctrl + Enter`
    * **Mac:** `Cmd + Enter`

* **Keep chatting:** After you get a response, you can ask a follow-up question. Just type your new question in the same box and run the cell again. The agent will remember what you talked about.

#### **Managing your chat history:**

* **To keep your conversation going:** Just continue to use **this** cell.

* **To restart the agent (and erase its memory):** Re-run the agent setup cell (Step 4c).

In [None]:
# @title {"display-mode":"form"}
Input = ""  # @param {"type":"string", "placeholder": " "}


first_run = not logs

if not Input.strip():
    print("⚠️ Provide input for the agent, then run the cell again.")
else:
    await call_agent_async(Input, runner=runner, user_id=USER_ID, session_id=SESSION_ID)

if first_run:
  print("👉 Provide a follow-up or ask a new question using the same 'Input' box above. ")



---


#### **Here are some ideas to get you started:**

* Which countries have the lowest GINI index?

* Tell me about the economy in Brazil.

* Which states in the US have the highest smoke pollution?

* How does life expectancy vary across countries in Africa?

---
❗ AI applications using the MCP server can make mistakes, so please double-check responses.