# Pydantic AI Integration via OpenTelemetry

Opik offers an [OpenTelemetry backend](https://www.comet.com/docs/opik/tracing/opentelemetry/overview) that ingests trace data from a variety of OpenTelemetry instrumentation libraries. In this guide, we demonstrate how to use the Pydantic Logfire instrumentation library to instrument your Pydantic AI agents.

> **About PydanticAI:** [PydanticAI](https://pydantic-ai.readthedocs.io/en/latest/) is a Python agent framework designed to simplify the development of production-grade generative AI applications. It brings the same type-safety, ergonomic API design, and developer experience found in FastAPI to the world of GenAI app development. 

## Step 1: Install Dependencies

Before you begin, install the necessary Python packages. The command below will install the `pydantic-ai` package along with Logfire support, which is required for trace ingestion via Opik:

In [None]:
%pip install pydantic-ai[logfire]

## Step 2: Configure Environment Variables

To forward trace data to Opik, you must set up the required environment variables. This includes providing your Opik API keys and the proper OpenTelemetry exporter endpoint. Additionally, you need to specify your OpenAI API key if you are using OpenAI for your generative tasks.

In [None]:
import logging

logging.basicConfig(level=logging.DEBUG)

In [None]:
import os
import getpass

OPIK_API_KEY = None
OPIK_PROJECT_NAME = "pydantic-ai-integration"
OPIK_WORKSPACE = "lothiraldan"

if OPIK_API_KEY is None and "OPIK_API_KEY" not in os.environ:
    OPIK_API_KEY = getpass.getpass("Enter your OPIK API key: ")
elif OPIK_API_KEY is None:
    OPIK_API_KEY = os.environ["OPIK_API_KEY"]

os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = (
    "https://www.comet.com/opik/api/v1/private/otel"  # Opik Cloud
)
# os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "https://<YOUR-OPIK-INSTANCE>/api/v1/private/otel" # Opik self-hosted

headers = [f"Authorization={OPIK_API_KEY}"]

if OPIK_PROJECT_NAME is not None:
    headers.append(f"projectName={OPIK_PROJECT_NAME}")

if OPIK_WORKSPACE is not None:
    headers.append(f"Comet-Workspace={OPIK_WORKSPACE}")

os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = ",".join(headers)


if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")

In [None]:
print(",".join(headers))

## Step 3: Initialize Instrumentation

Now, initialize Logfire’s instrumentation and define a sample Pydantic AI agent that makes use of dependency injection and tool registration. In this example, we create a "roulette" agent. The agent is configured to call a tool function (`roulette_wheel`), which checks if a given square is a winner. The agent is type-safe, ensuring that the dependency (`deps_type`) and the output (`result_type`) have defined types.

In [None]:
# import nest_asyncio
# nest_asyncio.apply()

In [None]:
import logfire

logfire_ = logfire.configure(
    service_name="my_logfire_service",
    # Sending to Logfire is on by default regardless of the OTEL env vars.
    send_to_logfire=False,
)

from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

tracer_provider = logfire_.config.get_tracer_provider()

processor = BatchSpanProcessor(ConsoleSpanExporter())
tracer_provider.add_span_processor(processor)

In [None]:
from pydantic_ai import Agent, RunContext

roulette_agent = Agent(
    "openai:gpt-4o",
    deps_type=int,
    result_type=bool,
    system_prompt=(
        "Use the `roulette_wheel` function to see if the "
        "customer has won based on the number they provide."
    ),
)


@roulette_agent.tool
async def roulette_wheel(ctx: RunContext[int], square: int) -> str:
    """check if the square is a winner"""
    return "winner" if square == ctx.deps else "loser"

## Step 4: Run the Agent

Finally, run your Pydantic AI agent and generate trace data that will be sent to Opik. In the example below, the agent is executed with a dependency value (the winning square) and natural language input. The output from the tool function is then printed.

In [None]:
# Run the agent


async def main():
    success_number = 18
    result = await roulette_agent.run(
        "Put my money on square eighteen", deps=success_number
    )
    print(result.data)


await main()

In [None]:
tracer_provider.force_flush()

In [None]:
from __future__ import annotations as _annotations

import os
from dataclasses import dataclass
from typing import Any

import logfire
from httpx import AsyncClient

from pydantic_ai import Agent, ModelRetry, RunContext


@dataclass
class Deps:
    client: AsyncClient
    weather_api_key: str | None
    geo_api_key: str | None


weather_agent = Agent(
    "openai:gpt-4o",
    # 'Be concise, reply with one sentence.' is enough for some models (like openai) to use
    # the below tools appropriately, but others like anthropic and gemini require a bit more direction.
    system_prompt=(
        "Be concise, reply with one sentence."
        "Use the `get_lat_lng` tool to get the latitude and longitude of the locations, "
        "then use the `get_weather` tool to get the weather."
    ),
    deps_type=Deps,
    retries=2,
    instrument=True,
)


@weather_agent.tool
async def get_lat_lng(
    ctx: RunContext[Deps], location_description: str
) -> dict[str, float]:
    """Get the latitude and longitude of a location.

    Args:
        ctx: The context.
        location_description: A description of a location.
    """
    if ctx.deps.geo_api_key is None:
        # if no API key is provided, return a dummy response (London)
        return {"lat": 51.1, "lng": -0.1}

    params = {
        "q": location_description,
        "api_key": ctx.deps.geo_api_key,
    }
    with logfire.span("calling geocode API", params=params) as span:
        r = await ctx.deps.client.get("https://geocode.maps.co/search", params=params)
        r.raise_for_status()
        data = r.json()
        span.set_attribute("response", data)

    if data:
        return {"lat": data[0]["lat"], "lng": data[0]["lon"]}
    else:
        raise ModelRetry("Could not find the location")


@weather_agent.tool
async def get_weather(ctx: RunContext[Deps], lat: float, lng: float) -> dict[str, Any]:
    """Get the weather at a location.

    Args:
        ctx: The context.
        lat: Latitude of the location.
        lng: Longitude of the location.
    """
    if ctx.deps.weather_api_key is None:
        # if no API key is provided, return a dummy response
        return {"temperature": "21 °C", "description": "Sunny"}

    params = {
        "apikey": ctx.deps.weather_api_key,
        "location": f"{lat},{lng}",
        "units": "metric",
    }
    with logfire.span("calling weather API", params=params) as span:
        r = await ctx.deps.client.get(
            "https://api.tomorrow.io/v4/weather/realtime", params=params
        )
        r.raise_for_status()
        data = r.json()
        span.set_attribute("response", data)

    values = data["data"]["values"]
    # https://docs.tomorrow.io/reference/data-layers-weather-codes
    code_lookup = {
        1000: "Clear, Sunny",
        1100: "Mostly Clear",
        1101: "Partly Cloudy",
        1102: "Mostly Cloudy",
        1001: "Cloudy",
        2000: "Fog",
        2100: "Light Fog",
        4000: "Drizzle",
        4001: "Rain",
        4200: "Light Rain",
        4201: "Heavy Rain",
        5000: "Snow",
        5001: "Flurries",
        5100: "Light Snow",
        5101: "Heavy Snow",
        6000: "Freezing Drizzle",
        6001: "Freezing Rain",
        6200: "Light Freezing Rain",
        6201: "Heavy Freezing Rain",
        7000: "Ice Pellets",
        7101: "Heavy Ice Pellets",
        7102: "Light Ice Pellets",
        8000: "Thunderstorm",
    }
    return {
        "temperature": f'{values["temperatureApparent"]:0.0f}°C',
        "description": code_lookup.get(values["weatherCode"], "Unknown"),
    }


async def main():
    async with AsyncClient() as client:
        # create a free API key at https://www.tomorrow.io/weather-api/
        weather_api_key = os.getenv("WEATHER_API_KEY")
        # create a free API key at https://geocode.maps.co/
        geo_api_key = os.getenv("GEO_API_KEY")
        deps = Deps(
            client=client, weather_api_key=weather_api_key, geo_api_key=geo_api_key
        )
        result = await weather_agent.run(
            "What is the weather like in Strasbourg?", deps=deps
        )
        print("Response:", result.data)

In [None]:
await main()

In [None]:
tracer_provider.force_flush()

## Step 5: Explore Traces in Opik

With the instrumentation in place, all trace data generated by the agent will be sent to Opik. You can view detailed trace logs—including operation timings, debugging information, and performance metrics—by accessing your Opik dashboard. For example, check out a [sample trace](https://cloud.langfuse.com/project/cloramnkj0002jz088vzn1ja4/traces/0194c8b3c1fbb67529f717d4009a310b?timestamp=2025-02-02T22%3A06%3A51.387Z) to see the flow of a Pydantic AI request.

[Example trace in Langfuse](https://cloud.langfuse.com/project/cloramnkj0002jz088vzn1ja4/traces/0194c8b3c1fbb67529f717d4009a310b?timestamp=2025-02-02T22%3A06%3A51.387Z)

![Pydantic AI OpenAI Trace](https://langfuse.com/images/cookbook/otel-integration-pydantic-ai/pydanticai-openai-trace-tree.png)

In [None]:
# result = roulette_agent.run_sync("I bet five is the winner", deps=success_number)
# print(result.data)