# Integrate Agenta with PydanticAI Logfire

This notebook demonstrates how to connect **Agenta** with **PydanticAI** and **Logfire** for comprehensive observability and debugging of your AI agent applications.

> **What is Agenta?** [Agenta](https://agenta.ai) is an open-source LLMOps platform designed to streamline the deployment, management, and scaling of large language models. It offers comprehensive observability, testing, and deployment capabilities for AI applications.

> **What is PydanticAI?** [PydanticAI](https://ai.pydantic.dev/) is a Python agent framework designed to make it less painful to build production-grade applications with generative AI. It provides type safety, structured outputs, and dependency injection for building robust AI agents.

> **What is Logfire?** [Logfire](https://logfire.pydantic.dev/) is an observability platform from Pydantic that provides structured logging and tracing capabilities, specifically designed to work seamlessly with PydanticAI applications.

## Implementation Guide

Follow this tutorial to set up PydanticAI with Logfire and Agenta's observability platform for real-time application insights.

### Step 1: Install Required Dependencies

Install the necessary Python packages for this integration:

In [None]:
!pip install pydantic-ai[examples] logfire agenta

**Package Descriptions:**
- `pydantic-ai[examples]`: The PydanticAI framework with example dependencies
- `logfire`: Pydantic's observability platform for structured logging and tracing
- `agenta`: Core SDK for Agenta's prompt engineering and observability platform

### Step 2: Setup and Configuration

Configure your environment and initialize both Agenta and Logfire:

In [None]:
import os
from dataclasses import dataclass
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
import logfire
import agenta as ag

# Load configuration from environment
os.environ["AGENTA_API_KEY"] = "your_agenta_api_key"
os.environ[
    "AGENTA_HOST"
] = "https://cloud.agenta.ai"  # Optional, defaults to the Agenta cloud API

# Initialize Agenta SDK
ag.init()

# Configure Logfire for local development
logfire.configure(
    service_name="my_logfire_service", send_to_logfire=False, scrubbing=False
)

**What does `ag.init()` do?**
This function initializes the Agenta SDK and sets up the necessary configuration for observability. It establishes connection to the Agenta platform, configures tracing and logging settings, and prepares the instrumentation context for your application.

**Logfire Configuration:**
The `logfire.configure()` function sets up Logfire's observability features. Setting `send_to_logfire=False` keeps traces local for development, while `scrubbing=False` preserves all data for debugging.

### Step 3: Enable PydanticAI Instrumentation

PydanticAI comes with built-in observability support that integrates with OpenTelemetry-compatible systems:

In [None]:
# Enable automatic instrumentation for async database operations
logfire.instrument_asyncpg()

### Step 4: Build Your Instrumented PydanticAI Application

Here's a complete example showcasing a banking support agent with Agenta instrumentation:

#### Create Mock Database

In [None]:
# Mock database for demonstration
class DatabaseConn:
    """Fake database for example purposes."""

    @classmethod
    async def customer_name(cls, *, id: int) -> str | None:
        if id == 123:
            return "John"
        return None

    @classmethod
    async def customer_balance(cls, *, id: int, include_pending: bool) -> float:
        if id == 123 and include_pending:
            return 123.45
        else:
            raise ValueError("Customer not found")

#### Define Dependencies and Output Models

In [None]:
# Dependencies for the support agent
@dataclass
class SupportDependencies:
    customer_id: int
    including_pending: bool
    db: DatabaseConn


# Structured output model
class SupportOutput(BaseModel):
    support_advice: str = Field(description="Advice returned to the customer")
    block_card: bool = Field(description="Whether to block their card or not")
    risk: int = Field(description="Risk level of query", ge=0, le=10)

#### Create the Support Agent

In [None]:
# Create the support agent with instrumentation
support_agent = Agent(
    "openai:gpt-4o",
    deps_type=SupportDependencies,
    output_type=SupportOutput,
    system_prompt=(
        "You are a support agent in our bank, give the "
        "customer support and judge the risk level of their query. "
        "Reply using the customer's name."
    ),
    instrument=True,  # Enable built-in PydanticAI instrumentation
)

#### Add Dynamic System Prompt and Tools

In [None]:
# Dynamic system prompt with customer context
@support_agent.system_prompt
async def add_customer_name(ctx: RunContext[SupportDependencies]) -> str:
    customer_name = await ctx.deps.db.customer_name(id=ctx.deps.customer_id)
    return f"The customer's name is {customer_name!r}"


# Agent tool for balance inquiries
@support_agent.tool
async def customer_balance(ctx: RunContext[SupportDependencies]) -> str:
    """Returns the customer's current account balance."""
    balance = await ctx.deps.db.customer_balance(
        id=ctx.deps.customer_id,
        include_pending=ctx.deps.including_pending,
    )
    return f"${balance:.2f}"

#### Create Agenta-Instrumented Functions

In [None]:
# Agenta-instrumented functions
@ag.instrument()
def bank_balance(customer_id: int, query: str, include_pending: bool = True):
    """Returns the customer's current account balance."""
    deps = SupportDependencies(
        customer_id=customer_id,
        including_pending=include_pending,
        db=DatabaseConn(),
    )
    result = support_agent.run_sync(query, deps=deps)
    return result


@ag.instrument()
def block_card(customer_id: int, query: str, include_pending: bool = True):
    """Blocks the customer's card if they report it lost."""
    deps = SupportDependencies(
        customer_id=customer_id,
        including_pending=include_pending,
        db=DatabaseConn(),
    )
    result = support_agent.run_sync(query, deps=deps)
    return result

#### Test the Banking Support Agent

In [None]:
# Example usage
# Agent 1: get user's account balance
result = bank_balance(123, "What is my balance?", True)
print("Balance Query Result:")
print(f"Advice: {result.output.support_advice}")
print(f"Risk Level: {result.output.risk}")
print(f"Block Card: {result.output.block_card}")

In [None]:
# Agent 2: block user's card if they report it lost
result = block_card(123, "I just lost my card!", True)
print("Card Block Result:")
print(f"Advice: {result.output.support_advice}")
print(f"Risk Level: {result.output.risk}")
print(f"Block Card: {result.output.block_card}")

### Step 5: Understanding the @ag.instrument() Decorator

The `@ag.instrument()` decorator automatically captures all input and output data from your function, enabling comprehensive observability without manual instrumentation.

**Span Type Configuration:**
Use the `spankind` parameter to categorize operations in Agenta WebUI. Available options:

- `agent` - Autonomous agent behaviors
- `chain` - Sequential processing workflows
- `workflow` - Complete application processes (default)
- `tool` - Utility and helper functions
- `embedding` - Vector embedding operations
- `query` - Search and retrieval tasks
- `completion` - Text generation operations
- `chat` - Conversational interfaces
- `rerank` - Result ordering operations

**Standard Behavior:**
By default, when `spankind` is not specified, the operation becomes a root-level span, categorized as a `workflow` in Agenta.

In [None]:
# Example with custom span classification:
@ag.instrument(spankind="agent")
def customer_service_agent(query: str):
    # Agent-specific logic implementation
    pass

### Step 6: View Traces in Agenta

After running your application, access detailed execution traces through Agenta's dashboard. The observability data includes:

- Complete agent workflow execution timeline
- PydanticAI agent initialization and configuration
- Tool function calls and dependency injection
- LLM interactions and structured output validation
- Database queries and external service calls
- Performance metrics and timing analysis

<img 
    style="display: block; margin: 20px; text-align: center"
    src="./images/agenta-pydanticai-logfire-trace.png"
    width="90%"
    alt="Agenta dashboard showing PydanticAI application trace with detailed execution steps">

The observability interface provides insights for:
- Debug complex agent interactions and tool usage
- Monitor dependency injection and context management
- Analyze LLM performance and structured output validation
- Track database queries and external API calls

## Advanced Usage

### Custom Span Configuration

Customize instrumentation for different application components:

In [None]:
@ag.instrument(spankind="workflow")
def customer_support_pipeline(customer_id: int, query: str):
    return bank_balance(customer_id, query)


@ag.instrument(spankind="tool")
def external_api_integration(data: dict):
    # External API call logic
    pass


@ag.instrument(spankind="agent")
def specialized_banking_agent(context: dict):
    # Specialized agent implementation
    return context

### Real-world Examples

#### Multi-Agent Customer Service System

In [None]:
class CustomerServiceDeps:
    def __init__(self, customer_id: int, db: DatabaseConn):
        self.customer_id = customer_id
        self.db = db


class ServiceOutput(BaseModel):
    response: str = Field(description="Customer service response")
    escalate: bool = Field(description="Whether to escalate to human agent")
    satisfaction_score: int = Field(description="Predicted satisfaction", ge=1, le=10)


# General inquiry agent
general_agent = Agent(
    "openai:gpt-4o",
    deps_type=CustomerServiceDeps,
    output_type=ServiceOutput,
    system_prompt="You are a helpful customer service agent for general inquiries.",
    instrument=True,
)

# Technical support agent
technical_agent = Agent(
    "openai:gpt-4o",
    deps_type=CustomerServiceDeps,
    output_type=ServiceOutput,
    system_prompt="You are a technical support specialist for banking applications.",
    instrument=True,
)


@ag.instrument(spankind="workflow")
def route_customer_query(customer_id: int, query: str, query_type: str = "general"):
    """Route customer queries to appropriate specialized agents."""
    deps = CustomerServiceDeps(customer_id=customer_id, db=DatabaseConn())

    if query_type == "technical":
        result = technical_agent.run_sync(query, deps=deps)
    else:
        result = general_agent.run_sync(query, deps=deps)

    return result

#### Test Multi-Agent System

In [None]:
# Test general inquiry
general_result = route_customer_query(123, "What are your bank hours?", "general")
print("General Inquiry Result:")
print(f"Response: {general_result.output.response}")
print(f"Escalate: {general_result.output.escalate}")
print(f"Satisfaction Score: {general_result.output.satisfaction_score}")

In [None]:
# Test technical inquiry
technical_result = route_customer_query(123, "My mobile app is crashing", "technical")
print("Technical Inquiry Result:")
print(f"Response: {technical_result.output.response}")
print(f"Escalate: {technical_result.output.escalate}")
print(f"Satisfaction Score: {technical_result.output.satisfaction_score}")

#### Fraud Detection Agent

In [None]:
class FraudDeps:
    def __init__(self, transaction_id: str, customer_id: int):
        self.transaction_id = transaction_id
        self.customer_id = customer_id


class FraudOutput(BaseModel):
    is_fraudulent: bool = Field(description="Whether transaction is fraudulent")
    confidence: float = Field(description="Confidence score", ge=0.0, le=1.0)
    reason: str = Field(description="Explanation for the decision")
    action: str = Field(description="Recommended action")


fraud_agent = Agent(
    "openai:gpt-4o",
    deps_type=FraudDeps,
    output_type=FraudOutput,
    system_prompt=(
        "You are a fraud detection specialist. Analyze transactions "
        "and determine if they are potentially fraudulent."
    ),
    instrument=True,
)


@fraud_agent.tool
async def get_transaction_history(ctx: RunContext[FraudDeps]) -> str:
    """Get recent transaction history for the customer."""
    # Mock transaction history
    return f"Recent transactions for customer {ctx.deps.customer_id}: $50.00, $25.00, $100.00"


@fraud_agent.tool
async def get_customer_profile(ctx: RunContext[FraudDeps]) -> str:
    """Get customer profile and spending patterns."""
    # Mock customer profile
    return f"Customer {ctx.deps.customer_id}: Average monthly spending $500, Location: New York"


@ag.instrument(spankind="agent")
def analyze_transaction(
    transaction_id: str, customer_id: int, amount: float, location: str
):
    """Analyze a transaction for potential fraud."""
    deps = FraudDeps(transaction_id=transaction_id, customer_id=customer_id)
    query = f"Analyze transaction {transaction_id}: ${amount} in {location}"

    result = fraud_agent.run_sync(query, deps=deps)
    return result

#### Test Fraud Detection

In [None]:
# Test fraud detection
fraud_result = analyze_transaction("txn_12345", 123, 5000.0, "Nigeria")
print("Fraud Detection Result:")
print(f"Is Fraudulent: {fraud_result.output.is_fraudulent}")
print(f"Confidence: {fraud_result.output.confidence}")
print(f"Reason: {fraud_result.output.reason}")
print(f"Action: {fraud_result.output.action}")

#### Investment Advisory Agent

In [None]:
class InvestmentDeps:
    def __init__(self, customer_id: int, portfolio_value: float):
        self.customer_id = customer_id
        self.portfolio_value = portfolio_value


class InvestmentOutput(BaseModel):
    recommendation: str = Field(description="Investment recommendation")
    risk_level: str = Field(description="Risk level assessment")
    expected_return: float = Field(description="Expected annual return percentage")
    timeframe: str = Field(description="Recommended investment timeframe")


investment_agent = Agent(
    "openai:gpt-4o",
    deps_type=InvestmentDeps,
    output_type=InvestmentOutput,
    system_prompt=(
        "You are an investment advisor. Provide personalized investment "
        "recommendations based on customer profile and market conditions."
    ),
    instrument=True,
)


@investment_agent.tool
async def get_market_data(ctx: RunContext[InvestmentDeps]) -> str:
    """Get current market conditions and trends."""
    return "Current market: S&P 500 up 2%, bonds stable, commodities mixed"


@investment_agent.tool
async def get_customer_risk_profile(ctx: RunContext[InvestmentDeps]) -> str:
    """Get customer's risk tolerance and investment goals."""
    return f"Customer {ctx.deps.customer_id}: Moderate risk tolerance, retirement savings goal"


@ag.instrument(spankind="workflow")
def get_investment_advice(customer_id: int, portfolio_value: float, query: str):
    """Provide investment advice based on customer profile and query."""
    deps = InvestmentDeps(customer_id=customer_id, portfolio_value=portfolio_value)
    result = investment_agent.run_sync(query, deps=deps)
    return result

#### Test Investment Advisory

In [None]:
# Test investment advice
investment_result = get_investment_advice(
    123, 50000.0, "I want to diversify my portfolio for retirement"
)
print("Investment Advisory Result:")
print(f"Recommendation: {investment_result.output.recommendation}")
print(f"Risk Level: {investment_result.output.risk_level}")
print(f"Expected Return: {investment_result.output.expected_return}%")
print(f"Timeframe: {investment_result.output.timeframe}")

## Next Steps

For more detailed information about Agenta's observability features and advanced configuration options, visit the [Agenta Observability SDK Documentation](/observability/observability-sdk).