In [None]:
import os
from pathlib import Path
from typing import Any
from pydantic import Field, BaseModel
import httpx

from grasp_agents import (
    BaseTool,
    LLMAgent,
    RunContext,
    Packet,
    ImageData,
    Messages,
)
from grasp_agents.typing.events import (
    CompletionChunkEvent,
    CompletionEvent,
    ProcPacketOutputEvent,
)
from grasp_agents.openai import OpenAILLM, OpenAILLMSettings
from grasp_agents.litellm import LiteLLM, LiteLLMSettings
from grasp_agents.grasp_logging import setup_logging
from grasp_agents.utils import get_timestamp
from grasp_agents.workflow.sequential_workflow import SequentialWorkflow
from grasp_agents.cloud_llm import APIProvider
from grasp_agents.rate_limiting import RateLimiterC
from grasp_agents.printer import print_event_stream

  from tqdm.autonotebook import tqdm


Set up logging to write to the console and/or file

In [None]:
PACKAGE_DIR = Path.cwd()
LOGGING_DIR = Path.cwd() / "data/multiagent/logs"

In [None]:
LOGGING_CFG_PATH = PACKAGE_DIR / "configs/logging/default.yaml"
setup_logging(
    LOGGING_DIR / f"grasp_agents_demo_{get_timestamp()}.log", LOGGING_CFG_PATH
)

Paths to images used in the demo

In [None]:
IMG_1_URL = "https://www.simplilearn.com/ice9/free_resources_article_thumb/Expressions_In_C_2.PNG"
IMG_2_PATH = PACKAGE_DIR / "src/grasp_agents/examples/data/expr.jpeg"

Utils

In [None]:
def print_single_output(out: Any) -> None:
    print(f"\n<final answer>\n{out.payloads[0]}\n</final answer>")

## Simple generation with validated outputs

Output type validation

In [None]:
# list[int] is the output type used to validate the output
chatbot = LLMAgent[None, list[int], None](
    name="chatbot",
    llm=LiteLLM(model_name="gpt-4.1", llm_settings=LiteLLMSettings(logprobs=True)),
)

# This initialises printer and usage tracker
ctx = RunContext[None](log_messages=True)

In [None]:
# Code block delimiters are stripped from the output
out = await chatbot.run(
    "Output a list of 3 integers from 0 to 10 as a python array, no talking",
    ctx=ctx,
)
print_single_output(out)

In [None]:
ctx.usage_tracker.usages

{'chatbot': Usage(input_tokens=30, output_tokens=9, reasoning_tokens=0, cached_tokens=0, cost=0.000132)}

Completion data (e.g. log probs) per agent can be accessed via RunContext:

In [None]:
# ctx.completions

Streaming with reasoning

In [None]:
chatbot = LLMAgent[None, list[int], None](
    name="chatbot",
    llm=LiteLLM(
        model_name="claude-sonnet-4-20250514",
        llm_settings=LiteLLMSettings(
            reasoning_effort=None, stream_options={"include_usage": True}
        ),
    ),
)
ctx = RunContext[None](log_messages=False)

In [None]:
async for event in print_event_stream(
    chatbot.run_stream(
        "Output a list of 30 integers from 0 to 10 as a python array. "
        "No code or talking.",
        ctx=ctx,
    )
):
    if isinstance(event, ProcPacketOutputEvent):
        out = event.data

In [None]:
ctx.usage_tracker.usages

{'chatbot': Usage(input_tokens=33, output_tokens=93, reasoning_tokens=0, cached_tokens=0, cost=0.0014939999999999999)}

Output type validation with structured outputs

In [None]:
# Some providers (e.g. `openai` and `gemini`) support structured outputs.
# With the OpenAI API, this will require a Pydantic model to validate the output.

from enum import StrEnum


class Selector(StrEnum):
    A = "A"
    B = "B"


class Response(BaseModel):
    result: list[int] = Field(..., description="3 random integers")
    value: Selector = Field(..., description="Choose a value randomly")


chatbot = LLMAgent[None, Response, None](
    name="chatbot",
    response_schema=Response,
    llm=LiteLLM(
        model_name="gpt-4.1",
        llm_settings=LiteLLMSettings(),
        apply_response_schema_via_provider=True,
    ),
)

# By default, response_schema is set to the output type of the agent (Response)
# In some cases, you may want to set it to a different type, e.g. when using
# custom output parsing.

ctx = RunContext[None](log_messages=True)

In [None]:
out = await chatbot.run("start", ctx=ctx)
print_single_output(out)

# Chat with images

In [None]:
chatbot = LLMAgent[None, str, None](name="chatbot", llm=LiteLLM(model_name="gpt-4o"))
ctx = RunContext[None](log_messages=True)

In [None]:
out = await chatbot.run("Where are you headed, stranger?", ctx=ctx)
print_single_output(out)

In [None]:
out = await chatbot.run("What did you just say, exactly?", ctx=ctx)
print_single_output(out)

In [None]:
out = await chatbot.run(
    ["What's in this image?", ImageData.from_path(IMG_2_PATH)], ctx=ctx
)
print_single_output(out)

In [None]:
out = await chatbot.run("Go on", ctx=ctx)
print_single_output(out)

In [None]:
out = await chatbot.run(["Try another one", ImageData.from_url(IMG_1_URL)], ctx=ctx)
print_single_output(out)

In [None]:
out = await chatbot.run("What was my first question, exactly?", ctx=ctx)
print_single_output(out)

In [None]:
ctx.usage_tracker.total_usage

Usage(input_tokens=3108, output_tokens=445, reasoning_tokens=0, cached_tokens=0, cost=0.012220000000000002)

# Parallel runs with retries and rate limiting

In [None]:
# Make the LLM generate text instead of integers occasionally
# to emphasise the need for retries

sys_prompt = """
You are a bad math student who always adds number {added_num} to the correct result of the operation. 
Output a single integer or its name, e.g. 'three' or '3'.
"""

in_prompt = "What is the square of {num}?"


class RunArgs(BaseModel):
    added_num: int


class InputArgs(BaseModel):
    num: int


# Specifying int as the output type means that the agent will
# validate the output against this type.

student = LLMAgent[InputArgs, int, RunArgs](
    name="student",
    llm=LiteLLM(
        model_name="gpt-4.1",
        llm_settings=LiteLLMSettings(temperature=1.3),
        # This rate limit will be applied to parallel runs of the agent
        rate_limiter=RateLimiterC(rpm=100),
    ),
    sys_prompt=sys_prompt,
    in_prompt=in_prompt,
    max_retries=4,
)


@student.add_system_prompt_builder
def system_prompt_builder(ctx: RunContext[RunArgs]) -> str:
    return sys_prompt.format(added_num=ctx.state.added_num)


[LiteLLM] Set rate limit to 100 RPM


In [None]:
in_args = [InputArgs(num=i) for i in range(10)]

Without streaming

In [None]:
ctx = RunContext[RunArgs](state=RunArgs(added_num=5), log_messages=True)

out = await student.run(in_args=in_args, ctx=ctx)

print()
print(*[p for p in out.payloads], sep="\n")

With streaming

In [None]:
ctx = RunContext[RunArgs](state=RunArgs(added_num=5), log_messages=False)

async for event in print_event_stream(student.run_stream(in_args=in_args, ctx=ctx)):
    if isinstance(event, ProcPacketOutputEvent):
        out = event.data

# ReAct agent loop with streaming

In [None]:
sys_prompt_react = """
Your task is to suggest an exciting stats problem to the student. 
You should first ask the student about their education, interests, and preferences, then suggest a problem tailored specifically to them. 

# Instructions
* Use the provided tool to ask questions.
* Ask questions one by one.
* Provide your thinking before asking a question and after receiving a reply.
* Do not include your exact question as part of your thinking.
* The problem must have all the necessary data.
* Use the final answer tool to provide the problem.
"""

In [None]:
# Tool input must be a Pydantic model to infer the JSON schema used by the LLM APIs
class TeacherQuestion(BaseModel):
    question: str


StudentReply = str


ask_student_tool_description = """
"Ask the student a question and get their reply."

Args:
    question: str
        The question to ask the student.
Returns:
    reply: str
        The student's reply to the question.
"""


class AskStudentTool(BaseTool[TeacherQuestion, StudentReply, Any]):
    name: str = "ask_student"
    description: str = ask_student_tool_description

    async def run(self, inp: TeacherQuestion, **kwargs: Any) -> StudentReply:
        return input(inp.question)

In [None]:
class Problem(BaseModel):
    problem: str


teacher = LLMAgent[None, Problem, None](
    name="teacher",
    llm=LiteLLM(
        model_name="gpt-4.1",
        # model_name="claude-sonnet-4-20250514",
        # llm_settings=LiteLLMSettings(reasoning_effort="low"),
    ),
    tools=[AskStudentTool()],
    react_mode=True,
    final_answer_as_tool_call=True,
    sys_prompt=sys_prompt_react,
)

In [None]:
ctx = RunContext[None](log_messages=False)

events = []
problem: Problem
async for event in print_event_stream(teacher.run_stream("start", ctx=ctx)):
    if isinstance(event, ProcPacketOutputEvent):
        problem = event.data.payloads[0]
    events.append(event)

In [None]:
problem

# Sequential workflow 

In [None]:
# Input arguments are passed to the agent dynamically (e.g. by other agents)
from grasp_agents.typing.content import Content


# Global state is used to store data that is shared between runs of the agent.
class State(BaseModel):
    b: int
    c: int


class AddInputArgs(BaseModel):
    a: int = Field(..., description="First number to add.")


class AddResponse(BaseModel):
    a_plus_b: int


add_in_prompt = "Add {a} and {b}. Your only output is the resulting number."


add_agent = LLMAgent[AddInputArgs, AddResponse, State](
    name="add_agent",
    llm=LiteLLM(model_name="gpt-4.1"),
    in_prompt=add_in_prompt,
    # Reset message history to system prompt (if provided) before each run
    reset_memory_on_run=True,
)


@add_agent.add_input_content_builder
def build_add_input_content(in_args: AddInputArgs, ctx: RunContext[State]) -> Content:
    return Content.from_formatted_prompt(
        add_agent.in_prompt, a=in_args.a, b=ctx.state.b
    )


@add_agent.add_output_parser
def parse_add_output(conversation: Messages, **kwargs: Any) -> AddResponse:
    return AddResponse(a_plus_b=int(str(conversation[-1].content)))

In [None]:
class MultiplyResponse(BaseModel):
    c_a_plus_b: int


multiply_in_prompt = (
    "Multiply {a_plus_b} by {c}. Your only output is the resulting number."
)

multiply_agent = LLMAgent[AddResponse, MultiplyResponse, State](
    name="multiply_agent",
    llm=LiteLLM(model_name="gpt-4.1"),
    in_prompt=multiply_in_prompt,
    reset_memory_on_run=True,
)


# Need a custom input content maker to use the global state
@multiply_agent.add_input_content_builder
def build_multiply_input_content(
    in_args: AddResponse, ctx: RunContext[State]
) -> Content:
    return Content.from_formatted_prompt(
        multiply_agent.in_prompt, a_plus_b=in_args.a_plus_b, c=ctx.state.c
    )


@multiply_agent.add_output_parser
def parse_multiply_output(conversation: Messages, **kwargs: Any) -> MultiplyResponse:
    return MultiplyResponse(c_a_plus_b=int(str(conversation[-1].content)))

In [None]:
seq_agent = SequentialWorkflow[AddInputArgs, MultiplyResponse, State](
    name="seq_agent", subprocs=[add_agent, multiply_agent]
)

In [None]:
state = State(b=3, c=6)
ctx = RunContext[State](state=state, log_messages=True)

In [None]:
# Can pass batched arguments
out = await seq_agent.run(in_args=AddInputArgs(a=2), ctx=ctx)
# out = await seq_agent.run(in_args=[AddInputArgs(a=2), AddInputArgs(a=3)], ctx=ctx)

print_single_output(out)

In [None]:
# ctx = RunContext[State](state=state, log_messages=False)
# events = []
# async for event in print_event_stream(
#     seq_agent.run_stream(in_args=AddInputArgs(a=2), ctx=ctx)
# ):
#     events.append(event)

# Agents as tools

When agents are used as tools, their `in_args` become the tool inputs.

This is how one can implement a manager + helpers architecture.

In [None]:
seq_tool = seq_agent.as_tool(
    tool_name="seq_agent_tool",
    tool_description=(
        "A sequential agent that adds 3 to a given integer, "
        "then multiplies the result by 5."
    ),
)

The JSON schema of `in_args` is preserved:

In [None]:
seq_tool.in_type.model_json_schema()

In [None]:
await seq_tool(a=15, ctx=ctx)

# Teacher / students

A more advanced example of multi-agent debate, where agents communicate using the actor model.

Communication schemas

In [None]:
from grasp_agents.runner import Runner

In [None]:
from collections.abc import Sequence
from typing import Literal


TeacherRecipient = Literal["*END*", "teacher", "student1", "student2"]


# Teacher can choose which students to send the message to
class TeacherExplanation(BaseModel):
    explanation: str
    selected_recipients: Sequence[TeacherRecipient] = Field(
        default_factory=list,
        description="Recipients selected by the teacher.",
    )


# Students can only ask questions to the teacher
class StudentQuestion(BaseModel):
    question: str = Field(
        ...,
        description="The question to ask the teacher.",
    )

#### Teacher

In [None]:
teacher_sys_prompt = """
You are a teacher explaining quantum gravity to a 2-year old baby (named 'student1') and a 30-year old graphic designer (named student2). 
Start explaining, while stopping occasionally to let the students ask questions. 
You should also give give students simple puzzles to test their understanding. 
Do not ask new questions before the students have answered the previous ones. 
To indicate to whom you are addressing your message, you must specify the recipients as a list of selected student names. 
When students have no more questions, finish the conversation with a SINGLE message with a SINGLE recipient called *END*. 
Do not produce multiple "thanks" or "goodbye" messages, just a single one.
"""

teacher = LLMAgent[StudentQuestion, TeacherExplanation, None](
    name="teacher",
    llm=LiteLLM(model_name="gpt-4o", apply_response_schema_via_provider=True),
    sys_prompt=teacher_sys_prompt,
    # need to specify allowed recipients to choose from
    recipients=["*END*", "student1", "student2"],
)


@teacher.add_recipient_selector
def select_recipients(
    output: TeacherExplanation, **kwargs: Any
) -> Sequence[TeacherRecipient] | None:
    return output.selected_recipients

#### Students

In [None]:
student_sys_prompts = [
    """
You are a 4-year old child trying to make sense of physics. 
Your name is <student1>.
Talk to the teacher to understand the topic.
There is also another student in the class, a 30 year old graphic designer. 
You talk to the teacher only.
""",
    """
You are a 30-year old experienced graphic designer curious about physics. 
Your name is <student2>.
Ask questions to the teacher until you understand the topic. 
Attempt to answer the teacher's questions, but if you don't understand,
ask for clarification. 
There is also another student in the class, a 4-year old child.
You talk to the teacher only.
""",
]


def make_student_agent(name: str, sys_prompt: str):
    student = LLMAgent[TeacherExplanation, StudentQuestion, None](
        name=name,
        llm=LiteLLM(model_name="gpt-4o", apply_response_schema_via_provider=True),
        sys_prompt=sys_prompt,
        recipients=["teacher"],
    )

    @student.add_output_parser
    def parse_student_output(conversation: Messages, **kwargs: Any) -> StudentQuestion:
        return StudentQuestion(question=f"<{name}>: " + str(conversation[-1].content))

    return student


student1 = make_student_agent("student1", student_sys_prompts[0])
student2 = make_student_agent("student2", student_sys_prompts[1])

In [None]:
ctx = RunContext[None](log_messages=True, color_messages_by="agent")
runner = Runner(entry_proc=teacher, procs=[teacher, student1, student2], ctx=ctx)
final_result = await runner.run(chat_input="start")

Streaming

In [None]:
# runner = Runner(
#     entry_proc=teacher, procs=[teacher, student1, student2], ctx=RunContext[None]()
# )
# events = []
# async for event in print_event_stream(
#     runner.run_stream(chat_input="start"), color_by="agent"
# ):
#     events.append(event)

# Custom API providers and HTTP clients

In [None]:
custom_provider = APIProvider(
    name="openrouter",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

http_client = httpx.AsyncClient(
    timeout=httpx.Timeout(10),
    limits=httpx.Limits(max_connections=10),
)

chatbot = LLMAgent[None, list[int], None](
    name="chatbot",
    llm=OpenAILLM(
        model_name="deepseek/deepseek-r1-0528",
        api_provider=custom_provider,
        async_http_client=http_client,
    ),
)


ctx = RunContext[None](log_messages=True)
out = await chatbot.run(
    "Output a list of 3 integers from 0 to 10 as a python array, no talking",
    ctx=ctx,
)
print_single_output(out)