# Production RAG — Ingestion Pipeline

This notebook embeds chunked documents and stores them in Qdrant.  
Designed to run on **Google Colab** (free GPU) so ingestion finishes in minutes, not hours.

### What this notebook does
1. Installs dependencies (torch GPU, sentence-transformers, qdrant-client)
2. Uploads the JSON chunk exports from the Lucene service
3. Loads `BAAI/bge-small-en` embedding model on GPU
4. Creates a Qdrant collection (HNSW disabled, brute-force only)
5. Streams each JSON file → embeds in batches → upserts to Qdrant
6. Verifies the final point count

### Requirements
- The 11 JSON files from `lucene-service/chunk-exports/`
- A running Qdrant instance (local, Docker, or Qdrant Cloud)

---
## 1. Install Dependencies

In [None]:
!pip install -q sentence-transformers==3.3.1 qdrant-client==1.12.1 torch tqdm

---
## 2. Configuration

All settings in one place. Update `QDRANT_HOST` / `QDRANT_API_KEY` to point to your Qdrant instance.

In [None]:
import uuid
import gc
import json
import os
import time
from pathlib import Path
from typing import List, Dict, Any, Generator

import torch
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.http.models import (
    Distance,
    HnswConfigDiff,
    OptimizersConfigDiff,
    PointStruct,
    VectorParams,
)
from tqdm.auto import tqdm


# ── Qdrant Cloud ────────────────────────────────────────────
QDRANT_URL = "https://b210317b-feb7-4514-89c0-44668fffeba0.eu-central-1-0.aws.cloud.qdrant.io:6333"
QDRANT_API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.MC3P9BZdG63yqfXKnG3udz5XAyS-wbqctc52fEcmYGk"
COLLECTION_NAME = "rag_chunks"

# ── Embedding ───────────────────────────────────────────────
EMBEDDING_MODEL = "BAAI/bge-small-en"
EMBEDDING_DIM = 384
EMBED_BATCH_SIZE = 256             # GPU can handle larger batches

# ── Ingestion ───────────────────────────────────────────────
UPSERT_BATCH_SIZE = 1000           # Points per Qdrant upsert call
JSON_DIR = "./chunk-exports"       # Upload your JSON files here

# ── Device ──────────────────────────────────────────────────
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device : {DEVICE}")
print(f"PyTorch: {torch.__version__}")
if DEVICE == "cuda":
    print(f"GPU    : {torch.cuda.get_device_name(0)}")

---
## 3. Upload JSON Chunk Exports

Upload the 11 JSON files exported by the Lucene service.  
Two options:
- **Option A**: Google Colab file upload (small files)
- **Option B**: Mount Google Drive (recommended for ~99 MB)

In [None]:
# ── Option A: Upload directly (works for small files) ──────
# Uncomment the lines below to upload via Colab UI:

# from google.colab import files
# os.makedirs(JSON_DIR, exist_ok=True)
# uploaded = files.upload()
# for name, data in uploaded.items():
#     with open(os.path.join(JSON_DIR, name), 'wb') as f:
#         f.write(data)
# print(f"Uploaded {len(uploaded)} file(s)")


# ── Option B: Mount Google Drive (recommended) ─────────────
# Upload the JSON files to your Drive first, then:

# from google.colab import drive
# drive.mount('/content/drive')
# JSON_DIR = "/content/drive/MyDrive/chunk-exports"  # adjust path


# ── Verify files ───────────────────────────────────────────
json_files = sorted(Path(JSON_DIR).glob("*.json"))
total_size = sum(f.stat().st_size for f in json_files) / (1024 * 1024)
print(f"Found {len(json_files)} JSON file(s) — {total_size:.1f} MB total")
for f in json_files:
    print(f"  {f.name} ({f.stat().st_size / (1024*1024):.1f} MB)")

---
## 4. Load Embedding Model

Loads `BAAI/bge-small-en` (33M params, 384 dimensions).  
On Colab GPU this takes ~5 seconds. On CPU it takes ~30 seconds.

In [None]:
def load_embedding_model(model_name: str, device: str) -> SentenceTransformer:
    """Load and return the sentence-transformer model.

    The model is put in eval mode and moved to the specified device.
    """
    print(f"Loading {model_name} on {device}...")
    model = SentenceTransformer(model_name, device=device)
    model.eval()
    print(f"Model loaded. Max sequence length: {model.max_seq_length}")
    return model


model = load_embedding_model(EMBEDDING_MODEL, DEVICE)

---
## 5. Connect to Qdrant & Create Collection

Creates the collection with **HNSW fully disabled** (`m=0`).  
This is intentional — the deployed server has only 1 GB RAM.  
Search uses `exact=True` (brute-force) over ~1000 Lucene-filtered candidates.

In [None]:
def connect_qdrant(url: str, api_key: str) -> QdrantClient:
    """Connect to Qdrant Cloud.

    Uses the full URL + API key for authentication.
    """
    client = QdrantClient(url=url, api_key=api_key, timeout=120)
    collections = [c.name for c in client.get_collections().collections]
    print(f"Connected to Qdrant Cloud. Existing collections: {collections}")
    return client


def create_collection(
    client: QdrantClient,
    name: str,
    dim: int,
) -> None:
    """Create the vector collection with HNSW disabled.

    Design choices:
      - HnswConfigDiff(m=0)       -> no HNSW graph, saves RAM
      - indexing_threshold=0       -> no automatic index building
      - on_disk_payload=True       -> payloads stored on disk, not RAM
      - Distance.COSINE            -> cosine similarity scoring
    """
    collections = [c.name for c in client.get_collections().collections]
    if name in collections:
        info = client.get_collection(name)
        print(f"Collection '{name}' already exists with {info.points_count} points.")
        return

    client.create_collection(
        collection_name=name,
        vectors_config=VectorParams(
            size=dim,
            distance=Distance.COSINE,
        ),
        hnsw_config=HnswConfigDiff(m=0),
        optimizers_config=OptimizersConfigDiff(
            indexing_threshold=0,
        ),
        on_disk_payload=True,
    )
    print(f"Created collection '{name}' (HNSW disabled, on-disk payload).")


qdrant = connect_qdrant(QDRANT_URL, QDRANT_API_KEY)
create_collection(qdrant, COLLECTION_NAME, EMBEDDING_DIM)

---
## 6. Helper Functions

Core utilities used by the ingestion loop:  
- **`to_qdrant_id`** — deterministic UUID conversion (Qdrant needs valid UUIDs)  
- **`load_json_records`** — loads and validates records from a JSON file  
- **`build_payload`** — extracts minimal payload fields for storage  
- **`embed_batch`** — embeds a list of texts using the model

In [None]:
def to_qdrant_id(chunk_id: str) -> str:
    """Convert an arbitrary chunk ID string to a valid Qdrant UUID.

    Qdrant accepts only unsigned integers or valid UUIDs as point IDs.
    Lucene exports IDs like '5c4a9c97-..._p1_c0_088c5634' which are not
    valid UUIDs. uuid5 produces a deterministic UUID from any string,
    so the same chunk_id always maps to the same Qdrant point ID.

    IMPORTANT: This same function is used in the deployed rag-service
    (qdrant_store.py) so the IDs match at query time.
    """
    return str(uuid.uuid5(uuid.NAMESPACE_URL, chunk_id))


def load_json_records(path: Path) -> List[Dict[str, Any]]:
    """Load a JSON file and return validated records.

    Filters out records with missing ID or non-string content.
    On Colab we have plenty of RAM so json.load() is fine.
    """
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)

    records = []
    skipped = 0
    for rec in data:
        rid = rec.get("id")
        content = rec.get("content")
        if rid is None or not isinstance(content, str) or not content.strip():
            skipped += 1
            continue
        records.append(rec)

    print(f"  Loaded {len(records)} valid records, skipped {skipped}")
    return records


def sanitize_text(text: str) -> str:
    """Clean text for the tokenizer.

    Removes NUL bytes and other control characters that can
    cause the Rust tokenizer to reject the input.
    """
    # Remove NUL bytes
    text = text.replace("\x00", "")
    # Replace other control chars (keep newline, tab, carriage return)
    cleaned = []
    for ch in text:
        if ord(ch) < 32 and ch not in "\n\r\t":
            cleaned.append(" ")
        else:
            cleaned.append(ch)
    return "".join(cleaned).strip()


def build_payload(record: Dict[str, Any]) -> Dict[str, Any]:
    """Extract minimal payload to store alongside the vector.

    Only fields needed for response construction are kept.
    The original chunk_id is stored so the API can return it.
    """
    metadata = record.get("metadata", {})
    return {
        "chunk_id": record.get("id", ""),
        "content": record.get("content", ""),
        "source": metadata.get("source", ""),
        "title": metadata.get("title", ""),
        "page_number": metadata.get("page_number"),
        "chunk_index": metadata.get("chunk_index"),
        "document_id": record.get("document_id", ""),
    }


def embed_batch(
    model: SentenceTransformer,
    texts: List[str],
    batch_size: int = 256,
) -> List[List[float]]:
    """Embed a list of texts and return as list of float vectors.

    Uses torch.no_grad() to save memory. Normalizes embeddings
    for cosine similarity. Cleans up GPU memory after encoding.
    """
    with torch.no_grad():
        embeddings = model.encode(
            texts,
            batch_size=batch_size,
            show_progress_bar=False,
            normalize_embeddings=True,
            convert_to_numpy=True,
        )
    result = embeddings.tolist()
    del embeddings
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    return result


print("Helper functions loaded.")

---
## 7. Run Ingestion Pipeline

This is the main loop:  
1. Loads each JSON file with `json.load()`  
2. Sanitizes text content for the tokenizer  
3. Embeds in batches of 256 on GPU  
4. Upserts to Qdrant Cloud in batches of 1000  
5. Reports progress with a live progress bar  

On a **Colab T4 GPU**, expect ~99 MB of chunks to finish in **2-5 minutes**.

In [None]:
def _process_and_upsert(
    records: List[Dict[str, Any]],
    model: SentenceTransformer,
    client: QdrantClient,
    collection: str,
    embed_batch_size: int,
) -> int:
    """Embed a batch of records and upsert to Qdrant."""
    # Sanitize texts for the tokenizer
    texts = [sanitize_text(str(r["content"])) for r in records]
    raw_ids = [r["id"] for r in records]
    ids = [to_qdrant_id(rid) for rid in raw_ids]
    payloads = [build_payload(r) for r in records]

    # Embed in sub-batches
    all_vectors: List[List[float]] = []
    for i in range(0, len(texts), embed_batch_size):
        sub = texts[i : i + embed_batch_size]
        vecs = embed_batch(model, sub, batch_size=embed_batch_size)
        all_vectors.extend(vecs)
        del vecs

    # Build points and upsert
    points = [
        PointStruct(id=ids[j], vector=all_vectors[j], payload=payloads[j])
        for j in range(len(ids))
    ]
    client.upsert(collection_name=collection, points=points, wait=True)

    del all_vectors, points
    gc.collect()
    return len(ids)


# ── Main ingestion loop ────────────────────────────────────
total_ingested = 0
total_skipped = 0
start_time = time.time()
pbar = tqdm(desc="Ingesting", unit=" chunks")

for file_idx, json_file in enumerate(json_files, 1):
    file_size = json_file.stat().st_size / (1024 * 1024)
    print(f"\nFile {file_idx}/{len(json_files)}: {json_file.name} ({file_size:.1f} MB)")

    # Load entire file (no streaming — Colab has plenty of RAM)
    with open(json_file, "r", encoding="utf-8") as f:
        raw_data = json.load(f)

    # Filter valid records
    records = []
    skipped = 0
    for rec in raw_data:
        rid = rec.get("id")
        content = rec.get("content")
        if rid is None or not isinstance(content, str) or not content.strip():
            skipped += 1
            continue
        records.append(rec)

    total_skipped += skipped
    print(f"  Loaded {len(records)} valid records, skipped {skipped}")
    del raw_data

    # Process in upsert-sized batches
    for i in range(0, len(records), UPSERT_BATCH_SIZE):
        batch = records[i : i + UPSERT_BATCH_SIZE]
        print(f"  Batch {i//UPSERT_BATCH_SIZE + 1}: embedding + upserting {len(batch)} chunks...")
        count = _process_and_upsert(batch, model, qdrant, COLLECTION_NAME, EMBED_BATCH_SIZE)
        total_ingested += count
        pbar.update(count)

    del records
    gc.collect()

pbar.close()
elapsed = time.time() - start_time

print(f"\n{'=' * 50}")
print(f"Ingestion complete!")
print(f"  Ingested : {total_ingested} chunks")
print(f"  Skipped  : {total_skipped} chunks")
print(f"  Files    : {len(json_files)}")
print(f"  Time     : {elapsed:.1f}s ({elapsed/60:.1f} min)")
print(f"  Speed    : {total_ingested/elapsed:.1f} chunks/sec")
print(f"{'=' * 50}")

---
## 8. Verify Ingestion

Check that all points were stored correctly in Qdrant.

In [None]:
def verify_ingestion(client: QdrantClient, collection: str) -> None:
    """Print collection stats and a sample point to verify ingestion."""
    info = client.get_collection(collection)
    print(f"Collection       : {collection}")
    print(f"Total points     : {info.points_count}")
    print(f"Vector dimension : {info.config.params.vectors.size}")
    print(f"Distance metric  : {info.config.params.vectors.distance}")
    print(f"HNSW m           : {info.config.hnsw_config.m}")
    print(f"On-disk payload  : {info.config.params.on_disk_payload}")

    # Fetch one sample point to verify structure
    sample = client.scroll(
        collection_name=collection,
        limit=1,
        with_payload=True,
        with_vectors=False,
    )
    if sample[0]:
        point = sample[0][0]
        print(f"\nSample point ID  : {point.id}")
        print(f"Sample payload   :")
        for key, val in point.payload.items():
            display = str(val)[:80] + "..." if len(str(val)) > 80 else str(val)
            print(f"  {key}: {display}")


verify_ingestion(qdrant, COLLECTION_NAME)

---
## 9. Quick Search Test

Run a quick similarity search to make sure vectors are working.  
This mimics what the deployed RAG service does at query time.

In [None]:
def test_search(
    model: SentenceTransformer,
    client: QdrantClient,
    collection: str,
    query: str,
    top_k: int = 5,
) -> None:
    """Run a test similarity search and print results.

    Uses the BGE query prefix for proper query embedding.
    Searches with exact=True (brute-force) like the deployed service.
    """
    from qdrant_client.http.models import SearchParams

    query_prefix = "Represent this sentence for searching relevant passages: "
    prefixed = query_prefix + query

    with torch.no_grad():
        vector = model.encode(
            [prefixed],
            normalize_embeddings=True,
            convert_to_numpy=True,
        )[0].tolist()

    results = client.search(
        collection_name=collection,
        query_vector=vector,
        search_params=SearchParams(exact=True),
        limit=top_k,
        with_payload=True,
        with_vectors=False,
    )

    print(f"Query: '{query}'")
    print(f"Top {top_k} results:\n")
    for i, hit in enumerate(results, 1):
        p = hit.payload
        content_preview = p.get('content', '')[:150].replace('\n', ' ')
        print(f"  [{i}] Score: {hit.score:.4f}")
        print(f"      Source: {p.get('source', 'N/A')} | Page: {p.get('page_number', 'N/A')}")
        print(f"      {content_preview}...")
        print()


test_search(model, qdrant, COLLECTION_NAME, "What is retrieval augmented generation?")

---
## Done!

Your vectors are now stored in Qdrant. The deployed `rag-service` will:
1. Receive a query + candidate IDs from the Lucene service
2. Embed the query using the same `BAAI/bge-small-en` model
3. Search Qdrant with `HasIdCondition` (filtered brute-force)
4. Return the top 10 chunks to Claude for answer generation

Make sure your deployed Qdrant instance has this data before starting the server.