In [1]:
%load_ext autoreload
%autoreload 2
import nest_asyncio

nest_asyncio.apply()

In [2]:
from dotenv import load_dotenv

load_dotenv()

True

In [3]:
from pocketflow_a2a.a2a_common.client import A2AClient
from pocketflow_a2a.a2a_common.types import (
    AgentCard,
    A2AClientJSONError,
    A2AClientHTTPError,
)


client = A2AClient(url="http://localhost:10003")


agent_card = await client.get_agent_card()

In [4]:
print(agent_card.name)
for skill in agent_card.skills:
    print(skill.name)
    print(skill.description)
    print(skill.inputModes)
    print(skill.outputModes)
    print(skill.examples)
    print(skill.tags)
    print(skill.id)
    print(skill.name)

PocketFlow Research Agent (A2A Wrapped)
Web Research and Answering
Answers questions using web search results when necessary.
['text', 'text/plain']
['text', 'text/plain']
['Who won the Nobel Prize in Physics 2024?', 'What is quantum computing?', 'Summarize the latest news about AI.']
['research', 'qa', 'web search']
web_research_qa
Web Research and Answering


# Get available agents

In [5]:
from pocketflow import AsyncFlow, AsyncNode
from typing import TypedDict
import asyncio
from a2a_client.prompt_templates import available_agents_template
from a2a_client.types import Shared, SelectedAgent


class GetAvailableAgentsNode(AsyncNode):
    async def prep_async(self, shared):
        a2a_clients = shared["a2a_clients"]
        return a2a_clients

    async def exec_async(self, a2a_clients: list[A2AClient]):
        return await asyncio.gather(
            *[a2a_client.get_agent_card() for a2a_client in a2a_clients]
        )

    async def exec_fallback_async(self, prep_res, exc):
        raise exc

    async def post_async(
        self,
        shared,
        prep_res,
        exec_res: list[AgentCard | A2AClientJSONError | A2AClientHTTPError],
    ) -> str:
        agents_list = [res for res in exec_res if isinstance(res, AgentCard)]
        if not agents_list:
            shared["agents_list"] = []
            shared["available_agents_prompt"] = None
        else:
            shared["agents_list"] = agents_list
            shared["available_agents_prompt"] = available_agents_template.render(
                agents=[agent.model_dump() for agent in agents_list]
            )
        return "agent_selector"


get_available_agents_node = GetAvailableAgentsNode()

flow = AsyncFlow(start=get_available_agents_node)
shared = {"a2a_clients": [A2AClient(url="http://localhost:10003")]}
await flow.run_async(shared)
print(shared["available_agents_prompt"])

These are agents who can you ask for consult:

Available agents:

Agent number 1:
Name: PocketFlow Research Agent (A2A Wrapped)
Description: A simple research agent based on PocketFlow, made accessible via A2A.
Skills: 
    - name: Web Research and Answering
        - description: Answers questions using web search results when necessary.
        - example: ['Who won the Nobel Prize in Physics 2024?', 'What is quantum computing?', 'Summarize the latest news about AI.']

--------------------------------


You may select one of them to help answer by return the array of selected agent number or if you find no one could help you, you may return empty array.
Your answer will be passed directly to JSON so You must return your selected agent(s) with the following JSON fiedls
[
    {
        "selected_agent: <int, the number of agent that you choose to answer this question>,
        "question_to_answer": <str, the question to ask the agent>
    },
    {/*...other agent you can select as much 

In [None]:
from a2a_client.utils import call_llm
from a2a_client.prompt_templates import agent_selector_template
from typing import cast

import json
import re


def parse_json_string(s: str) -> dict:
    """
    Parse a string containing JSON (optionally wrapped in markdown code block) into a Python dictionary.
    """
    # Remove leading/trailing whitespace
    s = s.strip()
    # Remove markdown code block if present
    # Handles ```json ... ``` or ``` ... ```
    code_block_pattern = r"^```(?:json)?\s*([\s\S]*?)\s*```$"
    match = re.match(code_block_pattern, s, re.IGNORECASE)
    if match:
        s = match.group(1).strip()
    # Now parse as JSON
    return json.loads(s)


class AgentSelectorNode(AsyncNode):
    async def prep_async(self, shared):
        return (shared["question"], shared["available_agents_prompt"])

    async def exec_async(self, inputs):
        question, available_agents_prompt = inputs
        prompt = agent_selector_template.render(
            question=question, available_agents_prompt=available_agents_prompt
        )
        return call_llm(prompt)

    async def post_async(self, shared, prep_res, exec_res):
        shared["selected_agents"] = cast(
            list[SelectedAgent], parse_json_string(exec_res)
        )
        if shared["selected_agents"]:
            return "execute_agent"
        else:
            return "answer_with_no_context"


get_available_agents_node = GetAvailableAgentsNode()
agent_selector_node = AgentSelectorNode()

get_available_agents_node - "agent_selector" >> agent_selector_node

flow = AsyncFlow(start=get_available_agents_node)

shared = {
    "question": "Who won the Nobel Prize in Physics 2024?",
    "a2a_clients": [A2AClient(url="http://localhost:10003")],
}
await flow.run_async(shared)

print(shared["selected_agents"])


In [None]:
from uuid import uuid4
from pocketflow_a2a.a2a_common.types import SendTaskResponse
from a2a_client.prompt_templates import action_template, agent_context_template
from a2a_client.types import Payload, AgentContext



def construct_payload(task_id: str, session_id: str, question: str):
    payload = {
        "id": task_id,
        "sessionId": session_id,
        "message": {
            "role": "user",
            "parts": [
                {
                    "type": "text",  # Explicitly match TextPart structure
                    "text": question,
                }
            ],
        },
        "acceptedOutputModes": ["text", "text/plain"],  # What the client wants back
    }
    return payload


class ExecuteAgentNode(AsyncNode):
    async def prep_async(self, shared: Shared) -> list[tuple[Payload, AgentCard]]:
        selected_agents = shared["selected_agents"]
        agents_list = shared["agents_list"]
        session_id = uuid4().hex
        payloads_and_agents = [
            (
                construct_payload(
                    uuid4().hex, session_id, selected_agent["question_to_answer"]
                ),
                agents_list[selected_agent["selected_agent"] - 1],
            )
            for selected_agent in selected_agents
        ]
        shared["payloads_and_agents"] = payloads_and_agents
        return payloads_and_agents

    async def exec_async(
        self, payloads_and_agents: list[tuple[Payload, AgentCard]]
    ) -> list[tuple[tuple[Payload, AgentCard], SendTaskResponse]]:
        responses = await asyncio.gather(
            *[
                A2AClient(agent_card=agent_card).send_task(payload)
                for payload, agent_card in payloads_and_agents
            ]
        )
        return list(zip(payloads_and_agents, responses))

    async def exec_fallback_async(self, prep_res, exc):
        raise exc

    async def post_async(
        self,
        shared,
        prep_res: list[tuple[Payload, AgentCard]],
        exec_res: list[tuple[tuple[Payload, AgentCard], SendTaskResponse]],
    ):
        agent_contexts = []
        for (payload, agent_card), response in exec_res:
            if response.error:
                continue
            else:
                agent_contexts.append(AgentContext(
                    agent_name=agent_card.name,
                    agent_skills=[skill.model_dump() for skill in agent_card.skills],
                    question=payload["message"]["parts"][0]["text"],
                    answer=response.result.artifacts[-1].parts[0].text
                ))
                shared["artifacts"] = response.result.artifacts
        shared["agent_contexts"] = agent_contexts
        if agent_contexts:
            shared["agent_contexts_prompt"] = agent_context_template.render(
                agent_contexts=agent_contexts
            )
            return "action"
        else:
            shared["agent_contexts_prompt"] = None
            return "answer_with_no_context"
        


execute_agent_node = ExecuteAgentNode()

await execute_agent_node.run_async(shared)

'action'

In [None]:
from a2a_client.prompt_templates import answer_question_with_no_context_template


class AnswerWithNoContextNode(AsyncNode):
    async def prep_async(self, shared: Shared) -> str:
        prompt = answer_question_with_no_context_template.render(
            question=shared["question"],
        )
        return prompt

    async def exec_async(self, prompt: str) -> str:
        return call_llm(prompt)

    async def post_async(self, shared: Shared, prep_res: str, exec_res: str) -> str:
        shared["answer"] = exec_res
        return "done"


answer_with_no_context_node = AnswerWithNoContextNode()

answer_with_no_context_node_res = await answer_with_no_context_node.run_async(shared)
print(shared["answer"])


The Nobel Prize in Physics for 2024 has not yet been awarded.  The announcement is typically made in October.



In [17]:
from a2a_client.prompt_templates import action_template

import re
import json


class ActionNode(AsyncNode):
    @staticmethod
    def extract_metadata_and_answer(text):
        # Extract JSON metadata block
        metadata_match = re.search(
            r'# metadata\s*```json\s*({.*?})\s*```', text, re.DOTALL)
        metadata = None
        if metadata_match:
            metadata_str = metadata_match.group(1)
            metadata = json.loads(metadata_str)
        
        # Extract answer block
        answer_match = re.search(
            r'# answer\s*(.*?)={5,}', text, re.DOTALL)
        answer = None
        if answer_match:
            answer = answer_match.group(1).strip()
        
        return metadata, answer

    async def prep_async(self, shared: Shared) -> str:
        action_prompt = action_template.render(
            question=shared["question"],
            context=shared["agent_contexts_prompt"]
        )
        return action_prompt

    async def exec_async(self, action_prompt: str) -> str:
        return call_llm(action_prompt)
    
    async def post_async(self, shared: Shared, prep_res: str, exec_res: str) -> str:
        metadata, answer = self.extract_metadata_and_answer(exec_res)
        shared["answer"] = answer
        shared["answer_metadata"] = metadata
        is_information_enough_to_answer = metadata["is_information_enough_to_answer"]
        if is_information_enough_to_answer:
            return "done"
        else:
            return "select_agent"

action_node = ActionNode()

action_node_res = await action_node.run_async(shared)
print(shared["answer"])


John Hopfield and Geoffrey Hinton won the 2024 Nobel Prize in Physics.


In [21]:
class DoneNode(AsyncNode):
    async def prep_async(self, shared: Shared) -> str:
        return "done"

    async def exec_async(self, shared: Shared) -> str:
        return "done"


# Create flow

In [25]:
get_available_agents_node = GetAvailableAgentsNode()
agent_selector_node = AgentSelectorNode()
execute_agent_node = ExecuteAgentNode()
answer_with_no_context_node = AnswerWithNoContextNode()
action_node = ActionNode()
done_node = DoneNode()


get_available_agents_node - "agent_selector" >> agent_selector_node
get_available_agents_node - "answer_with_no_context" >> answer_with_no_context_node

# Select agent
agent_selector_node - "execute_agent" >> execute_agent_node
agent_selector_node - "answer_with_no_context" >> answer_with_no_context_node

# Execute agent
execute_agent_node - "action" >> action_node
execute_agent_node - "answer_with_no_context" >> answer_with_no_context_node

# Action
action_node - "done" >> done_node
action_node - "select_agent" >> agent_selector_node

flow = AsyncFlow(start=get_available_agents_node)

shared = {
    "question": "Who won the Nobel Prize in Physics 2024?",
    "a2a_clients": [A2AClient(url="http://localhost:10003")],
}
await flow.run_async(shared)

print(shared["answer"])


John J. Hopfield and Geoffrey Hinton won the 2024 Nobel Prize in Physics.


In [34]:
responses = await asyncio.gather(
    *[
        A2AClient(agent_card=agent_card).send_task(payload)
        for payload, agent_card in payloads_and_agents
    ]
)


In [37]:
response = responses[0]

In [26]:
response = await shared["a2a_clients"][0].send_task(payloads_and_agents[0][0])

In [27]:
response

SendTaskResponse(jsonrpc='2.0', id='547353e0b7a84ed3baf955e70ec85f74', result=Task(id='8f9aab968fbe434bb062599a3cbfb761', sessionId='bb5a7d3623354e4ea7029c369b404940', status=TaskStatus(state=<TaskState.COMPLETED: 'completed'>, message=None, timestamp=datetime.datetime(2025, 5, 3, 22, 59, 8, 64534)), artifacts=[Artifact(name=None, description=None, parts=[TextPart(type='text', text='Based solely on the provided research, which names John J. Hopfield and Geoffrey Hinton,  we cannot definitively say who won the Nobel Prize in Physics 2024.  The research only lists two names, and there is no information connecting them to the 2024 Nobel Prize in Physics.  Further information is needed to answer the question.\n', metadata=None)], metadata=None, index=0, append=None, lastChunk=None)], history=[], metadata=None), error=None)

In [18]:
payloads_and_agents[0]

({'id': '6d915e4ace574c96bbcb9e697feed1b4',
  'sessionId': 'b09b544ce90d4259a4cfc2ad9a76319f',
  'message': {'role': 'user',
   'parts': [{'type': 'text',
     'text': 'How is the Thai stock market performing this week?'}]},
  'acceptedOutputModes': ['text', 'text/plain']},
 AgentCard(name='PocketFlow Research Agent (A2A Wrapped)', description='A simple research agent based on PocketFlow, made accessible via A2A.', url='http://localhost:10003/', provider=None, version='0.1.0-a2a', documentationUrl=None, capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False), authentication=None, defaultInputModes=['text', 'text/plain'], defaultOutputModes=['text', 'text/plain'], skills=[AgentSkill(id='web_research_qa', name='Web Research and Answering', description='Answers questions using web search results when necessary.', tags=['research', 'qa', 'web search'], examples=['Who won the Nobel Prize in Physics 2024?', 'What is quantum computing?', 'Summari

In [35]:
shared["agents_list"][shared["selected_agent"][0]["selected_agent"] - 1]

AgentCard(name='PocketFlow Research Agent (A2A Wrapped)', description='A simple research agent based on PocketFlow, made accessible via A2A.', url='http://localhost:10003/', provider=None, version='0.1.0-a2a', documentationUrl=None, capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False), authentication=None, defaultInputModes=['text', 'text/plain'], defaultOutputModes=['text', 'text/plain'], skills=[AgentSkill(id='web_research_qa', name='Web Research and Answering', description='Answers questions using web search results when necessary.', tags=['research', 'qa', 'web search'], examples=['Who won the Nobel Prize in Physics 2024?', 'What is quantum computing?', 'Summarize the latest news about AI.'], inputModes=['text', 'text/plain'], outputModes=['text', 'text/plain'])])

In [None]:
import json

'```json\n[]\n```\n'

In [None]:
print(
    agent_selector_template.render(
        question="How is Thai stock market this week?",
        available_agents_prompt=shared["available_agents_prompt"],
    )
)

You are a user's question screener. Your duty is to select the agents that proper to answer user's question.

The question: How is Thai stock market this week?

These are agents who can you ask for consult:

Available agents:

Agent number 1:
Name: PocketFlow Research Agent (A2A Wrapped)
Description: A simple research agent based on PocketFlow, made accessible via A2A.
--------------------------------


You may select one of them to help answer by return the array of selected agent number or if you find no one could help you, you may return empty array.
Your answer will be passed directly to JSON so You must return your selected agent(s) with the following JSON fiedls
[
    {
        "selected_agent: <int, the number of agent that you choose to answer this question>,
        "question_to_answer": <str, the question to ask the agent>
    },
    {/*...other agent you can select as much as you want*/}
]

Remember: Your answer will be passed directly to JSON parsing function, therefore ret

In [11]:
shared["available_agents_prompt"]

'These are agents who can you ask for consult:\n\nAvailable agents:\n\nAgent number 1:\nName: PocketFlow Research Agent (A2A Wrapped)\nDescription: A simple research agent based on PocketFlow, made accessible via A2A.\n--------------------------------\n\n\nYou may select one of them to help answer by return the array of selected agent number or if you find no one could help you, you may return empty array.\nYour answer will be passed directly to JSON so You must return your selected agent(s) with the following JSON fiedls\n[\n    {\n        "selected_agent: <int, the number of agent that you choose to answer this question>,\n        "question_to_answer": <str, the question to ask the agent>\n    },\n    {/*...other agent you can select as much as you want*/}\n]\n\nRemember: Your answer will be passed directly to JSON parsing function, therefore return only the array of json and not include PREFIX, SUFFIX or Prologue'

In [35]:
from jinja2 import Template

# Main template string
main_template = """
Grocery List:
{% for item in items %}
- {% include 'line_template' %}
{% endfor %}
"""

# Sub-template string
line_template = "{{ item.name }} ({{ item.qty }})"

# Create a Jinja2 environment and manually load sub-templates
from jinja2 import DictLoader, Environment

# Setup with in-memory templates
env = Environment(
    loader=DictLoader({"main_template": main_template, "line_template": line_template})
)

# Load and render
template = env.get_template("main_template")

# Sample data
items = [
    {"name": "Milk", "qty": "2 liters"},
    {"name": "Eggs", "qty": "12 pcs"},
    {"name": "Bread", "qty": "1 loaf"},
]

# Render
output = template.render(items=items)
print(output)



Grocery List:

- Milk (2 liters)

- Eggs (12 pcs)

- Bread (1 loaf)

