# 🏆 RAG Pipeline Construction & Evaluation Mini-Challenge

## Overview

This notebook guides you through building and evaluating Retrieval-Augmented Generation (RAG) pipelines. You will:

1. **Process a PDF document** using `docling` to extract text and handle images
2. **Experiment with various RAG components**: chunking strategies, embedding models, retrieval techniques
3. **Evaluate pipeline performance** using the `ragas` framework

## Setup Instructions

First, let's install all required dependencies and set up our environment.

In [None]:
# Install required packages (uncomment and run if needed)
# !pip install docling-core ollama langchain openai ragas datasets pandas pymupdf chromadb rank_bm25 tiktoken python-dotenv scikit-learn pillow rapidocr_onnxruntime

# Import necessary libraries
import os
import time
import pandas as pd
from pathlib import Path
from dotenv import load_dotenv

# Create necessary directories
Path("output").mkdir(exist_ok=True)
Path("results").mkdir(exist_ok=True)

print("✅ Setup complete! Make sure Ollama is running if you plan to use local models.")
print("💡 Don't forget to create your .env file with API keys if using OpenAI/Cohere.")

## Part 1: Document Processing

The first step is to process our PDF document using Docling to extract text and handle images.
This creates a clean markdown file that we can use for our RAG pipeline.

In [None]:
"""
Processes a PDF using Docling for text extraction and image handling,
preparing it for RAG pipeline ingestion.
"""

import base64
import re
import textwrap
import time
from io import BytesIO
from pathlib import Path
import numpy as np
from dotenv import load_dotenv
from PIL import Image
import ollama  # Make sure ollama server is running

# --- Docling Imports ---
try:
    from docling.datamodel.base_models import InputFormat
    from docling.datamodel.pipeline_options import (
        PdfPipelineOptions,
        RapidOcrOptions,
    )
    from docling.document_converter import DocumentConverter, PdfFormatOption
    print("Docling imports successful.")
except ImportError:
    print("Error: Failed to import Docling components.")
    print("Please ensure 'docling-core' and its dependencies are installed.")

# Configuration
PDF_PATH = Path("API_FR.pdf")  # Input PDF for the challenge
OUTPUT_DIR = Path("output")
OUTPUT_PROCESSED_TEXT_PATH = OUTPUT_DIR / "processed_text.md"
OUTPUT_DIR.mkdir(exist_ok=True)

# Placeholders
IMAGE_PLACEHOLDER = "<!__ image __>"
PAGE_BREAK_PLACEHOLDER = "\n\n--- Page Break ---\n\n"

In [None]:
def replace_occurrences(text: str, target: str, replacements: list[str]) -> str:
    """Replaces sequential occurrences of a target string with replacements."""
    current_replacement_index = 0
    while target in text and current_replacement_index < len(replacements):
        text = text.replace(target, replacements[current_replacement_index], 1)
        current_replacement_index += 1
    return text

def process_pdf(pdf_path: Path, output_dir: Path, output_filename: str):
    """Converts PDF to Markdown text using Docling."""
    print("--- Starting Document Processing ---")
    start_time = time.time()

    # Configure Docling
    pipeline_options = PdfPipelineOptions(
        generate_page_images=True,
        images_scale=0.5,
        do_ocr=True,
        do_picture_description=False,  # Simplified for this challenge
        ocr_options=RapidOcrOptions(),
    )

    converter = DocumentConverter(
        format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)}
    )

    # Convert PDF
    if not pdf_path.is_file():
        print(f"Error: PDF file not found at {pdf_path}")
        return

    print(f"Processing PDF: {pdf_path}...")
    result = converter.convert(pdf_path)
    doc = result.document

    # Export to Markdown
    markdown_text = doc.export_to_markdown(
        page_break_placeholder=PAGE_BREAK_PLACEHOLDER,
        image_placeholder=IMAGE_PLACEHOLDER,
    )

    # Basic cleaning
    processed_text = re.sub(r'\n{3,}', '\n\n', markdown_text).strip()

    # Save processed text
    output_path = output_dir / output_filename
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(processed_text)

    print(f"Processed text saved to: {output_path}")
    print(f"--- Processing completed in {time.time() - start_time:.2f} seconds ---")

# Run the document processing
if PDF_PATH.exists():
    process_pdf(PDF_PATH, OUTPUT_DIR, OUTPUT_PROCESSED_TEXT_PATH.name)
else:
    print(f"⚠️  PDF file '{PDF_PATH}' not found. Please add your PDF file to continue.")

## Part 2: RAG Pipeline Evaluation

Now we'll build and evaluate different RAG configurations using various:
- **Chunking strategies** (RecursiveCharacterTextSplitter with different parameters)
- **Retrieval methods** (Basic vector search, Parent Document Retriever, Ensemble Retriever)
- **Evaluation metrics** (faithfulness, answer relevancy, context precision, etc.)

In [None]:
"""
RAG Pipeline Evaluation Script for Mini-Challenge.
"""
import os
import time
import pandas as pd
from pathlib import Path
from dotenv import load_dotenv

# --- LangChain Imports ---
from langchain_community.vectorstores import Chroma
from langchain_community.chat_models import ChatOllama
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.docstore.in_memory import InMemoryStore
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import ParentDocumentRetriever, EnsembleRetriever

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema.document import Document

# --- Ragas Imports ---
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
    answer_correctness,
)
from datasets import Dataset
from tqdm import tqdm

# Configuration
load_dotenv()
PROCESSED_TEXT_PATH = Path("output/processed_text.md")
EVAL_DATASET_PATH = Path("groundtruth_eval_dataset.csv")
RESULTS_DIR = Path("results")
RESULTS_DIR.mkdir(exist_ok=True)

# LLM Configuration
USE_OLLAMA = True
if USE_OLLAMA:
    LLM_MODEL = "gemma:2b"
    EMBEDDING_MODEL = "nomic-embed-text"
    print(f"Using Ollama: LLM='{LLM_MODEL}', Embeddings='{EMBEDDING_MODEL}'")
else:
    from langchain_openai import ChatOpenAI, OpenAIEmbeddings
    LLM_MODEL = "gpt-3.5-turbo"
    EMBEDDING_MODEL = "text-embedding-ada-002"
    print(f"Using OpenAI: LLM='{LLM_MODEL}', Embeddings='{EMBEDDING_MODEL}'")

In [None]:
# Helper Functions for RAG Pipeline

def load_processed_text(file_path: Path) -> str:
    """Loads the processed markdown text."""
    if not file_path.is_file():
        raise FileNotFoundError(f"Processed text file not found at {file_path}")
    with open(file_path, "r", encoding="utf-8") as f:
        return f.read()

def load_evaluation_dataset(file_path: Path) -> Dataset:
    """Loads the evaluation dataset from CSV."""
    if not file_path.is_file():
        raise FileNotFoundError(f"Evaluation dataset not found at {file_path}")
    df = pd.read_csv(file_path)
    df["ground_truths"] = df["ground_truth"].apply(lambda x: [str(x)] if pd.notna(x) else [])
    df_ragas = df[["question", "ground_truths"]]
    return Dataset.from_pandas(df_ragas)

def get_llm():
    """Initializes the LLM based on configuration."""
    if USE_OLLAMA:
        return ChatOllama(model=LLM_MODEL, temperature=0.0)
    else:
        return ChatOpenAI(model_name=LLM_MODEL, temperature=0.0)

def get_embeddings():
    """Initializes the embedding model based on configuration."""
    if USE_OLLAMA:
        return OllamaEmbeddings(model=EMBEDDING_MODEL)
    else:
        return OpenAIEmbeddings(model=EMBEDDING_MODEL)

def format_docs(docs: list[Document]) -> str:
    """Formats retrieved documents into a single string for the prompt."""
    return "\n\n".join(doc.page_content for doc in docs)

In [None]:
# Chunking Strategies

def chunk_text_recursive(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> list[Document]:
    """Chunks text using RecursiveCharacterTextSplitter."""
    print(f"Chunking text: size={chunk_size}, overlap={chunk_overlap}")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""],
    )
    docs = [Document(page_content=chunk) for chunk in text_splitter.split_text(text)]
    print(f"Created {len(docs)} chunks.")
    return docs

# Test chunking on a sample
sample_text = "This is a sample text for testing chunking strategies. " * 100
sample_chunks = chunk_text_recursive(sample_text, chunk_size=200, chunk_overlap=50)
print(f"Sample created {len(sample_chunks)} chunks from {len(sample_text)} characters")

In [None]:
# Retriever Setup Functions

def setup_basic_retriever(docs: list[Document], embeddings, k: int = 3):
    """Sets up a basic Chroma vector store retriever."""
    print(f"Setting up basic Chroma retriever (k={k})...")
    vectorstore = Chroma.from_documents(documents=docs, embedding=embeddings)
    return vectorstore.as_retriever(search_kwargs={"k": k})

def setup_parent_document_retriever(text: str, embeddings,
                                    parent_chunk_size: int = 2000,
                                    child_chunk_size: int = 400,
                                    k_parent: int = 3):
    """Sets up a Parent Document Retriever."""
    print(f"Setting up Parent Document Retriever...")
    parent_splitter = RecursiveCharacterTextSplitter(chunk_size=parent_chunk_size)
    child_splitter = RecursiveCharacterTextSplitter(chunk_size=child_chunk_size)
    vectorstore = Chroma(collection_name="parent_doc_retriever", embedding_function=embeddings)
    store = InMemoryStore()

    retriever = ParentDocumentRetriever(
        vectorstore=vectorstore,
        docstore=store,
        child_splitter=child_splitter,
        parent_splitter=parent_splitter,
        search_kwargs={"k": k_parent}
    )

    initial_docs = [Document(page_content=doc) for doc in parent_splitter.split_text(text)]
    retriever.add_documents(initial_docs, ids=None)
    return retriever

def setup_ensemble_retriever(docs: list[Document], embeddings, k: int = 3, bm25_weight: float = 0.5):
    """Sets up an Ensemble Retriever (BM25 + Dense)."""
    print(f"Setting up Ensemble Retriever (k={k}, bm25_weight={bm25_weight})...")

    # BM25 retriever
    bm25_retriever = BM25Retriever.from_documents(docs)
    bm25_retriever.k = k

    # Dense retriever
    vectorstore = Chroma.from_documents(documents=docs, embedding=embeddings)
    dense_retriever = vectorstore.as_retriever(search_kwargs={"k": k})

    # Ensemble retriever
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, dense_retriever],
        weights=[bm25_weight, 1.0 - bm25_weight]
    )
    return ensemble_retriever

In [None]:
# QA Chain Factory

# RAG Prompt Template
template = """Answer the question based only on the following context. If you cannot answer the question with the context, please respond with 'I don't know'.

### CONTEXT
{context}

### QUESTION
Question: {question}

### ANSWER
"""
RAG_PROMPT = ChatPromptTemplate.from_template(template)

def create_qa_chain(retriever, llm, prompt):
    """Creates a LangChain QA Runnable."""
    print("Creating QA chain...")

    rag_chain_from_docs = (
        RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
        | prompt
        | llm
        | StrOutputParser()
    )

    rag_chain_with_source = RunnableParallel(
        {"context": retriever, "question": RunnablePassthrough()}
    ).assign(answer=rag_chain_from_docs)

    def final_chain(question: str):
        return rag_chain_with_source.invoke(question)

    return final_chain

In [None]:
# Evaluation Functions

EVALUATION_METRICS = [
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
    answer_correctness,
]

def generate_ragas_dataset(rag_chain, eval_dataset: Dataset) -> Dataset:
    """Runs the RAG chain on the evaluation dataset to prepare for Ragas."""
    print("Generating answers and contexts for Ragas evaluation...")
    results = []
    for row in tqdm(eval_dataset):
        question = row["question"]
        response = rag_chain(question)
        results.append({
            "question": question,
            "answer": response["answer"],
            "contexts": [doc.page_content for doc in response["context"]],
            "ground_truths": row["ground_truths"]
        })

    results_df = pd.DataFrame(results)
    return Dataset.from_pandas(results_df)

def evaluate_with_ragas(ragas_dataset: Dataset, pipeline_name: str) -> pd.DataFrame:
    """Evaluates the generated dataset using Ragas."""
    print(f"Evaluating '{pipeline_name}' with Ragas...")
    start_time = time.time()

    result = evaluate(ragas_dataset, metrics=EVALUATION_METRICS)

    end_time = time.time()
    print(f"Ragas evaluation finished in {end_time - start_time:.2f} seconds.")

    result_df = result.to_pandas()
    output_path = RESULTS_DIR / f"{pipeline_name}_ragas_results.csv"
    result_df.to_csv(output_path, index=False)

    mean_scores = result_df[[m.name for m in EVALUATION_METRICS]].mean().to_dict()
    print(f"Mean scores for {pipeline_name}: {mean_scores}")
    return mean_scores

In [None]:
# Main Experiment Runner

def run_rag_experiments():
    """Run all RAG pipeline experiments."""
    print("--- Starting RAG Pipeline Evaluation ---")

    # Load data
    try:
        processed_text = load_processed_text(PROCESSED_TEXT_PATH)
        eval_dataset = load_evaluation_dataset(EVAL_DATASET_PATH)
    except FileNotFoundError as e:
        print(f"❌ {e}")
        print("Please make sure you have:")
        print("1. Run the document processing step")
        print("2. Created the evaluation dataset CSV")
        return

    # Initialize components
    llm = get_llm()
    embeddings = get_embeddings()

    # Define experiment configurations
    configurations = [
        {
            "name": "baseline_recursive_1000_200",
            "chunking_func": chunk_text_recursive,
            "chunking_params": {"chunk_size": 1000, "chunk_overlap": 200},
            "retriever_func": setup_basic_retriever,
            "retriever_params": {"k": 3},
        },
        {
            "name": "chunking_recursive_500_100",
            "chunking_func": chunk_text_recursive,
            "chunking_params": {"chunk_size": 500, "chunk_overlap": 100},
            "retriever_func": setup_basic_retriever,
            "retriever_params": {"k": 3},
        },
        {
            "name": "retriever_parent_doc_2000_400",
            "chunking_func": None,
            "chunking_params": {},
            "retriever_func": setup_parent_document_retriever,
            "retriever_params": {"parent_chunk_size": 2000, "child_chunk_size": 400, "k_parent": 3},
        },
        {
            "name": "retriever_ensemble_bm25_0.5",
            "chunking_func": chunk_text_recursive,
            "chunking_params": {"chunk_size": 500, "chunk_overlap": 100},
            "retriever_func": setup_ensemble_retriever,
            "retriever_params": {"k": 3, "bm25_weight": 0.5},
        },
    ]

    # Run experiments
    all_results = []
    for config in configurations:
        print(f"\n--- Running Configuration: {config['name']} ---")

        try:
            # Setup retriever
            if config["chunking_func"]:
                docs = config["chunking_func"](processed_text, **config["chunking_params"])
                retriever_input = docs
            else:
                retriever_input = processed_text

            retriever = config["retriever_func"](retriever_input, embeddings, **config["retriever_params"])

            # Create QA chain
            qa_chain = create_qa_chain(retriever, llm, RAG_PROMPT)

            # Generate and evaluate
            ragas_dataset = generate_ragas_dataset(qa_chain, eval_dataset)
            mean_scores = evaluate_with_ragas(ragas_dataset, config['name'])
            mean_scores['name'] = config['name']
            all_results.append(mean_scores)

        except Exception as e:
            print(f"❌ Error in configuration {config['name']}: {e}")
            continue

    # Summarize results
    if all_results:
        print("\n--- Overall Results Summary ---")
        summary_df = pd.DataFrame(all_results).set_index('name')
        print(summary_df)

        summary_output_path = RESULTS_DIR / "all_pipelines_summary_results.csv"
        summary_df.to_csv(summary_output_path)
        print(f"\nSaved summary results to: {summary_output_path}")

    return all_results

# Run experiments (uncomment to execute)
# results = run_rag_experiments()

## Part 3: Results Analysis

After running all experiments, analyze your results:

1. **Compare metrics** across different configurations
2. **Identify best-performing** setups for different use cases
3. **Understand trade-offs** between complexity and performance
4. **Document insights** for future RAG implementations

### Quiz Questions

1. What is the purpose of `docling` in the first part of this challenge?
2. Explain the difference between `chunk_size` and `chunk_overlap` in `RecursiveCharacterTextSplitter`.
3. What is the fundamental difference between dense retrieval (vector search) and sparse retrieval (BM25)?
4. Describe the idea behind the `ParentDocumentRetriever`. When might it be useful?
5. What does the `ragas` metric `faithfulness` measure? Why is it important?
6. What does the `ragas` metric `context_recall` measure?
7. What is the role of a reranker in a RAG pipeline?

In [None]:
# Create Sample Evaluation Dataset (if needed)

def create_sample_evaluation_dataset():
    """Create a sample evaluation dataset for testing."""
    sample_data = {
        "question": [
            "How do you activate the public REST APIs?",
            "What authentication method is mentioned for the APIs?",
            "Can you filter data using the reporting API?",
            "What is the purpose of the status endpoint?",
        ],
        "ground_truth": [
            "To activate the public REST APIs, you need to enable the 'Enable Public Rest API' option in the application's general settings.",
            "The document mentions using API keys for authentication with the public REST APIs.",
            "Yes, the reporting API allows filtering data based on dimensions, scenarios, time periods, and other criteria using query parameters.",
            "The status endpoint allows you to retrieve the current status of processes like Running, Completed, or Failed.",
        ]
    }

    df = pd.DataFrame(sample_data)
    df.to_csv("groundtruth_eval_dataset.csv", index=False)
    print("✅ Sample evaluation dataset created as 'groundtruth_eval_dataset.csv'")
    print("📝 Please replace this with real Q&A pairs from your document!")
    return df

## Part 4: Bonus - Deep Dive into Retrieval Mechanisms

1.  How do vector search algorithms like Nearest Neighbor (NN) and Approximate Nearest Neighbor (ANN) efficiently find similar documents in a high-dimensional vector space? What are the core principles behind algorithms like HNSW, LSH, and IVF?

    #### Resources:

      * **HNSW (Hierarchical Navigable Small World):**

          * **Blog Post:** "Hierarchical Navigable Small Worlds (HNSW)"
              * **Link:** [https://www.pinecone.io/learn/series/faiss/hnsw/](https://www.pinecone.io/learn/series/faiss/hnsw/)

      * **LSH (Locality Sensitive Hashing):**

          * **Lecture Notes:** Stanford CS246 Lecture Notes on LSH (Chapter 3)
              * **Link:** [http://www.mmds.org/mmds/v2.1/ch03-lsh.pdf](http://www.mmds.org/mmds/v2.1/ch03-lsh.pdf)
      
      * **IVF (Inverted File Index):**

          * **Faiss Documentation:** "The Inverted File Index (IVF)"
              * **Link:** [https://github.com/facebookresearch/faiss/wiki/Faiss-index-types](https://github.com/facebookresearch/faiss/wiki/Faiss-index-types)

      * **General Vector Search/ANN:**

          * **Google Cloud Blog:** "Understanding Nearest Neighbor Search"
              * **Link:** [https://cloud.google.com/blog/products/databases/spanner-now-supports-approximate-nearest-neighbor-search](https://cloud.google.com/blog/products/databases/spanner-now-supports-approximate-nearest-neighbor-search)

2.  How Does Hybrid Search Work?

    #### Resources:

      * **Pinecone Blog:** "Hybrid Search: Combining Keyword and Vector Search"
          * **Link:** [https://www.pinecone.io/learn/hybrid-search/](https://www.pinecone.io/learn/hybrid-search/)



2.  How Do We Measure How Good a Retrieval System Is?

    #### Resources:
      * **Survey:** "Evaluation of Retrieval-Augmented Generation: A Survey"
          * **Link:** [https://arxiv.org/pdf/2405.07437](https://arxiv.org/pdf/2405.07437)
      * **Hugging Face:** "RAG Evaluation"
          * **Link:** [https://huggingface.co/learn/cookbook/en/rag_evaluation](https://huggingface.co/learn/cookbook/en/rag_evaluation)

