# Workshop 4: RAG with Hybrid Search

In this workshop, we'll build a **Retrieval-Augmented Generation (RAG)** system that:

1. **Embeds** documents using local MiniLM-L6-v2 (no API calls)
2. **Stores** embeddings in SQLite with FTS5 for keyword search
3. **Retrieves** context using hybrid BM25 + semantic search
4. **Generates** posts grounded in your knowledge base


---
## Part 1: Setup and Database Initialization

We'll use SQLite with FTS5 (Full-Text Search 5) for efficient keyword search.

In [1]:
%%capture
# Install required packages (run once)
!pip install -q sqlite-vec fastembed numpy openai mastodon.py

In [2]:
import json
import os
import sqlite3
from datetime import datetime
from pathlib import Path

# We rely on Colab Secrets or manual input, so we don't need dotenv here.
# from dotenv import load_dotenv
# load_dotenv(Path("../.env"))

print("Standard libraries loaded!")

Standard libraries loaded!


In [3]:
import sqlite_vec

# Initialize SQLite database with FTS5 and sqlite-vec support
DATABASE_PATH = Path("tutorial_rag.db")

def init_database(db_path: Path) -> sqlite3.Connection:
    """Create database with embeddings table, FTS5 for BM25, and vec0 for vectors."""
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row

    # Load sqlite-vec extension
    conn.enable_load_extension(True)
    sqlite_vec.load(conn)
    conn.enable_load_extension(False)

    cursor = conn.cursor()

    # Metadata table (stores content and metadata, linked to vectors by rowid)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS embeddings_meta (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source_type TEXT NOT NULL,
            source_id TEXT,
            content TEXT NOT NULL,
            metadata TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    # Vector table using sqlite-vec (384 dimensions for MiniLM-L6-v2)
    cursor.execute("""
        CREATE VIRTUAL TABLE IF NOT EXISTS vec_embeddings USING vec0(
            embedding float[384] distance_metric=cosine
        )
    """)

    # FTS5 virtual table for BM25 keyword search
    cursor.execute("""
        CREATE VIRTUAL TABLE IF NOT EXISTS embeddings_fts USING fts5(
            content,
            source_type,
            source_id,
            content='embeddings_meta',
            content_rowid='id'
        )
    """)

    # Triggers to keep FTS5 in sync with embeddings_meta table
    cursor.execute("""
        CREATE TRIGGER IF NOT EXISTS embeddings_ai AFTER INSERT ON embeddings_meta BEGIN
            INSERT INTO embeddings_fts(rowid, content, source_type, source_id)
            VALUES (new.id, new.content, new.source_type, new.source_id);
        END
    """)

    cursor.execute("""
        CREATE TRIGGER IF NOT EXISTS embeddings_ad AFTER DELETE ON embeddings_meta BEGIN
            INSERT INTO embeddings_fts(embeddings_fts, rowid, content, source_type, source_id)
            VALUES ('delete', old.id, old.content, old.source_type, old.source_id);
        END
    """)

    conn.commit()
    return conn

# Initialize the database
db = init_database(DATABASE_PATH)
print(f"Database initialized at: {DATABASE_PATH}")
print("sqlite-vec extension loaded successfully!")

Database initialized at: tutorial_rag.db
sqlite-vec extension loaded successfully!


<cell_type>markdown</cell_type>### Understanding the Database Schema

We use **two storage mechanisms** for different search types:

1. **sqlite-vec (`vec0`)** - Native vector storage for cosine similarity search
   - Stores 384-dimensional embeddings compactly
   - MATCH queries use optimized ANN (Approximate Nearest Neighbor)
   - Returns cosine **distance** (0 = identical, 2 = opposite)

2. **FTS5** - Full-text search for BM25 keyword matching
   - Indexes words for fast exact-term lookup
   - BM25 scoring ranks by term frequency/document frequency
   - Returns negative scores (more negative = better match)

### Part 2: Document Chunking

Large documents should be split into smaller chunks for better retrieval. We chunk by `##` headers to maintain semantic coherence. But it can be different for every knowledge base

In [54]:
from typing import Dict, Any, List, Optional, Set
import requests
import unicodedata

# --- Config ---
NOTION_TOKEN = userdata.get("NOTION_API")
ROOT_PAGE_ID = "2ee0a735274f80b6a961ee81eed44b06"
NOTION_VERSION = "2025-09-03"

if not NOTION_TOKEN:
    raise RuntimeError("NOTION_API not found in userdata secrets")

HEADERS = {
    "Authorization": f"Bearer {NOTION_TOKEN}",
    "Notion-Version": NOTION_VERSION,
    "Content-Type": "application/json",
}

# --- HTTP helper ---
def notion_get(url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    r = requests.get(url, headers=HEADERS, params=params)
    r.raise_for_status()
    return r.json()

# --- Notion API helper ---
def list_block_children(block_id: str) -> List[Dict[str, Any]]:
    """Fetch all child blocks of a block/page (paginated)."""
    results: List[Dict[str, Any]] = []
    url = f"https://api.notion.com/v1/blocks/{block_id}/children"
    cursor = None
    while True:
        params = {"start_cursor": cursor} if cursor else None
        data = notion_get(url, params=params)
        results.extend(data.get("results", []))
        if not data.get("has_more"):
            break
        cursor = data.get("next_cursor")
    return results

# --- Markdown-ish text extraction ---
def rich_text_to_plain(rt: List[Dict[str, Any]]) -> str:
    return "".join(x.get("plain_text", "") for x in (rt or []))

def normalize(text: str) -> str:
    return unicodedata.normalize("NFKD", text)

def block_to_markdown_line(block: Dict[str, Any], indent: int) -> str:
    """
    Convert a Notion block to a single Markdown line (no trailing newline).
    Uses markdown tokens (#, ##, -, 1., >, ```).
    """
    t = block.get("type")
    prefix_indent = "  " * indent  # indentation for nested blocks

    # Headings -> markdown headings (keeps raw # tokens in output)
    if t == "heading_1":
        txt = rich_text_to_plain(block["heading_1"].get("rich_text", []))
        return f"{prefix_indent}# {txt}".rstrip()
    if t == "heading_2":
        txt = rich_text_to_plain(block["heading_2"].get("rich_text", []))
        return f"{prefix_indent}## {txt}".rstrip()
    if t == "heading_3":
        txt = rich_text_to_plain(block["heading_3"].get("rich_text", []))
        return f"{prefix_indent}### {txt}".rstrip()

    # Paragraph-like
    if t == "paragraph":
        txt = rich_text_to_plain(block["paragraph"].get("rich_text", []))
        return f"{prefix_indent}{txt}".rstrip()

    # Lists
    if t == "bulleted_list_item":
        txt = rich_text_to_plain(block["bulleted_list_item"].get("rich_text", []))
        return f"{prefix_indent}- {txt}".rstrip()

    if t == "numbered_list_item":
        txt = rich_text_to_plain(block["numbered_list_item"].get("rich_text", []))
        # We don’t know the true numbering from the API alone; use "1." consistently.
        return f"{prefix_indent}1. {txt}".rstrip()

    # To-do
    if t == "to_do":
        obj = block["to_do"]
        checked = obj.get("checked", False)
        txt = rich_text_to_plain(obj.get("rich_text", []))
        box = "x" if checked else " "
        return f"{prefix_indent}- [{box}] {txt}".rstrip()

    # Quote / Callout
    if t == "quote":
        txt = rich_text_to_plain(block["quote"].get("rich_text", []))
        return f"{prefix_indent}> {txt}".rstrip()

    if t == "callout":
        obj = block["callout"]
        txt = rich_text_to_plain(obj.get("rich_text", []))
        icon = obj.get("icon")
        icon_txt = ""
        if isinstance(icon, dict) and icon.get("type") == "emoji":
            icon_txt = icon.get("emoji", "") + " "
        # Render as blockquote style so it stays “documentation-like”
        return f"{prefix_indent}> {icon_txt}{txt}".rstrip()

    # Code
    if t == "code":
        obj = block["code"]
        lang = obj.get("language", "")
        code = rich_text_to_plain(obj.get("rich_text", []))
        return f"{prefix_indent}```{lang}\n{code}\n{prefix_indent}```".rstrip()

    # Divider
    if t == "divider":
        return f"{prefix_indent}---"

    # Images
    if t == "image":
        caption = rich_text_to_plain(block["image"].get("caption", []))
        return f"{prefix_indent}{caption}".rstrip() if caption.strip() else f"{prefix_indent}[image]"

    # Subpage marker (we will recurse into it as blocks)
    if t == "child_page":
        title = block.get("child_page", {}).get("title", "Untitled page")
        return f"{prefix_indent}# {title}".rstrip()

    # Fallback (keeps your pipeline robust)
    return f"{prefix_indent}[{t}]"

def walk_page_as_markdown(root_block_id: str, indent: int = 0, visited: Optional[Set[str]] = None) -> str:
    """
    Recursively read an entire Notion page (and any child pages beneath it),
    returning a Markdown-like document that preserves headings/lists/code blocks.
    No database logic included.
    """
    if visited is None:
        visited = set()

    if root_block_id in visited:
        return ""
    visited.add(root_block_id)

    lines: List[str] = []
    children = list_block_children(root_block_id)

    for b in children:
        line = block_to_markdown_line(b, indent).rstrip()
        if line.strip():
            lines.append(line)

        if b.get("has_children"):
            # recurse into nested blocks / toggles / child pages
            nested = walk_page_as_markdown(b["id"], indent=indent + 1, visited=visited)
            if nested.strip():
                lines.append(nested)

    return normalize("\n".join(lines)).strip()


# ---- Run ----
md_text = walk_page_as_markdown(ROOT_PAGE_ID)
# print(md_text[:2000])  # preview
# Full markdown doc is in `md_text`


In [59]:
count = sum(
    1 for line in md_text.splitlines()
    if line.lstrip().startswith("## ")
)
count


24

In [60]:
import re
from typing import List, Dict, Any

def chunk_document(content: str, filename: str) -> List[Dict[str, Any]]:
    """
    Chunk a markdown document by H2 (##) headers, allowing indentation, and
    ignoring headers inside fenced code blocks.

    Each chunk includes:
    - The document title (# header) for context
    - The section content (starting at its ## header)
    - Metadata about the source
    """

    # --- 1) Extract document title (# ...) (allow indentation too) ---
    title_match = re.search(r'^\s*#\s+(.+?)\s*$', content, re.MULTILINE)
    doc_title = title_match.group(1) if title_match else filename

    # --- 2) Remove (or neutralize) fenced code blocks to avoid false headers inside them ---
    # We'll replace code block bodies with whitespace of the same length so indices stay aligned.
    fence_pattern = re.compile(r"(^|\n)(```.*?\n.*?\n```)", re.DOTALL)
    masked = content
    for m in fence_pattern.finditer(content):
        block = m.group(2)
        masked = masked.replace(block, "\n" + (" " * (len(block) - 1)), 1)

    # --- 3) Find all H2 headers (##), allowing indentation ---
    h2_pattern = re.compile(r'^\s*##\s+.+$', re.MULTILINE)
    matches = list(h2_pattern.finditer(masked))

    chunks: List[Dict[str, Any]] = []

    # If there are no H2 headers, return whole doc as single chunk
    if not matches:
        return [{"content": content, "metadata": {"source_file": filename, "section_title": doc_title}}]

    # Optional: include intro chunk for content before first ## (if meaningful)
    first_start = matches[0].start()
    intro = content[:first_start].strip()
    if intro:
        chunk_content = f"[From: {filename}]\n# {doc_title}\n\n{intro}"
        chunks.append({
            "content": chunk_content,
            "metadata": {
                "source_file": filename,
                "section_title": "Introduction",
            }
        })

    # --- 4) Slice sections by header positions ---
    for i, m in enumerate(matches):
        start = m.start()
        end = matches[i + 1].start() if i + 1 < len(matches) else len(content)
        section = content[start:end].strip()
        if not section:
            continue

        # Extract the section title from the first H2 line in the section (allow indentation)
        section_title_match = re.search(r'^\s*##\s+(.+?)\s*$', section, re.MULTILINE)
        section_title = section_title_match.group(1) if section_title_match else "Untitled Section"

        chunk_content = f"[From: {filename}]\n# {doc_title}\n\n{section}"

        chunks.append({
            "content": chunk_content,
            "metadata": {
                "source_file": filename,
                "section_title": section_title,
            }
        })

    return chunks


# Usage
chunks = chunk_document(md_text, "articulation.md")
print(f"Found {len(chunks)} chunks")
for i, chunk in enumerate(chunks[:5]):
    print(f"--- Chunk {i+1}: {chunk['metadata']['section_title']} ---")
    print(chunk["content"][:200])
    print()


Found 25 chunks
--- Chunk 1: Introduction ---
[From: articulation.md]
# Platform Overview

A company that gives oral exams based on material uploaded.

--- Chunk 2: Description: ---
[From: articulation.md]
# Platform Overview

## Description:
A platform that automatically generates and administers oral examinations based on user-uploaded materials. Educators, institutions, or com

--- Chunk 3: What It Is ---
[From: articulation.md]
# Platform Overview

## What It Is
  A platform for conducting structured oral evaluations that assess how clearly individuals articulate, reason through, and communicate their

--- Chunk 4: The Problem ---
[From: articulation.md]
# Platform Overview

## The Problem
  Traditional evaluation methods often fail to measure how well someone truly understands and can explain an idea.
  - Written exams priorit

--- Chunk 5: The Solution ---
[From: articulation.md]
# Platform Overview

## The Solution
  This platform makes oral evaluation scalable, structured, and 

---
## Part 3: Local Embeddings with MiniLM-L6-v2

We use `fastembed` to run the MiniLM-L6-v2 model locally via ONNX. This means:
- **No API calls** - Everything runs on your machine
- **Fast inference** - ONNX runtime optimizations
- **384 dimensions** - Compact but effective embeddings

In [61]:
import os
from fastembed import TextEmbedding

# Suppress Hugging Face token warning
os.environ["HF_HUB_DISABLE_IMPLICIT_TOKEN"] = "1"

# Initialize the embedding model (downloads on first use)
print("Loading MiniLM-L6-v2 embedding model (ONNX)...")
embedding_model = TextEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")
print("Model loaded successfully!")

Loading MiniLM-L6-v2 embedding model (ONNX)...
Model loaded successfully!


In [62]:
def generate_embedding(text: str) -> list[float]:
    """Generate a 384-dimensional embedding for the given text."""
    embeddings = list(embedding_model.embed([text]))
    return embeddings[0].tolist()

def generate_embeddings_batch(texts: list[str]) -> list[list[float]]:
    """Generate embeddings for multiple texts in a batch (more efficient)."""
    if not texts:
        return []
    embeddings = list(embedding_model.embed(texts))
    return [emb.tolist() for emb in embeddings]

# Test it out
test_text = "Artificial intelligence is transforming how businesses operate."
test_embedding = generate_embedding(test_text)

print(f"Input text: '{test_text}'")
print(f"Embedding dimensions: {len(test_embedding)}")
print(f"First 10 values: {test_embedding[:10]}")

Input text: 'Artificial intelligence is transforming how businesses operate.'
Embedding dimensions: 384
First 10 values: [0.035499076629084546, 0.00839253664254505, 0.05062492341224551, -0.007122231672038325, -0.01755316620916984, 0.006484544607462985, -0.001429692512319824, 0.0006524717970323388, 0.014390853328519674, -0.02782975465926638]


In [63]:
import struct

def serialize_embedding(embedding: list[float]) -> bytes:
    """Serialize embedding to binary format for sqlite-vec."""
    return struct.pack(f'{len(embedding)}f', *embedding)

def save_embedding(conn, source_type: str, content: str, embedding: list[float],
                   source_id: str = None, metadata: dict = None) -> int:
    """
    Save an embedding to the database.

    Inserts into:
    1. embeddings_meta - content and metadata (FTS5 updated via trigger)
    2. vec_embeddings - vector for similarity search (matched by rowid)
    """
    cursor = conn.cursor()

    # Insert metadata (FTS5 index updated automatically via trigger)
    cursor.execute(
        """
        INSERT INTO embeddings_meta (source_type, source_id, content, metadata, created_at)
        VALUES (?, ?, ?, ?, ?)
        """,
        (
            source_type,
            source_id,
            content,
            json.dumps(metadata) if metadata else None,
            datetime.now().isoformat(),
        ),
    )
    rowid = cursor.lastrowid

    # Insert vector with matching rowid
    cursor.execute(
        """
        INSERT INTO vec_embeddings (rowid, embedding)
        VALUES (?, ?)
        """,
        (rowid, serialize_embedding(embedding)),
    )

    conn.commit()
    return rowid

# Embed and save the sample chunks
for chunk in chunks:
    embedding = generate_embedding(chunk["content"])
    save_embedding(
        db,
        source_type="business_doc",
        content=chunk["content"],
        embedding=embedding,
        source_id="company.md",
        metadata=chunk["metadata"],
    )

print(f"Saved {len(chunks)} embeddings to database (using sqlite-vec)")

Saved 25 embeddings to database (using sqlite-vec)


In [None]:
# (Optional) Environment inspection - no longer needed as we use manual input
pass

### Embed Business Documents

Let's embed the actual business documents from the project. Add your files to the business-docs folder.

In [9]:
# BUSINESS_DOCS_DIR = Path("business-docs")

# # Create the directory so it's visible in the file browser
# BUSINESS_DOCS_DIR.mkdir(exist_ok=True)

# def embed_business_docs(conn, docs_dir: Path):
#     """Embed all markdown files in the business docs directory."""
#     doc_files = list(docs_dir.glob("*.md"))
#     print(f"Found {len(doc_files)} document(s) to embed")

#     total_chunks = 0
#     for doc_path in doc_files:
#         print(f"\nProcessing: {doc_path.name}")
#         content = doc_path.read_text()
#         chunks = chunk_document(content, doc_path.name)

#         # Batch generate embeddings
#         texts = [c["content"] for c in chunks]
#         embeddings = generate_embeddings_batch(texts)

#         # Save each chunk (to both embeddings_meta and vec_embeddings)
#         for chunk, embedding in zip(chunks, embeddings):
#             save_embedding(
#                 conn,
#                 source_type="business_doc",
#                 content=chunk["content"],
#                 embedding=embedding,
#                 source_id=doc_path.name,
#                 metadata=chunk["metadata"],
#             )

#         print(f"  Saved {len(chunks)} chunk(s)")
#         total_chunks += len(chunks)

#     return total_chunks

# if list(BUSINESS_DOCS_DIR.glob("*.md")):
#     total = embed_business_docs(db, BUSINESS_DOCS_DIR)
#     print(f"\nTotal embeddings created: {total}")
# else:
#     print(f"No markdown files found in {BUSINESS_DOCS_DIR.absolute()}")
#     print("Please upload some .md files to the 'business-docs' folder!")

Found 1 document(s) to embed

Processing: Articulate 2ee0a735274f80b6a961ee81eed44b06.md
  Saved 2 chunk(s)

Total embeddings created: 2


## Part 4: Hybrid Search (BM25 + Semantic)

Hybrid search combines:
1. **BM25 keyword search** via SQLite FTS5
2. **Semantic search** via sqlite-vec native cosine distance

### Why Hybrid?

| Query Type | BM25 | Semantic | Best Choice |
|------------|------|----------|-------------|
| "Emanon AI" | ✅ Exact match | ❌ May miss | BM25 |
| "help with machine learning" | ❌ No exact terms | ✅ Semantic match | Semantic |
| "Emanon consulting services" | ✅ "Emanon" match | ✅ "consulting" related | **Hybrid!** |

### sqlite-vec Benefits

| Aspect | JSON Blobs (old) | sqlite-vec (new) |
|--------|------------------|------------------|
| Search | Load all → NumPy | Native SQL MATCH |
| Speed | O(n) full scan | Optimized ANN |
| Storage | ~4x larger (JSON) | Compact binary |
| Scalability | <10k vectors | Millions |

In [64]:
def bm25_search(conn, query: str, limit: int = 100) -> dict[int, float]:
    """
    Search using BM25 ranking via FTS5.

    Returns dict mapping embedding_id to raw BM25 score.
    Note: FTS5 BM25 scores are NEGATIVE (more negative = better match).
    """
    cursor = conn.cursor()

    # Escape special FTS5 characters
    safe_query = query.replace('"', '""')

    try:
        cursor.execute("""
            SELECT rowid, bm25(embeddings_fts) as score
            FROM embeddings_fts
            WHERE embeddings_fts MATCH ?
            LIMIT ?
        """, (safe_query, limit))

        return {row[0]: row[1] for row in cursor.fetchall()}
    except sqlite3.OperationalError:
        # No matches or invalid query
        return {}

# Test BM25 search
test_query = "articulate"
bm25_results = bm25_search(db, test_query)
for emb_id, score in list(bm25_results.items())[:3]:
    print(f"  ID {emb_id}: score = {score:.4f}")

  ID 4: score = -3.4766
  ID 5: score = -1.9010
  ID 8: score = -2.1337


In [65]:
bm25_results

{4: -3.4766046747912225,
 5: -1.9010206491387,
 8: -2.133674850241169,
 16: -1.6006770781871287}

In [67]:
def semantic_search(conn, query_embedding: list[float], limit: int = 100) -> dict[int, float]:
    """
    Search using sqlite-vec's native cosine distance.

    Returns dict mapping rowid to cosine distance.
    Note: cosine distance is in [0, 2] where 0 = identical, 2 = opposite.
    """
    cursor = conn.cursor()

    # sqlite-vec requires 'k = ?' in the WHERE clause when using a parameterized limit
    cursor.execute("""
        SELECT rowid, distance
        FROM vec_embeddings
        WHERE embedding MATCH ?
          AND k = ?
        ORDER BY distance
    """, (serialize_embedding(query_embedding), limit))

    return {row[0]: row[1] for row in cursor.fetchall()}

# Test semantic search
test_query = "How can this produce help me?"

test_emb = generate_embedding(test_query)
semantic_results = semantic_search(db, test_emb, limit=5)

print(f"Semantic search for '{test_query}' found {len(semantic_results)} results:")
for rowid, distance in list(semantic_results.items()):
    print(f"  ID {rowid}: distance = {distance:.4f} (similarity = {1 - distance/2:.4f})")

Semantic search for 'How can this produce help me?' found 5 results:
  ID 22: distance = 0.7262 (similarity = 0.6369)
  ID 19: distance = 0.7315 (similarity = 0.6342)
  ID 16: distance = 0.7346 (similarity = 0.6327)
  ID 25: distance = 0.7579 (similarity = 0.6210)
  ID 20: distance = 0.7583 (similarity = 0.6208)


In [68]:
def normalize_bm25_scores(bm25_scores: dict[int, float]) -> dict[int, float]:
    """
    Normalize BM25 scores to [0, 1] range.

    FTS5 BM25 scores are negative (more negative = better).
    We invert so that best match gets 1.0, worst gets 0.0.
    """
    if not bm25_scores:
        return {}

    scores = list(bm25_scores.values())
    min_score = min(scores)  # Most negative = best
    max_score = max(scores)  # Least negative = worst

    if min_score == max_score:
        return {id: 1.0 for id in bm25_scores}

    score_range = max_score - min_score
    return {
        id: (max_score - score) / score_range
        for id, score in bm25_scores.items()
    }

def normalize_distances(distances: dict[int, float]) -> dict[int, float]:
    """
    Normalize cosine distances to similarity scores in [0, 1].

    Cosine distance is in [0, 2] where 0 = identical.
    We convert to similarity: 1 - (distance / 2)
    Then normalize so best match gets 1.0.
    """
    if not distances:
        return {}

    # Convert distances to similarities
    similarities = {id: 1 - (dist / 2) for id, dist in distances.items()}

    # Normalize to [0, 1] range
    min_sim = min(similarities.values())
    max_sim = max(similarities.values())

    if min_sim == max_sim:
        return {id: 1.0 for id in similarities}

    sim_range = max_sim - min_sim
    return {
        id: (sim - min_sim) / sim_range
        for id, sim in similarities.items()
    }

In [69]:
def get_metadata_by_ids(conn, ids: list[int]) -> dict[int, dict]:
    """Retrieve metadata for given IDs from embeddings_meta table."""
    if not ids:
        return {}

    cursor = conn.cursor()
    placeholders = ",".join("?" * len(ids))
    cursor.execute(f"""
        SELECT id, source_type, source_id, content, metadata
        FROM embeddings_meta
        WHERE id IN ({placeholders})
    """, ids)

    results = {}
    for row in cursor.fetchall():
        results[row[0]] = {
            "source_type": row[1],
            "source_id": row[2],
            "content": row[3],
            "metadata": json.loads(row[4]) if row[4] else {},
        }
    return results

def hybrid_search(
    conn,
    query: str,
    query_embedding: list[float],
    keyword_weight: float = 0.5,
    semantic_weight: float = 0.5,
    top_k: int = 10,
) -> list[dict]:
    """
    Perform hybrid search combining BM25 and sqlite-vec cosine similarity.

    Formula: final_score = keyword_weight * bm25 + semantic_weight * cosine_sim

    Args:
        conn: Database connection
        query: Search query text
        query_embedding: Pre-computed embedding of the query
        keyword_weight: Weight for BM25 (0-1)
        semantic_weight: Weight for cosine similarity (0-1)
        top_k: Number of results to return

    Returns:
        List of results sorted by combined score (highest first)
    """
    # Step 1: Get BM25 scores from FTS5
    bm25_raw = bm25_search(conn, query)
    bm25_normalized = normalize_bm25_scores(bm25_raw)

    # Step 2: Get semantic distances from sqlite-vec
    semantic_raw = semantic_search(conn, query_embedding, limit=100)
    semantic_normalized = normalize_distances(semantic_raw)

    # Step 3: Get all unique IDs from both searches
    all_ids = set(bm25_normalized.keys()) | set(semantic_normalized.keys())

    if not all_ids:
        return []

    # Step 4: Get metadata for all candidates
    metadata = get_metadata_by_ids(conn, list(all_ids))

    # Step 5: Compute combined scores
    scored_results = []

    for id in all_ids:
        # BM25 score (0 if no keyword match)
        bm25_score = bm25_normalized.get(id, 0.0)

        # Semantic score (0 if not in top semantic results)
        semantic_score = semantic_normalized.get(id, 0.0)

        # Combined score
        final_score = (keyword_weight * bm25_score) + (semantic_weight * semantic_score)

        meta = metadata.get(id, {})
        scored_results.append({
            "id": id,
            "content": meta.get("content", ""),
            "source_type": meta.get("source_type", ""),
            "source_id": meta.get("source_id", ""),
            "metadata": meta.get("metadata", {}),
            "bm25_score": bm25_score,
            "semantic_score": semantic_score,
            "final_score": final_score,
        })

    # Sort by final score (descending)
    scored_results.sort(key=lambda x: x["final_score"], reverse=True)

    return scored_results[:top_k]

# Test hybrid search
query_emb = generate_embedding(test_query)

results = hybrid_search(db, test_query, query_emb, top_k=5)

print(f"Hybrid search for '{test_query}':\n")
for i, r in enumerate(results, 1):
    print(f"{i}. Score: {r['final_score']:.3f} (BM25: {r['bm25_score']:.3f}, Semantic: {r['semantic_score']:.3f})")
    print(f"   Source: {r['source_id']}")
    print(f"   Preview: {r['content'][:100]}...\n")

Hybrid search for 'How can this produce help me?':

1. Score: 0.500 (BM25: 0.000, Semantic: 1.000)
   Source: company.md
   Preview: [From: articulation.md]
# Platform Overview

## General Users
  “It made me a clearer thinker.”
  Ex...

2. Score: 0.490 (BM25: 0.000, Semantic: 0.980)
   Source: company.md
   Preview: [From: articulation.md]
# Platform Overview

## Doctors and Medical Professionals
  “It helped me pr...

3. Score: 0.485 (BM25: 0.000, Semantic: 0.969)
   Source: company.md
   Preview: [From: articulation.md]
# Platform Overview

## Students
  “I finally feel evaluated on what I actua...

4. Score: 0.441 (BM25: 0.000, Semantic: 0.883)
   Source: company.md
   Preview: [From: articulation.md]
# Platform Overview

## Post-Exam Feedback (Generated by the Platform)
  ###...

5. Score: 0.441 (BM25: 0.000, Semantic: 0.881)
   Source: company.md
   Preview: [From: articulation.md]
# Platform Overview

## Professionals and Interview Candidates
  “It feels l...



---
## Part 5: Post Generation with RAG Context

Now we combine everything: retrieve relevant context and generate a post.

In [70]:
def format_context_for_prompt(results: list[dict], max_chars: int = 4000) -> str:
    """Format search results into context for the LLM prompt."""
    if not results:
        return "No relevant context found."

    context_parts = []
    chars_used = 0

    for i, result in enumerate(results, 1):
        header = f"[{i}. {result['source_type']}] (score: {result['final_score']:.2f})"
        content = result["content"]

        available = max_chars - chars_used - len(header) - 10
        if available <= 100:
            break

        if len(content) > available:
            content = content[:available - 3] + "..."

        entry = f"{header}\n{content}\n"
        context_parts.append(entry)
        chars_used += len(entry)

    return "\n".join(context_parts)

def retrieve_context(conn, query: str, top_k: int = 10) -> tuple[str, list[dict]]:
    """High-level function to retrieve and format context for RAG."""
    query_embedding = generate_embedding(query)
    results = hybrid_search(conn, query, query_embedding, top_k=top_k)
    formatted = format_context_for_prompt(results)
    return formatted, results

# Test context retrieval
context, results = retrieve_context(db, "Oral Evaluation")
print("Retrieved context:\n")
print(context[:1500] + "..." if len(context) > 1500 else context)

Retrieved context:

[1. business_doc] (score: 0.81)
[From: articulation.md]
# Platform Overview

## The Solution
  This platform makes oral evaluation scalable, structured, and consistent.
  Participants engage in guided oral sessions where they must:
  - Explain concepts in their own words
  - Respond to follow-up questions
  - Clarify ambiguous answers
  - Defend or refine their reasoning
  The system adapts in real time and evaluates responses using predefined criteria and rubrics.
  ---

[2. business_doc] (score: 0.78)
[From: articulation.md]
# Platform Overview

## The Problem
  Traditional evaluation methods often fail to measure how well someone truly understands and can explain an idea.
  - Written exams prioritize memorization and speed over reasoning
  - Plagiarism and AI-generated text undermine assessment integrity
  - Oral evaluations are high-signal but difficult to scale
  - Communication skills—critical in academic and professional settings—are underassessed
  ---

[3. 

In [71]:
from google.colab import userdata
import os

os.environ["OPENROUTER_API_KEY"]  = userdata.get("OPENROUTER_API_KEY") or "YOUR_API_KEY_HERE"

In [79]:
import os
from datetime import date, timedelta
from typing import List

from pydantic import BaseModel, Field
from openai import OpenAI


# -----------------------------
# Structured output schemas
# -----------------------------
class Status(BaseModel):
    status: str = Field(
        min_length=3,
        description="The post text to publish on Mastodon (<= 100 words)."
    )
    date_posted: date = Field(
        description="Target date for posting (YYYY-MM-DD)."
    )
    content: str = Field(
        description="Short internal rationale or angle for the post (not posted)."
    )

class StatusList(BaseModel):
    posts: List[Status] = Field(
        min_length=1,
        description="List of generated posts."
    )


# -----------------------------
# OpenRouter via OpenAI SDK
# -----------------------------
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY")
if not OPENROUTER_API_KEY:
    raise RuntimeError("Missing OPENROUTER_API_KEY")

client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=OPENROUTER_API_KEY,
)


def generate_posts_with_rag(context: str, topic: str) -> StatusList:
    """
    Generate exactly 10 Mastodon posts using ONLY the provided RAG context.
    Uses `text_format` to enforce structured JSON output.
    """
    today = date.today()
    target_dates = [(today + timedelta(days=i)).isoformat() for i in range(10)]

    system_prompt = (
        "You are a social media manager for a platform that evaluates how clearly people "
        "articulate and communicate their ideas through oral evaluation and practice.\n\n"
        "Rules:\n"
        "- Use ONLY the provided context\n"
        "- Each post must be <= 100 words\n"
        "- Include 1–2 relevant hashtags\n"
        "- Avoid hype or guarantees\n"
        "- Generate exactly 10 distinct posts\n"
        "- Output MUST conform to the provided schema\n"
    )

    user_prompt = (
        f"Context (use only this):\n{context}\n\n"
        f"Topic to emphasize: {topic}\n\n"
        f"Use these posting dates IN ORDER:\n{target_dates}\n\n"
        "Return 10 posts mentioning the company's name (Articulate). For each post:\n"
        "- status: Mastodon post text\n"
        "- date_posted: one of the provided dates\n"
        "- content: internal rationale or angle (not posted)\n"

    )

    posts = client.responses.parse(
        model="nvidia/nemotron-3-nano-30b-a3b:free",
        input=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        text_format=StatusList,
    )

    """

    posts = client.responses.parse(
      model="nvidia/nemotron-3-nano-30b-a3b:free",
      input= instructions,
      text_format=StatusList,
  )
    """

    # Parsed + validated against StatusList
    return posts

In [80]:
# -----------------------------
# Example usage
# -----------------------------
topic = "Good scalable oral exams and articulation-focused assessment"
context, _ = retrieve_context(db, topic)  # your existing RAG function
statuses = generate_posts_with_rag(context, topic)
# posts = statuses.posts  # <-- variable holding the 10 posts
# print(posts[0])

In [81]:
# type(statuses)
posts = (
    getattr(statuses, "output_parsed", None)
    or getattr(statuses, "parsed", None)
    or statuses
)


In [82]:
for post in posts.posts:
    print('-------------post---------------')
    print(post.status[:250])

-------------post---------------
Articulate lets educators run structured oral exams at scale, revealing true understanding beyond memorization.
-------------post---------------
With Articulate, teachers gain clearer insight into student comprehension through live oral questioning and adaptive follow‑ups.
-------------post---------------
Students using Articulate report confidence gains, practicing articulation in low‑stakes, structured oral assessments.
-------------post---------------
Articulate adapts to each answer, probing depth and clarifying reasoning, ensuring consistent evaluation across classes.
-------------post---------------
By uploading any material, Articulate automatically designs oral exams that test reasoning, not just rote recall.
-------------post---------------
The platform’s rubric‑driven evaluation gives reliable feedback, helping learners pinpoint where their understanding breaks down.
-------------post---------------
Articulate’s oral exams simulate real‑world 

---
## Part 6: Full Workflow Demo

Putting it all together: detect changes → retrieve context → generate post

In [None]:
# import hashlib

# POLL_INTERVAL_SECONDS = 120  # every 2 minutes
# STATE_FILE = "notion_watch_state.json"
# OUTPUT_FILE = "generated_posts.json"
# MASTODON_TOKEN = userdata.get("MASTODON_TOKEN") or "YOUR_MASTODON_TOKEN_HERE"
# MASTION_INSTANCE = "https://mastodon.social"

# def notion_get(url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
#     r = requests.get(url, headers=NOTION_HEADERS, params=params, timeout=30)
#     r.raise_for_status()
#     return r.json()

# def get_page_meta(page_id: str) -> Dict[str, Any]:
#     return notion_get(f"https://api.notion.com/v1/pages/{page_id}")

# def list_block_children(block_id: str) -> List[Dict[str, Any]]:
#     results: List[Dict[str, Any]] = []
#     url = f"https://api.notion.com/v1/blocks/{block_id}/children"
#     cursor = None
#     while True:
#         params = {"start_cursor": cursor} if cursor else None
#         data = notion_get(url, params=params)
#         results.extend(data.get("results", []))
#         if not data.get("has_more"):
#             break
#         cursor = data.get("next_cursor")
#     return results

# # =========================
# # STATE + OUTPUT
# # =========================
# # def load_state() -> Dict[str, Any]:
# #     try:
# #         with open(STATE_FILE, "r", encoding="utf-8") as f:
# #             return json.load(f)
# #     except FileNotFoundError:
# #         return {"pages": {}}

# # def save_state(state: Dict[str, Any]) -> None:
# #     with open(STATE_FILE, "w", encoding="utf-8") as f:
# #         json.dump(state, f, ensure_ascii=False, indent=2)

# # def append_output(page_id: str, title: str, posts: StatusList) -> None:
# #     record = {
# #         "page_id": page_id,
# #         "title": title,
# #         "generated_at": datetime.utcnow().isoformat() + "Z",
# #         "posts": [
# #             {
# #                 "status": p.status,
# #                 "date_posted": p.date_posted.isoformat(),
# #                 "content": p.content,
# #             }
# #             for p in posts.posts
# #         ],
# #     }
# #     try:
# #         existing = []
# #         if os.path.exists(OUTPUT_FILE):
# #             with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
# #                 existing = json.load(f)
# #         existing.append(record)
# #         with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
# #             json.dump(existing, f, ensure_ascii=False, indent=2)
# #     except Exception as e:
# #         print(f"[Output] Failed writing output: {e}")

# # def discover_child_pages(root_page_id: str) -> Set[str]:
# #     """
# #     Traverse the root page's block tree and collect IDs of all child pages.
# #     Includes nested child pages.
# #     """
# #     found: Set[str] = set()
# #     stack = [root_page_id]
# #     visited: Set[str] = set()

# #     while stack:
# #         block_id = stack.pop()
# #         if block_id in visited:
# #             continue
# #         visited.add(block_id)

# #         for b in list_block_children(block_id):
# #             t = b.get("type")

# #             # If this block is a child page, record it and also traverse into it
# #             if t == "child_page":
# #                 child_id = b["id"]
# #                 found.add(child_id)
# #                 stack.append(child_id)

# #             # Traverse any other block that has children (toggles, callouts, etc.)
# #             if b.get("has_children"):
# #                 stack.append(b["id"])

# #     return found

# # def run_notion_poll_once_watch_root_tree(root_id: str):
# #     """
# #     Poll changes for:
# #       - the root page
# #       - every child page discovered under the root (recursively)
# #     """
# #     state = load_state()
# #     pages_state = state["pages"]

# #     # 1) discover all pages under root
# #     page_ids = {root_id} | discover_child_pages(root_id)

# #     for page_id in sorted(page_ids):
# #         meta = get_page_meta(page_id)
# #         last_edited = meta.get("last_edited_time")
# #         title = page_title_from_meta(meta)

# #         prev_last_edited = pages_state.get(page_id, {}).get("last_edited_time")
# #         if prev_last_edited == last_edited:
# #             continue

# #         print(f"[Notion] Change detected on '{title}' ({page_id}). Generating posts...")

# #         md_text = walk_page_as_markdown(page_id)
# #         posts = generate_posts_with_rag(md_text, title)
# #         append_output(page_id, title, posts)

# #         pages_state[page_id] = {"last_edited_time": last_edited}

# #     save_state(state)


# # -----------------------
# # NOTION HELPERS
# # -----------------------

# def retrieve_page(page_id: str) -> Dict[str, Any]:
#     return notion_get(f"https://api.notion.com/v1/pages/{page_id}")

# def list_block_children(block_id: str) -> List[Dict[str, Any]]:
#     results: List[Dict[str, Any]] = []
#     url = f"https://api.notion.com/v1/blocks/{block_id}/children"
#     cursor = None
#     while True:
#         params = {"start_cursor": cursor} if cursor else None
#         data = notion_get(url, params=params)
#         results.extend(data.get("results", []))
#         if not data.get("has_more"):
#             break
#         cursor = data.get("next_cursor")
#     return results

# def rich_text_to_plain(rt: List[Dict[str, Any]]) -> str:
#     return "".join(x.get("plain_text", "") for x in (rt or []))

# def normalize(text: str) -> str:
#     return unicodedata.normalize("NFKD", text)

# def page_title(page: Dict[str, Any]) -> str:
#     props = page.get("properties", {})
#     for prop in props.values():
#         if prop.get("type") == "title":
#             return rich_text_to_plain(prop.get("title", [])) or "Untitled"
#     return "Untitled"

# def is_descendant_of_root(page_id: str, root_id: str, max_hops: int = 50) -> bool:
#     """
#     Walk the parent chain: page -> parent.page_id -> ... until root or workspace.
#     This lets you enforce "only pages under root trigger generation".
#     """
#     current = page_id
#     for _ in range(max_hops):
#         if current == root_id:
#             return True
#         page = retrieve_page(current)
#         parent = page.get("parent", {})
#         ptype = parent.get("type")
#         if ptype == "page_id":
#             current = parent.get("page_id")
#             continue
#         # workspace/database/unknown => stop
#         return False
#     return False


# # -----------------------
# # WEBHOOK ENDPOINT
# # -----------------------
# @app.post("/notion/webhook")
# async def notion_webhook(req: Request):
#     raw = await req.body()
#     sig = req.headers.get("X-Notion-Signature")

#     # Validate signature (recommended) :contentReference[oaicite:7]{index=7}
#     verify_notion_signature(raw, sig)

#     payload = json.loads(raw.decode("utf-8"))

#     # Subscription verification request contains verification_token :contentReference[oaicite:8]{index=8}
#     if "verification_token" in payload:
#         # You do NOT “respond with” the token. You paste it into Notion UI to verify.
#         # Return 200 OK to confirm endpoint is reachable.
#         return {"ok": True}

#     # Event payloads are "signals": you then fetch latest data via API :contentReference[oaicite:9]{index=9}
#     event_type = payload.get("type")
#     entity = payload.get("entity", {})
#     entity_type = entity.get("type")
#     entity_id = entity.get("id")

#     # We care about page content updates :contentReference[oaicite:10]{index=10}
#     if event_type == "page.content_updated" and entity_type == "page" and entity_id:
#         if not is_descendant_of_root(entity_id, ROOT_PAGE_ID):
#             return {"ok": True, "ignored": "not under root"}

#         page = retrieve_page(entity_id)
#         topic = page_title(page)

#         md_text = walk_page_as_markdown(entity_id)
#         posts = generate_10_posts(md_text, topic)

#         # Store results somewhere (example: local file). Replace with Notion write-back or Mastodon queue.
#         out = {
#             "page_id": entity_id,
#             "topic": topic,
#             "generated_at": datetime.now(timezone.utc).isoformat(),
#             "posts": [p.model_dump() for p in posts.posts],
#         }
#         with open("notion_generated_posts.json", "a", encoding="utf-8") as f:
#             f.write(json.dumps(out, ensure_ascii=False) + "\n")

#     return {"ok": True}

---
## Project Applications


### 1: Replace local knowledgebase with Notion api to get docs


### 2: Modify Chunking on your docs
Modify `chunk_document()` to chunk by:
- Fixed character count (e.g., 500 chars)
- Paragraph boundaries
- Sentence count

### 4: Add RAG retrieval to create posts function context


### 5: Auto-create posts using Notion API listener

### 6: Auto-reply to comments using Mastodon comments listener

### 7: OPTIONAL: Add posts to sqllite db storage for retrieval