# Pydantic AI + DuckDB Chat Demo

This notebook demonstrates how to use the `ChatApp` class to interact with a chat agent and store messages in DuckDB.

In [1]:
%pip -q install nest_asyncio
import nest_asyncio
nest_asyncio.apply()

/mnt/c/Users/ms101/Desktop/dev/llm_eval/.venv/bin/python: No module named pip
Note: you may need to restart the kernel to use updated packages.


In [2]:
from datetime import datetime, timezone
import json
import duckdb
from typing import Iterable, Tuple, Optional
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.providers.google import GoogleProvider
import os
from dotenv import load_dotenv, find_dotenv

load_dotenv(
    "./src/.env"
)

DB_PATH = "duckdb_rag.duckdb"


# --- agent (sync usage via .run_sync) ---
agent = Agent(
    model=GoogleModel(
        model_name="gemini-2.0-flash",
        provider=GoogleProvider(api_key=os.environ["GOOGLE_API_KEY"])
    )
)

# --- DuckDB store (returns dicts; robust to old string rows) ---
class ChatDatabase:
    def __init__(self, db_path: str):
        self.con = duckdb.connect(db_path)
        self._init_schema()

    def _init_schema(self):
        # schemas
        self.con.execute("CREATE SCHEMA IF NOT EXISTS chat;")
        self.con.execute("CREATE SCHEMA IF NOT EXISTS corpus;")

        # chat table
        self.con.execute("CREATE SEQUENCE IF NOT EXISTS chat.messages_seq START 1;")
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS chat.messages (
                id BIGINT DEFAULT nextval('chat.messages_seq'),
                message_list JSON
            );
        """)
        try:
            self.con.execute("ALTER TABLE chat.messages ADD CONSTRAINT messages_pk PRIMARY KEY (id);")
        except Exception:
            pass

        # docs table
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS corpus.docs (
                doc_id TEXT PRIMARY KEY,
                url TEXT,
                title TEXT,
                content TEXT,
                source_type TEXT,
                fetched_at TIMESTAMP
            );
        """)

    # util: resolve table name if user loaded docs without schema
    def _docs_table_name(self) -> str:
        t = self.con.execute("""
            SELECT CASE 
                     WHEN EXISTS (SELECT 1 FROM information_schema.tables 
                                  WHERE table_schema='corpus' AND table_name='docs') THEN 'corpus.docs'
                     WHEN EXISTS (SELECT 1 FROM information_schema.tables 
                                  WHERE table_name='docs') THEN 'docs'
                     ELSE NULL
                   END
        """).fetchone()[0]
        if not t:
            raise RuntimeError("No docs table found. Run your loader first.")
        return t

    def add_message(self, msg: dict):
        self.con.execute(
            "INSERT INTO chat.messages (message_list) VALUES (json(?));",
            (json.dumps(msg),)
        )

    def get_messages(self, limit: Optional[int] = None) -> list[dict]:
        sql = "SELECT message_list FROM chat.messages ORDER BY id"
        if limit:
            sql += f" LIMIT {int(limit)}"
        rows = self.con.execute(sql).fetchall()
        out = []
        for (raw,) in rows:
            if isinstance(raw, dict):
                out.append(raw)
            else:
                try:
                    out.append(json.loads(raw))
                except Exception:
                    out.append({"role": "user", "content": str(raw)})
        return out

    def read_docs(self) -> list[tuple[str, str]]:
        tbl = self._docs_table_name()
        return self.con.execute(f"SELECT doc_id, content FROM {tbl}").fetchall()

    def read_docs_with_meta(self) -> list[tuple[str, str, str]]:
        tbl = self._docs_table_name()
        return self.con.execute(f"SELECT doc_id, title, url FROM {tbl}").fetchall()


class TfidfRetriever:
    def __init__(self, docs: Iterable[Tuple[str, str]]):
        pairs = list(docs)
        self.ids = [d for d, _ in pairs]
        texts = [t for _, t in pairs]
        self.v = TfidfVectorizer(stop_words="english")
        self.M = self.v.fit_transform(texts) if texts else None

    def topk(self, query: str, k: int = 5):
        if not self.ids or self.M is None:
            return []
        qv = self.v.transform([query])
        sims = cosine_similarity(qv, self.M).ravel()
        idx = sims.argsort()[::-1][:k]
        return [(self.ids[i], float(sims[i])) for i in idx]

class RAGApp:
    def __init__(
        self,
        db_path: str,
        retriever: TfidfRetriever | None = None,
        corpus: dict[str, str] | None = None
    ):
        """
        Use either:
          - RAGApp(db_path, retriever, corpus)  # old style
          - RAGApp(db_path); rag.build_retriever_from_db()  # new style
        """
        self.db = ChatDatabase(db_path)
        self.retriever = retriever
        self.corpus = corpus or {}
        self.id2meta: dict[str, dict] = {}

    def build_retriever_from_db(self) -> int:
        docs = self.db.read_docs()  # [(doc_id, content)]
        self.retriever = TfidfRetriever(docs)
        # build corpus + meta for citations
        self.corpus = {doc_id: content for doc_id, content in docs}
        self.id2meta = {d: {"title": t, "url": u} for d, t, u in self.db.read_docs_with_meta()}
        return len(docs)

    def ask(self, prompt: str, k: int = 5) -> str:
        if self.retriever is None:
            raise RuntimeError(
                "Retriever not initialized. Pass retriever+corpus to __init__ "
                "or call build_retriever_from_db() first."
            )

        # log user
        self.db.add_message({"role": "user",
                             "timestamp": datetime.now(timezone.utc).isoformat(),
                             "content": prompt})

        # retrieve
        hits = self.retriever.topk(prompt, k=k)
        context_blocks, cites = [], []
        for doc_id, score in hits:
            meta = self.id2meta.get(doc_id, {})
            title = meta.get("title", doc_id)
            url = meta.get("url", "")
            text = self.corpus.get(doc_id)
            if text is None:
                # safety: fetch from DB if not in memory
                text = self.db.con.execute(
                    f"SELECT content FROM {self.db._docs_table_name()} WHERE doc_id = ?",
                    [doc_id]
                ).fetchone()[0]
                self.corpus[doc_id] = text
            context_blocks.append(f"# {title}\n\n{text}")
            cites.append(f"- {title} ({url}) [score={score:.3f}]")
        context = "\n\n---\n\n".join(context_blocks)

        # short transcript
        hist = self.db.get_messages()
        transcript = "\n".join(
            f"{str(m.get('role','user')).upper()}: {m.get('content','')}" for m in hist[-10:]
        )

        # model call (sync)
        prompt_full = (
            "You are a helpful assistant. Use the CONTEXT to answer. "
            "If the answer is not in the context, say you don't know.\n\n"
            f"CHAT HISTORY (last 10):\n{transcript}\n\n"
            f"CONTEXT:\n{context}\n\n"
            f"QUESTION: {prompt}\n"
            "ANSWER:"
        )
        result = agent.run_sync(prompt_full)
        text = getattr(result, "output", str(result))

        if cites:
            text = f"{text}\n\nSources:\n" + "\n".join(cites)

        # log model
        self.db.add_message({"role": "model",
                             "timestamp": datetime.now(timezone.utc).isoformat(),
                             "content": text})
        return text

In [18]:
## duckdb documentation retrieval
import os, re, time, json, hashlib
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse, urlunparse
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify as html_to_md
import duckdb

DUCKDB_SITEMAP = "https://duckdb.org/sitemap.html"
USER_AGENT = "DuckDB-RAG-Tutorial/1.0"

def make_session():
    s = requests.Session()
    s.headers.update({
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    })
    return s

def normalize_url(u: str) -> str:
    # strip fragments/query to avoid duplicates
    p = urlparse(u)
    p = p._replace(fragment="", query="")
    return urlunparse(p)

def to_md_url(u: str) -> str:
    # Add ".md" at the end of the path (do not duplicate)
    p = urlparse(u)
    if p.path.endswith(".md"):
        return u
    return urlunparse(p._replace(path=p.path.rstrip("/") + ".md"))

def page_title_from_html(html: str) -> str:
    soup = BeautifulSoup(html, "html.parser")
    if soup.title and soup.title.string:
        return soup.title.string.strip()
    h1 = soup.find("h1")
    return h1.get_text(strip=True) if h1 else ""

def fetch_markdown_or_convert(session: requests.Session, url: str) -> tuple[str, str, str]:
    """
    Returns (content_md, title, source_type) where source_type is 'md' or 'html->md'.
    Will try URL+'.md' first; on 404, falls back to HTML+convert.
    """
    md_url = to_md_url(url)
    r = session.get(md_url, timeout=30)
    if r.ok and r.text.strip():
        title = r.text.splitlines()[0].strip("# ").strip() if r.text else ""
        return r.text, title, "md"

    # Fallback: HTML -> Markdown
    r = session.get(url, timeout=30)
    r.raise_for_status()
    title = page_title_from_html(r.text) or url
    md = html_to_md(r.text)
    return md, title, "html->md"

def hash_id(s: str) -> str:
    # stable id for a URL (to avoid duplicates on reruns)
    return hashlib.sha1(s.encode("utf-8")).hexdigest()[:16]

def ensure_docs_table(con: duckdb.DuckDBPyConnection):
    con.execute("""
    CREATE SCHEMA IF NOT EXISTS corpus;
    CREATE TABLE IF NOT EXISTS corpus.docs (
        doc_id TEXT PRIMARY KEY,
        url TEXT,
        title TEXT,
        content TEXT,
        source_type TEXT,
        fetched_at TIMESTAMP
    );
    """)

def upsert_doc(con: duckdb.DuckDBPyConnection, doc):
    con.execute("""
        INSERT OR REPLACE INTO corpus.docs (doc_id, url, title, content, source_type, fetched_at)
        VALUES (?, ?, ?, ?, ?, ?);
    """, (doc["doc_id"], doc["url"], doc["title"], doc["content"],
          doc["source_type"], doc["fetched_at"]))

def load_duckdb_docs(db_path: str, sitemap_url: str = DUCKDB_SITEMAP, limit: int | None = None) -> int:
    """
    Crawl the DuckDB sitemap and load docs into DuckDB.
    Returns number of docs stored/updated.
    """
    session = make_session()
    con = duckdb.connect(db_path)
    ensure_docs_table(con)

    # Fetch sitemap
    resp = session.get(sitemap_url, timeout=30)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")

    # collect links that point to duckdb.org (dedupe)
    links = []
    seen = set()
    base = sitemap_url
    for a in soup.find_all("a", href=True):
        href = urljoin(base, a["href"])
        href = normalize_url(href)
        if "duckdb.org" not in href:
            continue
        if href.endswith("/sitemap.html"):
            continue
        if href in seen:
            continue
        seen.add(href)
        links.append(href)

    # Optionally limit for testing
    if limit:
        links = links[:limit]

    stored = 0
    for i, url in enumerate(links, 1):
        try:
            content_md, title, source_type = fetch_markdown_or_convert(session, url)
            doc = {
                "doc_id": hash_id(url),
                "url": url,
                "title": title or url,
                "content": content_md,
                "source_type": source_type,
                "fetched_at": datetime.now(timezone.utc),
            }
            upsert_doc(con, doc)
            stored += 1
        except requests.HTTPError as e:
            # Skip non-200 pages
            print(f"[{i}/{len(links)}] HTTP error {e.response.status_code} for {url} — skipping.")
        except Exception as e:
            # Log and continue
            print(f"[{i}/{len(links)}] Error for {url}: {e}")
    con.close()
    return stored


In [4]:
db_path = "duckdb_rag.duckdb"
# loader
count = load_duckdb_docs(db_path)  # make sure it writes to corpus.docs

BinderException: Binder Error: Failure while replaying WAL file "/mnt/c/Users/ms101/Desktop/dev/llm_eval/pydantic_ai_duckdb_chat/duckdb_rag.duckdb.wal": Catalog "duckdb_rag" does not exist!

In [5]:
import duckdb
import pandas as pd


con = duckdb.connect(DB_PATH)

# Find the docs table (with or without schema)
tbl = con.execute("""
    SELECT CASE 
             WHEN EXISTS (SELECT 1 FROM information_schema.tables 
                          WHERE table_schema='corpus' AND table_name='docs') 
               THEN 'corpus.docs'
             WHEN EXISTS (SELECT 1 FROM information_schema.tables 
                          WHERE table_name='docs') 
               THEN 'docs'
             ELSE NULL
           END AS full_name
""").fetchone()[0]

assert tbl, "No docs table found. Did you run load_duckdb_docs(...)?"

# How many docs?
count = con.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0]
print(f"Docs in table {tbl}: {count}")

# Show a few entries (id, title, url, content length)
df_head = con.execute(f"""
    SELECT 
      doc_id, 
      title, 
      url, 
      length(content) AS content_chars,
      substr(content, 1, 200) AS content_preview
    FROM {tbl}
    ORDER BY fetched_at DESC
    LIMIT 5
""").df()
con.close()
df_head


Docs in table docs: 370


Unnamed: 0,doc_id,title,url,content_chars,content_preview
0,08beaa37eb28a798,"<!doctype html><html lang=""en""><head><meta cha...",https://shell.duckdb.org,3947,"<!doctype html><html lang=""en""><head><meta cha..."
1,061e718e7cc0c9e8,Redirecting…,https://duckdb.org/docs/sitemap,117,Redirecting…\n\n\n\n\n\nRedirecting…\n========...
2,56c82ff7271ca5e5,---,https://duckdb.org/docs/stable/internals/pivot,4135,---\nlayout: docu\nredirect_from:\n- /docs/int...
3,031bae6da6fae1bc,---,https://duckdb.org/docs/stable/internals/vector,5936,---\nlayout: docu\nredirect_from:\n- /internal...
4,58ea0424cb576b26,---,https://duckdb.org/docs/stable/internals/storage,10059,---\nlayout: docu\nredirect_from:\n- /internal...


In [6]:
# RAG
rag = RAGApp(db_path)              # make sure it reads chat.* and corpus.*
rag.build_retriever_from_db()
print(rag.ask("How do I create a table with a sequence in DuckDB?"))

I don't know.



Okay, cool but why not?
Lets add some observebility before we evaluate the solution.

In [7]:
import mlflow
import mlflow_notebook_viewer as mv

mlflow.set_tracking_uri("file:./mlruns")          # local UI/db
mlflow.set_experiment("duckdb-rag")
mv.set_experiment("duckdb-rag")    # our viewer needs to know too
mlflow.pydantic_ai.autolog()   

In [8]:

from mlflow.entities import SpanType  # optional enum

@mlflow.trace(name="rag.ask", span_type=SpanType.CHAIN, attributes={"component": "RAGApp"})
def ask_traced(rag, prompt: str, k: int = 5):
    # child span: retrieval details
    with mlflow.start_span(name="retrieve", span_type="retriever") as s:
        hits = rag.retriever.topk(prompt, k=k)
        # Add inputs/outputs for the span (what we retrieved)
        s.set_inputs({"prompt": prompt, "k": k})
        s.set_outputs({
            "doc_ids": [h[0] for h in hits],
            "scores":  [float(h[1]) for h in hits],
        })

    # call your normal path (which autologs the LLM call)
    answer = rag.ask(prompt, k=k)
    return {"answer": answer, "hits": hits}


In [9]:
out = ask_traced(rag, "How do I create a table with a sequence in DuckDB?", k=5)
print(out["answer"])


I don't know.



Run `mlflow ui --backend-store-uri ./mlruns` if you are running this locally.

In [12]:
# Now list traces (not runs)
traces = mv.search_traces_df(max_results=20)
traces[["request", "response", "request_time", "execution_duration"]].head()

Unnamed: 0,request,response,request_time,execution_duration
0,{'rag': '<__main__.RAGApp object at 0x72b76b85...,"{'answer': 'I don't know. ', 'hits': []}",1757407945613,1275
1,{'rag': '<__main__.RAGApp object at 0x6ffd8764...,"{'answer': 'I don't know.', 'hits': []}",1757407772319,1085
2,{'rag': '<__main__.RAGApp object at 0x788a3c75...,"{'answer': 'I don't know. ', 'hits': []}",1757406461559,668
3,{'rag': '<__main__.RAGApp object at 0x788a3c75...,"{'answer': 'I don't know. ', 'hits': []}",1757406289365,1565


In [19]:
n = rag.build_retriever_from_db()
print("Docs in retriever:", n)
print(rag.ask("How do I create a sequence in DuckDB?"))


Docs in retriever: 370
The `CREATE SEQUENCE` statement creates a new sequence number generator.

Here's how to generate an ascending sequence starting from 1:

```sql
CREATE SEQUENCE serial;
```

To generate a sequence from a given start number:

```sql
CREATE SEQUENCE serial START 101;
```

To generate odd numbers using `INCREMENT BY`:

```sql
CREATE SEQUENCE serial START WITH 1 INCREMENT BY 2;
```

To generate a descending sequence starting from 99:

```sql
CREATE SEQUENCE serial START WITH 99 INCREMENT BY -1 MAXVALUE 99;
```

`CYCLE` allows cycling through the same sequence repeatedly:

```sql
CREATE SEQUENCE serial START WITH 1 MAXVALUE 10 CYCLE;
```

You can also use `CREATE OR REPLACE SEQUENCE serial;` to overwrite an existing sequence, or `CREATE SEQUENCE IF NOT EXISTS serial;` to only create a sequence if one does not exist.

Sources:
- --- (https://duckdb.org/docs/stable/sql/statements/create_sequence) [score=0.705]
- --- (https://duckdb.org/docs/stable/sql/statements/create_t

In [16]:
import duckdb

DB_PATH = "duckdb_rag.duckdb"
con = duckdb.connect(DB_PATH)

def _count(tbl):
    try:
        return con.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0]
    except Exception:
        return None

# What exists?
has_public = con.execute("""
  SELECT EXISTS (
    SELECT 1 FROM information_schema.tables 
    WHERE table_name='docs' AND table_schema NOT IN ('corpus')
  )
""").fetchone()[0]

has_corpus = con.execute("""
  SELECT EXISTS (
    SELECT 1 FROM information_schema.tables 
    WHERE table_schema='corpus' AND table_name='docs'
  )
""").fetchone()[0]

print("public.docs exists:", bool(has_public), "count:", _count("docs"))
print("corpus.docs exists:", bool(has_corpus), "count:", _count("corpus.docs"))

# If public.docs has data and corpus.docs is empty, migrate rows
public_cnt = _count("docs") or 0
corpus_cnt = _count("corpus.docs") or 0

if public_cnt > 0 and (corpus_cnt is None or corpus_cnt == 0):
    con.execute("CREATE SCHEMA IF NOT EXISTS corpus;")
    # Ensure target table
    con.execute("""
      CREATE TABLE IF NOT EXISTS corpus.docs (
          doc_id TEXT PRIMARY KEY,
          url TEXT,
          title TEXT,
          content TEXT,
          source_type TEXT,
          fetched_at TIMESTAMP
      );
    """)
    # Move (or copy) rows
    con.execute("""
      INSERT OR REPLACE INTO corpus.docs (doc_id, url, title, content, source_type, fetched_at)
      SELECT doc_id, url, title, content, source_type, fetched_at
      FROM docs;
    """)
    print(f"Migrated {public_cnt} docs from public.docs -> corpus.docs")
else:
    print("No migration needed.")

print("Post-migration corpus.docs count:", _count("corpus.docs"))


public.docs exists: True count: 370
corpus.docs exists: True count: 0
Migrated 370 docs from public.docs -> corpus.docs
Post-migration corpus.docs count: 370
