# MCP + A2A Walkthrough

This notebook shows the basic MCP and A2A flows for the registry app:

- Discover agent cards via MCP (list + read).
- Call the A2A gateway with JSON-RPC.
- Call the registry HTTP API.
- Placeholder examples for streaming and push notifications.

It assumes `config.yaml` is populated and the registry app is deployed.

In [3]:
from __future__ import annotations

from pathlib import Path
import json

import httpx
import yaml
from databricks.sdk import WorkspaceClient
from fastmcp import Client as MCPClient
from fastmcp.client.transports import StreamableHttpTransport
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import MessageSendParams, SendMessageRequest

In [9]:
APP_HOST = "localhost:8000"

Once the app is running, we can get the status of each service.

In [13]:
import requests

if 'localhost' in APP_HOST:
    resp = requests.get(
    f"http://{APP_HOST}/api/registry/status"
    )
else:
    resp = requests.get(
        f"https://{APP_HOST}/api/registry/status",
        headers={"Authorization": f"Bearer {token}"},
    )
    
resp.raise_for_status()
print(resp.json())

{'database': {'ok': True, 'error': None}, 'a2a': {'ok': True, 'url': '/api/a2a'}, 'mcp': {'ok': True, 'url': '/api/mcp'}}


Let's test the MCP client - we are going to start the servers and then connect to it.

In [None]:
import asyncio
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

async def main():
    # the app expects a trailing slash
    transport = StreamableHttpTransport("http://localhost:8000/api/mcp/")
    async with Client(transport) as client:
        tools = await client.list_tools()
        print([t.name for t in tools])
        result = await client.call_tool(
            "list_available_agents",
            {"tags": ["demo"], "limit": 5}
        )
        print(result)

await main()

['list_available_agents', 'invoke_agent']
CallToolResult(content=[TextContent(type='text', text='{"agents":[]}', annotations=None, meta=None)], structured_content={'agents': []}, meta=None, data={'agents': []}, is_error=False)


In [None]:
config_path = Path("config.yaml")
if not config_path.exists():
    config_path = Path("..") / "config.yaml"

config = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}

REGISTRY_BASE_URL = config.get("registry_base_url", "https://<your-app>.databricksapps.com")
MCP_BASE_URL = f"{REGISTRY_BASE_URL}/api/mcp"
A2A_BASE_URL = f"{REGISTRY_BASE_URL}/api"

w = WorkspaceClient()
headers = w.config.authenticate()

print("MCP_BASE_URL:", MCP_BASE_URL)
print("A2A_BASE_URL:", A2A_BASE_URL)

## MCP: list and read agent cards

In [None]:
async def list_mcp_resources():
    transport = StreamableHttpTransport(url=MCP_BASE_URL, headers=headers)
    async with MCPClient(transport) as client:
        resources = await client.list_resources()
        return resources

async def read_mcp_resource(uri: str):
    transport = StreamableHttpTransport(url=MCP_BASE_URL, headers=headers)
    async with MCPClient(transport) as client:
        content = await client.read_resource(uri)
        return content

# Example usage:
# resources = await list_mcp_resources()
# print(resources)
# if resources:
#     content = await read_mcp_resource(resources[0].uri)
#     print(content[0].text)

## A2A: resolve agent card and send message

In [None]:
async def resolve_agent_card():
    async with httpx.AsyncClient(timeout=60.0) as httpx_client:
        resolver = A2ACardResolver(httpx_client=httpx_client, base_url=A2A_BASE_URL)
        card = await resolver.get_agent_card(http_kwargs={"headers": headers})
        return card

async def send_a2a_message(text: str):
    async with httpx.AsyncClient(timeout=60.0) as httpx_client:
        card = await resolve_agent_card()
        client = A2AClient(httpx_client=httpx_client, agent_card=card)
        payload = {
            "message": {
                "role": "user",
                "parts": [{"kind": "text", "text": text}],
            }
        }
        request = SendMessageRequest(params=MessageSendParams(**payload))
        response = await client.send_message(request, http_kwargs={"headers": headers})
        return response

# Example usage:
# response = await send_a2a_message('{"action":"list_agents"}')
# print(response.model_dump(mode="json", exclude_none=True))

## Registry HTTP API (basic CRUD reads)

In [None]:
def list_agents_http():
    resp = httpx.get(f"{REGISTRY_BASE_URL}/api/registry/agents", headers=headers, timeout=30.0)
    resp.raise_for_status()
    return resp.json()

def get_agent_http(agent_id: str):
    resp = httpx.get(
        f"{REGISTRY_BASE_URL}/api/registry/agents/{agent_id}",
        headers=headers,
        timeout=30.0,
    )
    resp.raise_for_status()
    return resp.json()

def get_agent_card_http(agent_id: str, version: str | None = None):
    params = {"version": version} if version else None
    resp = httpx.get(
        f"{REGISTRY_BASE_URL}/api/registry/agents/{agent_id}/card",
        headers=headers,
        params=params,
        timeout=30.0,
    )
    resp.raise_for_status()
    return resp.json()

# Example usage:
# agents = list_agents_http()
# print(agents)
# first_id = agents["agents"][0]["agent_id"]
# print(get_agent_http(first_id))
# print(get_agent_card_http(first_id))

## Streaming and push notifications (placeholders)

These APIs require a streaming-capable A2A server and push configuration. The registry gateway currently runs non-streaming.

In [None]:
async def send_a2a_message_streaming(text: str):
    """Placeholder: requires streaming-enabled A2A server."""
    raise NotImplementedError("Streaming requires a streaming-capable A2A server.")

async def configure_push_notifications():
    """Placeholder: requires push notification support in the server."""
    raise NotImplementedError("Push notifications not configured in this app.")