# RAG For Mantine Documentation

In [1]:
%load_ext autoreload
%autoreload 2 

In [2]:
from llama_index.embeddings.google_genai import GoogleGenAIEmbedding
from pathlib import Path
from dotenv import load_dotenv
import os

load_dotenv()

True

In [3]:
# Instantiate pg_vector database session
from rag_service.db import DatabaseManager
from rag_service.models import Document, Chunk

local_session = DatabaseManager.get_session_factory()

## Ingestion

In [4]:
from rag_service.pipeline.document_loader import load_corpus

# Create document node object
ROOT = Path("../documents").resolve()
SOURCE = "mantine_docs"

documents = load_corpus(source=SOURCE, root=ROOT)

In [5]:
import time
import asyncio
from llama_index.core.bridge.pydantic import ConfigDict


class RateLimitedGeminiEmbedding(GoogleGenAIEmbedding):
    model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")
    def __init__(self, *args, sleep_s=1.0, **kwargs):
        super().__init__(*args, **kwargs)
        self.sleep_s = sleep_s

    async def aget_text_embedding_batch(self, texts, show_progress=True, **kwargs):
        embeddings = []
        for i in range(0, len(texts), self.embed_batch_size):
            batch = texts[i:i + self.embed_batch_size]
            await asyncio.sleep(self.sleep_s)  # throttle per batch
            embeddings.extend(await self._aget_text_embeddings(batch))
        return embeddings



In [6]:
# Initialize Google Gemini Embedding model
from google.genai.types import EmbedContentConfig

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
EMBEDDING_DIM = os.getenv("EMBEDDING_DIM")
embedding_model = RateLimitedGeminiEmbedding(
    model_name="gemini-embedding-001", 
    api_key=GEMINI_API_KEY,
    embedding_config=EmbedContentConfig(output_dimensionality=int(EMBEDDING_DIM)),
    embed_batch_size=99,
    timeout=60,
    sleep_s=0.1
)

### Instantiate Chunkers

In [7]:
CHUNK_SIZE = 2000
CHUNK_OVERLAP = 300

In [8]:
from rag_service.pipeline.mantine_markdown_parser import MantineMarkdownChunker

mantine_parser = MantineMarkdownChunker(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
)



In [8]:
from llama_index.core.node_parser import TokenTextSplitter

splitter = TokenTextSplitter(
    chunk_size=500,
    chunk_overlap=80,
    separator="\n\n",
    backup_separators=["\n", " "]
)


In [9]:
mantine_documentation = documents[0]
mantine_documentation.doc_id

'mantine_docs::mantine-llms-full.txt'

In [10]:
# Initialize Ingestion Pipeline
from rag_service.pipeline.ingestion import IngestPipeline

In [12]:
ingest_pipeline_custom = IngestPipeline(
    chunker_transform=mantine_parser,
    embedding_model=embedding_model,
    session_factory=local_session
)


In [11]:
ingest_pipeline_fixed = IngestPipeline(
    chunker_transform=splitter,
    embedding_model=embedding_model,
    session_factory=local_session
)

In [14]:
res_custom = await ingest_pipeline_custom.ingest_documents(
    documents=[mantine_documentation],
    source=SOURCE,
    title="Mantine Documentation"
)

Ingested 2902 chunks for source mantine_docs


In [12]:
res_fixed = await ingest_pipeline_fixed.ingest_documents(
    documents=[mantine_documentation],
    source=SOURCE,
    title="Mantine Documentation"
)

  from .autonotebook import tqdm as notebook_tqdm
Parsing nodes: 100%|██████████| 1/1 [00:02<00:00,  2.06s/it]


Ingested 1505 chunks for source mantine_docs


In [15]:
res_custom

{'document_id': UUID('673d595d-6638-442a-b898-cd42c051c547'),
 'doc_row': Document(source='mantine_docs', title='Mantine Documentation', doc_metadata={'n_nodes': 2902}, embedding_model='gemini-embedding-001', id=UUID('673d595d-6638-442a-b898-cd42c051c547'), created_at=datetime.datetime(2026, 2, 1, 5, 35, 3, 799329, tzinfo=datetime.timezone.utc)),
 'n_chunks': 2902}

In [13]:
res_fixed

{'document_id': UUID('6286e313-c8bb-454a-99cb-3ab5aa2a1f46'),
 'doc_row': Document(source='mantine_docs', title='Mantine Documentation', doc_metadata={'n_nodes': 1505}, embedding_model='gemini-embedding-001', id=UUID('6286e313-c8bb-454a-99cb-3ab5aa2a1f46'), created_at=datetime.datetime(2026, 2, 1, 7, 37, 16, 830517, tzinfo=datetime.timezone.utc)),
 'n_chunks': 1505}

## Retrieval & Evaluation

In [14]:
from pydantic import BaseModel, Field

class QueryItem(BaseModel):
    id: str
    category: str
    difficulty: int
    text: str
    tags: list[str] = Field(default_factory=list)
    
class RetrievalHit(BaseModel):
    query_id: str
    query_text: str
    run_name: str
    param_value: int
    rank: int
    dist: float
    chunk_id: str
    chunk_text: str
    
class JudgementLabel(BaseModel):
    query_id: str
    chunk_id: str
    relevance: int         

    

In [15]:
import json

query_fpath = Path("../evaluation/queries_b.jsonl")
queries = [json.loads(line) for line in query_fpath.read_text().splitlines() if line.strip()]
query_items = [QueryItem.model_validate(q) for q in queries]


In [16]:
from sqlalchemy import select, text


async def run_retrieval(
    *,
    queries: list[QueryItem],
    embedding_model,
    ef_search_values: list[int],
    k: int = 15,
) -> list[RetrievalHit]:
    query_embeds = {
        q.id: embedding_model.get_query_embedding(q.text)
        for q in queries
    }

    hits: list[RetrievalHit] = []

    async with local_session() as session:
        for ef in ef_search_values:
            run_name = f"hnsw_ef{ef}_k{k}"

            for q in queries:
                q_emb = query_embeds[q.id]
                dist = Chunk.embedding.cosine_distance(q_emb).label("dist")

                stmt = (
                    select(Chunk.id.label("chunk_id"), Chunk.content.label("chunk_text"), dist)
                    .join(Document, Document.id == Chunk.document_id)
                    .where(Document.source == SOURCE)          # or .where(Chunk.document_id == doc_id)
                    .order_by(dist)
                    .limit(k)
                )

                async with session.begin():  # needed for SET LOCAL
                    await session.execute(
                        text(f"SET LOCAL hnsw.ef_search = {ef}"),
                    )
                    rows = (await session.execute(stmt)).mappings().all()

                for rank, r in enumerate(rows, start=1):
                    hits.append(
                        RetrievalHit(
                            query_id=q.id,  
                            query_text=q.text,
                            run_name=run_name,
                            param_value=ef,
                            rank=rank,
                            dist=float(r["dist"]),
                            chunk_id=str(r["chunk_id"]),
                            chunk_text=r["chunk_text"],
                        )
                    )

    return hits


In [17]:
ef_search_values = [50]

retrieved_hits = await run_retrieval(
    queries=query_items,
    embedding_model=embedding_model,
    ef_search_values=ef_search_values,
    k=15
)




In [18]:
retrieved_hits[0]

RetrievalHit(query_id='eval2_q009', query_text='How do I set up MantineProvider in Next.js App Router (app/layout.tsx)?', run_name='hnsw_ef50_k15', param_value=50, rank=1, dist=0.23000172151354092, chunk_id='3726676e-e6fe-46bb-9d84-b14cbe5bdba1', chunk_text='const theme = createTheme({\n  /** Put your mantine theme override here */\n});\n\nexport default function App({ Component, pageProps }: AppProps) {\n  return (\n    <MantineProvider theme={theme}>\n      <Component {...pageProps} />\n    </MantineProvider>\n  );\n}\n```\n\nCreate `pages/_document.tsx` file with [ColorSchemeScript](https://mantine.dev/theming/color-schemes) component.\nNote that it is required even if you use only one color scheme in your application.\n\n```tsx\nimport { Head, Html, Main, NextScript } from \'next/document\';\nimport { ColorSchemeScript, mantineHtmlProps } from \'@mantine/core\';\n\nexport default function Document() {\n  return (\n    <Html lang="en" {...mantineHtmlProps}>\n      <Head>\n        <C

In [19]:
def dedupe_hits_best(hits: list[RetrievalHit]) -> list[RetrievalHit]:
    best: dict[tuple[str, str], RetrievalHit] = {}
    for h in hits:
        key = (h.query_id, str(h.chunk_id))
        if key not in best or h.dist < best[key].dist:
            best[key] = h
    return list(best.values())

In [21]:
import csv

outpath = "../evaluation/retrieval_mantine_fixed_chunk_label2.csv"

def export_hits_to_csv(hits: list[RetrievalHit], out_path: str) -> str:
    path = Path(out_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    
    deduped_hits = dedupe_hits_best(hits)

    rows = []
    for h in deduped_hits:
        d = h.model_dump()          
        d["relevance"] = ""         
        rows.append(d)

    if not rows:
        path.write_text("", encoding="utf-8")
        return str(path)

    with path.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=rows[0].keys())
        writer.writeheader()
        writer.writerows(rows)

    return str(path)



In [22]:
path = export_hits_to_csv(retrieved_hits, outpath)
path

'../evaluation/retrieval_mantine_fixed_chunk_label2.csv'