In [4]:

import camelot as cam
import pdfplumber
import json
import argparse
from typing import List, Dict, Union
import os

In [3]:
import fitz
import pdfplumber

In [5]:

def classify_pdf_pages(pdf_name: str) -> dict[str, list[int]]:
    """Classify PDF pages as digital or scanned"""
    page_groups = {"digital": [], "scanned": []}
    try:
        doc = fitz.open(pdf_name)
        for page_num in range(len(doc)):
            page = doc.load_page(page_num)
            if page.get_text().strip():
                page_groups["digital"].append(page_num)
            else:
                page_groups["scanned"].append(page_num)
    except Exception as e:
        raise RuntimeError(f"Failed to classify pages: {e}")
    return page_groups

def create_page_ranges(page_numbers: list[int]) -> list[str]:
    """Convert list of page numbers to human-readable ranges (1-indexed)"""
    if not page_numbers:
        return []
    
    ranges = []
    start = prev = page_numbers[0]
    for num in page_numbers[1:]:
        if num != prev + 1:
            ranges.append(f"{start+1}-{prev+1}" if start != prev else str(start+1))
            start = num
        prev = num
    ranges.append(f"{start+1}-{prev+1}" if start != prev else str(start+1))
    return ranges

def locate_table_title(pdf_name: str, keywords: List[str]) -> List[int]:
    """Find pages with table titles in top 35% of page"""
    try:
        doc = fitz.open(pdf_name)
        matching_pages = []
        
        for page_num in range(len(doc)):
            page = doc.load_page(page_num)
            for block in page.get_text("blocks"):
                x0, y0, x1, y1, text, _, _ = block
                if y0 < page.rect.height * 0.35:
                    if any(kw.lower() in text.lower() for kw in keywords):
                        matching_pages.append(page_num)
                        break
        return matching_pages
    except Exception as e:
        raise RuntimeError(f"Title location failed: {e}")

def extract_table_pages(pdf_name: str, title_page: int) -> List[int]:
    """Identify pages containing continuous table data"""
    try:
        doc = fitz.open(pdf_name)
        table_pages = [title_page]
        
        # Get reference column count from title page
        title_tables = cam.read_pdf(pdf_name, pages=str(title_page+1), flavor="lattice")
        if not title_tables:
            return table_pages
        
        ref_columns = len(title_tables[0].df.columns)
        current_page = title_page + 1

        while current_page < len(doc):
            try:
                tables = cam.read_pdf(pdf_name, pages=str(current_page+1), flavor="lattice")
                if tables and len(tables[0].df.columns) == ref_columns:
                    table_pages.append(current_page)
                    current_page += 1
                else:
                    break
            except Exception:
                break
        return table_pages
    except Exception as e:
        raise RuntimeError(f"Table page extraction failed: {e}")

def get_continued_tables(tables, threshold: int = 15):
    """Group spatially continuous tables across pages"""
    try:
        continued_tables = []
        current_group = []
        page_height = 842  # Average PDF page height in points

        for i, table in enumerate(tables):
            if not current_group:
                current_group.append(table)
                continue

            prev_table = tables[i-1]
            prev_bottom = prev_table._bbox[1]
            curr_top = table._bbox[3]

            if (table.page == prev_table.page + 1 and
                len(table.cols) == len(prev_table.cols) and
                prev_bottom < (threshold/100)*page_height and
                curr_top > (1 - threshold/100)*page_height):
                current_group.append(table)
            else:
                continued_tables.append(current_group)
                current_group = [table]

        if current_group:
            continued_tables.append(current_group)
        return continued_tables
    except Exception as e:
        raise RuntimeError(f"Table continuation detection failed: {e}")

def extract_with_camelot(pdf_name: str, pages: List[int]) -> List[Dict]:
    """Extract tables using Camelot with auto-merge"""
    try:
        tables = cam.read_pdf(
            pdf_name,
            pages=",".join(str(p+1) for p in pages),
            flavor="lattice",
            suppress_stdout=True,
            layout_kwargs={'detect_vertical': False}
        )

        if hasattr(cam, 'cleanup'):
            cam.cleanup()

        if not tables:
            return []

        grouped_tables = get_continued_tables(tables)
        results = []

        for group in grouped_tables:
            headers = group[0].df.iloc[0].tolist()
            rows = []
            for table in group:
                rows.extend([dict(zip(headers, row)) for row in table.df.iloc[1:].values])
            results.append({
                "source": "camelot",
                "headers": headers,
                "rows": rows,
                "page_range": f"{group[0].page}-{group[-1].page}"
            })
        return results
    except Exception as e:
        print(f"Camelot extraction failed: {e}")
        return []

def extract_with_pdfplumber(pdf_name: str, pages: List[int]) -> List[Dict]:
    """Fallback extraction with PDFPlumber"""
    try:
        results = []
        with pdfplumber.open(pdf_name) as pdf:
            for p in pages:
                page = pdf.pages[p]
                table = page.extract_table()
                if table:
                    headers = table[0]
                    rows = [dict(zip(headers, row)) for row in table[1:]]
                    results.append({
                        "source": "pdfplumber",
                        "headers": headers,
                        "rows": rows,
                        "page_range": str(p+1)
                    })
        return results
    except Exception as e:
        print(f"PDFPlumber extraction failed: {e}")
        return []

def process_pdf(pdf_name: str, keywords: List[str], output_json: str):
    """Main processing pipeline"""
    result = {
        "metadata": {},
        "tables": [],
        "warnings": []
    }

    try:
        # Classify pages
        classification = classify_pdf_pages(pdf_name)
        result["metadata"]["digital_pages"] = create_page_ranges(classification["digital"])
        result["metadata"]["scanned_pages"] = create_page_ranges(classification["scanned"])

        # Exit if no digital pages
        if not classification["digital"]:
            result["warnings"].append("PDF appears to be fully scanned")
            with open(output_json, "w") as f:
                json.dump(result, f, indent=2)
            return

        # Locate table titles
        title_pages = locate_table_title(pdf_name, keywords)
        if not title_pages:
            result["warnings"].append("No matching table titles found")
            with open(output_json, "w") as f:
                json.dump(result, f, indent=2)
            return

        # Process each found table
        for title_page in title_pages:
            table_pages = extract_table_pages(pdf_name, title_page)
            if not table_pages:
                result["warnings"].append(f"Found title but no tables on page {title_page+1}")
                continue

            # Try Camelot first
            tables = extract_with_camelot(pdf_name, table_pages)
            if not tables:
                # Fallback to PDFPlumber
                tables = extract_with_pdfplumber(pdf_name, table_pages)
                if not tables:
                    result["warnings"].append(f"Failed to extract tables from pages {create_page_ranges(table_pages)}")
                    continue

            result["tables"].extend(tables)

        # Save results
        with open(output_json, "w") as f:
            json.dump(result, f, indent=2)

    except Exception as e:
        result["error"] = str(e)
        with open(output_json, "w") as f:
            json.dump(result, f, indent=2)
        raise


In [None]:
keywords = [
    "Approved Makes and Manufacturer",
    "Approved Manufacturers",
    "List of Approved",
    "List of Approved Makes",
    "List of Manufacturers",
]

#Enter PDF NAME and output file name
process_pdf("pdf5.pdf", keywords, "hello.json")