<a href="https://colab.research.google.com/github/Bhashkar1987/Production-Rag-from-Scratch/blob/main/Production_RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os

if "COLAB_GPU" in os.environ:
  print("[INFO] Runing in Google colab, Installing Requirement")
  #!pip install -U torch
  !pip install PyMuPDF
  !pip install tqdm
  #!pip install sentence-transformer
  !pip install accelerate
  !pip install bitsandbytes
  !pip install flash-attn --no-build-isolation

In [None]:
!pip uninstall -y torch torchvision torchaudio transformers sentence-transformer
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip install -U transformers sentence-transformer

In [None]:
#Download file
import os
import requests

#Get PDF document
pdf_path = "human-nutrition-text.pdf"

if not os.path.exists(pdf_path):
  print("File doesn't exist, Downloading...")
  #The url of the documnet which you are trying to download
  url = "https://pressbooks.oer.hawaii.edu/humannutrition2/open/download?type=pdf"
  # The local filename to save the downloaded file
  filename = pdf_path
  # Send a get requet to URL
  response = requests.get(url)
  if response.status_code == 200:
    #Open a file in binary write mode and save the content to it
    with open(filename, "wb") as file:
      file.write(response.content)
    print(f"The file has been downloaded and saved as {filename}")
  else:
    print(f"failed to download the file. Status code : {response.status_code} ")

else:
  print(f"File {pdf_path} exists")

In [None]:
import fitz
from tqdm.auto import tqdm

def text_formatter(text: str) -> str:
    """Performs minor formatting on text"""
    return text.replace("\n", " ").strip()

def open_and_read_pdf(pdf_path: str) -> list[dict]:
    """
    Opens a PDF file, reads its text content page by page, and collects statistics.
    """
    docs = fitz.open(pdf_path)
    pages_and_texts = []

    for page_number, page in tqdm(enumerate(docs), total=len(docs)):
        text = page.get_text()
        text = text_formatter(text)
        pages_and_texts.append({
            "page_number": page_number - 41,   # adjust offset if needed
            "page_char_count": len(text),
            "page_word_count": len(text.split(" ")),
            "page_sentence_count_raw": len(text.split(". ")),
            "page_token_count": len(text) // 4,
            "text": text
        })

    return pages_and_texts

pages_and_texts = open_and_read_pdf(pdf_path=pdf_path)
pages_and_texts[:2]


In [None]:
import random
random.sample(pages_and_texts, k=3)

In [None]:
import pandas as pd

df = pd.DataFrame(pages_and_texts)
df.head()


In [None]:
df.describe().round(2)

In [None]:
def chunk_text(text: str, chunk_size: int = 500) -> list:
  """
  Splits text into chunks of approx . `chunk size` characters.
  """
  chunks = []
  current_chunk = ''
  words = text.split()

  for word in words:
    #check if adding the words exceeds chunk size
    if len(current_chunk) + len(word) + 1 <= chunk_size:
      current_chunk += (word + ' ')
    else:
      # Stroe Current chunk and start new one
      chunks.append(current_chunk.strip())
      current_chunk = word + ' '

  # Add the last chunk if not empty

  if current_chunk:
    chunks.append(current_chunk.strip())

  return chunks


def chunk_pdf_pages(pages_and_texts: list, chunk_size: int = 500) -> list[dict]:
  """
  Takes PDF pages with text and splits them into chnks

  Return a list of dicts with page_number, chunk_index, and chunk_text.
  """

  all_chunks = []
  for page in pages_and_texts:
    page_number = page["page_number"]
    page_text = page["text"]

    chunks = chunk_text(page_text, chunk_size=chunk_size)
    for i, chunk in enumerate(chunks):
      all_chunks.append({
          "page_number": page_number,
          "chunk_index": i,
          "chunk_char_count": len(chunk),
          "chunk_word_count": len(chunk.split()),
          "chunk_token_count": len(chunk) / 4,
          "chunk_text": chunk

      })
  return all_chunks

#Example usage
chunked_pages = chunk_pdf_pages(pages_and_texts, chunk_size=500)
print(f"Total cunks: {len(chunked_pages)}")
print(f"First chunk (page {chunked_pages[0]['page_number']}): {chunked_pages[0]['chunk_text'][:200]}...")




**Types of Chuking :**
Fixed Size Chunking
Semantic Chunking
Recursive Chunking
Structural Chunking
LLm CHunking

In [None]:

import random, textwrap


#----------------------Sampling & preety Printing -----------------------------

def _scattered_indices(n: int, k: int, jitter_frac = 0.08) -> list[int]:
  """Evenly Spaced anchors + random jitter -> indices scattered accros [0, n-1 ]."""
  if k <= 0:
    return []
  if k == 1:
    return [random.randrange(n)]

  anchors = [int(round(i * (n - 1) / (k - 1))) for i in range(k)]
  out, seen = [], set()
  radius = max(1, int(n * jitter_frac))
  for a in anchors:
    lo, hi = max(0, a - radius), min(n - 1, a + radius)
    j = random.randint(lo, hi)
    if j not in seen:
      out.append(j); seen.add(j)

  while len(out) < k:
    r = random.randrange(n)
    if r not in seen:
      out.append(r); seen.add(r)
  return out


def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
    header = (
        f" Chunk p{c['page_number']}  . idx {c['chunk_index']}   |"
        f" chars {c['chunk_char_count']}  .  words {c['chunk_word_count']}  . ~ tokens {c['chunk_token_count']} "
    )

    # Wrap body texts, avoid breaking long words awkwardly
    wrapped_lines = textwrap.wrap(
        c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
    )

    content_width = max([0, *map(len, wrapped_lines)])
    box_width = max(len(header), content_width + 2)

    top    = " " + "=" * box_width + " "
    hline  = " " + header.ljust(box_width) + " "
    sep    = " " + "_" * box_width + " "
    body   = "\n".join(" " + line.ljust(box_width - 2) + " " for line in wrapped_lines) or \
             (" " + "".ljust(box_width - 2) + " ")
    bottom = " " + "=" * box_width + " "
    return "\n".join([top, hline, sep, body, bottom])


def show_random_chunks(pages_and_texts: list, chunk_size: int = 500, k: int = 5, seed: int | None = 42):
  if seed is not None:
    random.seed(seed)
  all_chunks = chunk_pdf_pages(pages_and_texts, chunk_size=chunk_size)
  if not all_chunks:
    print("No chunk to diplay.")
    return
  idxs = _scattered_indices(len(all_chunks), k)
  print(f"Showing {len(idxs)} scattered random chunks out of {len(all_chunks)} total:\n")
  for i, idx in enumerate(idxs, 1):
    print(f"#{i}")
    print(_draw_boxed_chunk(all_chunks[idx]))
    print()



#----------------Run---------------------------
assert 'pages_and_texts' in globals(), "Run: pages_and_texts = open_and_read_pdf(pdf_path) frist."
show_random_chunks(pages_and_texts, chunk_size=500, k=5, seed=42)







In [None]:

import random, textwrap


#----------------------Sampling & preety Printing -----------------------------

def _scattered_indices(n: int, k: int, jitter_frac = 0.08) -> list[int]:
  """Evenly Spaced anchors + random jitter -> indices scattered accros [0, n-1 ]."""
  if k <= 0:
    return []
  if k == 1:
    return [random.randrange(n)]

  anchors = [int(round(i * (n - 1) / (k - 1))) for i in range(k)]
  out, seen = [], set()
  radius = max(1, int(n * jitter_frac))
  for a in anchors:
    lo, hi = max(0, a - radius), min(n - 1, a + radius)
    j = random.randint(lo, hi)
    if j not in seen:
      out.append(j); seen.add(j)

  while len(out) < k:
    r = random.randrange(n)
    if r not in seen:
      out.append(r); seen.add(r)
  return out


def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
    header = (
        f" Chunk p{c['page_number']}  . idx {c['chunk_index']}   |"
        f" chars {c['chunk_char_count']}  .  words {c['chunk_word_count']}  . ~ tokens {c['chunk_token_count']} "
    )

    # Wrap body texts, avoid breaking long words awkwardly
    wrapped_lines = textwrap.wrap(
        c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
    )

    content_width = max([0, *map(len, wrapped_lines)])
    box_width = max(len(header), content_width)

    top    = "┌" + "─" * (box_width + 2) + "┐"
    hline  = "│ " + header.ljust(box_width) + " │"
    sep    = "├" + "─" * (box_width + 2) + "┤"
    body   = "\n".join("│ " + line.ljust(box_width) + " │" for line in wrapped_lines) or \
             ("│ " + "".ljust(box_width) + " │")
    bottom = "└" + "─" * (box_width + 2) + "┘"

    return "\n".join([top, hline, sep, body, bottom])


def show_random_chunks(pages_and_texts: list, chunk_size: int = 500, k: int = 5, seed: int | None = 42):
  if seed is not None:
    random.seed(seed)
  all_chunks = chunk_pdf_pages(pages_and_texts, chunk_size=chunk_size)
  if not all_chunks:
    print("No chunk to diplay.")
    return
  idxs = _scattered_indices(len(all_chunks), k)
  print(f"Showing {len(idxs)} scattered random chunks out of {len(all_chunks)} total:\n")
  for i, idx in enumerate(idxs, 1):
    print(f"#{i}")
    print(_draw_boxed_chunk(all_chunks[idx]))
    print()



#----------------Run---------------------------
assert 'pages_and_texts' in globals(), "Run: pages_and_texts = open_and_read_pdf(pdf_path) frist."
show_random_chunks(pages_and_texts, chunk_size=500, k=5, seed=42)


Semantic Chunking



In [None]:
!pip -q install --upgrade "sentence-transformers==3.0.1" "transformers<5, >4.1" scikit-learn nltk

In [None]:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import nltk
nltk.download('punkt', quiet=True)

#Load Once Globaly

semantic_model = SentenceTransformer("all-MiniLM-L6-v2")

def semantic_chunk_text(text: str, similarity_threshhold: float = 0.8, max_tokens: int = 500) -> list:
  """Splits text into semantic chunk base on sentence similarity and max token length """
  sentences = nltk.sent_tokenize(text)
  if not sentences:
    return []

  embeddings = semantic_model.encode(sentences)

  chunks = []
  current_chunk = [sentences[0]]
  current_embedding = embeddings[0]

  for i in range(1, len(sentences)):
    sim = cosine_similarity([current_embedding], [embeddings[i]])[0][0]
    chunk_token_count = len(" ".join(current_chunk)) // 4

    if sim >= similarity_threshhold and chunk_token_count < max_tokens:
      current_chunk.append(sentences[i])
      current_embedding = np.mean([current_embedding, embeddings[i]], axis=0)
    else:
      chunks.append(" ".join(current_chunk))
      current_chunk = [sentences[i]]
      current_embedding = embeddings[i]

  if current_chunk:
   chunks.append(" ".join(current_chunk))

  return chunks

from tqdm.auto import tqdm
def semantic_chunk_pdf_pages(pages_and_texts: list, similarity_threshhold: float = 0.8, max_tokens: int = 500) -> list[dict]:
    """Takes PDF pages with text and splits them into semantic chunks .
    Returns a list of dicsts with page_number, chunk_index and chunk_text """

    all_chunks = []

    for page in tqdm(pages_and_texts, desc="Sematic chunking page"):
      page_number = page["page_number"]
      page_text = page["text"]

      chunks = semantic_chunk_text(page_text,
                                   similarity_threshhold=similarity_threshhold,
                                   max_tokens=max_tokens)
      for i, chunk in enumerate(chunks):
        all_chunks.append({
           "page_number": page_number,
           "chunk_index": i,
           "chunk_char_count": len(chunk),
           "chunk_word_count": len(chunk.split()),
           "chunk_token_count": len(chunk) / 4,
           "chunk_text": chunk
        })
    return all_chunks







In [None]:

import nltk
nltk.download('punkt_tab')

semantic_chunked_pages = semantic_chunk_pdf_pages(pages_and_texts, similarity_threshhold=0.75, max_tokens=500)

print(f"Total semantic chunks : {len(semantic_chunked_pages)}")
print(f"First semantic chunk : (page {semantic_chunked_pages[0]['page_number']}):")
print(semantic_chunked_pages[0]['chunk_text'][:200] + "......")




In [None]:

import random, textwrap

def _scattered_indices(n: int, k: int, jitter_frac = 0.08) -> list[int]:
  """Evenly Spaced anchors + random jitter -> indices scattered accros [0, n-1 ]."""
  if k <= 0:
    return []
  if k == 1:
    return [random.randrange(n)]

  anchors = [int(round(i * (n - 1) / (k - 1))) for i in range(k)]
  out, seen = [], set()
  radius = max(1, int(n * jitter_frac))
  for a in anchors:
    lo, hi = max(0, a - radius), min(n - 1, a + radius)
    j = random.randint(lo, hi)
    if j not in seen:
      out.append(j); seen.add(j)

  while len(out) < k:
    r = random.randrange(n)
    if r not in seen:
      out.append(r); seen.add(r)
  return out


def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
    header = (
        f" Chunk p{c['page_number']}  . idx {c['chunk_index']}   |"
        f" chars {c['chunk_char_count']}  .  words {c['chunk_word_count']}  . ~ tokens {c['chunk_token_count']} "
    )

    # Wrap body texts, avoid breaking long words awkwardly
    wrapped_lines = textwrap.wrap(
        c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
    )

    content_width = max([0, *map(len, wrapped_lines)])
    box_width = max(len(header), content_width)

    top    = "┌" + "─" * (box_width + 2) + "┐"
    hline  = "│ " + header.ljust(box_width) + " │"
    sep    = "├" + "─" * (box_width + 2) + "┤"
    body   = "\n".join("│ " + line.ljust(box_width) + " │" for line in wrapped_lines) or \
             ("│ " + "".ljust(box_width) + " │")
    bottom = "└" + "─" * (box_width + 2) + "┘"

    return "\n".join([top, hline, sep, body, bottom])


def show_random_semantics_chunks(pages_and_texts: list, chunk_size: int = 500, k: int = 5, seed: int | None = 42):
  if seed is not None:
    random.seed(seed)
  n = len(semantic_chunked_pages)
  if n == 0:
    print("No Semantic chunk to display .");
    return
  idxs = _scattered_indices(n, k)
  print(f"Showing {len(idxs)} scattered Semantic chunk out of {n} total:\n")
  for i, idx in enumerate(idxs, 1):
    print(f"#{i}")
    print(_draw_boxed_chunk(semantic_chunked_pages[idx]))
    print()



#----------------Run---------------------------
assert 'semantic_chunked_pages' in globals() and len(semantic_chunked_pages) > 0, \
"Run your semantic chunkeing code frist to define `semantic_chunked_pages` . "
show_random_semantics_chunks(semantic_chunked_pages, k=5, seed=42)

**Chunking Strategy 3: Recursive Chunking (Structure-Aware for RAG)**

**What Is Recursive Chunking?**
Recursive Chunking is a structure-preserving strategy designed to break down long documents into semantically meaningful chunks for RAG pipelines. Unlike naive splitting methods that ignore context, this approach respects the natural hierarchy of the text—starting from sections, then paragraphs, and finally sentences—ensuring that each chunk retains coherence and relevance for retrieval.

**⚙️ How It Works (Step-by-Step)**
**Initial Size Check** If the input block is already smaller than the max_chunk_size (e.g., in tokens or characters), it's retained as-is—no further splitting needed.

**Split by Section Boundaries (\n\n)** If the block is too large, the algorithm first attempts to split it using double newlines, which typically indicate section breaks or topic shifts.

**Fallback to Paragraphs (\n)** If sections are still too long, it recursively splits further using single newlines to isolate paragraphs.

**Final Fallback to Sentences** When paragraph-level chunks exceed the size limit, the algorithm falls back to sentence-level splitting using NLP tools like nltk.sent_tokenize.

**Why It’s Ideal for RAG**
Preserves Semantic Flow: Chunks align with the document’s logical structure, improving retrieval relevance.

**Minimizes Fragmentation:** Avoids breaking sentences or concepts mid-way, which can degrade embedding quality.

**Flexible Granularity**: Adapts chunk size dynamically based on content structure, not just token count.

Step 1 : Split Hole text

  

In [None]:
import nltk
from tqdm.auto import tqdm
nltk.download('punkt')

def recursive_chunk_text(text:str, max_chunk_size: int = 1000, min_chunk_size: int = 100) -> list:
  """Recursively splits a block of text into chunks that fit within size constraints
  Tries spliting by section, then newlines, then sentences
  """

  def split_chunk(chunk: str) -> list:
    #Base Case
    if len(chunk) <= max_chunk_size:
      return [chunk]


    #Try Spliting by double newlines
    sections = chunk.split("\n\n")
    if len(sections) > 1:
      result = []
      for section in sections:
        if sections.strip():
          result.extend(split_chunk(section.strip()))
      return result

      # Try Spliting by single newline

    sections = chunk.split("\n")
    if len(sections) > 1:
      result = []
      for section in sections:
        if sections.strip():
          result.extend(split_chunk(section.strip()))
      return result

  sentences = nltk.sent_tokenize(chunk)
  chunks, current_chunk, current_size = [], [], 0

  for sentence in sentences:
    if current_size + len(sentence) > max_chunk_size:
      if current_chunk:
        chunk.append(" ".join(current_chunk))
      current_chunk = [sentence]
      current_size = len(sentence)
    else:
        current_chunk = [sentence]
        current_size = len(sentence)
  if current_chunk:
    chunk.append(" ".join(current_chunk))

        return chunk






In [None]:
import nltk
from tqdm.auto import tqdm
nltk.download('punkt', quiet=True)

def recursive_chunk_text(text: str, max_chunk_size: int = 1000, min_chunk_size: int = 100) -> list:
    """Recursively splits a block of text into chunks that fit within size constraints.
    Tries splitting by section, then newlines, then sentences.
    """

    def split_chunk(chunk: str) -> list:
        # Base case
        if len(chunk) <= max_chunk_size:
            return [chunk]

        # Try splitting by double newlines
        sections = chunk.split("\n\n")
        if len(sections) > 1:
            result = []
            for section in sections:
                if section.strip():
                    result.extend(split_chunk(section.strip()))
            return result

        # Try splitting by single newline
        sections = chunk.split("\n")
        if len(sections) > 1:
            result = []
            for section in sections:
                if section.strip():
                    result.extend(split_chunk(section.strip()))
            return result

        # Fallback to sentence-level splitting
        sentences = nltk.sent_tokenize(chunk)
        chunks, current_chunk, current_size = [], [], 0

        for sentence in sentences:
            sentence_len = len(sentence)
            if current_size + sentence_len > max_chunk_size:
                if current_chunk:
                    chunks.append(" ".join(current_chunk))
                current_chunk = [sentence]
                current_size = sentence_len
            else:
                current_chunk.append(sentence)
                current_size += sentence_len

        if current_chunk:
            chunks.append(" ".join(current_chunk))

        return chunks

    return split_chunk(text)
