#PydanticAI - Fundamentals
Made by: Wilfredo Aaron Sosa Ramos


Docs: https://ai.pydantic.dev/#hello-world-example

In [1]:
!pip install -q pydantic-ai

In [2]:
import os
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
os.environ["GEMINI_API_KEY"] = userdata.get('GOOGLE_API_KEY')

#1. First use of an Agent:

In [4]:
import nest_asyncio
nest_asyncio.apply()

In [5]:
from pydantic_ai import Agent

agent = Agent(
    'gemini-1.5-flash',
    system_prompt="""
    You are a highly skilled Senior Software Engineer with expertise in various programming languages and software development methodologies.
    You can provide comprehensive solutions to complex technical problems, design scalable and efficient software architectures, and mentor
    junior engineers. Your responses should be clear, concise, and actionable, demonstrating a deep understanding of software development principles.
    """,
)

result = agent.run_sync("""
Explain the BFS algorithm for graph traversal using Python.
""")
print(result.data)

The Breadth-First Search (BFS) algorithm systematically explores a graph level by level.  It starts at a given source node and visits all its neighbors before moving to their neighbors, ensuring that nodes closer to the source are visited first. This is achieved using a queue data structure.

Here's a Python implementation of BFS, along with explanations and considerations for different graph representations:

**1. Adjacency List Representation:**

This representation is generally preferred for BFS due to its efficiency.  It stores the graph as a dictionary where keys are nodes, and values are lists of their neighbors.

```python
from collections import deque

def bfs_adjacency_list(graph, source):
    """
    Performs Breadth-First Search on a graph represented as an adjacency list.

    Args:
        graph: A dictionary representing the graph (adjacency list).
        source: The starting node for the search.

    Returns:
        A list of nodes visited in BFS order.  Returns an emp

In [6]:
from IPython.display import display, Markdown
display(Markdown(result.data))

The Breadth-First Search (BFS) algorithm systematically explores a graph level by level.  It starts at a given source node and visits all its neighbors before moving to their neighbors, ensuring that nodes closer to the source are visited first. This is achieved using a queue data structure.

Here's a Python implementation of BFS, along with explanations and considerations for different graph representations:

**1. Adjacency List Representation:**

This representation is generally preferred for BFS due to its efficiency.  It stores the graph as a dictionary where keys are nodes, and values are lists of their neighbors.

```python
from collections import deque

def bfs_adjacency_list(graph, source):
    """
    Performs Breadth-First Search on a graph represented as an adjacency list.

    Args:
        graph: A dictionary representing the graph (adjacency list).
        source: The starting node for the search.

    Returns:
        A list of nodes visited in BFS order.  Returns an empty list if the source is not in the graph.  
        Raises a TypeError if the input graph is not a dictionary.
    """

    if not isinstance(graph, dict):
        raise TypeError("Graph must be represented as a dictionary (adjacency list).")

    if source not in graph:
        return []

    visited = set()
    queue = deque([source])
    visited_order = []

    while queue:
        vertex = queue.popleft()
        if vertex not in visited:
            visited.add(vertex)
            visited_order.append(vertex)
            for neighbor in graph[vertex]:
                if neighbor not in visited:
                    queue.append(neighbor)

    return visited_order


# Example usage:
graph = {
    'A': ['B', 'C'],
    'B': ['A', 'D', 'E'],
    'C': ['A', 'F'],
    'D': ['B'],
    'E': ['B', 'F'],
    'F': ['C', 'E']
}

print(f"BFS traversal starting from 'A': {bfs_adjacency_list(graph, 'A')}") # Output: ['A', 'B', 'C', 'D', 'E', 'F']
print(f"BFS traversal starting from 'E': {bfs_adjacency_list(graph, 'E')}") # Output: ['E', 'B', 'F', 'A', 'D', 'C']
print(f"BFS traversal starting from 'Z': {bfs_adjacency_list(graph, 'Z')}") # Output: []
#print(bfs_adjacency_list( [1,2,3], 'A')) # Raises TypeError

```


**2. Adjacency Matrix Representation:**

While less efficient for BFS than an adjacency list, an adjacency matrix can be used.  It's a 2D array where `matrix[i][j] == 1` if there's an edge from node `i` to node `j`, and 0 otherwise.

```python
def bfs_adjacency_matrix(matrix, source):
    """
    Performs Breadth-First Search on a graph represented as an adjacency matrix.  Assumes nodes are numbered 0,1,2...

    Args:
      matrix: A 2D list representing the adjacency matrix.
      source: The starting node (index).

    Returns:
      A list of nodes visited in BFS order. Returns an empty list if the source is invalid.
    """
    num_nodes = len(matrix)
    if not (0 <= source < num_nodes):
        return []

    visited = [False] * num_nodes
    queue = deque([source])
    visited_order = []

    while queue:
        vertex = queue.popleft()
        if not visited[vertex]:
            visited[vertex] = True
            visited_order.append(vertex)
            for neighbor in range(num_nodes):
                if matrix[vertex][neighbor] == 1 and not visited[neighbor]:
                    queue.append(neighbor)

    return visited_order

#Example usage (Note that nodes are implicitly numbered 0, 1, 2...)
matrix = [
    [0, 1, 1, 0, 0, 0],
    [1, 0, 0, 1, 1, 0],
    [1, 0, 0, 0, 0, 1],
    [0, 1, 0, 0, 0, 0],
    [0, 1, 0, 0, 0, 1],
    [0, 0, 1, 0, 1, 0]
]
print(f"BFS traversal (matrix) starting from 0: {bfs_adjacency_matrix(matrix, 0)}") #Output: [0, 1, 2, 3, 4, 5]

```

**Important Considerations:**

* **Handling Disconnected Graphs:** The above implementations only explore the connected component containing the source node.  To visit all nodes in a disconnected graph, you'd need to iterate through all nodes and perform BFS starting from any unvisited node.
* **Weighted Graphs:**  The basic BFS algorithm doesn't consider edge weights.  For weighted graphs, Dijkstra's algorithm is more appropriate for finding shortest paths.
* **Cycles:** BFS handles cycles gracefully; it will simply visit a node only once.
* **Error Handling:**  Robust code should include checks for invalid input (e.g.,  `source` not in `graph`, incorrect graph representation).


The adjacency list representation is generally more efficient for sparse graphs (graphs with relatively few edges), while the adjacency matrix is better suited for dense graphs (graphs with many edges).  For most practical BFS applications, the adjacency list is recommended.


#2. Tools & Dependency Injection Example:

In [9]:
from dataclasses import dataclass

from pydantic import BaseModel, Field

from pydantic_ai import Agent, RunContext

In [10]:
class DatabaseConn:
    """This is a fake database for example purposes.

    In reality, you'd be connecting to an external database
    (e.g. PostgreSQL) to get information about customers.
    """

    @classmethod
    async def customer_name(cls, *, id: int) -> str | None:
        if id == 123:
            return 'Aaron'

    @classmethod
    async def customer_balance(cls, *, id: int, include_pending: bool) -> float:
        if id == 123:
            return 123.45
        else:
            raise ValueError('Customer not found')

In [11]:
@dataclass
class SupportDependencies:
    customer_id: int
    db: DatabaseConn


class SupportResult(BaseModel):
    support_advice: str = Field(description='Advice returned to the customer')
    block_card: bool = Field(description='Whether to block their')
    risk: int = Field(description='Risk level of query', ge=0, le=10)

In [12]:
support_agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=SupportDependencies,
    result_type=SupportResult,
    system_prompt=(
        'You are a support agent in our bank, give the '
        'customer support and judge the risk level of their query. '
        "Reply using the customer's name."
    ),
)

In [16]:
support_agent.system_prompt

In [13]:
@support_agent.system_prompt
async def add_customer_name(ctx: RunContext[SupportDependencies]) -> str:
    customer_name = await ctx.deps.db.customer_name(id=ctx.deps.customer_id)
    return f"The customer's name is {customer_name!r}"

In [14]:
@support_agent.tool
async def customer_balance(
    ctx: RunContext[SupportDependencies], include_pending: bool
) -> str:
    """Returns the customer's current account balance."""
    balance = await ctx.deps.db.customer_balance(
        id=ctx.deps.customer_id,
        include_pending=include_pending,
    )
    return f'${balance:.2f}'

In [17]:
if __name__ == '__main__':
    deps = SupportDependencies(customer_id=123, db=DatabaseConn())
    result = support_agent.run_sync('What is my balance?', deps=deps)
    print(result.data)

support_advice='Your current balance is $123.45, including any pending transactions.' block_card=False risk=1


In [18]:
result = support_agent.run_sync('What is my name?', deps=deps)
print(result.data)

support_advice='Your name is Aaron.' block_card=False risk=0


In [19]:
result = support_agent.run_sync('I just lost my card!', deps=deps)
print(result.data)

support_advice="It's important to block your card immediately to prevent unauthorized transactions. Please confirm if you'd like to proceed with blocking your card." block_card=True risk=8
