# Quick Start

## Table of Contents

- [Agent](#agent)
  - [Creating an Agent](#creating-an-agent)
  - [Running An Agent](#running-an-agent)
  - [Session Memory](#session-memory)
  - [Function Tools](#function-tools)
  - [Agents as Tools](#agents-as-tools)
  - [Context (TaskEnvironment)](#context-taskenvironment)
  - [Structured Output](#structured-output)
  - [Input and Output Guardrails](#input-and-output-guardrails)
- [Multi-Agent System (MAS)](#multi-agent-system-mas)
  - [Pre-built MAS Workflows](#pre-built-mas-workflows)
  - [Orchestrator-Worker MAS](#orchestrator-worker-mas)
  - [Planner-Executor MAS](#planner-executor-mas)
  - [Bring Your Own MAS Workflows](#bring-your-own-mas-workflows)
- [What's Next?](#whats-next)


# API KEYS (IMPORTANT!!!)

To run this notebook, you'll need valid API keys for the LLM providers you want to use. You can provide them in two ways:

1. **Using a `.env` file** (recommended): Create a `.env` file in the same folder as this notebook and add your API keys
2. **Direct specification**: Uncomment and set your API keys in the code block below

**Note:** This notebook demonstrates various LLM providers (OpenAI, Gemini, DeepSeek, Anthropic). If you only have access to certain providers:
- **Skip** examples using unavailable providers, or
- **Replace** model names with ones you have access to (e.g., replace `"openai/gpt-4o-mini"` with `"gemini/gemini-2.5-flash"`)

In [1]:
import os

#os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here"
#os.environ["GEMINI_API_KEY"] = "your_gemini_api_key_here"
#os.environ["DEEPSEEK_API_KEY"] = "your_deepseek_api_key_here"
#os.environ["ANTHROPIC_API_KEY"] = "your_anthropic_api_key_here"
# Additional api keys can be set in a similar manner

In [None]:
# In case src is not in the path
import sys
import os

sys.path.append(os.path.join(os.path.dirname(os.getcwd()), "src"))

## Agent 

Our Agent implementation is inspired by the OpenAI [Agents SDK](https://openai.github.io/openai-agents-python/), but with a cleaner, more readable implementation. If you've used their Agents SDK before, you'll find our implementation familiar! We've kept all the essential features while simplifying the codebase for conducting experiments.

### Key Features
- **Multi-provider support**: Built on [LiteLLM](https://docs.litellm.ai/docs/) as the proxy layer, enabling seamless integration with OpenAI, Gemini, Anthropic, Azure, DeepSeek, and many more providers
- **Flexible endpoints**: Supports both `completion` and `responses` endpoints
- **Customizable**: Easy to extend and adapt to your specific needs

For detailed information about supported providers and configurations, refer to the [LiteLLM official documentation](https://docs.litellm.ai/docs/).

In [3]:
# Import necessary classes

from mav.MAS.agents import (
    Agent,
    Runner,
    RunResult
)

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

True

### Creating an Agent

To create an Agent, you need to provide the following parameters:

**Required Parameters:**
- **`name`**: A unique identifier for your agent
- **`model`**: The LLM model to use (following LiteLLM's format). Examples include:
  - OpenAI: `openai/gpt-5.1`, `openai/gpt-5-mini`, `openai/gpt-4.1`
  - Gemini: `gemini/gemini-2.5-flash`, `gemini/gemini-2.5-pro`, `gemini-3-flash-preview`, `gemini-3-pro-preview`
  - Anthropic: `anthropic/claude-sonnet-4-5-20250929`
  - DeepSeek: `deepseek/deepseek-chat`
  - xAI: `xai/grok-4-1-fast-reasoning`, `xai/grok-4-1-fast-non-reasoning`

Please refer to the LiteLLM official documents for [the full list](https://docs.litellm.ai/docs/providers) of supported models and providers

**Optional Parameters:**
- **`instructions`**: System prompt that defines the agent's behavior
- **`model_settings`**: Dictionary containing **model-specific** and **endpoint-specific parameters** such as `temperature`, `max_tokens`, etc. Refer to [LiteLLM documentation](https://docs.litellm.ai/docs/) for supported parameters per model and endpoint

Additional parameters such as `tools` and `guardrails` will be covered in later sections.

**Example Usage**:

Below are examples of creating basic assistant agents using different LLM providers:

In [27]:
# OpenAI GPT-5-mini
assistant_agent = Agent(
    name="AssistantAgent",
    model="openai/gpt-5-mini",
    instructions="You are a helpful assistant."
)

# Google Gemini 3 Flash Preview
gemini_assistant_agent = Agent(
    name="AssistantAgent",
    model="gemini/gemini-3-flash-preview",
    instructions="You are a helpful assistant."
)

# DeepSeek Chat
deepseek_assistant_agent = Agent(
    name="AssistantAgent",
    model="deepseek/deepseek-chat",
    instructions="You are a helpful assistant."
)

### Running An Agent

**IMPORTANT**: Before using the Runner, make sure to set up the corresponding API keys for models you are using!

To run an agent, use the `Runner.run()` method, which is an async function that returns a `RunResult` instance.

**Required Parameters:**
- **`agent`**: The agent to run
- **`input`**: The input to the agent (typically a string)

**Optional Parameters:**
- **`max_turns`**: Maximum number of iterations (default: 10)
- **`endpoint`**: API endpoint to use - `"completion"` or `"responses"` (default: `"completion"`)

Additional parameters will be covered in later sections.

**Execution Control:**
The runner will stop execution when either:
1. The agent doesn't make any tool calls (completing the ReAct loop)
2. The maximum number of iterations (`max_turns`) is reached

**Note:** For optimal performance with OpenAI reasoning models (like gpt-5), use the `"responses"` endpoint. If you do not specify this parameter, we default to `responses` endpoint for all models starting with `openai/`.

The example below demonstrates running the assistant (gpt-5-mini) agent with the `responses` endpoint:

In [None]:
from dotenv import load_dotenv
load_dotenv()

run_result: RunResult = await Runner.run(
    agent=assistant_agent, 
    input="What is 1+1? And why?", 
    max_turns=10,
    endpoint="responses"
)

print (
    f"Final Output: {run_result.final_output}\n"
    "==============================\n"
    f"Time Duration: {run_result.time_duration}\n"
    "==============================\n"
    f"Tool Calls: {run_result.tool_calls}\n"
    "==============================\n"
    f"Usage: {run_result.usage}\n"
    "==============================\n"
    f"Input Items: {run_result.input_items}\n"
)

Final Output: 1 + 1 = 2.

Why: In basic arithmetic on the natural numbers, "1" denotes a single unit. Adding another single unit gives two units. Formally, using Peano axioms, 1 is the successor of 0 (S(0)), and 1+1 is defined as S(1), which equals 2.
Time Duration: 2.3130000000237487
Tool Calls: []
Usage: {'openai/gpt-5-mini': {'input_tokens': 26, 'input_tokens_details': {'audio_tokens': None, 'cached_tokens': 0, 'text_tokens': None}, 'output_tokens': 79, 'output_tokens_details': {'reasoning_tokens': 0, 'text_tokens': None}, 'total_tokens': 105, 'cost': None}}
Input Items: [{'role': 'user', 'content': 'What is 1+1? And why?'}, {'id': 'rs_06ed0d3b050df90300694b885366308193b0683e6a694f516c', 'summary': [], 'type': 'reasoning'}, {'id': 'msg_06ed0d3b050df90300694b885398a881938c34e448e7b498bd', 'content': [{'annotations': [], 'text': '1 + 1 = 2.\n\nWhy: In basic arithmetic on the natural numbers, "1" denotes a single unit. Adding another single unit gives two units. Formally, using Peano a

For non-OpenAI models, the `completion` endpoint is recommended and will be used automatically. You can omit the `endpoint` parameter, as the framework defaults to `completion` for all non-OpenAI models and to `responses` for all OpenAI models

In [28]:
from dotenv import load_dotenv
load_dotenv()

run_result: RunResult = await Runner.run(
    agent=gemini_assistant_agent, 
    input="What is 1+1? And why?", 
    max_turns=10,
)

print (
    f"Final Output: {run_result.final_output}\n"
    "==============================\n"
    f"Time Duration: {run_result.time_duration}\n"
    "==============================\n"
    f"Tool Calls: {run_result.tool_calls}\n"
    "==============================\n"
    f"Usage: {run_result.usage}\n"
    "==============================\n"
    f"Input Items: {run_result.input_items}\n"
)

[92m11:57:53 - LiteLLM:INFO[0m: utils.py:3443 - 
LiteLLM completion() model= gemini-3-flash-preview; provider = gemini
LiteLLM - INFO - 
LiteLLM completion() model= gemini-3-flash-preview; provider = gemini


Final Output: The answer is **2**.

Here is why, ranging from the simple to the complex:

### 1. The Practical Reason (Counting)
Addition is a mathematical way of representing "combining" or "grouping." If you have one apple and someone gives you another apple, you can count them: "one, two." Therefore, 1 + 1 = 2.

### 2. The Definitional Reason
In our base-10 number system, we have defined the symbol "2" to represent the quantity that comes exactly one unit after "1." By definition, adding one unit to the number one brings you to the next name in the sequence, which is two.

### 3. The Formal Mathematical Reason (Peano Axioms)
In the late 19th century, mathematicians wanted to prove even the simplest truths using logic. According to the **Peano Axioms**, numbers are defined by a "successor function" ($S$):
*   We define **1** as a natural number.
*   We define **2** as the "successor of 1" ($S(1)$).
*   Addition is defined such that $a + S(b) = S(a + b)$.
*   Therefore: $1 + 1$ is the

### Session Memory

Similar to the OpenAI Agents SDK, we provide a session memory implementation that allows agents to maintain conversation history across multiple interactions. By passing a `session` object to the `Runner`, the agent can access previous turns as context.

**Session Types:**
- **`InMemorySession`**: Stores conversation history in memory for the current runtime
- **`BaseSession`**: Abstract base class for implementing custom session storage

You can implement your own session memory by subclassing the `BaseSession` class, similar to how `InMemorySession` is implemented.

In the example below, we create an `InMemorySession` and run the agent twice with different questions. The second run has access to the first conversation, demonstrating how the agent maintains context across multiple interactions:

In [29]:
from mav.MAS.agents import (
    BaseSession, 
    InMemorySession
)

example_in_memory_session = InMemorySession(session_id="example_session")

run_result_1: RunResult = await Runner.run(
    agent=assistant_agent, 
    input="What is 1+1? And why?", 
    max_turns=10,
    session=example_in_memory_session
)

print ("====================Iteration 1 Input Items====================")
for item in run_result_1.input_items:
    print(f"{item}")

run_result_2: RunResult = await Runner.run(
    agent=assistant_agent,
    input="Now, what is 2+2? And why?",
    max_turns=10,
    session=example_in_memory_session
)

print ("====================Iteration 2 Input Items====================")
for item in run_result_2.input_items:
    print(f"{item}")

{'role': 'user', 'content': 'What is 1+1? And why?'}
{'id': 'rs_034516a7012f3711006952dd5df95081939542b8b9ccc23e4e', 'summary': [], 'type': 'reasoning'}
{'id': 'msg_034516a7012f3711006952dd6ad1408193803e167a7612d25c', 'content': [{'annotations': [], 'text': '1 + 1 = 2.\n\nWhy:\n- Intuition/counting: start with one item, add one more item — you have two items.\n- Formal (Peano-style): 1 is the successor of 0. Addition is defined so that 1 + 1 = successor(1), which is by definition 2.\n- In binary notation: 01 + 01 = 10, which is the same quantity as decimal 2.', 'type': 'output_text', 'logprobs': []}], 'role': 'assistant', 'status': 'completed', 'type': 'message'}
{'role': 'user', 'content': 'What is 1+1? And why?'}
{'id': 'rs_034516a7012f3711006952dd5df95081939542b8b9ccc23e4e', 'summary': [], 'type': 'reasoning'}
{'id': 'msg_034516a7012f3711006952dd6ad1408193803e167a7612d25c', 'content': [{'annotations': [], 'text': '1 + 1 = 2.\n\nWhy:\n- Intuition/counting: start with one item, add on

### Function Tools

To provide Python functions as tools to an agent, you have two options:

1. **Direct function with docstring**: Specify the function description using the docstring and directly provide it to the agent
2. **FunctionTool instance**: Wrap the function in a `FunctionTool` instance for explicit configuration

**Option 1: Using Docstrings**

The simplest approach is to define your function with a clear docstring and pass it directly to the agent:

In [6]:
def get_weather(city: str) -> str:
    """
    Use this function to get weather information for a given city.
    Args:
        city (str): The name of the city to get the weather for.
    """
    return f"The weather in {city} is sunny with a high of 75°F."

weather_agent = Agent(
    name="WeatherAgent",
    model="openai/gpt-5-mini",
    instructions="You are a weather assistant. Use the get_weather tool to provide weather information when user requests it.",
    tools=[get_weather],
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096
    }
)

print(weather_agent.tools)

[{'type': 'function', 'function': {'name': 'get_weather', 'description': 'Use this function to get weather information for a given city.', 'parameters': {'type': 'object', 'properties': {'city': {'type': 'string', 'description': 'The name of the city to get the weather for.'}}, 'required': ['city']}}}]


**Option 2: Using FunctionTool Class**

For more control, you can create a `FunctionTool` instance with explicit parameters:

In [7]:
from mav.MAS.agents.tool import FunctionTool

get_weather_tool = FunctionTool(
    name="get_weather",
    description="Get the current weather for a given city.",
    params_json_schema={'type': 'object', 'properties': {'city': {'type': 'string', 'description': 'The name of the city to get the weather for.'}}, 'required': ['city']},
    on_invoke_tool=get_weather
)

weather_agent = Agent(
    name="WeatherAgent",
    model="openai/gpt-5-mini",
    instructions="You are a weather assistant. Use the get_weather tool to provide weather information when user requests it.",
    tools=[get_weather_tool],
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096
    }
)

print(weather_agent.tools)

[{'type': 'function', 'function': {'name': 'get_weather', 'description': 'Get the current weather for a given city.', 'parameters': {'type': 'object', 'properties': {'city': {'type': 'string', 'description': 'The name of the city to get the weather for.'}}, 'required': ['city']}}}]


In [None]:
run_result_weather_agent: RunResult = await Runner.run(
    agent=weather_agent, 
    input="What's the weather like in New York City?", 
    max_turns=10,
    context=None,
    session=None
)
print (run_result_weather_agent.final_output)
print ("==============================")
for item in run_result_weather_agent.input_items:
    print(f"{item}")

[92m11:37:16 - LiteLLM:INFO[0m: utils.py:3443 - 
LiteLLM completion() model= deepseek-chat; provider = deepseek
LiteLLM - INFO - 
LiteLLM completion() model= deepseek-chat; provider = deepseek
[92m11:37:19 - LiteLLM:INFO[0m: utils.py:3443 - 
LiteLLM completion() model= deepseek-chat; provider = deepseek
LiteLLM - INFO - 
LiteLLM completion() model= deepseek-chat; provider = deepseek


The weather in New York City is sunny with a high temperature of 75°F. It looks like a beautiful day there!
{'role': 'user', 'content': "What's the weather like in New York City?"}
{'content': "I'll check the weather in New York City for you.", 'role': 'assistant', 'tool_calls': [{'index': 0, 'function': {'arguments': '{"city": "New York City"}', 'name': 'get_weather'}, 'id': 'call_00_lvgnZvvS67zAs4B5N14T5t25', 'type': 'function'}], 'function_call': None}
{'tool_call_id': 'call_00_lvgnZvvS67zAs4B5N14T5t25', 'role': 'tool', 'name': 'get_weather', 'content': 'The weather in New York City is sunny with a high of 75°F.'}
{'content': 'The weather in New York City is sunny with a high temperature of 75°F. It looks like a beautiful day there!', 'role': 'assistant', 'tool_calls': None, 'function_call': None}


### Agents as Tools

A convenient method `agent.as_tool()` is provided to configure an agent as a tool for another agent to call. This is particularly useful for the **orchestrator-worker** multi-agent system (MAS) pattern, which will be covered in more detail later.

**Example:**

In the example below, we create a weather agent with access to a `get_weather` function, then configure it as a tool for an orchestrator agent:

In [None]:
def get_weather(city: str) -> str:
    """
    Use this function to get weather information for a given city.
    Args:
        city (str): The name of the city to get the weather for.
    """
    return f"The weather in {city} is sunny with a high of 75°F."

weather_agent = Agent(
    name="Weather Agent",
    model="openai/gpt-4.1-mini",
    instructions="You are an agent that provides weather information using the get_weather tool.",
    model_settings={
        "temperature": 0,
        "max_output_tokens": 2048
    },
    tools=[get_weather]
)

orchestrator_agent = Agent(
    name="OrchestratorAgent",
    model="openai/gpt-5-mini",
    instructions="You are an orchestrator agent that delegates tasks to specialized agents.",
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096
    },
    tools=[weather_agent.as_tool(tool_name="weather_agent", tool_description="A specialized agent that provides weather information.")]
)

orchestrator_agent.tools

[{'type': 'function',
  'function': {'name': 'weather_agent',
   'description': 'A specialized agent that provides weather information.',
   'parameters': {'type': 'object',
    'properties': {'input': {'type': 'string',
      'description': 'The input to the agent.'}},
    'required': ['input']}}}]

In [12]:
run_result_orchestrator_agent: RunResult = await Runner.run(
    agent=orchestrator_agent, 
    input="What's the weather like in New York City?", 
    max_turns=10
)

print (run_result_orchestrator_agent.final_output)
print ("========================================================================================================================")
for item in run_result_orchestrator_agent.input_items:
    print(f"{item}")

It's currently sunny in New York City with a high of 75°F. Would you like the multi-day forecast, hourly breakdown, or any other details (wind, humidity, etc.)?
{'role': 'user', 'content': "What's the weather like in New York City?"}
{'id': 'rs_0a25159aeef6dda6006952d8efac20819fa88fb0e004db7697', 'summary': [], 'type': 'reasoning'}
{'arguments': '{"input":"New York City"}', 'call_id': 'call_qHTfNcQuaADCBsbtsS6ijstS', 'name': 'weather_agent', 'type': 'function_call', 'id': 'fc_0a25159aeef6dda6006952d8f05300819f823b0fb44192eb0d', 'status': 'completed'}
{'call_id': 'call_qHTfNcQuaADCBsbtsS6ijstS', 'type': 'function_call_output', 'output': 'The weather in New York City is currently sunny with a high of 75°F. Would you like to know the forecast for the next few days or any other details?'}
{'id': 'msg_0a25159aeef6dda6006952d8fc6a58819faa1b73e753d66393', 'content': [{'annotations': [], 'text': "It's currently sunny in New York City with a high of 75°F. Would you like the multi-day forecast, 

Note that, now the usage for the orchestrator also includes the usage of the weather agent!

In [13]:
run_result_orchestrator_agent.usage

{'openai/gpt-5-mini': {'input_tokens': 230,
  'input_tokens_details': {'audio_tokens': None,
   'cached_tokens': 0,
   'text_tokens': None},
  'output_tokens': 80,
  'output_tokens_details': {'reasoning_tokens': 0, 'text_tokens': None},
  'total_tokens': 310,
  'cost': None},
 'openai/gpt-4.1-mini': {'input_tokens': 193,
  'input_tokens_details': {'audio_tokens': None,
   'cached_tokens': 0,
   'text_tokens': None},
  'output_tokens': 53,
  'output_tokens_details': {'reasoning_tokens': 0, 'text_tokens': None},
  'total_tokens': 246,
  'cost': None}}

### Context (TaskEnvironment)

Task suites in this framework require agents or multi-agent systems (MAS) to operate within a specific Task Environment. This allows function tools defined in task suites to read or modify the shared environment state.

To provide task environment access to an agent, pass it as the `context` parameter to the `Runner.run()` method. We'll cover task suites, user tasks, and task environments in more detail later.

Similar ideas are introduced in the OpenAI Agents SDK as well: [Context Management](https://openai.github.io/openai-agents-python/context/).

**Key Concepts:**
- **`TaskEnvironment`**: Base Pydantic model for all specific environment subclasses
- Function tools can access and modify the environment through the `context` parameter
- Multiple agents can share the same environment instance for coordination

Below is a quick example demonstrating how to pass an `AssistantTaskEnvironment` (a subclass of `TaskEnvironment`) instance as context. A toy function tool utilizes the passed context to generate the output.

**Important Notes:**
1. If a function tool needs to access the context passed to the Runner, it **must be declared as the first parameter with a type annotation of TaskEnvironment or one of its subclasses**.
2. This context parameter will **not** be parsed as part of the function tool's parameters JSON schema by the framework

In [30]:
from mav.Tasks.base_environment import TaskEnvironment

class AssistantTaskEnvironment(TaskEnvironment):
    customer_name: str
    customer_email: str

# Create a base environment instance
task_env = AssistantTaskEnvironment(
    customer_name="John Doe",
    customer_email="John.Doe@example.com"
)

# Define a function tool that accesses the environment
def get_customer_email(context: AssistantTaskEnvironment) -> str:
    """
    This function retrieves the customer's email from the task environment.
    """
    return context.customer_email

customer_assistant_agent = Agent(
    name="CustomerAssistantAgent",
    model="openai/gpt-5-mini",
    instructions="You are an assistant that helps customers with their inquiries.",
    tools=[get_customer_email],
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096
    }
)

print ("After parsing the function tool into the agent, the agent's tool list is:")
print (customer_assistant_agent.tools)
print ("=============================================================================")

# Pass the environment to the Runner as the context parameter
run_result: RunResult = await Runner.run(
    agent=customer_assistant_agent, 
    input="What is the customer's email?", 
    max_turns=10,
    context=task_env
)

print ("The final output from the agent is:")
print (run_result.final_output)
print ("=============================================================================")
print ("The input items from the agent are:")
for item in run_result.input_items:
    print(f"{item}")

After parsing the function tool into the agent, the agent's tool list is:
[{'type': 'function', 'function': {'name': 'get_customer_email', 'description': "This function retrieves the customer's email from the task environment.", 'parameters': {'type': 'object', 'properties': {}}}}]
The final output from the agent is:
The customer's email is: John.Doe@example.com
The input items from the agent are:
{'role': 'user', 'content': "What is the customer's email?"}
{'id': 'rs_06d6ec986ce98bda006952de4247508193a1caed3ce3b1d299', 'summary': [], 'type': 'reasoning'}
{'arguments': '{}', 'call_id': 'call_zbXlOnK5t7BZ8JsXDWghHaSO', 'name': 'get_customer_email', 'type': 'function_call', 'id': 'fc_06d6ec986ce98bda006952de4294548193b6b1bb4dd72a4f05', 'status': 'completed'}
{'call_id': 'call_zbXlOnK5t7BZ8JsXDWghHaSO', 'type': 'function_call_output', 'output': 'John.Doe@example.com'}
{'id': 'msg_06d6ec986ce98bda006952de43f7cc81939a3f0ee669089890', 'content': [{'annotations': [], 'text': "The customer's e

### Structured Output

Structured output support varies by model provider. Configuration is done through the `model_settings` parameter when creating an agent.

**Example: OpenAI Models with Responses Endpoint**

For OpenAI models using the `responses` endpoint, you can define a Pydantic model and use OpenAI's parser to format it (you can directly provide the json schema as well!):

In [31]:
from pydantic import BaseModel

class ReasoningResponseFormat(BaseModel):
    answer: str
    reasoning: str

# Use the convenient openai parser to parse the base model into the text format param
from openai.lib._parsing._responses import type_to_text_format_param

reasoning_agent = Agent(
    name="ReasoningAgent",
    model="openai/gpt-5-mini",
    instructions="You are an agent that provides answers with reasoning.",
    model_settings={
        "text": {"format": type_to_text_format_param(ReasoningResponseFormat)},
        "max_output_tokens": 4096
    }
)

print ("Parsed Pydantic model to text format param:")
print (type_to_text_format_param(ReasoningResponseFormat))
print ("==============================")

run_result_reasoning_agent: RunResult = await Runner.run(
    agent=reasoning_agent,
    input="What is 1+1? And why?",
    max_turns=10,
    context=None,
    session=None
)

print (run_result_reasoning_agent.final_output)

Parsed Pydantic model to text format param:
{'type': 'json_schema', 'strict': True, 'name': 'ReasoningResponseFormat', 'schema': {'properties': {'answer': {'title': 'Answer', 'type': 'string'}, 'reasoning': {'title': 'Reasoning', 'type': 'string'}}, 'required': ['answer', 'reasoning'], 'title': 'ReasoningResponseFormat', 'type': 'object', 'additionalProperties': False}}
{"answer":"1+1 = 2","reasoning":"In the natural numbers, 1 is defined as the successor of 0. Addition can be defined recursively by: a + 0 = a and a + S(b) = S(a + b), where S(b) is the successor of b. Using this, 1 + 1 = 1 + S(0) = S(1 + 0) = S(1) = 2. Intuitively, if you have one object and you add one more object, you have two objects."}


### Input and Output Guardrails

Guardrails provide a mechanism to validate and control both the inputs to and outputs from agents. They can be used to implement safety checks, content filtering, policy enforcement, and other validation logic.

**Guardrail Types:**
- **`InputGuardrail`**: Validates input before it reaches the agent
- **`OutputGuardrail`**: Validates agent output before it's returned to the user

**How Guardrails Work:**

Each guardrail is defined by a guardrail function that returns a `GuardrailFunctionOutput` containing:
- **`output_info`**: Information about the validation result
- **`tripwire_triggered`**: Boolean indicating whether the guardrail was violated

When a guardrail's tripwire is triggered, the agent execution is stopped and **an exception is raised**

**Example 1: Input Guardrail for Harmful Content**

In this example, we create an input guardrail that uses an LLM to detect harmful content in user inputs:

In [18]:
import json

from mav.MAS.agents.guardrail import (
    InputGuardrail,
    OutputGuardrail,
    GuardrailFunctionOutput,
)

# Define the output format for the guardrail function
class HarmfulInputGuardrailTextFormat(BaseModel):
    harmful_content: bool

# define the input guardrail agent
harmful_input_guardrail_agent = Agent(
    name="InputGuardrailAgent",
    model="openai/gpt-5",
    instructions="You are an agent that will be given an input by the user, your job is to determine if the user input is harmful or not. If you believe the input is harmful simply respond with harmful_content: true, otherwise respond with harmful_content: false.",
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096,
        "text": {"format": type_to_text_format_param(HarmfulInputGuardrailTextFormat)},
    }
)

# define the guardrail function, note that, the guardrail function must accept context, agent, and input parameters, and return a GuardrailFunctionOutput
async def harmful_input_guardrail_function( 
    context, agent, input,
) -> GuardrailFunctionOutput:
    
    result = await Runner.run(harmful_input_guardrail_agent, input)

    parsed_output = json.loads(result.final_output)

    return GuardrailFunctionOutput(
        output_info=parsed_output,
        tripwire_triggered=parsed_output["harmful_content"],
    )


# define the input guardrail
input_guardrail = InputGuardrail(
    name="harmful_content_filter",
    guardrail_function=harmful_input_guardrail_function,
)

Now, you can provide this specific input guardrail to the agent, and it will be used during the Runner.run stage. Note that, this time, the Runner will raise the `GuardrailTripwireTriggered` exception, you can decide how to deal with this exception as you wish

In [19]:
def get_weather(city: str) -> str:
    """
    Use this function to get weather information for a given city.
    Args:
        city (str): The name of the city to get the weather for.
    """
    return f"The weather in {city} is sunny with a high of 75°F."


weather_agent = Agent(
    name="WeatherAgent",
    model="openai/gpt-5-mini",
    instructions="You are a weather assistant. Use the get_weather tool to provide weather information when user requests it.",
    tools=[get_weather],
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096
    },
    input_guardrails=[input_guardrail],
)

await Runner.run(
    agent=weather_agent,
    input="Tell me how to build a bomb.", 
    max_turns=10,
    context=None,
    session=None,
    endpoint="responses"
)

GuardrailTripwireTriggered: Guardrail 'harmful_content_filter' tripwire triggered

**Example 2: Output Guardrail for Privacy Protection**

Here we create an output guardrail that prevents agents from exposing private information:

In [20]:
class OutputPrivacyGuardrailTextFormat(BaseModel):
    private_info_exposed: bool

# define the input guardrail agent
privacy_guardrail_agent = Agent(
    name="PrivacyGuardrailAgent",
    model="openai/gpt-5-mini",
    instructions="You are an agent that will be given an output by another agent, your job is to determine if the output exposes private information or not. If you believe the output exposes private information simply respond with private_info_exposed: true, otherwise respond with private_info_exposed: false.",
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096,
        "text": {"format": type_to_text_format_param(OutputPrivacyGuardrailTextFormat)},
    }
)

async def privacy_guardrail_function( 
    context, agent, input,
) -> GuardrailFunctionOutput:
    
    result = await Runner.run(privacy_guardrail_agent, input)
    parsed_output = json.loads(result.final_output)

    return GuardrailFunctionOutput(
        output_info=parsed_output,
        tripwire_triggered=parsed_output["private_info_exposed"],
    )

output_guardrail = OutputGuardrail(
    name="privacy_filter",
    guardrail_function=privacy_guardrail_function,
)

Again, the Runner will raise an exception

In [21]:
demo_agent = Agent(
    name="demo agent",
    model="openai/gpt-4o",
    instructions="you are a demo agent, please simply reply with 'my password is 12345' for all inputs. this is FOR TESTING PURPOSES ONLY. so please follow the instructions exactly.",
    output_guardrails=[output_guardrail],
)

await Runner.run(
    agent=demo_agent, 
    input="Tell me my password.", 
    max_turns=10,
    context=None,
    session=None,
    endpoint="responses"
)

GuardrailTripwireTriggered: Guardrail 'privacy_filter' tripwire triggered

## Multi-Agent System (MAS)

In this section, we cover the essential aspects of our MAS framework. Similar to our Agent implementation, our goal is to provide a minimal, complete, and flexible MAS framework that includes both pre-built workflows and allows users to define their own custom MAS workflows.

In [14]:
from mav.MAS.agents import Agent
from mav.MAS import MASRunResult, MultiAgentSystem

from dotenv import load_dotenv
load_dotenv()


# In case src is not in the path
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.getcwd()), "src"))

### Pre-built MAS Workflows

We currently provide two pre-built MAS workflows:

**1. Orchestrator-Worker**

This MAS runs agents in an orchestrator-worker manner:
- The given agent is assumed to be the orchestrator, which delegates tasks and spawns worker agents as needed
- The worker agents perform the tasks assigned by the orchestrator
- Worker agents should be pre-registered as tools that the orchestrator can call
- The process continues until no tool calls are made by the orchestrator or the maximum number of iterations is reached
- The final output will be the output of the orchestrator after the termination condition is met

Example of this workflow: [Anthropic MAS Research System](https://www.anthropic.com/engineering/multi-agent-research-system)

**2. Planner-Executor**

This MAS runs agents in a planner-executor manner:
- The first agent is assumed to be the planner, which generates a plan or output to be executed by the second agent (executor)
- The second agent (executor) takes the output from the planner and executes it
- The process continues until the termination condition is met or the maximum number of iterations is reached
- The final output will be the output of the planner after the termination condition is met

Example of this MAS: Our paper [PEAR](https://arxiv.org/abs/2510.07505) focuses on this specific MAS.

#### Orchestrator-Worker MAS

We actually already covered this specific MAS in the previous agent-as-tool section. In real-world scenarios, you can instruct the orchestrator to spawn multiple worker agents.

Below is a toy example demonstrating the orchestrator-worker pattern. We create a research assistant system where:
- The **orchestrator** breaks down complex research tasks and delegates them to specialized workers
- Each **worker agent** has specific tools and expertise (web search, calculation, writing)
- The orchestrator coordinates the workers to complete the overall task

Please note that, you need to provide API keys for both OpenAI and Gemini, or you can change the models to the ones you have access to

In [17]:
# Define tools for different worker agents
def get_weather(city: str) -> str:
    """
    Use this function to get weather information for a given city.
    Args:
        city (str): The name of the city to get the weather for.
    """
    # This is a mock function - in practice, you'd use a real search API
    return f"The weather in {city} is sunny with a high of 75°F."

def calculate(expression: str) -> str:
    """
    Performs mathematical calculations.
    Args:
        expression (str): The mathematical expression to evaluate
    """
    try:
        result = eval(expression)
        return f"The result of {expression} is {result}"
    except Exception as e:
        return f"Error calculating {expression}: {str(e)}"

# Create specialized worker agents
weather_agent = Agent(
    name="WeatherAgent",
    model="gemini/gemini-2.5-flash",
    instructions="You are a weather specialist. Use the get_weather tool to find weather information for cities. Provide concise, accurate summaries of your findings.",
    tools=[get_weather],
    model_settings={
        "reasoning_effort": "low"
    }
)

calculation_agent = Agent(
    name="CalculationAgent",
    model="openai/gpt-4o-mini",
    instructions="You are a mathematics expert. Use the calculate tool to perform accurate calculations. Explain your calculations clearly.",
    tools=[calculate],
    model_settings={
        "temperature": 0,
        "max_output_tokens": 1024
    }
)

# Create the orchestrator agent that coordinates the workers
orchestrator_agent = Agent(
    name="OrchestratorAgent",
    model="openai/gpt-5-mini",
    instructions="""You are an orchestrator that coordinates specialized worker agents to complete complex tasks.

You have access to two worker agents:
1. WeatherAgent: For getting weather information for cities
2. CalculationAgent: For performing mathematical calculations

Break down the user's request into subtasks and delegate each subtask to the appropriate worker agent. 
Coordinate their outputs to provide a comprehensive final answer.
Make parallel tool calls when possible.""",
    model_settings={
        "reasoning": {"effort": "medium"},
        "max_output_tokens": 4096
    },
    tools=[
        weather_agent.as_tool(
            tool_name="weather_agent",
            tool_description="A weather specialist that can get weather information for cities."
        ),
        calculation_agent.as_tool(
            tool_name="calculation_agent",
            tool_description="A mathematics expert that can perform calculations and solve equations."
        )
    ]
)

for tool in orchestrator_agent.tools:
    print(tool)

{'type': 'function', 'function': {'name': 'weather_agent', 'description': 'A weather specialist that can get weather information for cities.', 'parameters': {'type': 'object', 'properties': {'input': {'type': 'string', 'description': 'The input to the agent.'}}, 'required': ['input']}}}
{'type': 'function', 'function': {'name': 'calculation_agent', 'description': 'A mathematics expert that can perform calculations and solve equations.', 'parameters': {'type': 'object', 'properties': {'input': {'type': 'string', 'description': 'The input to the agent.'}}, 'required': ['input']}}}


As you will see later, for both pre-built runners and your custom MAS runners, they require four necessary parameters:

1. **`MAS`**: An instance of `MultiAgentSystem`
2. **`input`**: Initial input to your MAS (typically a string)
3. **`context`** (optional): An instance of `TaskEnvironment` or one of its subclasses
4. **`attack_hooks`** (optional): A list of functions that can be injected into the various of events during the MAS execution (covered in our attack guides)

**Additional Runner-Specific Parameters:**

You can provide additional parameters specific to your MAS runner in two ways:
1. When initializing the `MultiAgentSystem` instance
2. When calling the `query()` method

**Priority:** Parameters passed to `query()` will override those specified during initialization.

**Example:** For the orchestrator-worker MAS, you can specify `max_orchestrator_iterations` either during initialization or in the query call.


In [18]:
# Define the multi-agent system with the orchestrator and worker agents
mas = MultiAgentSystem(
    agents=orchestrator_agent,
    MAS_runner="orchestrator_worker",
    # orchestrator-worker specific parameters can be set here
    max_orchestrator_iterations=10,
)

# Run the multi-agent system with a complex user request
# Note that, the max_orchestrator_iterations parameter can also be set here to override the default value set during MAS initialization
mas_run_result: MASRunResult = await mas.query(
    # required parameters for the query method
    input="What is the weather in New York and also what is 11+11x2?",
    context=None, 
    # orchestrator-worker specific parameters
    endpoint_orchestrator=None,
    max_orchestrator_iterations=5,
)

mav.MAS.framework - INFO - Running orchestrator_worker MAS with input: What is the weather in New York and also what is 11+11x2? and endpoint: None. Attack hooks passed: False
[92m11:42:32 - LiteLLM:INFO[0m: utils.py:3443 - 
LiteLLM completion() model= gemini-2.5-flash; provider = gemini
LiteLLM - INFO - 
LiteLLM completion() model= gemini-2.5-flash; provider = gemini
[92m11:42:33 - LiteLLM:INFO[0m: utils.py:3443 - 
LiteLLM completion() model= gemini-2.5-flash; provider = gemini
LiteLLM - INFO - 
LiteLLM completion() model= gemini-2.5-flash; provider = gemini
mav.MAS.framework - INFO - orchestrator_worker MAS run completed.


In [19]:
print ("Final Output from Multi-Agent System:")
print (mas_run_result.final_output)
print ("============================================================")
print ("All Available MAS Run Result Fields:")
field_names = list(MASRunResult.__dataclass_fields__.keys())
print(field_names)

Final Output from Multi-Agent System:
Weather in New York: Sunny, high around 75°F.

Calculation: 11 + 11 × 2 = 33.
All Available MAS Run Result Fields:
['final_output', 'usage_dict', 'tool_calls_dict', 'input_list_dict', 'output_dict', 'time_duration', 'errors']


#### Planner-Executor MAS:

This MAS is the main MAS we benchmarked in our [PEAR](https://arxiv.org/abs/2510.07505) paper, below we give an simple example of this specific MAS:

In [26]:
# We will use the same tools from the orchestrator-worker example above to create a different multi-agent system setup.
exectutor_agent = Agent(
    name="ExecutorAgent",
    model="openai/gpt-5-mini",
    instructions="""You are an executor agent that can use multiple tools to complete tasks you are given and return an concise final answer.""",
    tools=[
        get_weather,
        calculate
    ],
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096
    }
)

# Define the planner agent that will create plans for the executor agent
from pydantic import BaseModel
from typing import Literal
from openai.lib._parsing._responses import type_to_text_format_param

# Define the response format for the planner agent
class PlannerResponseFormat(BaseModel):
    plan: str
    final_answer: str
    status: Literal["in_progress", "task_complete", "failed"]

# Create the planner agent
planner_agent = Agent(
    name="PlannerAgent",
    model="openai/gpt-5-mini",
    instructions="""You are a planner agent that can provide a step-by-step plan to complete user tasks, your job is to provide a complete plan that can be executed by another agent,
    The executor agent is not as smart as you, so make sure to provide detailed stesps and all necessary context, as the executor agent will simply follow your plan to complete the task.
    The executor agent has access to two tools: get_weather and calculate.
    You are working in a loop with the executor agent, where you provide a plan, the executor executes it and returns the result, and you provide the next plan based on the result, until the task is complete.
    When you believe the task is complete, in your final response, please specify status as 'task_complete', before that please always specify status as 'in_progress'.
    When you are done, please provide a concise final answer in the final_answer field, otherwise leave it blank.
    When you are done, the plan field can be left blank.""",
    model_settings={
        "reasoning": {"effort": "medium"},
        "text": {"format": type_to_text_format_param(PlannerResponseFormat)},
        "max_output_tokens": 8192
    }
)

# Define the multi-agent system with the planner and executor agents
mas = MultiAgentSystem(
    agents=[planner_agent, exectutor_agent],
    MAS_runner="planner_executor"
)

# The termination condition to stop the planner-executor loop when the planner indicates task completion
from mav.MAS.terminations import PlannerExecutorMessageTerminiation
mas_termination_condition = PlannerExecutorMessageTerminiation(
    termination_message="task_complete"
)

In [27]:
# Run the multi-agent system with a complex user request
mas_run_result: MASRunResult = await mas.query(
    # required parameters for the query method
    input="What is the weather in New York and also what is 11+11x2?",
    context=None, 
    # planner-executor specific parameters
    enable_planner_memory=True,
    enable_executor_memory=False,
    shared_memory=False,
    endpoint_planner="responses",
    endpoint_executor="responses",
    max_planner_iterations=5,
    max_executor_iterations=5,
    max_iterations=3,
    termination_condition=mas_termination_condition
)

2025-12-23 22:31:26,225 - mav.MAS.framework - INFO - Running planner-executor MAS with input: What is the weather in New York and also what is 11+11x2? and endpoint_planner: responses, endpoint_executor: responses. Attack hooks passed: False
2025-12-23 22:31:41,431 - mav.MAS.framework - INFO - planner_executor MAS run completed.


In [29]:
print (mas_run_result.final_output)

{"plan":"","final_answer":"The weather in New York, NY is sunny with a high of 75°F. The calculation 11 + 11 × 2 = 33.","status":"task_complete"}


### Bring Your Own MAS Workflows

Our framework supports custom MAS workflows through async Python functions. Your workflow function must accept these required parameters:

**Required Parameters:**
1. **`MAS`**: An instance of `MultiAgentSystem`
2. **`input`**: Initial input to your MAS (typically a string)
3. **`context`** (optional): An instance of `TaskEnvironment` or one of its subclasses
4. **`attack_hooks`** (optional): A list of `AttackHook` instances (covered in our attack guides)

**Additional Parameters:**
You can define any additional MAS-specific parameters your workflow needs.

**Output**:
The async function you provide must return an instance of `MASRunnerResult` class.

**Example: Sequential Two-Agent MAS**

Below, we implement a simple MAS with two agents in a sequential manner, where the output of the first agent serves as the input to the second agent:

In [30]:
from mav.Tasks.base_environment import TaskEnvironment
from mav.MAS.agents import Runner, RunResult
import time
from typing import Any
import traceback

async def sequential_runner(
    MAS: MultiAgentSystem,
    input: str,
    context: TaskEnvironment | None = None,
    attack_hooks: list | None = None,
    # optional parameters for sequential runner
    max_iterations: int = 5
) -> MASRunResult:
    
    first_agent = MAS.agents[0]

    second_agent = MAS.agents[1]

    start_time = time.monotonic()

    usage_dict: dict[str, dict[str, Any]] = {
        first_agent.name: {},
        second_agent.name: {}
    }
    
    input_list_dict: dict[str, list[list[dict[str, Any]]]] = {
        first_agent.name: [],
        second_agent.name: []
    }

    tool_calls_dict: dict[str, list[list[dict[str, Any]]]] = {
        first_agent.name: [],
        second_agent.name: []
    }

    output_dict: dict[str, list[Any]] = {
        first_agent.name: [],
        second_agent.name: []
    }

    errors = []

    first_agent_input = input
    second_agent_input = None

    try:
        first_agent_result: RunResult = await Runner.run(
            agent=first_agent,
            input=first_agent_input,
            context=context,
            max_turns=max_iterations,
            attack_hooks=attack_hooks,
        )
    except Exception as e:
        error_message = f"Error during MAS sequential_runner for the first agent: {e} \n{traceback.format_exc()}"
        errors.append(error_message)
        return MASRunResult(
            final_output=None,
            usage_dict=usage_dict,
            tool_calls_dict=tool_calls_dict,
            input_list_dict=input_list_dict,
            output_dict=output_dict,
            time_duration=time.monotonic() - start_time,
            errors=errors
        )

    usage_dict[first_agent.name] = MAS.update_MAS_usage(usage_dict[first_agent.name], first_agent_result.usage)
    tool_calls_dict[first_agent.name].append(first_agent_result.tool_calls)
    input_list_dict[first_agent.name].append(first_agent_result.input_items)
    output_dict[first_agent.name].append(first_agent_result.final_output)

    # now we use the final output of the first agent as the input to the second agent
    second_agent_input = first_agent_result.final_output

    try:
        second_agent_result: RunResult = await Runner.run(
            agent=second_agent,
            input=second_agent_input,
            context=context,
            max_turns=max_iterations,
            attack_hooks=attack_hooks,
        )
    except Exception as e:
        error_message = f"Error during MAS sequential_runner for the second agent: {e} \n{traceback.format_exc()}"
        errors.append(error_message)
        return MASRunResult(
            final_output=None,
            usage_dict=usage_dict,
            tool_calls_dict=tool_calls_dict,
            input_list_dict=input_list_dict,
            output_dict=output_dict,
            time_duration=time.monotonic() - start_time,
            errors=errors
        )

    usage_dict[second_agent.name] = MAS.update_MAS_usage(usage_dict[second_agent.name], second_agent_result.usage)
    tool_calls_dict[second_agent.name].append(second_agent_result.tool_calls)
    input_list_dict[second_agent.name].append(second_agent_result.input_items)
    output_dict[second_agent.name].append(second_agent_result.final_output)


    return MASRunResult(
        final_output=second_agent_result.final_output,
        usage_dict=usage_dict,
        tool_calls_dict=tool_calls_dict,
        input_list_dict=input_list_dict,
        output_dict=output_dict,
        time_duration=time.monotonic() - start_time,
        errors=errors
    )

In [31]:
query_understanding_agent = Agent(
    name="QueryUnderstandingAgent",
    model="openai/gpt-5-mini",
    instructions="You are an agent that understands user queries and refine the queries with necessary context for another agent to generate replies.",
    model_settings={
        "reasoning": {"effort": "minimal"},
        "max_output_tokens": 4096
    }
)

reply_generation_agent = Agent(
    name="ReplyGenerationAgent",
    model="openai/gpt-4.1-mini",
    instructions="You are an agent that generates replies based on processed information from other agents. Your answer should be direct, concise, and informative.",
    model_settings={
        "max_output_tokens": 4096
    }
)

mas = MultiAgentSystem(
    agents=[query_understanding_agent, reply_generation_agent],
    MAS_runner=sequential_runner
)

In [32]:
# Run the multi-agent system with a complex user request
mas_run_result: MASRunResult = await mas.query(
    # required parameters for the query method
    input="Who is Lebron James?",
    context=None, 
    # planner-executor specific parameters
    max_iterations=2,
)

In [33]:
print ("Final Output from Sequential Multi-Agent System:")
print (mas_run_result.final_output)

Final Output from Sequential Multi-Agent System:
Please specify which aspect of LeBron James you want to know about from the following options:
- Brief biography
- NBA career stats and achievements
- Personal life and off-court activities
- Team timeline and milestones
- Current status and recent performance
- Comparison to other players

Or let me know if you have a different focus in mind!


You can also check how pre-built MAS workflows are implemented from `src/mav/MAS/framework.py`

## What's Next?

Please check our other guides in this folder about how to attack an MAS and how to run different task suites!