# Dataset Preparation for RAG (Retrieval-Augmented Generation)

This notebook provides a complete, end-to-end workflow for preparing a dataset of documents for a **RAG (Retrieval-Augmented Generation)** system. We'll cover:

1. **Document Conversion** with [Docling](https://docling-project.github.io/docling/) - converting PDFs to structured formats
2. **RAG-Optimized Chunking** - breaking documents into semantically meaningful chunks
3. **Embedding Generation** - using Snowflake Arctic embedding model
4. **Vector Storage** - storing embeddings in Milvus vectordb for efficient retrieval

## Why RAG-Specific Preparation Matters

RAG systems require carefully prepared data to ensure:
- **Contextual Completeness**: Each chunk contains sufficient context to be understood independently
- **Semantic Coherence**: Chunks align with natural topic boundaries
- **Optimal Size**: Balanced between detail (larger chunks) and precision (smaller chunks)
- **Metadata Preservation**: Structural information (headings, tables, lists) is retained for better retrieval

## 📦 Installation

Install the required packages. This may take a minute.

In [None]:
%pip install -qq docling sentence-transformers numpy jupyterlab "pymilvus[milvus_lite]==2.6.2"

## ⚠️ Important Notes

**Exception Handling**: This notebook demonstrates the core workflow with minimal error handling for clarity. When using your own data or deploying to production:

- Add try-except blocks around file I/O operations
- Handle network errors for URL-based document loading
- Validate document formats and sizes before processing
- Implement timeouts for long-running operations
- Add proper logging for debugging and monitoring
- Handle cases where documents fail to convert or chunk
- Validate embedding generation and database insertion success

Example of adding exception handling:
```python
try:
    result = converter.convert(file_path)
    document = result.document
except Exception as e:
    print(f"❌ Failed to convert {file_path}: {str(e)}")
    continue  # Skip to next document
```

## 🔧 Configuration

### Set Input Documents

Define the documents to process. You can mix URLs and local file paths.


In [None]:
from pathlib import Path
from urllib.parse import urlparse
import re

def get_safe_filename(file_path: str) -> str:
    """
    Extract a safe filename from a path or URL.
    
    Examples:
        >>> get_safe_filename("https://example.com/docs/report.pdf")
        'report'
        >>> get_safe_filename("https://drive.google.com/file/d/123/view")
        'doc_a3f8e91b2c'  # stable hash fallback
        >>> get_safe_filename("/path/to/my document.pdf")
        'my_document'
    """
    if file_path.startswith(('http://', 'https://')):
        parsed = urlparse(file_path)
        filename = Path(parsed.path.rstrip('/')).stem
        # Treat generic basenames as non-descriptive and use stable hash fallback
        UNHELPFUL = {"view", "download", "file", "index", "blob", "raw"}
        if (not filename) or (filename.lower() in UNHELPFUL) or (len(filename) < 3):
            from hashlib import md5
            digest = md5(file_path.encode("utf-8")).hexdigest()[:10]
            filename = f"doc_{digest}"
    else:
        filename = Path(file_path).stem
    
    # Sanitize any remaining special characters
    return re.sub(r'[^A-Za-z0-9._-]', '_', filename)

print("✓ Helper function defined")

In [None]:
# Documents to process - replace with your own
# Using sample PDFs with distinct names to avoid file naming collisions
input_files = [
    "https://github.com/docling-project/docling/raw/v2.43.0/tests/data/pdf/2203.01017v2.pdf",
    "https://github.com/docling-project/docling/raw/v2.43.0/tests/data/pdf/2206.01062.pdf",
    "https://github.com/docling-project/docling/raw/v2.43.0/tests/data/pdf/2305.03393v1.pdf",
]

# Output directory for intermediate files
output_dir = Path("dataset-preparation-for-RAG/output")
output_dir.mkdir(parents=True, exist_ok=True)

# Milvus database configuration
milvus_db_name = "milvus_rag_demo.db"
milvus_db_path = output_dir / milvus_db_name

print(f"✓ Will process {len(input_files)} documents")
print(f"✓ Output directory: {output_dir.resolve()}")
print(f"✓ Milvus database: {milvus_db_path.resolve()}")

## 📄 Step 1: Document Conversion with Docling

### RAG-Optimized Conversion Configuration

For RAG systems, we configure Docling with specific settings:

- **Table Structure Detection**: Essential for preserving tabular data relationships
- **OCR**: Enables text extraction from scanned documents and images
  - **Note**: OCR is enabled by default in this notebook for maximum text extraction
  - Set `pipeline_options.do_ocr = False` below to disable if not needed
  - **Requires Tesseract binary**: Please refer to the Docling [installation](https://docling-project.github.io/docling/installation/) docs if you're not running this notebook from a RHEL-based Workbench image that has it installed already
- **Image Generation**: Useful for multimodal RAG or image-based context
- **Enrichments** (optional): Code, formulas, picture descriptions add semantic richness

These settings ensure we extract maximum structural and semantic information from documents.


In [None]:
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions, TesseractOcrOptions
from docling.backend.docling_parse_v4_backend import DoclingParseV4DocumentBackend

# Configure RAG-optimized pipeline
pipeline_options = PdfPipelineOptions()

# Essential settings for RAG
pipeline_options.do_table_structure = True  # Preserve table structure
pipeline_options.table_structure_options.do_cell_matching = True  # Accurate cell matching
pipeline_options.generate_page_images = False  # Set to True if you need images for multimodal RAG
pipeline_options.generate_picture_images = True  # Extract pictures from documents

# OCR configuration - enabled by default for maximum text extraction
# Note: OCR requires the Tesseract binary (should be pre-installed on RHEL-based Workbench images)
# Set to False if OCR is not needed: pipeline_options.do_ocr = False
pipeline_options.do_ocr = True
# Uncomment below to force OCR on all pages (useful for scanned documents)
# pipeline_options.ocr_options = TesseractOcrOptions(
#     force_full_page_ocr=False  # Set to True to force OCR on all pages (useful for scanned documents)
# )

# Performance settings
pipeline_options.accelerator_options = AcceleratorOptions(
    num_threads=4,
    device=AcceleratorDevice.AUTO
)

# Optional enrichments - enable these for richer semantic information
# Note: These add processing time but improve RAG quality for technical documents
pipeline_options.do_code_enrichment = False  # Enable for code-heavy documents
pipeline_options.do_formula_enrichment = False  # Enable for math/scientific papers
pipeline_options.do_picture_description = False  # Enable for multimodal RAG
pipeline_options.do_picture_classification = False  # Enable to classify image types

print("✓ Pipeline configured for RAG")

### Run Document Conversion

Convert all documents to Docling's structured format.


In [None]:
import json
from docling_core.types.doc import ImageRefMode

# Create converter
converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(
            pipeline_options=pipeline_options,
            backend=DoclingParseV4DocumentBackend,
        )
    }
)

# Store converted documents
converted_documents = []

print("Starting document conversion...\n")

for file_path in input_files:
    print(f"Converting: {file_path}")
    
    result = converter.convert(file_path)
    document = result.document
    
    # Compute safe filename once for reuse
    file_name = get_safe_filename(file_path)
    
    # Store the document object for later processing
    converted_documents.append({
        'source': file_path,
        'file_name': file_name,
        'document': document,
        'result': result
    })
    
    # Save JSON (structured format)
    json_path = output_dir / f"{file_name}.json"
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(document.export_to_dict(), f, indent=2)
    print(f"  ✓ Saved JSON: {json_path}")
    
    # Save Markdown (human-readable)
    md_path = output_dir / f"{file_name}.md"
    with open(md_path, "w", encoding="utf-8") as f:
        f.write(document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED))
    print(f"  ✓ Saved Markdown: {md_path}")
    
    # Print confidence metrics
    print(f"  ✓ Confidence: {result.confidence.mean_grade.name} (score: {result.confidence.mean_score:.3f})\n")

print(f"✓ Conversion complete! Processed {len(converted_documents)} documents")


## 🧩 Step 2: RAG-Optimized Chunking

### Chunking Strategy for RAG

Effective chunking is critical for RAG systems. We'll implement **hybrid chunking** that:

1. **Respects Document Structure**: Uses headings, sections, and natural boundaries
2. **Maintains Context**: Includes metadata like titles, section names
3. **Optimizes Chunk Size**: Tailored to embedding model (1024 dimensions for Snowflake Arctic)
4. **Preserves Relationships**: Keeps tables, lists, and code blocks intact
5. **Smart Merging**: Combines smaller chunks while respecting max token limits

### Docling's Hybrid Chunker

Docling provides a `HybridChunker` that intelligently splits documents while preserving structure and merging smaller chunks for optimal retrieval.

In [None]:
from docling.chunking import HybridChunker
from docling_core.types.doc import DocItemLabel, DoclingDocument
from typing import List, Dict, Any

def chunk_document_for_rag(
    doc: DoclingDocument,
    source: str,
    file_name: str,
    max_tokens: int = 1024,
    merge_peers: bool = True
) -> List[Dict[str, Any]]:
    """
    Chunk a Docling document using hybrid chunking optimized for RAG.
    
    Args:
        doc: DoclingDocument to chunk
        source: Source file path/URL (for metadata/traceability)
        file_name: Safe filename for generating chunk IDs
        max_tokens: Maximum tokens per chunk (optimized for Snowflake Arctic: 1024)
        merge_peers: Whether to merge smaller chunks at the same level
    
    Returns:
        List of chunks with text and metadata
    """
    # Initialize hybrid chunker with RAG-optimized parameters
    # HybridChunker combines hierarchical structure with smart merging
    chunker = HybridChunker(
        max_tokens=max_tokens,      # Matches Snowflake Arctic's optimal context window
        merge_peers=merge_peers      # Merge small adjacent chunks for better context
    )
    
    # Perform chunking
    chunk_iter = chunker.chunk(doc)
    
    # Process chunks and add metadata
    chunks = []
    for idx, chunk in enumerate(chunk_iter):
        chunk_data = {
            'chunk_id': f"{file_name}_chunk_{idx}",
            'text': chunk.text,
            'source': source,
            'chunk_index': idx,
            'metadata': {
                'doc_items': [str(item.label) for item in chunk.meta.doc_items] if hasattr(chunk.meta, 'doc_items') else [],
                'headings': chunk.meta.headings if hasattr(chunk.meta, 'headings') else [],
                'token_count': len(chunk.text.split())  # Rough estimate
            }
        }
        chunks.append(chunk_data)
    
    return chunks

print("✓ Chunking function defined")

### Apply Chunking to All Documents

In [None]:
# Chunk all documents
all_chunks = []

print("Starting chunking process...\n")

for doc_data in converted_documents:
    source = doc_data['source']
    file_name = doc_data['file_name']
    document = doc_data['document']
    
    print(f"Chunking: {source}")
    
    chunks = chunk_document_for_rag(
        doc=document,
        source=source,
        file_name=file_name,
        max_tokens=1024,  # Optimized for Snowflake Arctic embeddings (1024 dims)
        merge_peers=True   # Enable smart merging of smaller chunks
    )
    
    all_chunks.extend(chunks)
    print(f"  ✓ Created {len(chunks)} chunks\n")

print(f"✓ Total chunks created: {len(all_chunks)}")

# Display sample chunk
if all_chunks:
    print("\n=== Sample Chunk ===")
    sample = all_chunks[0]
    print(f"Chunk ID: {sample['chunk_id']}")
    print(f"Source: {sample['source']}")
    print(f"Text preview: {sample['text'][:200]}...")
    print(f"Metadata: {sample['metadata']}")


### Save Chunks to Disk

Save chunks as JSON for inspection and debugging.

In [None]:
chunks_file = output_dir / "rag_chunks.json"
with open(chunks_file, "w", encoding="utf-8") as f:
    json.dump(all_chunks, f, indent=2, ensure_ascii=False)

print(f"✓ Chunks saved to: {chunks_file.resolve()}")


## 🔢 Step 3: Generate Embeddings with Snowflake Arctic

### About Snowflake Arctic Embeddings

[Snowflake Arctic Embed](https://huggingface.co/Snowflake/snowflake-arctic-embed-l) is a family of text embedding models optimized for retrieval tasks. We'll use the **arctic-embed-l** variant which offers:

- High-quality dense embeddings (1024 dimensions)
- Excellent performance on retrieval benchmarks


In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np

# Load Snowflake Arctic embedding model
print("Loading Snowflake Arctic embedding model...")
print("(This may take a few minutes on first run)\n")

embedding_model = SentenceTransformer(
    'Snowflake/snowflake-arctic-embed-l',
    trust_remote_code=True
)

print("✓ Model loaded successfully")
print(f"  Embedding dimension: {embedding_model.get_sentence_embedding_dimension()}")


### Generate Embeddings for All Chunks


In [None]:
def generate_embeddings(chunks: List[Dict[str, Any]], model: SentenceTransformer) -> List[Dict[str, Any]]:
    """
    Generate embeddings for all chunks.
    
    Args:
        chunks: List of chunk dictionaries
        model: SentenceTransformer model
    
    Returns:
        Chunks with added 'embedding' field
    """
    print(f"Generating embeddings for {len(chunks)} chunks...")
    
    # Extract text from chunks
    texts = [chunk['text'] for chunk in chunks]
    
    # Generate embeddings in batches for efficiency
    embeddings = model.encode(
        texts,
        batch_size=32,
        show_progress_bar=True,
        convert_to_numpy=True
    )
    
    # Add embeddings to chunks
    for chunk, embedding in zip(chunks, embeddings):
        chunk['embedding'] = embedding.tolist()  # Convert to list for JSON serialization
    
    return chunks

# Generate embeddings
all_chunks = generate_embeddings(all_chunks, embedding_model)

print(f"\n✓ Embeddings generated for all {len(all_chunks)} chunks")
print(f"Embedding dimension: {len(all_chunks[0]['embedding'])}")


## 💾 Step 4: Store in Milvus Vector Database

### About Milvus

[Milvus](https://milvus.io/) is an open-source vector database built for AI applications. It provides:
- Fast similarity search
- Scalable vector storage
- Multiple indexing algorithms
- Hybrid search capabilities

### Using Milvus Lite

We're using **[Milvus Lite](https://milvus.io/docs/milvus_lite.md)** - a lightweight, file-based version of Milvus that:
- Requires no Docker or server setup
- Stores data locally in a file
- Perfect for development, testing, and small-scale deployments
- Uses the same API as full Milvus

For production deployments with larger datasets, consider using the full Milvus server with Docker or Kubernetes.


In [None]:
from pymilvus import (
    connections,
    Collection,
    CollectionSchema,
    FieldSchema,
    DataType,
    utility
)

# Configuration
COLLECTION_NAME = "rag_documents"
EMBEDDING_DIM = len(all_chunks[0]['embedding'])

# Connect to Milvus Lite (file-based, no Docker needed!)
print("Connecting to Milvus Lite (local file-based database)...")
print("This lightweight option is perfect for development and testing.\n")

connections.connect(
    alias="default",
    uri=str(milvus_db_path)  # Use configured database path
)

print("✓ Connected to Milvus Lite successfully!")
print(f"✓ Database file: {milvus_db_path.resolve()}")
print(f"✓ Collection name: '{COLLECTION_NAME}'")
print(f"✓ Embedding dimension: {EMBEDDING_DIM}\n")


### Create Collection Schema

Define the schema for our RAG collection with fields for:
- Chunk ID (primary key)
- Text content
- Source document
- Metadata
- Embedding vector


In [None]:
# Define collection schema
fields = [
    FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=256),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=1024),
    FieldSchema(name="chunk_index", dtype=DataType.INT64),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM),
]

schema = CollectionSchema(
    fields=fields,
    description="RAG document chunks with embeddings"
)

print(f"Collection schema defined with {len(fields)} fields")


### Create or Recreate Collection


In [None]:
# Drop existing collection if it exists
if utility.has_collection(COLLECTION_NAME):
    print(f"Collection '{COLLECTION_NAME}' exists. Dropping...")
    utility.drop_collection(COLLECTION_NAME)
    print("  ✓ Collection dropped")

# Create new collection
print(f"Creating collection '{COLLECTION_NAME}'...")
collection = Collection(
    name=COLLECTION_NAME,
    schema=schema,
    using='default'
)

print(f"✓ Collection '{COLLECTION_NAME}' created successfully")


### Insert Data into Milvus


In [None]:
# Prepare data for insertion
print(f"Preparing {len(all_chunks)} chunks for insertion...")

insert_data = [
    [chunk['chunk_id'] for chunk in all_chunks],  # id
    [chunk['text'][:65535] for chunk in all_chunks],  # text (truncate if needed)
    [chunk['source'] for chunk in all_chunks],  # source
    [chunk['chunk_index'] for chunk in all_chunks],  # chunk_index
    [chunk['embedding'] for chunk in all_chunks],  # embedding
]

# Insert data
print("Inserting data into Milvus...")
collection.insert(insert_data)

print(f"✓ Inserted {len(all_chunks)} chunks into Milvus")

# Flush to ensure data is persisted
collection.flush()
print("✓ Data flushed to disk")


### Create Index and Load Collection

Create an index on the embedding field to enable search functionality. For small datasets like this demo, a simple **FLAT** index works well. For production with larger datasets (1000+ vectors), consider **IVF_FLAT** or **HNSW** for better performance.


In [None]:
# Define index parameters
# For small datasets (<1000 vectors), FLAT index is simple and fast
# For larger datasets, consider IVF_FLAT or HNSW
index_params = {
    "metric_type": "COSINE",  # Cosine similarity (works without pre-normalization)
    "index_type": "FLAT",  # Simple brute-force search, perfect for small datasets
}

# For larger datasets (1000+ vectors), use IVF_FLAT instead:
# index_params = {
#     "metric_type": "COSINE",
#     "index_type": "IVF_FLAT",
#     "params": {"nlist": 128}  # Number of clusters
# }

print("Creating index on embedding field...")
collection.create_index(
    field_name="embedding",
    index_params=index_params
)

print("✓ Index created successfully")

# Load collection into memory for search
collection.load()
print("✓ Collection loaded into memory")


## 🔍 Step 5: Test RAG Retrieval

Let's test our RAG system by performing a similarity search.

In [None]:
def search_similar_chunks(query: str, top_k: int = 5) -> List[Dict[str, Any]]:
    """
    Search for similar chunks using semantic similarity.
    
    Args:
        query: Search query text
        top_k: Number of results to return
    
    Returns:
        List of similar chunks with scores
    """
    # Generate embedding for query
    query_embedding = embedding_model.encode([query])[0].tolist()
    
    # Search parameters (FLAT index doesn't require nprobe parameter)
    search_params = {
        "metric_type": "COSINE",
        "params": {}  # Empty params for FLAT index (would need nprobe for IVF_FLAT)
    }
    
    # Perform search
    results = collection.search(
        data=[query_embedding],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        output_fields=["id", "text", "source", "chunk_index"]
    )
    
    # Format results
    formatted_results = []
    for hits in results:
        for hit in hits:
            formatted_results.append({
                'chunk_id': hit.entity.get('id'),
                'text': hit.entity.get('text'),
                'source': hit.entity.get('source'),
                'chunk_index': hit.entity.get('chunk_index'),
                'similarity_score': hit.score
            })
    
    return formatted_results

print("✓ Search function defined")

### Perform Multiple Test Searches

Test the RAG system with different queries targeting different documents and topics.

In [None]:
# Define test queries for each document
test_queries = [
    "What are the main contributions and results presented in the paper?",
    "What methodology or approach is used in this research?",
    "What are the key findings and conclusions?"
]

# Run each test query and display results
for query_num, query in enumerate(test_queries, 1):
    print(f"\n{'='*80}")
    print(f"🔍 TEST QUERY {query_num}")
    print(f"{'='*80}")
    print(f"Query: '{query}'\n")
    
    # Perform search
    search_results = search_similar_chunks(query, top_k=1)
    
    # Display results
    for idx, result in enumerate(search_results, 1):
        print(f"\n📄 Result {idx}")
        print(f"   Similarity Score: {result['similarity_score']:.4f}")
        print(f"   Source: {result['source'].split('/')[-1][:60]}...")
        print(f"   Chunk ID: {result['chunk_id']}")
        print(f"\n   Text Preview:")
        print(f"   {result['text'][:1000].replace(chr(10), ' ')}...")
        print(f"   {'-'*80}")

print(f"\n{'='*80}")
print(f"✅ RAG System Tested Successfully!")
print(f"✓ Tested {len(test_queries)} queries across different documents")
print(f"{'='*80}")

## 📚 Resources

### Documentation
- [Docling Documentation](https://docling-project.github.io/docling/)
- [Milvus Documentation](https://milvus.io/docs)
- [Snowflake Arctic Embed](https://huggingface.co/Snowflake/snowflake-arctic-embed-l)
- [RAG Best Practices](https://docs.llamaindex.ai/en/stable/optimizing/production_rag/)

### Related Examples & Tutorials
- **RAG with Llama Stack**: For a complete RAG application implementation using Llama Stack, check out the [Docling RAG Notebook](https://github.com/opendatahub-io/rag/blob/main/demos/kfp/docling/pdf-conversion/docling_rag.ipynb) in the Open Data Hub RAG repository
- **Data Processing Examples**: For additional example notebooks related to Data Processing, check the [Open Data Hub Data Processing](https://github.com/opendatahub-io/odh-data-processing/) repository on GitHub

## 💬 Feedback

We'd love to hear your feedback! Please [open an issue](https://github.com/opendatahub-io/odh-data-processing/issues) if you have suggestions or run into any problems.

## 🧹 Cleanup (Optional)

Run this cell to clean up resources if needed.


In [None]:
# # Uncomment to drop the collection
# collection.drop()
# print("✓ Collection dropped")

# # Disconnect from Milvus
# connections.disconnect("default")
# print("✓ Disconnected from Milvus")
