# Creating Local RAG Pipeline from Scratch

## 1. Data Preparation and Embedding Creation

### 1.1 Importing PDF Document for our Book

In [6]:
import os
import re
import fitz
import torch
import random
import requests
import textwrap
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from typing import List, Dict# for type hints
from spacy.lang.en import English
from transformers.utils import is_flash_attn_2_available
from sentence_transformers import util, SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM

In [5]:
# getting pdf documents from local system
pdf_path = "/Users/adityamishra/Documents/Machine Learning Tutorial/4. RAG/clrs.pdf"

# download pdf if not present
if not os.path.exists(pdf_path):
    print(f"Given path {pdf_path} does not exist. Downloading the pdf file!!!")
    url = "https://www.cs.mcgill.ca/~akroit/math/compsci/Cormen%20Introduction%20to%20Algorithms.pdf"
    
    filename = pdf_path
    response = requests.get(url)# download the file
    
    if response.status_code == 200: # check if the download was successful
        with open(filename, 'wb') as file:
            file.write(response.content)# save the file
        print(f"File downloaded and saved as {filename}")
    else:
        print(f"Failed to download file. Status code: {response.status_code}")
else:
    print(f"File already exists at {pdf_path}. Proceeding to read the file.")

File already exists at /Users/adityamishra/Documents/Machine Learning Tutorial/4. RAG/clrs.pdf. Proceeding to read the file.


Sine now we have imported our file now next step is to preprocess the text as we read it. We have imported the pages of book in the `file_path` and now we can open and read it with `PyMuPDF` by typing command `import fitz`.

In [9]:
def text_formatter(text: str) -> str:
    # Fixing hyphenated words split across lines
    text = re.sub(r'(\w+)-\s*\n\s*(\w+)', r'\1\2', text)
    #Identify and preserve code blocks
    lines = text.split('\n')
    processed_lines = []
    in_code_block = False

    for i, line in enumerate(lines):
        stripped = line.strip()

        #skiping empty lines
        if not stripped:
            processed_lines.append('')
            continue

        #checking if this looks like code or pseudocode
        is_code_line = (
            len(line) - len(line.lstrip()) >= 4 or  # 4+ space indent
            stripped.startswith('//') or  # comment
            re.match(r'^(if|for|while|return|else)\b', stripped, re.IGNORECASE) or
            re.match(r'^[A-Z][A-Z\-]+\(', stripped)  # FUNCTION-NAME(
        )

        if is_code_line:
            in_code_block = True
            processed_lines.append('__CODELINE__' + line)# checking codeline
        else:
            #checkingif we just exited a code block
            if in_code_block:
                processed_lines.append('__CODEEND__')
                in_code_block = False
            processed_lines.append(line)

    text = '\n'.join(processed_lines)

    #paragraph joining for non code texts
    text = re.sub(
        r'(?<!__CODELINE__)(?<!__CODEEND__)(?<!\n)\n(?!__CODELINE__)(?!__CODEEND__)(?!\n)(?![A-Z])', ' ', text)

    #clearning up markers
    text = text.replace('__CODELINE__', '')
    text = text.replace('__CODEEND__', '\n')

    # removing excessive blank
    text = re.sub(r'\n{3,}', '\n\n', text)

    #cllean up multiple spaces (but not at line start - that's indentation)
    text = re.sub(r'([^\n]) {2,}', r'\1 ', text)

    #spacing around punctuation
    text = re.sub(r'\s+([.,;:!?])', r'\1', text)

    # deleting standalone page numbers
    text = re.sub(r'^\s*\d{1,4}\s*$', '', text, flags=re.MULTILINE)

    # Remove common header/footer patterns
    text = re.sub(r'^(Chapter|Section)\s+\d+.*$', '',
                  text, flags=re.MULTILINE | re.IGNORECASE)

    text = text.strip()

    return text


def is_algorithm_block(text: str) -> bool:
    indicators = [
        bool(re.search(r'\b(if|then|else)\b', text, re.IGNORECASE)),
        bool(re.search(r'\b(for|while|do)\b', text, re.IGNORECASE)),
        bool(re.search(r'\breturn\b', text, re.IGNORECASE)),
        bool(re.search(r'^\s*//.*', text, re.MULTILINE)),  # comments
        bool(re.search(r'[A-Z][A-Z\-]+\([^)]*\)', text)),  # FUNCTION(...)
        bool(re.search(r'A\[\s*\d+', text)),  # Array notation A[1
        bool(re.search(r'←|:=|=', text)),  # Assignment operators
    ]
    # Need at least 2 indicators and text should be important
    return sum(indicators) >= 2 and len(text.split()) > 10


#for detection of headers
def is_section_header(text: str) -> bool:
    text = text.strip()
    if not text or len(text) > 100:
        return False

    words = text.split()
    if len(words) > 15 or len(words) < 2:
        return False

    # Skip table of contents entries (have lots of dots)
    if text.count('.') > 3:
        return False

    # Skip if it's just "Contents" or roman numerals
    if text.lower() in ['contents', 'preface', 'index', 'references']:
        return False

    # Likely a header if it doesn't end with period and isn't too long
    return not text.endswith('.')


def is_toc_or_front_matter(text: str) -> bool:
    """Detect if text is from table of contents or front matter (should be skipped)."""
    indicators = [
        'contents' in text.lower()[:50],
        'preface' in text.lower()[:50],
        text.count('...') > 2,  # TOC dots
        # Too many periods
        text.count('.') > len(text.split()) * 0.5,
        # Roman numerals only
        bool(re.search(r'^[ivxlcdm]+$', text.strip(), re.IGNORECASE)),
    ]
    return any(indicators) or len(text.split()) < 5


def smart_chunker(pages_and_texts: List[Dict],
                  chunk_size: int = 1000,
                  overlap: int = 200,
                  skip_front_matter: bool = True) -> List[Dict]:
    chunks = []

    for page_data in pages_and_texts:
        text = page_data['text']
        page_num = page_data['page_number']

        # Skip if this looks like front matter
        if skip_front_matter and is_toc_or_front_matter(text):
            continue

        # Skip very short pages (likely artifacts)
        if len(text.split()) < 10:
            continue

        # Split by double newlines to get paragraphs/blocks
        blocks = re.split(r'\n\n+', text)

        current_chunk = ""
        current_chunk_metadata = {
            'page_number': page_num,
            'has_algorithm': False,
            'section_header': None
        }

        for block in blocks:
            block = block.strip()
            if not block or len(block) < 10:
                continue

            # Check if this is a section header
            if is_section_header(block):
                # If we have a current chunk, save it
                if current_chunk and len(current_chunk.split()) > 10:
                    chunks.append({
                        'text': current_chunk.strip(),
                        'page_number': current_chunk_metadata['page_number'],
                        'chunk_char_count': len(current_chunk),
                        'chunk_word_count': len(current_chunk.split()),
                        'chunk_token_count': len(current_chunk) / 4,
                        'has_algorithm': current_chunk_metadata['has_algorithm'],
                        'section_header': current_chunk_metadata['section_header']
                    })

                # Start new chunk with this header
                current_chunk = block + "\n\n"
                current_chunk_metadata = {
                    'page_number': page_num,
                    'has_algorithm': False,
                    'section_header': block
                }
                continue

            # Check if this is an algorithm block
            block_has_algo = is_algorithm_block(block)
            if block_has_algo:
                current_chunk_metadata['has_algorithm'] = True

                # If adding this would exceed chunk_size significantly and we have content, save current chunk
                if len(current_chunk) + len(block) > chunk_size * 1.5 and len(current_chunk.split()) > 20:
                    chunks.append({
                        'text': current_chunk.strip(),
                        'page_number': current_chunk_metadata['page_number'],
                        'chunk_char_count': len(current_chunk),
                        'chunk_word_count': len(current_chunk.split()),
                        'chunk_token_count': len(current_chunk) / 4,
                        'has_algorithm': current_chunk_metadata['has_algorithm'],
                        'section_header': current_chunk_metadata['section_header']
                    })

                    # Start new chunk with overlap from previous
                    overlap_text = current_chunk[-overlap:] if len(
                        current_chunk) > overlap else current_chunk
                    current_chunk = overlap_text + "\n\n" + block + "\n\n"
                    current_chunk_metadata = {
                        'page_number': page_num,
                        'has_algorithm': True,
                        'section_header': current_chunk_metadata['section_header']
                    }
                else:
                    # Add algorithm block to current chunk (keep it together!)
                    current_chunk += block + "\n\n"
            else:
                # Regular text block
                if len(current_chunk) + len(block) > chunk_size and len(current_chunk.split()) > 20:
                    # Save current chunk
                    chunks.append({
                        'text': current_chunk.strip(),
                        'page_number': current_chunk_metadata['page_number'],
                        'chunk_char_count': len(current_chunk),
                        'chunk_word_count': len(current_chunk.split()),
                        'chunk_token_count': len(current_chunk) / 4,
                        'has_algorithm': current_chunk_metadata['has_algorithm'],
                        'section_header': current_chunk_metadata['section_header']
                    })

                    # Start new chunk with overlap
                    overlap_text = current_chunk[-overlap:] if len(
                        current_chunk) > overlap else current_chunk
                    current_chunk = overlap_text + "\n\n" + block + "\n\n"
                    current_chunk_metadata = {
                        'page_number': page_num,
                        'has_algorithm': False,
                        'section_header': current_chunk_metadata['section_header']
                    }
                else:
                    current_chunk += block + "\n\n"

        # Save the last chunk from this page if it's substantial
        if current_chunk.strip() and len(current_chunk.split()) > 10:
            chunks.append({
                'text': current_chunk.strip(),
                'page_number': current_chunk_metadata['page_number'],
                'chunk_char_count': len(current_chunk),
                'chunk_word_count': len(current_chunk.split()),
                'chunk_token_count': len(current_chunk) / 4,
                'has_algorithm': current_chunk_metadata['has_algorithm'],
                'section_header': current_chunk_metadata['section_header']
            })

    # Add chunk IDs
    for i, chunk in enumerate(chunks):
        chunk['chunk_id'] = i

    return chunks