In [None]:
install pdfplumber
install tesseract and pytessearct
install PyMuPDF

### Text from PDF

In [1]:
import fitz  # PyMuPDF

def extract_text_blocks(pdf_path):
    doc = fitz.open(pdf_path)
    items = []
    for page_num in range(len(doc)):
        page = doc[page_num]
        blocks = page.get_text("blocks")  # (x0, y0, x1, y1, text, block_no, type, ...)
        for b in blocks:
            x0, y0, x1, y1, text, *_ = b
            if text.strip():
                items.append({
                    "modality": "text",
                    "page": page_num + 1,
                    "bbox": (x0, y0, x1, y1),
                    "content": text.strip()
                })
    doc.close()
    return items
items = extract_text_blocks("./docs/doc.pdf")

In [2]:
items

[{'modality': 'text',
  'page': 1,
  'bbox': (72.02400207519531,
   112.42996215820312,
   429.59295654296875,
   349.3399658203125),
  'content': 'TCS AI Fridays – Participant Handbook'},
 {'modality': 'text',
  'page': 1,
  'bbox': (72.02400207519531,
   357.55999755859375,
   508.35302734375,
   386.2099914550781),
  'content': 'VERSION 1.0'},
 {'modality': 'text',
  'page': 2,
  'bbox': (442.8999938964844,
   780.0660400390625,
   530.8119506835938,
   804.5999755859375),
  'content': 'TCS Confidential | 2'},
 {'modality': 'text',
  'page': 2,
  'bbox': (72.02400207519531,
   76.54995727539062,
   312.33294677734375,
   113.29998779296875),
  'content': 'Document Release Notice'},
 {'modality': 'text',
  'page': 2,
  'bbox': (72.02400207519531,
   128.17999267578125,
   526.1199951171875,
   156.97998046875),
  'content': 'This TCS AI Fridays Hackathon – Participant Handbook, Version 1.0 is released for use in TATA \nConsultancy Services (TCS) with effect from 01/08/2025.'},
 {'mod

### Tables from PDF

In [5]:
import pdfplumber
import pandas as pd

def extract_tables(pdf_path):
    tables = []
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages, start=1):
            page_tables = page.extract_tables()
            for t_index, t in enumerate(page_tables):
                df = pd.DataFrame(t[1:], columns=t[0])  # heuristic: first row headers
                tables.append({
                    "modality": "table",
                    "page": i,
                    "content_csv": df.to_csv(index=False),
                    "content_md": df.to_markdown(index=False),
                    "shape": df.shape
                })
    return tables
tables = extract_tables("./docs/doc.pdf")

In [6]:
tables

[{'modality': 'table',
  'page': 3,
  'content_csv': ',Revision/,,,Document,,"Revision\nDescription","Section\nNo.","Rational for\nChange",,Change type,\n,Version,,,Release/Revision,,,,,,(add/modify,\n,No.,,,Date,,,,,,/delete),\n1.0,,,01-08-2025,,,"First release of\nthe document",NA,NA,NA,,\n,,,,,,,,,,,\n,,,,,,,,,,,\n',
  'content_md': '|     | Revision/   |    |            | Document         |    | Revision         | Section   | Rational for   |    | Change type   |    |\n|     |             |    |            |                  |    | Description      | No.       | Change         |    |               |    |\n|:----|:------------|:---|:-----------|:-----------------|:---|:-----------------|:----------|:---------------|:---|:--------------|:---|\n|     | Version     |    |            | Release/Revision |    |                  |           |                |    | (add/modify   |    |\n|     | No.         |    |            | Date             |    |                  |           |           

### Images from PDF

In [7]:
import fitz
import io
from PIL import Image
import hashlib

def extract_images(pdf_path):
    doc = fitz.open(pdf_path)
    items = []
    for page_index in range(len(doc)):
        page = doc[page_index]
        for img_index, img in enumerate(page.get_images(full=True)):
            xref = img[0]
            base_image = doc.extract_image(xref)
            image_bytes = base_image["image"]
            pil_img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
            img_hash = hashlib.md5(image_bytes).hexdigest()
            items.append({
                "modality": "image",
                "page": page_index + 1,
                "image_bytes": image_bytes,
                "image_format": base_image.get("ext", "png"),
                "image_hash": img_hash,
                "width": pil_img.width,
                "height": pil_img.height
            })
    doc.close()
    return items
images = extract_images("./docs/doc.pdf")

In [36]:
for i in range(len(images)):
    Image.open(io.BytesIO(images[i]['image_bytes'])).show()

### Image to Text

In [39]:
for i in range(10):
    img = Image.open(io.BytesIO(images[i]['image_bytes']))
    text = pytesseract.image_to_string(img)
    print(text)

TATA
tCs CONSULTANCY
SERVICES TATA






Participant Journey Map

Hackathon Feedback

+ Suomit your *Youwill get mail *Check youraccess *Collaborate with + Present your _—_—« Provide Feedback
Nomination confirmation tothelocation your Team solution to Jury + Share your
before Due Date * Orientation ‘Verify yourlogin *Seek Mentor for and Audience Experience
Alla credentials Guidance sTimebox of 5
Provisioning Familiarize with Al * Build Scalable, Minutes
Tools Maintainable

Solution






In [44]:

import fitz
import pytesseract
from PIL import Image
import io

def ocr_page_images(images):
    ocr_items = []
    for page_num in range(len(images)):
        img = Image.open(io.BytesIO(images[page_num]['image_bytes']))
        text = pytesseract.image_to_string(img)
        ocr_items.append({
            "modality": "ocr_text",
            "page": page_num + 1,
            "content": text
        })
    return ocr_items

image_text = ocr_page_images(images)

In [45]:
image_text

[{'modality': 'ocr_text',
  'page': 1,
  'content': 'TATA\ntCs CONSULTANCY\nSERVICES TATA\n'},
 {'modality': 'ocr_text', 'page': 2, 'content': ''},
 {'modality': 'ocr_text', 'page': 3, 'content': ''},
 {'modality': 'ocr_text', 'page': 4, 'content': ''},
 {'modality': 'ocr_text', 'page': 5, 'content': ''},
 {'modality': 'ocr_text', 'page': 6, 'content': ''},
 {'modality': 'ocr_text',
  'page': 7,
  'content': 'Participant Journey Map\n\nHackathon Feedback\n\n+ Suomit your *Youwill get mail *Check youraccess *Collaborate with + Present your _—_—« Provide Feedback\nNomination confirmation tothelocation your Team solution to Jury + Share your\nbefore Due Date * Orientation ‘Verify yourlogin *Seek Mentor for and Audience Experience\nAlla credentials Guidance sTimebox of 5\nProvisioning Familiarize with Al * Build Scalable, Minutes\nTools Maintainable\n\nSolution\n'},
 {'modality': 'ocr_text', 'page': 8, 'content': ''},
 {'modality': 'ocr_text', 'page': 9, 'content': ''},
 {'modality': 'ocr_

In [None]:
{
  "id": "string",
  "content": "string | markdown | csv (for tables)",
  "modality": "text|table|image|ocr_text",
  "embedding": [float, ...],
  "metadata": {
    "source_pdf": "file.pdf",
    "page": 12,
    "bbox": [x0, y0, x1, y1],
    "table_format": "markdown|csv",
    "image_hash": "md5hex",
    "width": 1200,
    "height": 800,
    "chunk_id": "page12_chunk3",
    "created_at": "2025-11-28T10:37:05+05:30"
  }
}

In [None]:
from chromadb import Client
from chromadb.config import Settings

client = Client(Settings(anonymized_telemetry=False))
collection = client.get_or_create_collection(name="pdf_rag")

def upsert_items(items, embeddings):
    ids = [it.get("id") for it in items]
    metadatas = [it.get("metadata") for it in items]
    documents = [it.get("content") for it in items]
    collection.upsert(
        ids=ids,
        metadatas=metadatas,
        documents=documents,
        embeddings=embeddings
    )


In [None]:

def retrieve(query, k=8):
    q_vec = embed_texts([query])[0]
    results = collection.query(
        query_embeddings=[q_vec],
        n_results=k
    )
    return results  # documents + metadata

def build_context(results):
    lines = []
    for doc, meta in zip(results["documents"][0], results["metadatas"][0]):
        lines.append(f"### Source: {meta['source_pdf']} p.{meta['page']} ({meta['modality']})\n")
        if meta.get("table_format") == "markdown":
            lines.append(doc[:4000])  # cap length
        else:
            lines.append(doc[:1200])
        lines.append("\n")
    return "\n".join(lines)


In [None]:

def process_pdf_to_vectors(pdf_path):
    text_items = extract_text_blocks(pdf_path)
    table_items = extract_tables(pdf_path)
    image_items = extract_images(pdf_path)
    ocr_items = ocr_page_images(pdf_path)  # optional

    # Chunk text
    chunked = []
    for it in text_items:
        chunks = chunk_text(it["content"])
        for idx, ch in enumerate(chunks):
            chunked.append({
                "id": f"{pdf_path}-p{it['page']}-t{idx}",
                "content": ch,
                "modality": "text",
                "metadata": {
                    "source_pdf": pdf_path,
                    "page": it["page"],
                    "bbox": it["bbox"],
                    "chunk_id": f"p{it['page']}_text_{idx}"
                }
            })

    # Prepare tables
    table_docs = []
    for idx, t in enumerate(table_items):
        table_docs.append({
            "id": f"{pdf_path}-table-{idx}",
            "content": t["content_md"],           # prefer markdown for LLMs
            "modality": "table",
            "metadata": {
                "source_pdf": pdf_path,
                "page": t["page"],
                "table_format": "markdown",
                "shape": t["shape"]
            }
        })

    # Prepare images (with optional OCR text as caption)
    image_docs = []
    for idx, img in enumerate(image_items):
        image_docs.append({
            "id": f"{pdf_path}-image-{idx}",
            "content": f"[IMAGE] {img['image_hash']}",  # store a placeholder; embed separately
            "modality": "image",
            "metadata": {
                "source_pdf": pdf_path,
                "page": img["page"],
                "image_hash": img["image_hash"],
                "width": img["width"],
                "height": img["height"]
            },
            "binary": img["image_bytes"]  # store externally or as a blob
        })

    # Embed and upsert text + tables
    text_vectors = embed_texts([d["content"] for d in chunked + table_docs])
    upsert_items(chunked + table_docs, text_vectors)

    # Embed and upsert images (image vectors)
    image_vectors = embed_images([d["binary"] for d in image_docs])
    upsert_items(
        [{**{"content": ""}, **{k: v for k, v in d.items() if k != "binary"}} for d in image_docs],
        image_vectors
    )

    # Optionally: also embed image captions/OCR and upsert as text modality for hybrid retrieval


### y-threshold + Repeated Content

In [None]:

import fitz
from collections import Counter
import re

def detect_repeated_strings(blocks_per_page, min_pages_ratio=0.6):
    # Count normalized strings across pages
    # blocks_per_page: list of pages, each page has list of {"text": str}
    norm = lambda s: re.sub(r"\s+", " ", s.strip()).lower()
    counter = Counter()
    for page_blocks in blocks_per_page:
        seen = set()
        for b in page_blocks:
            t = norm(b["text"])
            if t and len(t) <= 80:  # short strings typical of headers/footers
                seen.add(t)
        counter.update(seen)
    min_count = int(len(blocks_per_page) * min_pages_ratio)
    return {s for s, c in counter.items() if c >= min_count}

def extract_body_text_without_header_footer(pdf_path, top_ratio=0.1, bottom_ratio=0.1):
    doc = fitz.open(pdf_path)
    HFs = []  # collect blocks for repetition detection
    pages_blocks = []

    # First pass: gather blocks per page
    for p in range(len(doc)):
        page = doc[p]
        height = page.rect.height
        blocks = page.get_text("blocks")  # (x0,y0,x1,y1,text, block_no, ...)
        page_items = []
        for x0, y0, x1, y1, text, *_ in blocks:
            page_items.append({"bbox": (x0, y0, x1, y1), "text": text})
        pages_blocks.append(page_items)

    # Detect repeated short strings likely to be headers/footers
    repeated = detect_repeated_strings(pages_blocks, min_pages_ratio=0.6)

    # Second pass: exclude by y-threshold + repeated strings
    body_pages = []
    for p, page_items in enumerate(pages_blocks, start=1):
        page_height = doc[p-1].rect.height
        top_cut = page_height * top_ratio
        bottom_cut = page_height * (1 - bottom_ratio)

        body_text = []
        for item in page_items:
            x0, y0, x1, y1 = item["bbox"]
            text = item["text"].strip()
            if not text:
                continue

            # Skip if located in header/footer bands
            if y1 <= top_cut or y0 >= bottom_cut:
                continue

            # Skip if matches repeated strings
            norm_text = re.sub(r"\s+", " ", text.lower())
            if norm_text in repeated:
                continue

            # Skip typical page number-only blocks at edges
            if (y0 <= top_cut or y1 >= bottom_cut) and re.fullmatch(r"(page\s*\d+(\s*of\s*\d+)?)|\d+", text.lower()):
                continue

            body_text.append(text)

        body_pages.append({"page": p, "text": "\n".join(body_text)})

    doc.close()
    return body_pages


### Removing Headers/Footers from Tables

In [None]:

import pdfplumber
import pandas as pd

def extract_tables_clean(pdf_path, top_ratio=0.1, bottom_ratio=0.12):
    results = []
    with pdfplumber.open(pdf_path) as pdf:
        for i, page in enumerate(pdf.pages, start=1):
            H = page.height
            top_cut, bottom_cut = H*top_ratio, H*(1-bottom_ratio)
            tables = page.find_tables()
            for t in tables:
                # Filter table rows by cell y positions
                rows = []
                for row in t.rows:
                    # Compute row bbox from cell boxes
                    y0 = min(cell.y0 for cell in row.cells if cell is not None)
                    y1 = max(cell.y1 for cell in row.cells if cell is not None)
                    # Skip if in header/footer band
                    if y1 <= top_cut or y0 >= bottom_cut:
                        continue
                    rows.append([cell.get_text().strip() if cell else "" for cell in row.cells])
                if not rows:
                    continue
                # Heuristic: first row headers
                df = pd.DataFrame(rows[1:], columns=rows[0])
                results.append({"page": i, "df": df})
    return results


### Deduplication: Text, Tables, Images

In [None]:

import hashlib, re

def normalize_text(s):
    s = re.sub(r"\s+", " ", s.strip().lower())
    s = re.sub(r"[^\w\s]", "", s)  # optional: strip punctuation
    return s

def text_hash(s):
    return hashlib.sha256(normalize_text(s).encode("utf-8")).hexdigest()

def dedup_text_chunks(chunks):
    seen = set()
    unique = []
    for ch in chunks:
        h = text_hash(ch["content"])
        if h in seen:
            continue
        seen.add(h)
        ch["content_hash"] = h
        unique.append(ch)
    return unique


In [None]:

import pandas as pd
import hashlib

def table_hash(df: pd.DataFrame):
    # Normalize: strip whitespace in cells
    df2 = df.copy()
    df2 = df2.applymap(lambda x: str(x).strip())
    csv = df2.to_csv(index=False)
    return hashlib.md5(csv.encode("utf-8")).hexdigest()

def dedup_tables(table_docs):
    seen = set()
    unique = []
    for t in table_docs:
        df = t["df"]  # assume you kept the dataframe
        h = table_hash(df)
        if h in seen:
            continue
        seen.add(h)
        t["table_hash"] = h
        unique.append(t)
    return unique


In [None]:

from PIL import Image
import imagehash
import io

def image_phash(image_bytes):
    img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
    return str(imagehash.phash(img))

def dedup_images(image_items, max_hamming_distance=4):
    # Keep first image for each pHash bucket; small distance considered duplicate
    kept = []
    phashes = []
    for it in image_items:
        ph = image_phash(it["image_bytes"])
        is_dup = False
        for existing in phashes:
            # Hamming distance between hashes
            if imagehash.hex_to_hash(ph) - imagehash.hex_to_hash(existing) <= max_hamming_distance:
                is_dup = True
                break
        if not is_dup:
            phashes.append(ph)
            it["phash"] = ph
            kept.append(it)
    return kept
