# Document Q&A with pgvector
PDF → chunk → embed → store in Postgres/pgvector → retrieve top-k similar chunks.

## Prereqs
From repo root:
```bash
docker compose up -d
```


In [1]:
import os
from dotenv import load_dotenv

load_dotenv()

PG_USER = os.getenv("POSTGRES_USER", "ai")
PG_PASS = os.getenv("POSTGRES_PASSWORD", "ai")
PG_DB   = os.getenv("POSTGRES_DB", "ai")
PG_HOST = os.getenv("POSTGRES_HOST", "localhost")
PG_PORT = os.getenv("POSTGRES_PORT", "5532")

DB_URL = f"postgresql+psycopg://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}"
DB_URL


'postgresql+psycopg://ai:ai@localhost:5532/ai'

## Verify DB connectivity

In [2]:
import time
from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError

def test_db_connection(db_url: str, max_retries: int = 12, wait_time: int = 2) -> None:
    engine = create_engine(db_url, pool_pre_ping=True)
    for attempt in range(1, max_retries + 1):
        try:
            print(f"Connection attempt {attempt}/{max_retries}")
            with engine.connect() as conn:
                version = conn.execute(text("SELECT version();")).scalar_one()
                print("✅ Connected:", version.split(",")[0])
                conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
                print("✅ pgvector extension ready")
            return
        except OperationalError as e:
            print("⏳ Not ready yet:", e.__class__.__name__)
            time.sleep(wait_time)
    raise RuntimeError("❌ Database connection failed. Is docker compose up?")

test_db_connection(DB_URL)


Connection attempt 1/12
✅ Connected: PostgreSQL 15.4 (Debian 15.4-2.pgdg120+1) on x86_64-pc-linux-gnu
✅ pgvector extension ready


## DocumentQA implementation

In [3]:
import io
from dataclasses import dataclass
from typing import Optional, List

import numpy as np
import requests
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer
from sqlalchemy import create_engine, text


@dataclass
class RetrievalHit:
    chunk_index: int
    content: str


class DocumentQA:
    def __init__(
        self,
        db_url: str,
        model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
        table_name: str = "doc_chunks",
    ):
        self.db_url = db_url
        self.table = table_name

        self.model = SentenceTransformer(model_name)
        self.dim = self.model.get_sentence_embedding_dimension()

        self.engine = create_engine(self.db_url, pool_pre_ping=True)
        self._init_db()

        self.current_source_url: Optional[str] = None

    def _init_db(self) -> None:
        with self.engine.begin() as conn:
            conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
            conn.execute(text(f"""
                CREATE TABLE IF NOT EXISTS {self.table} (
                    id BIGSERIAL PRIMARY KEY,
                    source_url TEXT NOT NULL,
                    chunk_index INT NOT NULL,
                    content TEXT NOT NULL,
                    embedding vector({self.dim}) NOT NULL
                );
            """))

            conn.execute(text(
                f"CREATE INDEX IF NOT EXISTS {self.table}_embedding_ivfflat "
                f"ON {self.table} USING ivfflat (embedding vector_l2_ops) "
                f"WITH (lists = 100);"
            ))

    def _download_pdf(self, url: str) -> bytes:
        r = requests.get(url, timeout=60)
        r.raise_for_status()
        return r.content

    def _extract_text(self, pdf_bytes: bytes) -> str:
        reader = PdfReader(io.BytesIO(pdf_bytes))
        pages = [(p.extract_text() or "") for p in reader.pages]
        text_all = "\n".join(pages).strip()
        if not text_all:
            raise ValueError("No text extracted. PDF may be scanned/image-only.")
        return text_all

    def _chunk_text(self, text_str: str, chunk_chars: int = 1200, overlap: int = 150) -> List[str]:
        text_str = " ".join(text_str.split())
        chunks: List[str] = []
        start = 0
        while start < len(text_str):
            end = min(len(text_str), start + chunk_chars)
            chunks.append(text_str[start:end])
            if end == len(text_str):
                break
            start = max(0, end - overlap)
        return chunks

    def load_pdf_url(
        self,
        url: str,
        chunk_chars: int = 1200,
        overlap: int = 150,
        delete_existing_for_url: bool = True,
    ) -> int:
        pdf_bytes = self._download_pdf(url)
        text_all = self._extract_text(pdf_bytes)
        chunks = self._chunk_text(text_all, chunk_chars=chunk_chars, overlap=overlap)

        embeddings = self.model.encode(chunks, convert_to_numpy=True, show_progress_bar=True).astype(np.float32)

        with self.engine.begin() as conn:
            if delete_existing_for_url:
                conn.execute(text(f"DELETE FROM {self.table} WHERE source_url = :u"), {"u": url})

            for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
                conn.execute(
                    text(f"""
                        INSERT INTO {self.table} (source_url, chunk_index, content, embedding)
                        VALUES (:url, :idx, :content, :embedding)
                    """),
                    {"url": url, "idx": i, "content": chunk, "embedding": emb.tolist()},
                )

            conn.execute(text(f"ANALYZE {self.table};"))

        self.current_source_url = url
        print(f"✅ Loaded {len(chunks)} chunks into pgvector.")
        return len(chunks)

    def retrieve(self, query: str, top_k: int = 5, source_url: Optional[str] = None) -> List[RetrievalHit]:
        source_url = source_url or self.current_source_url
        if not source_url:
            raise ValueError("No source_url set. Call load_pdf_url(...) first or pass source_url=...")

        q_emb = self.model.encode(query, convert_to_numpy=True).astype(np.float32)

        with self.engine.begin() as conn:
            rows = conn.execute(
                text(f"""
                    SELECT chunk_index, content
                    FROM {self.table}
                    WHERE source_url = :url
                    ORDER BY embedding <-> (:qvec)::vector
                    LIMIT :k
                """),
                {"url": source_url, "qvec": q_emb.tolist(), "k": top_k},
            ).fetchall()

        return [RetrievalHit(chunk_index=r[0], content=r[1]) for r in rows]


## Load a PDF and retrieve chunks

In [4]:
qa = DocumentQA(DB_URL)

pdf_url = "https://www.nestle.com/sites/default/files/2025-02/2024-financial-statements-en.pdf"
qa.load_pdf_url(pdf_url)

hits = qa.retrieve("What are the main insights from Nestle's 2024 financial statements?", top_k=5)
for h in hits:
    print("\n--- chunk", h.chunk_index)
    print(h.content[:600], "...")


Batches:   0%|          | 0/8 [00:00<?, ?it/s]

✅ Loaded 238 chunks into pgvector.

--- chunk 217
es” document published under https://www.nestle.com/investors/publications provides the definition of these non-IFRS Accounting Standards financial performance measures. (b) Calculated on the basis of the dividend for the year concerned, which is paid in the following year, and on high/low stock prices. (c) As proposed by the Board of Directors of Nestlé S.A. (d) 2024 and 2023 based on headcount. Other years based on full-time equivalent. Financial information – 5 year review Consolidated Financial Statements of the Nestlé Group 2024190 158th Financial Statements of Nestlé S.A. 158th Financial ...

--- chunk 191
tlé Group 2024176 City of operations Consolidated Financial Statements of the Nestlé Group 2024 177 To the General Meeting of Lausanne, 12 February 2025 Nestlé S.A., Cham & Vevey Report of the statutory auditor Report on the audit of the consolidated financial statements Opinion We have audited the consolidated financial stateme