<a href="https://colab.research.google.com/github/NewCodeLearner/LLM-Applications/blob/main/Create_AI_Agent_CRUD_Application_with_PydanticAI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Chapter 1: PydanticAI

Before you get started - check for the python version that you are on. PydanticAI requires python time 3.9+

python --version


In [1]:
!python --version

Python 3.11.11


Install PydanticAI

In [2]:
!pip install pydantic-ai

Collecting pydantic-ai
  Downloading pydantic_ai-0.0.19-py3-none-any.whl.metadata (11 kB)
Collecting pydantic-ai-slim==0.0.19 (from pydantic-ai-slim[anthropic,graph,groq,mistral,openai,vertexai]==0.0.19->pydantic-ai)
  Downloading pydantic_ai_slim-0.0.19-py3-none-any.whl.metadata (2.8 kB)
Collecting griffe>=1.3.2 (from pydantic-ai-slim==0.0.19->pydantic-ai-slim[anthropic,graph,groq,mistral,openai,vertexai]==0.0.19->pydantic-ai)
  Downloading griffe-1.5.5-py3-none-any.whl.metadata (5.0 kB)
Collecting logfire-api>=1.2.0 (from pydantic-ai-slim==0.0.19->pydantic-ai-slim[anthropic,graph,groq,mistral,openai,vertexai]==0.0.19->pydantic-ai)
  Downloading logfire_api-3.2.0-py3-none-any.whl.metadata (971 bytes)
Collecting anthropic>=0.40.0 (from pydantic-ai-slim[anthropic,graph,groq,mistral,openai,vertexai]==0.0.19->pydantic-ai)
  Downloading anthropic-0.44.0-py3-none-any.whl.metadata (23 kB)
Collecting pydantic-graph==0.0.19 (from pydantic-ai-slim[anthropic,graph,groq,mistral,openai,vertexai]==

Pydantic AI Basics

Agent class is the main class within PydanticAI, you can read more about it here - https://ai.pydantic.dev/api/agent/

Available Models at time of print:

KnownModelName = Literal[
    "openai:gpt-4o",
    "openai:gpt-4o-mini",
    "openai:gpt-4-turbo",
    "openai:gpt-4",
    "openai:o1-preview",
    "openai:o1-mini",
    "openai:o1",
    "openai:gpt-3.5-turbo",
    "groq:llama-3.3-70b-versatile",
    "groq:llama-3.1-70b-versatile",
    "groq:llama3-groq-70b-8192-tool-use-preview",
    "groq:llama3-groq-8b-8192-tool-use-preview",
    "groq:llama-3.1-70b-specdec",
    "groq:llama-3.1-8b-instant",
    "groq:llama-3.2-1b-preview",
    "groq:llama-3.2-3b-preview",
    "groq:llama-3.2-11b-vision-preview",
    "groq:llama-3.2-90b-vision-preview",
    "groq:llama3-70b-8192",
    "groq:llama3-8b-8192",
    "groq:mixtral-8x7b-32768",
    "groq:gemma2-9b-it",
    "groq:gemma-7b-it",
    "gemini-1.5-flash",
    "gemini-1.5-pro",
    "gemini-2.0-flash-exp",
    "vertexai:gemini-1.5-flash",
    "vertexai:gemini-1.5-pro",
    "mistral:mistral-small-latest",
    "mistral:mistral-large-latest",
    "mistral:codestral-latest",
    "mistral:mistral-moderation-latest",
    "ollama:codellama",
    "ollama:gemma",
    "ollama:gemma2",
    "ollama:llama3",
    "ollama:llama3.1",
    "ollama:llama3.2",
    "ollama:llama3.2-vision",
    "ollama:llama3.3",
    "ollama:mistral",
    "ollama:mistral-nemo",
    "ollama:mixtral",
    "ollama:phi3",
    "ollama:qwq",
    "ollama:qwen",
    "ollama:qwen2",
    "ollama:qwen2.5",
    "ollama:starcoder2",
    "claude-3-5-haiku-latest",
    "claude-3-5-sonnet-latest",
    "claude-3-opus-latest",
    "test",
]

We will use Groq models, so start by using the API key from google colab secret or set as an environment variable:

In [3]:
!pip install groq



Import API Key from Google Colab

In [4]:
import os
from google.colab import userdata

GROQ_API_KEY=userdata.get('GROQ_API_KEY')

In [5]:
from pydantic_ai import Agent
from pydantic_ai.models.groq  import GroqModel

model = GroqModel('llama-3.1-70b-versatile',api_key=GROQ_API_KEY)
agent = Agent(model)


In [6]:
import asyncio
async def main():
  result = await agent.run("What is Bitcoin")
  print(result.data)

await main()

Bitcoin is a decentralized digital currency that allows for peer-to-peer transactions without the need for a central authority or intermediary. It was created in 2009 by an individual or group of individuals using the pseudonym Satoshi Nakamoto.

Here's how it works:

**Key Characteristics:**

1. **Decentralized:** Bitcoin is not controlled by any government, institution, or organization. It operates on a decentralized network of computers around the world.
2. **Digital:** Bitcoin is a digital currency, meaning it exists only in electronic form.
3. **Cryptocurrency:** Bitcoin uses advanced cryptography to secure and verify transactions.
4. **Limited supply:** There will only ever be 21 million Bitcoins in existence.
5. **Open-source:** Bitcoin's underlying code is open-source, allowing anyone to review, modify, and distribute it.

**How Bitcoin Works:**

1. **Mining:** New Bitcoins are created through a process called "mining," which involves solving complex mathematical problems to va

# PydanticAI with Tools

The reason why pydanticAI is so powerful is --- structured data. You can utilise Pydantic to create structured outputs that you can use with tools. We will build an example "Note Manager". The note manager is an assistant that allows you to create, list and retrieve notes. This is a perfect example to demostrate how powerful PydanticAI is. For this we will use a sqllite database, here is the code for the database:

You will need to install the following:

In [7]:
!pip install aiosqlite

Collecting aiosqlite
  Downloading aiosqlite-0.20.0-py3-none-any.whl.metadata (4.3 kB)
Downloading aiosqlite-0.20.0-py3-none-any.whl (15 kB)
Installing collected packages: aiosqlite
Successfully installed aiosqlite-0.20.0


We import aiosqlite and ensure we're using the asynchronous connection.

The async with aiosqlite.connect('example.db') as conn statement is used to create an asynchronous connection.

We use this connection object correctly in the Deps class.

In [8]:
import sqlite3 # Connect to the new SQLite database
conn = sqlite3.connect('new_database.db')
cursor = conn.cursor()
# Example query to test the connection
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print(cursor.fetchall()) # Close the connection conn.close()

[]


In a normal app.py script or any standalone Python script: You can use asyncio.run() to start and manage your asynchronous code. This is because there's no existing event loop running, and asyncio.run() can create a new event loop, run your async function, and then close the loop properly.

In Jupyter notebooks or other interactive environments: You should use nest_asyncio to allow nested event loops. This is because Jupyter notebooks already have an event loop running, and trying to use asyncio.run() within an existing event loop will cause a RuntimeError. Applying nest_asyncio allows you to work around this limitation and run asynchronous code within a Jupyter notebook seamlessly.

In [9]:
!pip install nest_asyncio



In [23]:
import sqlite3
import aiosqlite
import nest_asyncio
import asyncio

nest_asyncio.apply()

async def create_notes_table():
    """
    Creates a table named 'notes' with columns 'title' (up to 200 characters) and 'text' (unlimited length).

    """

    create_table_query = """
    CREATE TABLE IF NOT EXISTS notes (
        id SERIAL PRIMARY KEY,
        title VARCHAR(200) UNIQUE NOT NULL,
        text TEXT NOT NULL
    );
    """
    connection = None # Initialize connection variable
    cursor = None     # Initialize cursor variable
    try:

        # Connect to the database
        #connection = sqlite3.connect('new_database.db')
        #cursor = connection.cursor()
        async with aiosqlite.connect('new_database.db') as connection:
              async with connection.cursor() as cursor:
              # Execute the table creation query
                await cursor.execute(create_table_query)
                await connection.commit()

        # Execute the table creation query
        #cursor.execute(create_table_query)
        #connection.commit()

        print("Table 'notes' created successfully (if it didn't already exist).")

    except Exception as e:
        print(f"An error occurred: {e}")

    #Commenting finally block as aiosqlite already closes the connection
    #finally:
        # Close the connection
        #if connection:
        #    await cursor.close()
        #    await connection.close()

async def check_table_exists(table_name: str) -> bool:
    """
    Checks if a table exists in the PostgreSQL database.

    Args:
        table_name (str): The name of the table to check.

    Returns:
        bool: True if the table exists, False otherwise.
    """

    query = """
    SELECT EXISTS (
        SELECT 1
        FROM information_schema.tables
        WHERE table_schema = 'public'
        AND table_name = %s
    );
    """
    try:
        #Commenting sqllite to use aiosqlite
        #connection = sqlite3.connect('new_database.db')
        #cursor = connection.cursor()
        #cursor.execute(query, (table_name,))

        async with aiosqlite.connect('new_database.db') as connection:
              async with connection.cursor() as cursor:
              # Execute the table creation query
                await cursor.execute(query, (table_name,))
                await connection.commit()

                exists = await cursor.fetchone()[0]
                return exists

    except Exception as e:
        print(f"An error occurred: {e}")
        return False

    #Commenting finally block as aiosqlite already closes the connection
    #finally:
    #    if connection:
    #        await cursor.close()
    #        await connection.close()

"""
Asynchronous database connection class for interacting with the 'notes' table in your PostgreSQL database.

This class provides methods to add a note, retrieve a note by title,
and list all note titles.
"""


from typing import Optional, List


class DatabaseConn:
    def __init__(self):
        """
        Initializes the DatabaseConn with the Data Source Name (DSN).
        """
       # connection = sqlite3.connect()
        self.dsn = 'new_database.db'

    async def _connect(self):
        """
        Establishes an asynchronous connection to the PostgreSQL database.

        Returns:
            asyncpg.Connection: An active database connection.
        """
        #return await sqlite3.connect(self.dsn)
        return aiosqlite.connect(self.dsn)


    async def add_note(self, title: str, text: str) -> bool:
        """
        Adds a new note to the 'notes' table.

        Args:
            title (str): The title of the note.
            text (str): The content of the note.

        Returns:
            bool: True if the note was added successfully, False otherwise.
        """
        query = """
        INSERT INTO notes (title, text)
        VALUES ($1, $2)
        ON CONFLICT (title) DO NOTHING;
        """
        conn = await self._connect()
        try:
            result = await conn.execute(query, title, text)
            return result == "INSERT 0 1"  # Returns True if one row was inserted
        finally:
            await conn.close()

    async def get_note_by_title(self, title: str) -> Optional[dict]:
        """
        Retrieves a note's content by its title.

        Args:
            title (str): The title of the note to retrieve.

        Returns:
            Optional[dict]: A dictionary containing the note's title and text if found, None otherwise.
        """
        query = "SELECT title, text FROM notes WHERE title = $1;"
        conn = await self._connect()
        try:
            result = await conn.fetchrow(query, title)
            if result:
                return {"title": result["title"], "text": result["text"]}
            return None
        finally:
            await conn.close()

    async def list_all_titles(self) -> List[str]:
        """
        Lists all note titles in the 'notes' table.

        Returns:
            List[str]: A list of all note titles.
        """
        query = "SELECT title FROM notes ORDER BY title;"
        conn = await self._connect()
        try:
            results = await conn.fetch(query)
            return [row["title"] for row in results]
        finally:
       #     await conn.close()
          pass


In [24]:
# Run the async function to create the table
asyncio.run(create_notes_table())

Table 'notes' created successfully (if it didn't already exist).


Check that the DB connection is working before continuing to the next step:

In [25]:
from dataclasses import dataclass
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext, Tool
from typing import Optional, List
#from DatabaseConn import DatabaseConn

from pydantic_ai.models.groq  import GroqModel


# Dataclass for structured output from Intent Extraction Agent
@dataclass
class NoteIntent:
   action: str
   title: Optional[str] = None
   text: Optional[str] = None

# Dataclass for dependencies
@dataclass
class NoteDependencies:
   db: DatabaseConn

# Define the response structure
class NoteResponse(BaseModel):
   message: str
   note: Optional[dict] = None
   titles: Optional[List[str]] = None


# Intent Extraction Agent
intent_model = GroqModel('llama-3.1-70b-versatile',api_key=GROQ_API_KEY)
intent_agent = Agent(
   intent_model,
   result_type=NoteIntent,
   system_prompt=(
       "You are an intent extraction assistant. Your job is to analyze user inputs, "
       "determine the user's intent (e.g., create, retrieve, list), and extract relevant "
       "information such as title and text. Output the structured intent in the following format:\n\n"
       "{'action': '...', 'title': '...', 'text': '...'}\n\n"
       "Always use MD format for the text field.",
       "Add supporting information to help the user understand the note and produce well formatted notes.",
       "For example, if the user says 'Please note the following: Meeting tomorrow at 10 AM', "
       "the output should be:\n{'action': 'create', 'title': 'Meeting', 'text': '## Meeting Note \n\n Please note a meeting on the 4th of January at 10 AM.'}."
   )
)

# Action Handling Agent
action_model = GroqModel('llama-3.1-70b-versatile',api_key=GROQ_API_KEY)
action_agent = Agent(
   action_model,
   deps_type=NoteDependencies,
   result_type=NoteResponse,
   system_prompt=(
       "You are a note management assistant. Based on the user's intent (action, title, text), "
       "perform the appropriate action: create a note, retrieve a note, or list all notes."
   )
)

# Define tools for Action Handling Agent

In [26]:
# Define tools for Action Handling Agent
@action_agent.tool
async def create_note_tool(ctx: RunContext[NoteDependencies], title: str, text: str) -> NoteResponse:
   db = ctx.deps.db
   success = await db.add_note(title, text)
   return NoteResponse(message="CREATED:SUCCESS") if success else NoteResponse(message="CREATED:FAILED")

@action_agent.tool
async def retrieve_note_tool(ctx: RunContext[NoteDependencies], title: str) -> NoteResponse:
   db = ctx.deps.db
   note = await db.get_note_by_title(title)
   return NoteResponse(message="GET:SUCCESS", note=note) if note else NoteResponse(message="GET:FAILED")

@action_agent.tool
async def list_notes_tool(ctx: RunContext[NoteDependencies]) -> NoteResponse:
   db = ctx.deps.db
   titles = await db.list_all_titles()
   return NoteResponse(message="LIST:SUCCESS", titles=titles)

# Main function to handle user input with both agents

In [27]:
async def handle_user_query(user_input: str, deps: NoteDependencies) -> NoteResponse:
   intent = await intent_agent.run(user_input)
   print(intent.data)

   if intent.data.action == "create":
       query = f"Create a note with the title '{intent.data.title}' and the text '{intent.data.text}'."
       response = await action_agent.run(query, deps=deps)
       return response.data
   elif intent.data.action == "retrieve":
       query = f"Retrieve the note with the title '{intent.data.title}'."
       response = await action_agent.run(query, deps=deps)
       return response.data
   elif intent.data.action == "list":
       query = "List all notes."
       response = await action_agent.run(query, deps=deps)
       return response.data
   else:
       return NoteResponse(message="Invalid action. Please try again.")

In [28]:
async def ask(query: str):
   db_conn = DatabaseConn()
   note_deps = NoteDependencies(db=db_conn)
   response = await handle_user_query(query, note_deps)
   return response

In [29]:
query = "List all notes"
import asyncio
response = await ask(query)

print(dir(response))

#Access the 'text' attribute directly
print(response.message)
print(response.note)
print(response.titles)

NoteIntent(action='list', title='Notes', text='## List of Notes\nNo notes available.')


UnexpectedModelBehavior: Exceeded maximum retries (1) for result validation

# Streamlit App Interface

Final step is to build a user friendly interface to interact with our agent:

Install streamlit:

In [30]:
! pip install streamlit

Collecting streamlit
  Downloading streamlit-1.41.1-py2.py3-none-any.whl.metadata (8.5 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.41.1-py2.py3-none-any.whl (9.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.1/9.1 MB[0m [31m61.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m86.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[

The below code would create an app.py file in google colab temp directory. We can then use that file to launch our Steamlit app.

In [31]:
%%writefile app.py
# pip install streamlit
# streamlit run app.py

import asyncio
import streamlit as st
#from main import ask # The Ask Function is in the main.py file

# Set up Streamlit page
st.set_page_config(page_title="Note Management Agent", layout="centered")

st.title("Note Management System")
st.write("Ask the agent to create, retrieve, or list notes.")

# User input
user_input = st.text_area("Enter your query:", placeholder="e.g., Create a note titled 'Meeting' about tomorrow's meeting.")

if st.button("Submit"):
    if not user_input.strip():
        st.error("Please enter a valid query.")
    else:
        # Run the `ask` function asynchronously
        with st.spinner("Processing..."):
            try:
                response = asyncio.run(ask(user_input))

                # Handle the response based on the presence of None in `note` and `titles`
                if response.note is None and response.titles is None:
                    st.success(response.message)
                elif response.note is not None and response.titles is None:
                    note = response.note
                    st.success(response.message)
                    st.subheader(f"Note: {note.get('title', 'Untitled')}")
                    st.write(note.get('text', "No content available."))
                elif response.note is None and response.titles is not None:
                    titles = response.titles
                    st.success(response.message)
                    st.subheader("List of Notes:")
                    if titles:
                        for title in titles:
                            st.write(f"- {title}")
                    else:
                        st.info("No notes available.")
                else:
                    st.error("Unexpected response format. Please check your query.")
            except Exception as e:
                st.error(f"An error occurred: {e}")


Writing app.py


## Install localtunnel to serve the Streamlit app

In [32]:
!npm install localtunnel

[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K
added 22 packages in 6s
[1G[0K⠼[1G[0K
[1G[0K⠼[1G[0K3 packages are looking for funding
[1G[0K⠼[1G[0K  run `npm fund` for details
[1G[0K⠼[1G[0K

## Run the Streamlit app in the background

In [33]:
!streamlit run app.py &>/content/logs.txt &

In [34]:
import urllib
print("Password/Enpoint IP for localtunnel is:",urllib.request.urlopen('https://ipv4.icanhazip.com').read().decode('utf8').strip("\n"))

Password/Enpoint IP for localtunnel is: 34.23.136.26


This will give you the EnpointIP your Internet is running on. The copy that IP & paste it into the Friendly Reminder page.
Then you should be directed into your streamlit web app page.

This is a great way to test your web app before actual deployment. I hope that helps

## Expose the Streamlit app on port 8501

In [None]:
!npx localtunnel --port 8501

[1G[0K⠙[1G[0Kyour url is: https://silent-ghosts-train.loca.lt
