# A2A Multi Agent - Quick-Start Notebook

Welcome to the A2A Multi-Agent Quick-Start notebook! This guide will show you how to set up and interact with a multi-agent system using the Agent-to-Agent (A2A) protocol within the Llama Stack environment.

## Overview

This notebook guides you through the following key steps to set up and interact with a multi-agent system:

1.  **Setting up the Environment**: Preparing your Python environment by installing necessary libraries and configuring asynchronous operations.

2.  **Agent Management**: Understanding how to connect to and manage the different agents within the system.

3.  **Defining Agent Task Flow**: Exploring the multi-phase process (planning, execution, and composition) by which agents collaboratively solve a query.

4.  **Launching Agent Servers**: Starting the Agent-to-Agent (A2A) orchestrator and skill agent servers.

5.  **Interacting with Agents**: Sending questions to the agent team and observing their orchestrated responses.

## Prerequisites

Before you begin, ensure that you have:

* `python_requires >= 3.13`.

* Completed the initial setup as outlined in the [Setup Guide](../../rag_agentic/notebooks/Level0_getting_started_with_Llama_Stack.ipynb) notebook.

## Environment Variables

This notebook is designed to be flexible, allowing you to connect to either a local or a remote Llama Stack instance, and to specify the inference models used by the agents. You can configure these aspects using the following environment variables:

* `REMOTE_BASE_URL`: Set this variable if you intend to connect to a **remote Llama Stack instance**. If this variable is not set, the notebook will default to running with a local instance.

* `INFERENCE_MODEL_ID`: Define this variable to specify the default Large Language Model (LLM) that agents should use for inference. We recommend using `llama3.1:8b-instruct-fp16` for optimal performance.

**Note on Agent-Specific Models:**
If you require different inference models for individual agents, you can achieve this by directly opening and modifying the `__main__.py` file within each respective agent's folder (e.g. `demos/a2a_llama_stack/agents/a2a_custom_tools/__main__.py`).

## 1. Setting Up the Notebook Environment

We'll start by setting up the necessary environment and installing the required packages to enable A2A communication.

We'll install the official [A2A sample implementation by Google](https://github.com/google/A2A/tree/main/samples/python), the Llama Stack client, and other essential packages for asynchronous operations in Jupyter. Run the following command:

In [1]:
! pip install "git+https://github.com/google/A2A.git#subdirectory=samples/python"
! pip install llama_stack_client asyncclick nest_asyncio ipywidgets dotenv

Collecting git+https://github.com/google/A2A.git#subdirectory=samples/python
  Cloning https://github.com/google/A2A.git to /private/var/folders/p4/635191ns4599kwjkqt12kwd80000gn/T/pip-req-build-11e356th
  Running command git clone --filter=blob:none --quiet https://github.com/google/A2A.git /private/var/folders/p4/635191ns4599kwjkqt12kwd80000gn/T/pip-req-build-11e356th
  Resolved https://github.com/google/A2A.git to commit 081fa20bdfede24922c49e8e56fcdfbee0db0c28
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39

Next, we'll add the necessary paths to `sys.path` to ensure we can import the required libraries later in the notebook.

In [2]:
import sys
# the path of the A2A library
sys.path.append('./A2A/samples/python')
# the path to our own utils
sys.path.append('../..')

We will now proceed with importing all the necessary Python libraries and modules for the notebook.

In [3]:
from common.server import A2AServer
from common.types import AgentCard, AgentSkill, AgentCapabilities
from a2a_llama_stack.A2ATool import A2ATool
from a2a_llama_stack.task_manager import AgentTaskManager

# for asynchronously serving the A2A agent
import os
import time
import json
import logging
from dotenv import load_dotenv
import urllib.parse
from uuid import uuid4
from typing import Any, Dict, List, Tuple
import subprocess
import socket
import asyncio
import nest_asyncio
import threading
import concurrent.futures as cf



# Importing custom modules from the common package
from common.client import A2AClient, A2ACardResolver
from common.utils.push_notification_auth import PushNotificationReceiverAuth
from hosts.cli.push_notification_listener import PushNotificationListener

Next, we will initialize our environment as described in detail in our ["Getting Started" notebook](../../rag_agentic/notebooks/Level0_getting_started_with_Llama_Stack.ipynb). Please refer to it for additional explanations.

In [4]:
# for accessing the environment variables
import os
from dotenv import load_dotenv
load_dotenv()


base_url = os.getenv("REMOTE_BASE_URL", "http://localhost:8321")
    
print(f"Connected to Llama Stack server")

# the model you wish to use that is configured with the Llama Stack server
model_id = os.getenv("INFERENCE_MODEL_ID", "llama3.1:8b-instruct-fp16")

temperature = float(os.getenv("TEMPERATURE", 0.0))
if temperature > 0.0:
    top_p = float(os.getenv("TOP_P", 0.95))
    strategy = {"type": "top_p", "temperature": temperature, "top_p": top_p}
else:
    strategy = {"type": "greedy"}

max_tokens = int(os.getenv("MAX_TOKENS", 4096))

# sampling_params will later be used to pass the parameters to Llama Stack Agents/Inference APIs
sampling_params = {
    "strategy": strategy,
    "max_tokens": max_tokens,
}

stream_env = os.getenv("STREAM", "False")
# the Boolean 'stream' parameter will later be passed to Llama Stack Agents/Inference APIs
# any value non equal to 'False' will be considered as 'True'
stream = (stream_env != "False")

print(f"Inference Parameters:\n\tModel: {model_id}\n\tSampling Parameters: {sampling_params}\n\tstream: {stream}")

Connected to Llama Stack server
Inference Parameters:
	Model: llama3.1:8b-instruct-fp16
	Sampling Parameters: {'strategy': {'type': 'greedy'}, 'max_tokens': 4096}
	stream: False


We'll configure basic logging to provide informative output from the agents as they run, which can be very helpful for debugging and understanding the flow.

In [5]:
# Configuring basic logging for the application
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger(__name__)

We need to use `nest_asyncio` to make sure things run smoothly when your Python code tries to do multiple tasks at the same time in Jupyter.

In [6]:
nest_asyncio.apply()

## 2. Understanding the `AgentManager`

The `AgentManager` class is key to connecting with and managing the different agents in our system.

It handles:

* Connecting to your orchestrator agent.

* Connecting to individual skill agents (the ones that perform specific tasks).

* Managing unique session IDs for each connection.

* Using a helper function (`_send_payload`) to send tasks to the agents.

In [7]:
AgentInfo = Tuple[str, Any, A2AClient, str]

class AgentManager:
    def __init__(self, urls: List[str]):
        # first URL is your orchestrator…
        self.orchestrator: AgentInfo = self._make_agent_info(urls[0])
        # …the rest are skill agents, each keyed by skill.id
        self.skills: Dict[str, AgentInfo] = {
            skill.id: info
            for url in urls[1:]
            for info in (self._make_agent_info(url),)
            for skill in info[1].skills
        }

    @staticmethod
    def _make_agent_info(url: str) -> AgentInfo:
        card   = A2ACardResolver(url).get_agent_card()
        client = A2AClient(agent_card=card)
        session = uuid4().hex
        return url, card, client, session


async def _send_payload(client, card, session, payload, streaming: bool) -> str:
    if not streaming:
        res = await client.send_task(payload)
        return res.result.status.message.parts[0].text.strip()

    text = ""
    async for ev in client.send_task_streaming(payload):
        part = ev.result.status.message.parts[0].text or ""
        print(part, end="", flush=True)
        text = part
    print()
    return text

## 3. Preparing and Sending Tasks to Agents

This section defines functions that format the user's question into a structured message (a JSON payload) that the agents can understand and process. It then uses the `_send_payload` helper from the `AgentManager` to send this task to the appropriate agent.

In [8]:
def _build_skill_meta(mgr):
    """Gather metadata for every skill in all executor cards."""
    return [
        {
            "skill_id": s.id, "name": s.name,
            "description": getattr(s, "description", None),
            "inputSchema":  getattr(s, "inputSchema", None),
            "outputSchema": getattr(s, "outputSchema", None),
        }
        for _, card, _, _ in mgr.skills.values()
        for s in card.skills
    ]

async def _send_task_to_agent(mgr, client, card, session, question, push=False, host=None, port=None) -> str:
    """Build a card-driven payload (with optional push) and dispatch it."""
    # Skill metadata + input parts
    skills = _build_skill_meta(mgr)
    content = {"skills": skills, "question": question}
    modes   = getattr(card, "acceptedInputModes", ["text"])
    parts   = ([{"type": "json", "json": content}]
               if "json" in modes
               else [{"type": "text", "text": json.dumps(content)}])

    # Optional push URL & auth
    can_push = push and getattr(card.capabilities, "pushNotifications", False)
    push_url = (urllib.parse.urljoin(f"http://{host}:{port}", "/notify")
                if can_push and host and port else None)
    schemes = getattr(card.authentication, "supportedSchemes", ["bearer"])

    # Assemble payload
    payload = {
        "id": uuid4().hex,
        "sessionId": session,
        "acceptedOutputModes": card.defaultOutputModes,
        "message": {"role": "user", "parts": parts},
        **({"pushNotification": {"url": push_url,
                                 "authentication": {"schemes": schemes}}}
           if push_url else {})
    }

    # Dispatch, letting the card decide streaming vs one-shot
    stream = getattr(card.capabilities, "streaming", False)
    return await _send_payload(client, card, session, payload, stream)


## 4. Defining the Agent Task Flow: Planning, Execution, and Composition
This section contains the core logic for how the agents work together to answer a question. It's broken down into three distinct phases:

* **Planning:** The orchestrator agent figures out the steps needed to answer the question and which skill agents to use.

* **Execution:** The notebook code calls the necessary skill agents based on the plan and collects their results.

* **Composition:** A final agent combines the results from the skill agents into a human-friendly answer.

In [9]:
async def _planning_phase(agent_manager, question, push, host, port):
    """Ask orchestrator for a plan, parse/fix JSON if necessary."""
    _, card, client, sess = agent_manager.orchestrator

    raw = await _send_task_to_agent(agent_manager, client, card, sess, question, push=push, host=host, port=port)
    print(f"Raw plan ➡️ {raw}")

    try:
        return json.loads(raw[: raw.rfind("]") + 1])
    except ValueError:
        print("\033[31mPlan parse failed, fixing invalid JSON...\033[0m")
        fixer = "Fix this json to be valid: " + raw
        fixed = await _send_task_to_agent(agent_manager, client, card, sess, fixer, push=push, host=host, port=port)
        return json.loads(fixed)


async def _execution_phase(agent_manager, plan, push, host, port):
    """Run each step in the plan via its skill and collect outputs."""
    results = []
    for i, step in enumerate(plan, 1):
        sid, inp = step["skill_id"], json.dumps(step.get("input", {}))
        print(f"➡️ Step {i}: {sid}({inp})")

        info = agent_manager.skills.get(sid)
        if not info:
            print(f"\033[31mNo executor for '{sid}', skipping.\033[0m")
            results.append({"skill_id": sid, "output": None})
            continue

        _, skill_card, skill_client, skill_sess = info
        out = await _send_task_to_agent(agent_manager, skill_client, skill_card, skill_sess, f"{sid}({inp})", push=push, host=host, port=port)
        print(f"   ✅ → {out}")
        results.append({"skill_id": sid, "output": out})

    return results

def _compose_prompt(parts, question):
    """Create the final composition prompt for the orchestrator."""
    return (
        f"Using the following information: {json.dumps(parts)}, "
        f"write a clear and human-friendly response to the question: '{question}'. "
        "Keep it concise and easy to understand and respond like a human with character. "
        "Only use the information provided. If you cannot answer the question, say 'I don't know'. "
        "Never show any code or JSON, just the answer.\n\n"
    )


## 5. Orchestrating the Agent Interaction
This is the main function that ties together the planning, execution, and composition phases to answer a user's question using the agent team.

In [10]:
async def ask_question(
    agent_manager: AgentManager,
    question: str,
    push: bool = False,
    push_receiver: str = "http://localhost:5000",
) -> str:
    # Unpack orchestrator info
    orch_url, orch_card, orch_client, orch_session = agent_manager.orchestrator

    # Optionally start push listener
    host = port = None
    if push:
        parsed = urllib.parse.urlparse(push_receiver)
        host, port = parsed.hostname, parsed.port
        auth = PushNotificationReceiverAuth()
        await auth.load_jwks(f"{orch_url}/.well-known/jwks.json")
        PushNotificationListener(
            host=host,
            port=port,
            notification_receiver_auth=auth
        ).start()

    # --- Planning Phase ---
    print("\n\033[1;33m=========== 🧠 Planning Phase ===========\033[0m")
    plan = await _planning_phase(agent_manager, question, push=push, host=host, port=port)
    print(f"\n\033[1;32mFinal plan ➡️ {plan}\033[0m")

    # --- Execution Phase ---
    print("\n\033[1;33m=========== ⚡️ Execution Phase ===========\033[0m")
    parts = await _execution_phase(agent_manager, plan, push=push, host=host, port=port)

    # --- Composing Answer ---
    print("\n\033[1;33m=========== 🛠️ Composing Answer ===========\033[0m")
    comp_prompt = _compose_prompt(parts, question)
    WRITING_AGENT_ID   = "writing_agent"

    _, skill_card, skill_client, skill_sess = agent_manager.skills.get(WRITING_AGENT_ID)
    final = await _send_task_to_agent(agent_manager, skill_client, skill_card, skill_sess, comp_prompt, push=push, host=host, port=port)

    print("\n\033[1;36m🎉 FINAL ANSWER\033[0m")
    print(final)
    print("\033[1;36m====================================\033[0m")
    return final


## 6. Launching the A2A Agent Servers
Before we can interact with the agents, we need to start their servers. This bootstrap script handles bringing the complete A2A stack online.

It performs three key actions in sequence:

1. Defines connection details (ports and modules).

2. Starts each agent in parallel and waits for them to be ready.

3. Connects to the running agents and summarizes their status.

Below, we set up the network addresses (`URLS`) for our orchestrator and skill agents, and specify the Python modules that implement their functionality. These definitions are crucial for starting and connecting to the agents in the next steps.

In [11]:
ORCHESTRATOR_URL = "http://localhost:10010"
EXECUTOR_URLS    = ["http://localhost:10011", "http://localhost:10012"]
URLS             = [ORCHESTRATOR_URL, *EXECUTOR_URLS]
MODULES          = [
    "agents.a2a_planner",
    "agents.a2a_custom_tools",
    "agents.a2a_composer",
]

This launches the agent processes and wait until each server is ready and listening on its assigned port.

In [12]:
os.chdir('..') # change to the directory where the script is located

def _launch(mod, url):
    port = int(url.split(":")[-1])
    subprocess.Popen([sys.executable, "-m", mod, "--port", str(port)],
                     stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    while socket.socket().connect_ex(("127.0.0.1", port)): time.sleep(.1)
    return f"✅ {mod} ready on port {port}"

with cf.ThreadPoolExecutor() as pool:
    print(*pool.map(_launch, MODULES, URLS), sep="\n")

✅ agents.a2a_planner ready on port 10010
✅ agents.a2a_custom_tools ready on port 10011
✅ agents.a2a_composer ready on port 10012


Now that the agents should be running, we'll use our `AgentManager` to connect to them, confirm they are online, and see what skills they offer.

In [13]:
_agent_manager = AgentManager(URLS)
orch_url, orch_card, *_ = _agent_manager.orchestrator

print("\n\033[1;36m===================== 🛰️ Connected Agents =====================\033[0m")
print(f"Orchestrator: {orch_url} ({orch_card.name})")
print("Executors:")
for sid, (u, card, *_) in _agent_manager.skills.items():
    print(f"  • {sid} -> {u} ({card.name})")
print("\033[1;36m===============================================================\033[0m")

2025-05-20 16:02:19,356 INFO httpx: HTTP Request: GET http://localhost:10010/.well-known/agent.json "HTTP/1.1 200 OK"
2025-05-20 16:02:19,366 INFO httpx: HTTP Request: GET http://localhost:10011/.well-known/agent.json "HTTP/1.1 200 OK"
2025-05-20 16:02:19,376 INFO httpx: HTTP Request: GET http://localhost:10012/.well-known/agent.json "HTTP/1.1 200 OK"



Orchestrator: http://localhost:10010 (Orchestration Agent)
Executors:
  • random_number_tool -> http://localhost:10011 (Custom Agent)
  • date_tool -> http://localhost:10011 (Custom Agent)
  • writing_agent -> http://localhost:10012 (Writing Agent)


## 7. Asking the Agent Team a Question!

Finally, it's time to put our agent team to work! We'll use the `ask_question` function we defined earlier to send our queries and see the multi-agent system in action.

In [14]:
questions = [ 
    "Get todays date then generate five random numbers",
    "Get todays date?",
    "generate a random number",
    ]

for question in questions:
    await ask_question(_agent_manager, question)




2025-05-20 16:02:31,947 INFO httpx: HTTP Request: POST http://0.0.0.0:10010/ "HTTP/1.1 200 OK"


Raw plan ➡️ [
  {"skill_id": "date_tool"},
  {"skill_id": "random_number_tool"},
  {"skill_id": "random_number_tool"},
  {"skill_id": "random_number_tool"},
  {"skill_id": "random_number_tool"},
  {"skill_id": "random_number_tool"}
]

[1;32mFinal plan ➡️ [{'skill_id': 'date_tool'}, {'skill_id': 'random_number_tool'}, {'skill_id': 'random_number_tool'}, {'skill_id': 'random_number_tool'}, {'skill_id': 'random_number_tool'}, {'skill_id': 'random_number_tool'}][0m

➡️ Step 1: date_tool({})


2025-05-20 16:02:37,550 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "date_tool", "parameters": {}}Tool:date_tool Args:{}Tool:date_tool Response:"2025-05-20"The date today is 2023-05-20.
➡️ Step 2: random_number_tool({})


2025-05-20 16:02:41,982 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "random_number_tool", "parameters": {}}Tool:random_number_tool Args:{}Tool:random_number_tool Response:1The random number generated is 1.
➡️ Step 3: random_number_tool({})


2025-05-20 16:02:46,690 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "random_number_tool", "parameters": {}}Tool:random_number_tool Args:{}Tool:random_number_tool Response:86The random number generated is 86.
➡️ Step 4: random_number_tool({})


2025-05-20 16:02:51,154 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "random_number_tool", "parameters": {}}Tool:random_number_tool Args:{}Tool:random_number_tool Response:20The random number generated is 20.
➡️ Step 5: random_number_tool({})


2025-05-20 16:02:55,637 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "random_number_tool", "parameters": {}}Tool:random_number_tool Args:{}Tool:random_number_tool Response:5The random number generated is 5.
➡️ Step 6: random_number_tool({})


2025-05-20 16:03:00,145 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "random_number_tool", "parameters": {}}Tool:random_number_tool Args:{}Tool:random_number_tool Response:44The random number generated is 44.



2025-05-20 16:03:05,302 INFO httpx: HTTP Request: POST http://0.0.0.0:10012/ "HTTP/1.1 200 OK"



[1;36m🎉 FINAL ANSWER[0m
Here's your response:

"Today's date is May 20th, 2023. Here are five random numbers: 1, 86, 20, 5, and 44."



2025-05-20 16:03:11,257 INFO httpx: HTTP Request: POST http://0.0.0.0:10010/ "HTTP/1.1 200 OK"


Raw plan ➡️ [
  {"skill_id": "date_tool"}
]

[1;32mFinal plan ➡️ [{'skill_id': 'date_tool'}][0m

➡️ Step 1: date_tool({})


2025-05-20 16:03:28,192 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "date_tool", "parameters": {}}Tool:date_tool Args:{}Tool:date_tool Response:"2025-05-20"The current date is May 20, 2025.



2025-05-20 16:03:30,939 INFO httpx: HTTP Request: POST http://0.0.0.0:10012/ "HTTP/1.1 200 OK"



[1;36m🎉 FINAL ANSWER[0m
"Today's date is May 20, 2025."



2025-05-20 16:03:37,648 INFO httpx: HTTP Request: POST http://0.0.0.0:10010/ "HTTP/1.1 200 OK"


Raw plan ➡️ [
  {"skill_id": "random_number_tool"}
]

[1;32mFinal plan ➡️ [{'skill_id': 'random_number_tool'}][0m

➡️ Step 1: random_number_tool({})


2025-05-20 16:03:54,412 INFO httpx: HTTP Request: POST http://0.0.0.0:10011/ "HTTP/1.1 200 OK"


   ✅ → {"type": "function", "name": "random_number_tool", "parameters": {}}Tool:random_number_tool Args:{}Tool:random_number_tool Response:21The random number generated is 21.



2025-05-20 16:03:57,080 INFO httpx: HTTP Request: POST http://0.0.0.0:10012/ "HTTP/1.1 200 OK"



[1;36m🎉 FINAL ANSWER[0m
"Here's a random number: 21."


## Key Takeaways
Congratulations! You've successfully set up and interacted with a multi-agent system using the A2A protocol. You saw how an orchestrator agent planned the task, how skill agents executed the steps, and how a composition agent put it all together for you.

Future demos will cover more advanced aspects of agent-to-agent communication.