In [1]:
!pip install -qU pydantic-ai[logfire] openai

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/192.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m184.3/192.5 kB[0m [31m14.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m192.5/192.5 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.6/51.6 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m730.3/730.3 kB[0m [31m17.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m288.8/288.8 kB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.7/43.7 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━

In [None]:
!logfire auth


Welcome to Logfire! 🔥
Before you can send data to Logfire, we need to authenticate you.

Logfire is available in multiple data regions. Please select one:
1. US (GCP region: us-east4)
2. EU (GCP region: europe-west4)
Selected region [1/2]: 1
Press Enter to open logfire-us.pydantic.dev in your browser...
Please open https://logfire-us.pydantic.dev/auth/device/7VjbSkC5E4U80l2vHha3RjVHQPEnY1HKhcPMifFp9uc in your browser to authenticate if it hasn't already.
Waiting for you to authenticate with Logfire...
Successfully authenticated!

Your Logfire credentials are stored in /root/.logfire/default.toml


In [None]:
!logfire projects use starter-project

Project configured successfully. You will be able to view it at: https://logfire-us.pydantic.dev/qianchen94/starter-project


In [2]:
import os
from google.colab import userdata

openai_key = userdata.get("OPENAI_API_KEY")

os.environ["OPENAI_API_KEY"] = openai_key

In [None]:
import logfire

logfire.configure()
logfire.instrument_pydantic_ai()

## Quick Start

In [3]:
import asyncio
from dataclasses import dataclass
from datetime import date

from pydantic_ai import Agent
from pydantic_ai.tools import RunContext


@dataclass
class WeatherService:
    async def get_forecast(self, location: str, forecast_date: date) -> str:
        # In real code: call weather API, DB queries, etc.
        return f'The forecast in {location} on {forecast_date} is 24°C and sunny.'

    async def get_historic_weather(self, location: str, forecast_date: date) -> str:
        # In real code: call a historical weather API or DB
        return (
            f'The weather in {location} on {forecast_date} was 18°C and partly cloudy.'
        )


weather_agent = Agent[WeatherService, str](
    'openai:gpt-4o-mini',
    deps_type=WeatherService,
    output_type=str,  # We'll produce a final answer as plain text
    system_prompt='Providing a weather forecast at the locations the user provides.',
)


@weather_agent.tool
async def weather_forecast(
    ctx: RunContext[WeatherService],
    location: str,
    forecast_date: date,
) -> str:
    if forecast_date >= date.today():
        return await ctx.deps.get_forecast(location, forecast_date)
    else:
        return await ctx.deps.get_historic_weather(location, forecast_date)


output = await weather_agent.run(
    'What will the weather be like in Paris on Tuesday?',
    deps=WeatherService(),
)

print(output)

AgentRunResult(output='The weather in Paris on Tuesday, October 31, 2023, will be 18°C and partly cloudy.')


## Quick Start with Streaming

Below is a quick start example with streaming turned on. It's not quite "quick" because streaming requires some additional handling.

In [None]:
import asyncio
from dataclasses import dataclass
from datetime import date

from pydantic_ai import Agent
from pydantic_ai.messages import (
    FinalResultEvent,
    FunctionToolCallEvent,
    FunctionToolResultEvent,
    PartDeltaEvent,
    PartStartEvent,
    TextPartDelta,
    ToolCallPartDelta,
)
from pydantic_ai.tools import RunContext


@dataclass
class WeatherService:
    async def get_forecast(self, location: str, forecast_date: date) -> str:
        # In real code: call weather API, DB queries, etc.
        return f'The forecast in {location} on {forecast_date} is 24°C and sunny.'

    async def get_historic_weather(self, location: str, forecast_date: date) -> str:
        # In real code: call a historical weather API or DB
        return (
            f'The weather in {location} on {forecast_date} was 18°C and partly cloudy.'
        )


weather_agent = Agent[WeatherService, str](
    'openai:gpt-4o-mini',
    deps_type=WeatherService,
    output_type=str,  # We'll produce a final answer as plain text
    system_prompt='Providing a weather forecast at the locations the user provides.',
)


@weather_agent.tool
async def weather_forecast(
    ctx: RunContext[WeatherService],
    location: str,
    forecast_date: date,
) -> str:
    if forecast_date >= date.today():
        return await ctx.deps.get_forecast(location, forecast_date)
    else:
        return await ctx.deps.get_historic_weather(location, forecast_date)


output_messages: list[str] = []


async def main():
    user_prompt = 'What will the weather be like in Paris on Tuesday?'

    # Begin a node-by-node, streaming iteration
    async with weather_agent.iter(user_prompt, deps=WeatherService()) as run:
        async for node in run:
            if Agent.is_user_prompt_node(node):
                # A user prompt node => The user has provided input
                print(f'=== UserPromptNode: {node.user_prompt} ===')
            elif Agent.is_model_request_node(node):
                # A model request node => We can stream tokens from the model's request
                print(
                    '=== ModelRequestNode: streaming partial request tokens ==='
                )
                async with node.stream(run.ctx) as request_stream:
                    async for event in request_stream:
                        if isinstance(event, PartStartEvent):
                            print(
                                f'[Request] Starting part {event.index}: {event.part!r}'
                            )
                        elif isinstance(event, PartDeltaEvent):
                            if isinstance(event.delta, TextPartDelta):
                                print(
                                    f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}'
                                )
                            elif isinstance(event.delta, ToolCallPartDelta):
                                print(
                                    f'[Request] Part {event.index} args_delta={event.delta.args_delta}'
                                )
                        elif isinstance(event, FinalResultEvent):
                            print(
                                f'[Result] The model produced a final output (tool_name={event.tool_name})'
                            )
            elif Agent.is_call_tools_node(node):
                # A handle-response node => The model returned some data, potentially calls a tool
                print(
                    '=== CallToolsNode: streaming partial response & tool usage ==='
                )
                async with node.stream(run.ctx) as handle_stream:
                    async for event in handle_stream:
                        if isinstance(event, FunctionToolCallEvent):
                            print(
                                f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
                            )
                        elif isinstance(event, FunctionToolResultEvent):
                            print(
                                f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}'
                            )
            elif Agent.is_end_node(node):
                assert run.result.output == node.data.output
                # Once an End node is reached, the agent run is complete
                print(
                    f'=== Final Agent Output: {run.result.output} ==='
                )

await main()


=== UserPromptNode: What will the weather be like in Paris on Tuesday? ===
=== ModelRequestNode: streaming partial request tokens ===
[Request] Starting part 0: ToolCallPart(tool_name='weather_forecast', args='', tool_call_id='call_tAG2ZAGtysAIZuHIQmwu7etZ')
[Request] Part 0 args_delta={"
[Request] Part 0 args_delta=location
[Request] Part 0 args_delta=":"
[Request] Part 0 args_delta=Paris
[Request] Part 0 args_delta=","
[Request] Part 0 args_delta=forecast
[Request] Part 0 args_delta=_date
[Request] Part 0 args_delta=":"
[Request] Part 0 args_delta=202
[Request] Part 0 args_delta=3
[Request] Part 0 args_delta=-
[Request] Part 0 args_delta=10
[Request] Part 0 args_delta=-
[Request] Part 0 args_delta=03
[Request] Part 0 args_delta="}
=== CallToolsNode: streaming partial response & tool usage ===
[Tools] The LLM calls tool='weather_forecast' with args={"location":"Paris","forecast_date":"2023-10-03"} (tool_call_id='call_tAG2ZAGtysAIZuHIQmwu7etZ')
[Tools] Tool call 'call_tAG2ZAGtysAIZuHIQ

## Build Airline Service Agent

In [None]:
from pydantic import BaseModel

class Date(BaseModel):
    # Somehow LLM is bad at specifying `datetime.datetime`
    year: int
    month: int
    day: int
    hour: int

class UserProfile(BaseModel):
    user_id: str
    name: str
    email: str

class Flight(BaseModel):
    flight_id: str
    date_time: Date
    origin: str
    destination: str
    duration: float
    price: float

class Itinerary(BaseModel):
    confirmation_number: str
    user_profile: UserProfile
    flight: Flight

class Ticket(BaseModel):
    user_request: str
    user_profile: UserProfile

In [None]:
user_database = {
    "Adam": UserProfile(user_id="1", name="Adam", email="adam@gmail.com"),
    "Bob": UserProfile(user_id="2", name="Bob", email="bob@gmail.com"),
    "Chelsie": UserProfile(user_id="3", name="Chelsie", email="chelsie@gmail.com"),
    "David": UserProfile(user_id="4", name="David", email="david@gmail.com"),
}

flight_database = {
    "DA123": Flight(
        flight_id="DA123",
        origin="SFO",
        destination="JFK",
        date_time=Date(year=2025, month=9, day=1, hour=1),
        duration=3,
        price=200,
    ),
    "DA125": Flight(
        flight_id="DA125",
        origin="SFO",
        destination="JFK",
        date_time=Date(year=2025, month=9, day=1, hour=7),
        duration=9,
        price=500,
    ),
    "DA456": Flight(
        flight_id="DA456",
        origin="SFO",
        destination="SNA",
        date_time=Date(year=2025, month=10, day=1, hour=1),
        duration=2,
        price=100,
    ),
    "DA460": Flight(
        flight_id="DA460",
        origin="SFO",
        destination="SNA",
        date_time=Date(year=2025, month=10, day=1, hour=9),
        duration=2,
        price=120,
    ),
}

itinery_database = {}
ticket_database = {}

In [None]:
instruction = """
    You are an AI customer service agent for Awesome airline, an airline company that runs flights across the globe. Your
job is to help users book flights and manage iternerary, including canceling and modifying. When the user request cannot
be resolved, make sure you raise a custom support ticket. For the message you return to the user, please include the
confirmation number if a flight is booked, an a custom support ticket number if a ticket is raised. On other scenarios,
please also make sure that all information users need is included.

    Your core principles for interacting with users are:

*   **Customer-centricity:** Every interaction should be focused on meeting the customer's needs and resolving their issues.
*   **Accuracy:** Ensure all information provided is factually correct and up-to-date, referencing provided tools whenever possible.
*   **Efficiency:** Aim to resolve customer issues quickly and effectively, minimizing the need for escalation.
*   **Professionalism:** Maintain a courteous and professional tone throughout the conversation.
*   **Empathy:** Acknowledge the customer's frustration and show understanding when appropriate.
"""

In [None]:
airline_agent = Agent(
    'openai:gpt-4o-mini',
    output_type=str,
    system_prompt=instruction,
)

In [None]:
import random
import string

@airline_agent.tool_plain
def fetch_flight_info(date: Date, origin: str, destination: str):
    """Fetch flight information from origin to destination on the given date"""
    flights = []

    for flight_id, flight in flight_database.items():
        if (
            flight.date_time.year == date.year
            and flight.date_time.month == date.month
            and flight.date_time.day == date.day
            and flight.origin == origin
            and flight.destination == destination
        ):
            flights.append(flight)
    return flights


@airline_agent.tool_plain
def fetch_itinerary(confirmation_number: str):
    """Fetch a booked itinerary information from database"""
    return itinery_database.get(confirmation_number)


@airline_agent.tool_plain
def pick_flight(flights: list[Flight]):
    """Pick up the best flight that matches users' request."""
    sorted_flights = sorted(
        flights,
        key=lambda x: (
            x.get("duration") if isinstance(x, dict) else x.duration,
            x.get("price") if isinstance(x, dict) else x.price,
        ),
    )
    return sorted_flights[0]


def _generate_id(length=8):
    chars = string.ascii_lowercase + string.digits
    return "".join(random.choices(chars, k=length))


@airline_agent.tool_plain
def book_itinerary(flight: Flight, user_profile: UserProfile):
    """Book a flight on behalf of the user."""
    confirmation_number = _generate_id()
    while confirmation_number in itinery_database:
        confirmation_number = _generate_id()
    itinery_database[confirmation_number] = Itinerary(
        confirmation_number=confirmation_number,
        user_profile=user_profile,
        flight=flight,
    )
    return confirmation_number, itinery_database[confirmation_number]


@airline_agent.tool_plain
def cancel_itinerary(confirmation_number: str, user_profile: UserProfile):
    """Cancel an itinerary on behalf of the user."""
    if confirmation_number in itinery_database:
        del itinery_database[confirmation_number]
        return
    raise ValueError("Cannot find the itinerary, please check your confirmation number.")


@airline_agent.tool_plain
def get_user_info(name: str):
    """Fetch the user profile from database with given name."""
    return user_database.get(name)


@airline_agent.tool_plain
def file_ticket(user_request: str, user_profile: UserProfile):
    """File a customer support ticket if this is something the agent cannot handle."""
    ticket_id = _generate_id(length=6)
    ticket_database[ticket_id] = Ticket(
        user_request=user_request,
        user_profile=user_profile,
    )
    return ticket_id


In [None]:
output = await airline_agent.run("please help me book a flight from SFO to JFK on 09/01/2025, my name is Adam")

In [None]:
output

AgentRunResult(output='Your flight from San Francisco (SFO) to New York City (JFK) has been successfully booked! Here are the details:\n\n- **Flight ID:** DA123\n- **Departure Date and Time:** September 1, 2025, at 1:00 AM\n- **Duration:** 3 hours\n- **Price:** $200\n\nYour **confirmation number** is **x7z20qsz**.\n\nIf you have any other requests or need further assistance, feel free to ask!')

In [None]:
itinery_database = {}

In [None]:
async with airline_agent.iter("please help me book a flight from SFO to JFK on 09/01/2025, my name is Adam") as agent_run:
    async for node in agent_run:
        print(f"GEEZ NODE: {node}")


00:31:08.515 airline_agent run
GEEZ NODE: UserPromptNode(user_prompt='please help me book a flight from SFO to JFK on 09/01/2025, my name is Adam', instructions=None, instructions_functions=[], system_prompts=("\n    You are an AI customer service agent for Awesome airline, an airline company that runs flights across the globe. Your\njob is to help users book flights and manage iternerary, including canceling and modifying. When the user request cannot\nbe resolved, make sure you raise a custom support ticket. For the message you return to the user, please include the\nconfirmation number if a flight is booked, an a custom support ticket number if a ticket is raised. On other scenarios,\nplease also make sure that all information users need is included.\n\n    Your core principles for interacting with users are:\n\n*   **Customer-centricity:** Every interaction should be focused on meeting the customer's needs and resolving their issues.\n*   **Accuracy:** Ensure all information provid

Basically pydantic AI puts tool call and LM call into the term called "Node", and by using the async generator, we are iterating through the node processing. The cool thing is it gives flexiblity, while the uncool thing is it's not super clear how I should play with these nodes at the first place.

Streaming is very powerful, with streaming support on structured output fields, while I am not sure why I need that. Still impressive though.

The concept is a bit overwhelming to be honest, like the deps, context and different ways of configuring tools, and all those attributes available in a node. All these are like specific terms to Pydantic AI, so they are sort of "made up". With that, it's pretty much means I need to learn a lot about pydantic AI in order to get things up and running. This is similar to DSPy where we keep things flexible for the user, but PydanticAI has way more things to learn than DSPy.

The logging experience is pretty much DIY. Tracing experience is decent, which is powered by something called logfire.