# Complete RAG Pipeline: PDF Processing with Docling + Granite Embeddings + Elasticsearch

This notebook demonstrates an end-to-end RAG (Retrieval-Augmented Generation) pipeline:
1. **PDF Preprocessing** with IBM Docling
2. **Embedding Generation** with Granite Embedding models
3. **Vector Storage & Search** with Elasticsearch

## Prerequisites
- Python 3.8+
- Elasticsearch 8.0+ running locally or remotely
- Sufficient RAM for model loading (~2GB minimum)

## Architecture Overview
```
PDF Documents
    ↓
Docling (Convert to Markdown)
    ↓
Smart Chunking (Semantic boundaries)
    ↓
Granite Embedding (Generate vectors)
    ↓
Elasticsearch (Store & Search)
```

## Cell 1: Install Dependencies

**What this does:** Installs all required Python packages for the pipeline.

**Packages installed:**
- `docling`: IBM's document processing library
- `elasticsearch`: Python client for Elasticsearch
- `sentence-transformers`: For loading Granite embedding models
- `langchain` & `langchain-text-splitters`: For intelligent text chunking
- `torch`: PyTorch backend for embeddings

**Note:** This may take 2-5 minutes on first run.

In [None]:
# Install all required packages
!pip install -q docling elasticsearch>=8.0<9.0 sentence-transformers langchain langchain-text-splitters torch numpy ocrmac

## Cell 2: Import Libraries

**What this does:** Imports all necessary Python libraries and modules.

**Key imports:**
- Document processing: `DocumentConverter`, `PdfPipelineOptions`
- Embeddings: `SentenceTransformer`
- Search: `Elasticsearch`
- Chunking: `RecursiveCharacterTextSplitter`

In [1]:
import os
import json
from pathlib import Path
from typing import List, Dict, Any
import numpy as np

# Docling imports for PDF processing
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions, 
    TableFormerMode, 
    TableStructureOptions,
    TesseractOcrOptions,
    OcrMacOptions
)

# LangChain for text chunking
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document

# Sentence Transformers for Granite embeddings
from sentence_transformers import SentenceTransformer

# Elasticsearch for vector storage
from elasticsearch import Elasticsearch, helpers

print("✓ All libraries imported successfully")

  from .autonotebook import tqdm as notebook_tqdm


✓ All libraries imported successfully


## Cell 3: Configuration Variables

**What this does:** Sets up all configurable parameters for the pipeline.

**Configuration Options:**

### Elasticsearch Settings
- `ES_HOST`: Elasticsearch server address (default: "localhost")
- `ES_PORT`: Elasticsearch port (default: 9200)
- `ES_USER`: Username if authentication enabled (default: "elastic")
- `ES_PASSWORD`: Password if authentication enabled (set to None for no auth)
- `ES_INDEX_NAME`: Name of the index to create (default: "docling_granite_rag")

### Granite Embedding Model Options
Choose one based on your needs:
- `granite-embedding-30m-english`: **Fastest**, English only, 30M parameters
- `granite-embedding-125m-english`: **Balanced**, English only, 125M parameters (RECOMMENDED)
- `granite-embedding-278m-multilingual`: **Best quality**, multilingual, 278M parameters
- `granite-embedding-107m-multilingual`: Multilingual, 107M parameters

### Document Processing Settings
- `ENABLE_OCR`: Enable OCR for scanned PDFs (True/False)
- `CHUNK_SIZE`: Maximum characters per chunk (default: 1000)
  - Smaller (500-800): More precise retrieval, more chunks
  - Larger (1200-1500): More context, fewer chunks
- `CHUNK_OVERLAP`: Character overlap between chunks (default: 200)
  - Prevents context loss at chunk boundaries
- `BATCH_SIZE`: Documents to process at once (default: 50)
  - Reduce if memory issues occur

### Output Settings
- `OUTPUT_DIR`: Directory for intermediate files (default: "./output")
- `SAVE_INTERMEDIATE`: Save Markdown files (True/False)

Update the ES connection details in the cell below. Here is an example of the format to follow. 

🚨 Note: The ES_HOST is just the url minus the protocol prefix. 

![es connection details example](resources/es_connection_credentials.png)

In [None]:
# ===== ELASTICSEARCH CONFIGURATION =====
from psutil import MACOS


ES_HOST = ""  # Change to your Elasticsearch host
ES_PORT = "31972"         # Change to your Elasticsearch port
ES_USER = ""    # Change if using different username
ES_PASSWORD = ""    # Set password here if authentication is enabled, e.g., "your_password"
ES_INDEX_NAME = "docling_granite_rag"  # Name of the Elasticsearch index

# ===== GRANITE EMBEDDING MODEL CONFIGURATION =====
# Choose your model based on requirements:
# - For speed: granite-embedding-30m-english
# - For balance: granite-embedding-125m-english (RECOMMENDED)
# - For multilingual: granite-embedding-278m-multilingual
GRANITE_MODEL = "ibm-granite/granite-embedding-125m-english"

# ===== DOCUMENT PROCESSING CONFIGURATION =====
ENABLE_OCR = True      # Enable OCR for scanned PDFs
MACOS = True            # Manual override. Enables OcrMacOptions() instead of TesseractOcrOptions(); Set to true if running on macos to get faster more accurate OCR with Apple's native Vision framework. 
CHUNK_SIZE = 512      # Characters per chunk (adjust based on your needs)
CHUNK_OVERLAP = 100    # Overlap between chunks to maintain context
BATCH_SIZE = 50        # Number of documents to process in one batch

# ===== OUTPUT CONFIGURATION =====
OUTPUT_DIR = "./output"              # Directory for intermediate files
SAVE_INTERMEDIATE = True             # Save Markdown files for inspection

# Create output directory if it doesn't exist
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)

print("Configuration:")
print(f"  Elasticsearch: {ES_HOST}:{ES_PORT}")
print(f"  Index Name: {ES_INDEX_NAME}")
print(f"  Model: {GRANITE_MODEL}")
print(f"  Chunk Size: {CHUNK_SIZE} chars")
print(f"  Chunk Overlap: {CHUNK_OVERLAP} chars")
print(f"  OCR Enabled: {ENABLE_OCR}")
print(f"  Output Directory: {OUTPUT_DIR}")
print("✓ Configuration complete")

Configuration:
  Elasticsearch: 6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972
  Index Name: docling_granite_rag
  Model: ibm-granite/granite-embedding-125m-english
  Chunk Size: 1000 chars
  Chunk Overlap: 200 chars
  OCR Enabled: True
  Output Directory: ./output
✓ Configuration complete


## Cell 4: Initialize Docling PDF Processor

**What this does:** Creates a Docling document converter with configured options.

**Key features:**
- Converts PDFs to Markdown while preserving structure
- Handles tables, images, and complex layouts
- Optional OCR for scanned documents
- Supports both local files and URLs

In [4]:
# Configure PDF pipeline options
pipeline_options = PdfPipelineOptions()

# Enable OCR if configured
if ENABLE_OCR:
    if MACOS:
        pipeline_options.ocr_options = OcrMacOptions()
        print("OCR enabled for scanned documents (using macOS Vision)")
    else:
        pipeline_options.ocr_options = TesseractOcrOptions()
        print("OCR enabled for scanned documents (using Tesseract)")

# Enable table structure processing with Granite model
pipeline_options.table_structure_options = TableStructureOptions(
        mode=TableFormerMode.ACCURATE,  # Use accurate mode for better quality
        do_cell_matching=True
    )
print("IBM Granite 4 Docling enabled for scanned documents")

# Initialize Docling converter
docling_converter = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(
            pipeline_options=pipeline_options,
            # The model will be automatically downloaded from HuggingFace
            table_structure_model="ibm-granite/granite-docling-258M"
            )
    }
)

print("✓ Docling PDF processor initialized")

OCR enabled for scanned documents (using macOS Vision)
IBM Granite 4 Docling enabled for scanned documents
✓ Docling PDF processor initialized


## Cell 5: Initialize Granite Embedding Model

**What this does:** Loads the Granite embedding model into memory.

**Performance notes:**
- First run: Downloads model (~50-500MB depending on model size)
- Subsequent runs: Loads from cache (fast)
- Memory usage:
  - 30M model: ~500MB RAM
  - 125M model: ~1GB RAM
  - 278M model: ~2GB RAM

**Model capabilities:**
- Generates dense vector embeddings
- Optimized for semantic search and RAG
- Normalized embeddings (cosine similarity ready)

In [5]:
print(f"Loading Granite Embedding model: {GRANITE_MODEL}")
print("This may take a moment on first run...")

# Load the Granite embedding model
embedding_model = SentenceTransformer(GRANITE_MODEL)

# Get embedding dimension for Elasticsearch configuration
embedding_dim = embedding_model.get_sentence_embedding_dimension()

print(f"✓ Model loaded successfully")
print(f"  Embedding dimension: {embedding_dim}")
print(f"  Model ready for encoding")

2025-10-16 09:46:31,468 - INFO - Use pytorch device_name: mps
2025-10-16 09:46:31,468 - INFO - Load pretrained SentenceTransformer: ibm-granite/granite-embedding-125m-english


Loading Granite Embedding model: ibm-granite/granite-embedding-125m-english
This may take a moment on first run...
✓ Model loaded successfully
  Embedding dimension: 768
  Model ready for encoding


## Cell 6: Initialize Elasticsearch Connection

**What this does:** Establishes connection to Elasticsearch and verifies it's working.

**Connection modes:**
- **With authentication**: Uses ES_USER and ES_PASSWORD
- **Without authentication**: Direct connection (local dev setups)

**Troubleshooting:**
- If connection fails, verify Elasticsearch is running: `curl http://localhost:9200`
- Check firewall settings if using remote Elasticsearch
- Verify credentials if authentication is enabled

In [6]:
print(f"Connecting to Elasticsearch at https://{ES_HOST}:{ES_PORT}...")

# Initialize Elasticsearch client with or without authentication
if ES_PASSWORD:
    es_client = Elasticsearch(
        f"https://{ES_HOST}:{ES_PORT}",
        basic_auth=(ES_USER, ES_PASSWORD),
        verify_certs=True, # Set to True if using valid SSL certificates
        ca_certs="es_certs/cert.pem",
        request_timeout=300
    )
    print(f"Connecting with authentication (user: {ES_USER})")
else:
    es_client = Elasticsearch([f"http://{ES_HOST}:{ES_PORT}"])
    print("Connecting without authentication")

# Test connection
if es_client.ping():
    info = es_client.info()
    print(f"✓ Connected to Elasticsearch {info['version']['number']}")
    print(f"  Cluster: {info['cluster_name']}")
else:
    print("✗ Failed to connect to Elasticsearch")
    print("  Please verify Elasticsearch is running and configuration is correct")

Connecting to Elasticsearch at https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972...
Connecting with authentication (user: ibm_cloud_033b3d2c_28d7_4154_9702_e0f3b7a325fb)


2025-10-16 09:47:18,352 - INFO - HEAD https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/ [status:200 duration:0.264s]
2025-10-16 09:47:18,381 - INFO - GET https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/ [status:200 duration:0.028s]


✓ Connected to Elasticsearch 8.15.0
  Cluster: 6f9bed01-e25b-4a72-8d95-1b5a485e48ad


## Cell 7: Create Elasticsearch Index

**What this does:** Creates an Elasticsearch index with proper mappings for vector search.

**Index structure:**
- `text`: Full-text field for keyword search (type: text)
- `embedding`: Dense vector field for semantic search (type: dense_vector)
- `metadata`: Flexible object for document metadata (type: object)
- `doc_id`: Unique identifier (type: keyword)
- `chunk_id`: Chunk position in document (type: integer)
- `source`: Original document name (type: keyword)

**Vector settings:**
- Dimension: Matches Granite model output
- Similarity: Cosine (best for normalized embeddings)
- Indexing: Enabled for fast kNN search

**Warning:** Setting `delete_if_exists=True` will DELETE any existing index with the same name!

In [7]:
# Configuration: Set to True to delete existing index
DELETE_IF_EXISTS = True  # CHANGE TO True IF YOU WANT TO RECREATE THE INDEX

# Delete existing index if configured
if DELETE_IF_EXISTS and es_client.indices.exists(index=ES_INDEX_NAME):
    es_client.indices.delete(index=ES_INDEX_NAME)
    print(f"⚠ Deleted existing index: {ES_INDEX_NAME}")

# Define index mapping
index_mapping = {
    "mappings": {
        "properties": {
            # Full text for keyword search
            "text": {
                "type": "text",
                "analyzer": "standard"
            },
            # Vector embedding for semantic search
            "embedding": {
                "type": "dense_vector",
                "dims": embedding_dim,
                "index": True,
                "similarity": "cosine"
            },
            # Document metadata
            "metadata": {
                "type": "object",
                "enabled": True
            },
            # Unique document identifier
            "doc_id": {
                "type": "keyword"
            },
            # Chunk position in document
            "chunk_id": {
                "type": "integer"
            },
            # Source document name
            "source": {
                "type": "keyword"
            }
        }
    },
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    }
}

# Create index if it doesn't exist
if not es_client.indices.exists(index=ES_INDEX_NAME):
    es_client.indices.create(index=ES_INDEX_NAME, body=index_mapping)
    print(f"✓ Created index: {ES_INDEX_NAME}")
    print(f"  Vector dimensions: {embedding_dim}")
    print(f"  Similarity metric: cosine")
else:
    print(f"✓ Index already exists: {ES_INDEX_NAME}")

2025-10-16 09:48:09,805 - INFO - HEAD https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/docling_granite_rag [status:200 duration:0.032s]
2025-10-16 09:48:09,906 - INFO - DELETE https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/docling_granite_rag [status:200 duration:0.100s]
2025-10-16 09:48:09,933 - INFO - HEAD https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/docling_granite_rag [status:404 duration:0.027s]


⚠ Deleted existing index: docling_granite_rag


2025-10-16 09:48:10,179 - INFO - PUT https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/docling_granite_rag [status:200 duration:0.245s]


✓ Created index: docling_granite_rag
  Vector dimensions: 768
  Similarity metric: cosine


## Cell 8: Define Helper Functions

**What this does:** Creates reusable functions for the pipeline.

**Functions defined:**

### 1. `convert_pdf_to_markdown(pdf_path, save_output=True)`
- Converts PDF to Markdown format
- Preserves document structure
- Optionally saves to file

### 2. `chunk_markdown(markdown_text, chunk_size, chunk_overlap)`
- Splits Markdown into semantic chunks
- Respects document structure (headers, paragraphs)
- Maintains context with overlap

### 3. `generate_embeddings(texts)`
- Converts text to vector embeddings
- Batch processing for efficiency
- Normalized vectors for cosine similarity

### 4. `index_chunks_to_elasticsearch(chunks, source_name)`
- Stores chunks with embeddings in Elasticsearch
- Bulk indexing for performance
- Adds metadata for tracking

In [8]:
def convert_pdf_to_markdown(pdf_path: str, save_output: bool = SAVE_INTERMEDIATE) -> str:
    """
    Convert PDF to Markdown using Docling with Granite model.
    
    Args:
        pdf_path: Path to PDF file or URL
        save_output: Whether to save Markdown to file
    
    Returns:
        Markdown string
    """
    print(f"Converting PDF with Granite Docling model: {pdf_path}")
    
    # Convert document
    result = docling_converter.convert(pdf_path)
    markdown_text = result.document.export_to_markdown()
    
    # Save if requested
    if save_output:
        pdf_name = Path(pdf_path).stem if not pdf_path.startswith('http') else 'downloaded_pdf'
        output_path = Path(OUTPUT_DIR) / f"{pdf_name}.md"
        output_path.write_text(markdown_text, encoding='utf-8')
        print(f"  Saved Markdown to: {output_path}")
    
    print(f"  Converted {len(markdown_text)} characters")
    return markdown_text


def chunk_markdown(
    markdown_text: str,
    chunk_size: int = CHUNK_SIZE,
    chunk_overlap: int = CHUNK_OVERLAP
) -> List[Document]:
    """
    Split Markdown into chunks with semantic boundaries.
    
    Args:
        markdown_text: Markdown text to chunk
        chunk_size: Maximum chunk size in characters
        chunk_overlap: Overlap between chunks
    
    Returns:
        List of LangChain Document objects
    """
    # Define separators that respect Markdown structure
    separators = [
        "\n## ",      # H2 headers
        "\n### ",     # H3 headers
        "\n#### ",    # H4 headers
        "\n\n",       # Paragraphs
        "\n",         # Lines
        ". ",         # Sentences
        " ",          # Words
        ""            # Characters
    ]
    
    # Create text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=separators,
        length_function=len,
    )
    
    # Create chunks
    chunks = text_splitter.create_documents([markdown_text])
    print(f"  Created {len(chunks)} chunks")
    
    return chunks


def generate_embeddings(texts: List[str]) -> np.ndarray:
    """
    Generate embeddings using Granite embedding model.
    
    Args:
        texts: List of text strings to embed
    
    Returns:
        Numpy array of embeddings
    """
    embeddings = embedding_model.encode(
        texts,
        normalize_embeddings=True,
        show_progress_bar=True,
        batch_size=BATCH_SIZE
    )
    return embeddings


def index_chunks_to_elasticsearch(
    chunks: List[Document],
    source_name: str
) -> int:
    """
    Index chunks with embeddings into Elasticsearch.
    
    Args:
        chunks: List of LangChain Document objects
        source_name: Name of source document
    
    Returns:
        Number of documents indexed
    """
    print(f"Generating embeddings for {len(chunks)} chunks...")
    
    # Extract text from chunks
    texts = [chunk.page_content for chunk in chunks]
    
    # Generate embeddings
    embeddings = generate_embeddings(texts)
    
    print(f"Indexing to Elasticsearch...")
    
    # Prepare bulk indexing data
    bulk_data = []
    for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
        doc_id = f"{source_name}_{i}"
        
        bulk_data.append({
            "_index": ES_INDEX_NAME,
            "_id": doc_id,
            "_source": {
                "text": chunk.page_content,
                "embedding": embedding.tolist(),
                "metadata": chunk.metadata,
                "doc_id": doc_id,
                "chunk_id": i,
                "source": source_name
            }
        })
    
    # Bulk index
    success, failed = helpers.bulk(es_client, bulk_data, refresh=True)
    
    print(f"  Indexed: {success} documents")
    if failed:
        print(f"  Failed: {failed} documents")
    
    return success


print("✓ Helper functions defined")

✓ Helper functions defined


## Cell 9: Process PDFs - Complete Pipeline

**What this does:** Runs the complete pipeline on your PDF documents.

**Pipeline steps:**
1. Convert PDF to Markdown (Docling using IBM Granite 4)
2. Chunk Markdown into semantic segments
3. Generate embeddings (Granite)
4. Index to Elasticsearch

**How to use:**
- Add your PDF paths to the `pdf_documents` list
- Supports both local files and URLs
- Processes each document sequentially

**Example PDF sources:**
- Local file: `"/path/to/your/document.pdf"`
- URL: `"https://arxiv.org/pdf/2408.09869"`

In [9]:
# ===== CONFIGURE YOUR PDF DOCUMENTS HERE =====
pdf_documents = [
    # Example: Docling paper from arXiv
     "https://arxiv.org/pdf/2408.09869",
    # "https://www.ibm.com/downloads/documents/us-en/1227c12d3a38b173",
    # Add your own PDFs here:
     "/Users/danielbenner/Documents/4 Archives/work/Seizing the AI and automation opportunity/SUMMARY_Seizing the AI and automation opportunity.pdf",
     "/Users/danielbenner/Documents/4 Archives/work/INBOX/20240111_PresalesEngineering.pdf",
    # "https://example.com/document.pdf"
]

print(f"Processing {len(pdf_documents)} PDF documents...\n")
print("=" * 60)

total_chunks_indexed = 0

# Process each PDF
for i, pdf_path in enumerate(pdf_documents, 1):
    print(f"\n[{i}/{len(pdf_documents)}] Processing: {pdf_path}")
    print("-" * 60)
    
    try:
        # Step 1: Convert PDF to Markdown
        markdown = convert_pdf_to_markdown(pdf_path)
        
        # Step 2: Chunk the Markdown
        chunks = chunk_markdown(markdown)
        
        # Step 3 & 4: Generate embeddings and index to Elasticsearch
        source_name = Path(pdf_path).stem if not pdf_path.startswith('http') else f"doc_{i}"
        num_indexed = index_chunks_to_elasticsearch(chunks, source_name)
        
        total_chunks_indexed += num_indexed
        print(f"✓ Completed processing {pdf_path}")
        
    except Exception as e:
        print(f"✗ Error processing {pdf_path}: {str(e)}")
        continue

print("\n" + "=" * 60)
print(f"\n✓ Pipeline Complete!")
print(f"  Total documents processed: {len(pdf_documents)}")
print(f"  Total chunks indexed: {total_chunks_indexed}")

# Verify index status
count = es_client.count(index=ES_INDEX_NAME)['count']
print(f"  Documents in Elasticsearch: {count}")

Processing 3 PDF documents...


[1/3] Processing: https://arxiv.org/pdf/2408.09869
------------------------------------------------------------
Converting PDF with Granite Docling model: https://arxiv.org/pdf/2408.09869


2025-10-16 09:50:18,622 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2025-10-16 09:50:18,665 - INFO - Going to convert document batch...
2025-10-16 09:50:18,665 - INFO - Initializing pipeline for StandardPdfPipeline with options hash 82998634576463395d3c31474c3c1569
2025-10-16 09:50:18,673 - INFO - Loading plugin 'docling_defaults'
2025-10-16 09:50:18,674 - INFO - Registered picture descriptions: ['vlm', 'api']
2025-10-16 09:50:18,680 - INFO - Loading plugin 'docling_defaults'
2025-10-16 09:50:18,683 - INFO - Registered ocr engines: ['easyocr', 'ocrmac', 'rapidocr', 'tesserocr', 'tesseract']
2025-10-16 09:50:19,257 - INFO - Accelerator device: 'mps'
2025-10-16 09:50:30,749 - INFO - Accelerator device: 'mps'
2025-10-16 09:50:31,259 - INFO - Processing document 2408.09869v5.pdf
2025-10-16 09:50:39,643 - INFO - Finished converting document 2408.09869v5.pdf in 21.65 sec.


  Saved Markdown to: output/downloaded_pdf.md
  Converted 33326 characters
  Created 52 chunks
Generating embeddings for 52 chunks...


Batches: 100%|██████████| 2/2 [00:01<00:00,  1.16it/s]


Indexing to Elasticsearch...


2025-10-16 09:50:42,773 - INFO - PUT https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/_bulk?refresh=true [status:200 duration:1.359s]
2025-10-16 09:50:42,780 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2025-10-16 09:50:42,785 - INFO - Going to convert document batch...
2025-10-16 09:50:42,786 - INFO - Processing document SUMMARY_Seizing the AI and automation opportunity.pdf


  Indexed: 52 documents
✓ Completed processing https://arxiv.org/pdf/2408.09869

[2/3] Processing: /Users/danielbenner/Documents/4 Archives/work/Seizing the AI and automation opportunity/SUMMARY_Seizing the AI and automation opportunity.pdf
------------------------------------------------------------
Converting PDF with Granite Docling model: /Users/danielbenner/Documents/4 Archives/work/Seizing the AI and automation opportunity/SUMMARY_Seizing the AI and automation opportunity.pdf


2025-10-16 09:50:43,489 - INFO - Finished converting document SUMMARY_Seizing the AI and automation opportunity.pdf in 0.71 sec.


  Saved Markdown to: output/SUMMARY_Seizing the AI and automation opportunity.md
  Converted 2158 characters
  Created 3 chunks
Generating embeddings for 3 chunks...


Batches: 100%|██████████| 1/1 [00:00<00:00,  8.84it/s]


Indexing to Elasticsearch...


2025-10-16 09:50:43,739 - INFO - PUT https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/_bulk?refresh=true [status:200 duration:0.131s]
2025-10-16 09:50:43,744 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2025-10-16 09:50:43,748 - INFO - Going to convert document batch...
2025-10-16 09:50:43,749 - INFO - Processing document 20240111_PresalesEngineering.pdf


  Indexed: 3 documents
✓ Completed processing /Users/danielbenner/Documents/4 Archives/work/Seizing the AI and automation opportunity/SUMMARY_Seizing the AI and automation opportunity.pdf

[3/3] Processing: /Users/danielbenner/Documents/4 Archives/work/INBOX/20240111_PresalesEngineering.pdf
------------------------------------------------------------
Converting PDF with Granite Docling model: /Users/danielbenner/Documents/4 Archives/work/INBOX/20240111_PresalesEngineering.pdf


2025-10-16 09:50:54,365 - INFO - Finished converting document 20240111_PresalesEngineering.pdf in 10.63 sec.


  Saved Markdown to: output/20240111_PresalesEngineering.md
  Converted 35065 characters
  Created 53 chunks
Generating embeddings for 53 chunks...


Batches: 100%|██████████| 2/2 [00:01<00:00,  1.78it/s]


Indexing to Elasticsearch...


2025-10-16 09:50:56,818 - INFO - PUT https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/_bulk?refresh=true [status:200 duration:1.292s]
2025-10-16 09:50:56,860 - INFO - POST https://6f9bed01-e25b-4a72-8d95-1b5a485e48ad.bn2a2vgd01r3l0hfmvc0.databases.appdomain.cloud:31972/docling_granite_rag/_count [status:200 duration:0.038s]


  Indexed: 53 documents
✓ Completed processing /Users/danielbenner/Documents/4 Archives/work/INBOX/20240111_PresalesEngineering.pdf


✓ Pipeline Complete!
  Total documents processed: 3
  Total chunks indexed: 108
  Documents in Elasticsearch: 108


## Cell 10: Define Search Functions

**What this does:** Creates functions for semantic and hybrid search.

**Search capabilities:**

### 1. `semantic_search(query, top_k, min_score)`
- Pure vector similarity search
- Uses cosine similarity
- Best for finding conceptually similar content
- Ignores exact keyword matches

### 2. `hybrid_search(query, top_k, semantic_weight)`
- Combines semantic + keyword search
- `semantic_weight` controls the balance:
  - 0.0 = Pure keyword search
  - 0.5 = Equal weight
  - 1.0 = Pure semantic search
- Best for most production use cases

**Parameters:**
- `query`: Search query text
- `top_k`: Number of results to return (default: 5)
- `min_score`: Minimum similarity score (0.0-1.0)
- `semantic_weight`: Weight for semantic search (0.0-1.0)

In [None]:
def semantic_search(
    query: str,
    top_k: int = 5,
    min_score: float = 0.0
) -> List[Dict[str, Any]]:
    """
    Perform semantic search using Granite embeddings.
    
    Args:
        query: Search query text
        top_k: Number of results to return
        min_score: Minimum similarity score threshold
    
    Returns:
        List of search results with scores
    """
    # Generate query embedding
    query_embedding = embedding_model.encode(
        [query],
        normalize_embeddings=True
    )[0]
    
    # Elasticsearch KNN search using script_score
    search_query = {
        "size": top_k,
        "query": {
            "script_score": {
                "query": {"match_all": {}},
                "script": {
                    "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
                    "params": {"query_vector": query_embedding.tolist()}
                }
            }
        },
        "_source": ["text", "source", "chunk_id", "metadata"]
    }
    
    response = es_client.search(index=ES_INDEX_NAME, body=search_query)
    
    # Process results
    results = []
    for hit in response['hits']['hits']:
        score = hit['_score'] - 1.0  # Adjust back to cosine similarity
        if score >= min_score:
            results.append({
                'text': hit['_source']['text'],
                'source': hit['_source']['source'],
                'chunk_id': hit['_source']['chunk_id'],
                'metadata': hit['_source'].get('metadata', {}),
                'score': score
            })
    
    return results


def hybrid_search(
    query: str,
    top_k: int = 5,
    semantic_weight: float = 0.7
) -> List[Dict[str, Any]]:
    """
    Perform hybrid search combining semantic and keyword search.
    
    Args:
        query: Search query text
        top_k: Number of results to return
        semantic_weight: Weight for semantic search (0-1)
    
    Returns:
        List of search results with combined scores
    """
    keyword_weight = 1.0 - semantic_weight
    
    # Generate query embedding
    query_embedding = embedding_model.encode(
        [query],
        normalize_embeddings=True
    )[0]
    
    # Hybrid search query
    search_query = {
        "size": top_k,
        "query": {
            "script_score": {
                "query": {
                    "bool": {
                        "should": [
                            {
                                "match": {
                                    "text": {
                                        "query": query,
                                        "boost": keyword_weight
                                    }
                                }
                            }
                        ]
                    }
                },
                "script": {
                    "source": f"({semantic_weight} * (cosineSimilarity(params.query_vector, 'embedding') + 1.0)) + _score",
                    "params": {"query_vector": query_embedding.tolist()}
                }
            }
        },
        "_source": ["text", "source", "chunk_id", "metadata"]
    }
    
    response = es_client.search(index=ES_INDEX_NAME, body=search_query)
    
    # Process results
    results = []
    for hit in response['hits']['hits']:
        results.append({
            'text': hit['_source']['text'],
            'source': hit['_source']['source'],
            'chunk_id': hit['_source']['chunk_id'],
            'metadata': hit['_source'].get('metadata', {}),
            'score': hit['_score']
        })
    
    return results


print("✓ Search functions defined")

## Cell 11: Example - Semantic Search

**What this does:** Demonstrates pure semantic search capabilities.

**Use cases:**
- Finding conceptually similar content
- Question answering
- Discovering related documents
- Cross-lingual search (with multilingual models)

**Try different queries:**
- Conceptual: "How does machine learning process documents?"
- Technical: "What are the benefits of vector embeddings?"
- Exploratory: "Methods for converting PDFs to structured data"

**Reading results:**
- Score: 0.0-1.0 (higher = more similar)
- Typical good matches: 0.6+
- Excellent matches: 0.8+

In [None]:
# ===== CONFIGURE YOUR SEARCH QUERY HERE =====
search_query = "How does Docling process PDF documents?"
num_results = 3

print(f"Semantic Search Query: '{search_query}'")
print("=" * 60)

# Perform search
results = semantic_search(search_query, top_k=num_results)

# Display results
if results:
    print(f"\nFound {len(results)} results:\n")
    
    for i, result in enumerate(results, 1):
        print(f"Result {i}:")
        print(f"  Score: {result['score']:.4f}")
        print(f"  Source: {result['source']} (chunk {result['chunk_id']})")
        print(f"  Text preview: {result['text'][:200]}...")
        print()
else:
    print("No results found")

## Cell 12: Example - Hybrid Search

**What this does:** Demonstrates hybrid search combining semantic + keyword matching.

**When to use:**
- Production search applications
- When exact terms matter
- Balancing precision and recall
- User-facing search interfaces

**Tuning semantic_weight:**
- **0.9-1.0**: Mostly semantic (good for natural language queries)
- **0.6-0.8**: Balanced (recommended default)
- **0.3-0.5**: Keyword-heavy (good for technical term searches)
- **0.0-0.2**: Mostly keyword (traditional search)

**Pro tip:** Start with 0.7 and adjust based on your results!

In [None]:
# ===== CONFIGURE YOUR SEARCH QUERY HERE =====
search_query = "table extraction from PDFs"
num_results = 3
semantic_weight = 0.7  # Adjust between 0.0 (keyword) and 1.0 (semantic)

print(f"Hybrid Search Query: '{search_query}'")
print(f"Semantic Weight: {semantic_weight} | Keyword Weight: {1-semantic_weight}")
print("=" * 60)

# Perform search
results = hybrid_search(search_query, top_k=num_results, semantic_weight=semantic_weight)

# Display results
if results:
    print(f"\nFound {len(results)} results:\n")
    
    for i, result in enumerate(results, 1):
        print(f"Result {i}:")
        print(f"  Combined Score: {result['score']:.4f}")
        print(f"  Source: {result['source']} (chunk {result['chunk_id']})")
        print(f"  Text preview: {result['text'][:200]}...")
        print()
else:
    print("No results found")

## Cell 13: Advanced - Batch Query Multiple Questions

**What this does:** Demonstrates how to process multiple queries efficiently.

**Use cases:**
- Evaluating search quality
- Testing different query formulations
- Building Q&A systems
- Automated content discovery

**Add your queries:**
- Modify the `queries` list
- Each query is processed independently
- Results are displayed separately

In [None]:
# ===== CONFIGURE YOUR QUERIES HERE =====
queries = [
    "What is document conversion?",
    "How to extract tables from documents?",
    "Benefits of using embeddings for search",
]

print(f"Processing {len(queries)} queries...\n")
print("=" * 60)

for i, query in enumerate(queries, 1):
    print(f"\nQuery {i}: '{query}'")
    print("-" * 60)
    
    results = semantic_search(query, top_k=2)
    
    if results:
        print(f"Top result (score: {results[0]['score']:.4f}):")
        print(f"  {results[0]['text'][:150]}...")
    else:
        print("  No results found")

print("\n" + "=" * 60)
print("✓ Batch query complete")

## Cell 14: Statistics and Index Information

**What this does:** Displays statistics about your indexed data.

**Information shown:**
- Total documents in index
- Index size on disk
- Number of unique sources
- Sample document structure
- Index health status

In [None]:
print("Index Statistics")
print("=" * 60)

# Get document count
doc_count = es_client.count(index=ES_INDEX_NAME)['count']
print(f"Total documents: {doc_count}")

# Get index stats
stats = es_client.indices.stats(index=ES_INDEX_NAME)
size_bytes = stats['indices'][ES_INDEX_NAME]['total']['store']['size_in_bytes']
size_mb = size_bytes / (1024 * 1024)
print(f"Index size: {size_mb:.2f} MB")

# Get unique sources
agg_query = {
    "size": 0,
    "aggs": {
        "unique_sources": {
            "terms": {
                "field": "source",
                "size": 100
            }
        }
    }
}
agg_results = es_client.search(index=ES_INDEX_NAME, body=agg_query)
sources = agg_results['aggregations']['unique_sources']['buckets']

print(f"\nUnique sources: {len(sources)}")
for source in sources:
    print(f"  - {source['key']}: {source['doc_count']} chunks")

# Sample document
sample = es_client.search(index=ES_INDEX_NAME, size=1)['hits']['hits'][0]['_source']
print(f"\nSample document structure:")
print(f"  Text length: {len(sample['text'])} characters")
print(f"  Embedding dimension: {len(sample['embedding'])}")
print(f"  Source: {sample['source']}")
print(f"  Chunk ID: {sample['chunk_id']}")

print("\n✓ Statistics retrieved")

## Cell 15: Clean Up (Optional)

**What this does:** Provides commands to clean up resources.

**⚠️ WARNING:**
- Uncommenting the delete command will **permanently delete** your index
- All indexed data will be lost
- Only run this if you want to start fresh

**When to use:**
- Finished with this demo
- Want to reindex with different settings
- Testing and experimentation

In [None]:
print("Clean Up Options")
print("=" * 60)

# ⚠️ UNCOMMENT THE FOLLOWING LINES TO DELETE THE INDEX ⚠️
# print(f"Deleting index: {ES_INDEX_NAME}...")
# es_client.indices.delete(index=ES_INDEX_NAME)
# print("✓ Index deleted")

print("To delete the index, uncomment the lines in this cell.")
print(f"Current index '{ES_INDEX_NAME}' is preserved.")

## Summary and Next Steps

### What You've Built
✓ End-to-end RAG pipeline with:
  - PDF preprocessing using Docling
  - Semantic chunking with context preservation
  - High-quality embeddings with Granite models
  - Fast vector search with Elasticsearch

### Key Features
- ✓ Preserves document structure (tables, images, layout)
- ✓ Intelligent chunking respects semantic boundaries
- ✓ Enterprise-ready embeddings with commercial licenses
- ✓ Hybrid search combining semantic + keyword matching

### Next Steps

1. **Optimize for Your Use Case:**
   - Adjust `CHUNK_SIZE` and `CHUNK_OVERLAP` for better results
   - Experiment with different Granite models
   - Tune `semantic_weight` in hybrid search

2. **Scale Up:**
   - Process larger document collections
   - Implement batch processing for efficiency
   - Add document deduplication

3. **Enhance Search:**
   - Add metadata filtering
   - Implement re-ranking
   - Add query expansion

4. **Build Applications:**
   - Connect to LLM for Q&A (Granite 4, GPT, etc.)
   - Build web API with FastAPI
   - Create interactive UI with Streamlit

5. **Production Considerations:**
   - Add error handling and logging
   - Implement monitoring and metrics
   - Set up index backups
   - Configure Elasticsearch security

### Resources
- Docling: https://github.com/DS4SD/docling
- Granite Models: https://www.ibm.com/granite
- Elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

### Questions?
Review the detailed comments in each cell for guidance on configuration and usage.

Happy building! 🚀