In [1]:
import os
import fitz  # PyMuPDF
import io
import time
import json
from pathlib import Path
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, Image as IPImage
from typing import List, Dict, Any, Optional, Tuple

In [7]:
# For optional AI analysis
try:
    from dotenv import load_dotenv
    from langchain_openai import ChatOpenAI
    from langchain.schema import HumanMessage, SystemMessage
    import base64
    
    # Set API key directly
    os.environ["OPENAI_API_KEY"] = "your-key-here"  # Replace with your actual key
    AI_AVAILABLE = True
    print("AI analysis capabilities are available.")
except ImportError:
    AI_AVAILABLE = False
    print("AI analysis unavailable. Images will be classified using heuristics only.")


AI analysis capabilities are available.


In [None]:
class PDFImageExtractor:
    """Class for extracting and analyzing images from PDF documents"""
    
    def __init__(self, pdf_path: str, output_dir: str = "extracted_content"):
        """Initialize the PDF analyzer"""
        self.pdf_path = pdf_path
        self.output_dir = output_dir
        self.images_dir = os.path.join(output_dir, "images")
        self.charts_dir = os.path.join(output_dir, "charts")
        self.graphs_dir = os.path.join(output_dir, "graphs")
        
        # Create output directories
        for dir_path in [self.output_dir, self.images_dir, self.charts_dir, self.graphs_dir]:
            os.makedirs(dir_path, exist_ok=True)
        
        # Open the PDF document
        self.doc = fitz.open(pdf_path)
        
        # Initialize AI model if available
        self.has_ai = False
        if AI_AVAILABLE:
            try:
                self.vision_model = ChatOpenAI(model="gpt-4-vision-preview", temperature=0)
                self.has_ai = True
                print("AI analysis capabilities are available.")
            except Exception as e:
                print(f"AI analysis unavailable: {e}")
        else:
            print("AI analysis unavailable. Images will be classified using heuristics only.")
    
    def extract_images_from_pdf(self) -> List[Dict[str, Any]]:
        """Extract all images from the PDF using various methods"""
        all_images = []
        
        # Method 1: Extract images from xrefs
        print("\nExtracting embedded images from PDF...")
        for page_idx in range(len(self.doc)):
            page = self.doc[page_idx]
            
            # Get images from page
            image_list = page.get_images(full=True)
            if image_list:
                print(f"  Page {page_idx+1}: {len(image_list)} embedded images found")
            
            for img_idx, img_info in enumerate(image_list):
                xref = img_info[0]
                
                try:
                    base_image = self.doc.extract_image(xref)
                    if base_image:
                        image_bytes = base_image["image"]
                        image_ext = base_image["ext"]
                        width = base_image["width"]
                        height = base_image["height"]
                        
                        # Handle image mask if present
                        if base_image.get("smask", 0) > 0:
                            try:
                                # Apply mask to get transparent image
                                mask_image = self.doc.extract_image(base_image["smask"])
                                if mask_image:
                                    # Create pixmap of the base image
                                    pix1 = fitz.Pixmap(base_image["image"])
                                    # Create pixmap of the mask
                                    mask = fitz.Pixmap(mask_image["image"])
                                    # Apply mask to base image
                                    pix = fitz.Pixmap(pix1, mask)
                                    image_bytes = pix.tobytes()
                            except Exception as e:
                                print(f"    Error applying mask: {e}")
                        
                        # Save image
                        img_filename = f"page{page_idx+1}_img{img_idx+1}.{image_ext}"
                        img_path = os.path.join(self.images_dir, img_filename)
                        
                        with open(img_path, "wb") as img_file:
                            img_file.write(image_bytes)
                        
                        # Add to our list
                        all_images.append({
                            "page": page_idx + 1,
                            "filename": img_filename,
                            "path": img_path,
                            "xref": xref,
                            "width": width,
                            "height": height,
                            "ext": image_ext,
                            "extraction_method": "xref"
                        })
                
                except Exception as e:
                    print(f"    Error extracting image: {e}")
        
        # Method 2: Extract images by rendering page pixmaps
        # This helps catch images that might be rendered through other means
        print("\nRendering full page images for charts and diagrams...")
        for page_idx in range(len(self.doc)):
            page = self.doc[page_idx]
            print(f"  Rendering page {page_idx+1}...")
            
            # Render page at high resolution
            pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
            page_img_filename = f"page{page_idx+1}_full.jpg"
            page_img_path = os.path.join(self.images_dir, page_img_filename)
            pix.save(page_img_path)
            
            all_images.append({
                "page": page_idx + 1,
                "filename": page_img_filename,
                "path": page_img_path,
                "width": pix.width,
                "height": pix.height,
                "ext": "png",
                "extraction_method": "page_render"
            })
        
        print(f"\nExtracted {len(all_images)} images from the PDF")
        return all_images
    
    def classify_image_with_heuristics(self, image_path: str) -> str:
        """Classify an image using basic image properties"""
        try:
            # Open the image
            img = Image.open(image_path)
            width, height = img.size
            aspect_ratio = width / height
            
            # Get color diversity (limited to 10000 colors for efficiency)
            try:
                colors = len(img.convert('RGB').getcolors(maxcolors=10000))
                if colors is None:  # More than 10000 colors
                    colors = 10000
            except:
                colors = 10000  # Default if we can't determine
            
            # Make educated guesses based on properties
            if "full.png" in image_path:
                return "page_image"  # Full page images are handled separately
            elif 100 < colors < 1000 and 1.2 < aspect_ratio < 2.5:
                return "chart_or_graph"  # Charts/graphs often have limited colors and wider aspect ratio
            elif colors > 5000:
                return "photo"  # Photos usually have many colors
            elif colors < 100:
                return "diagram"  # Diagrams often have limited colors
            else:
                return "other"
                
        except Exception as e:
            print(f"Error in image classification: {e}")
            return "error"
    
    def analyze_image_with_ai(self, image_path: str) -> Dict[str, Any]:
        """Analyze an image using AI vision capabilities"""
        if not self.has_ai:
            classification = self.classify_image_with_heuristics(image_path)
            return {
                "classification": classification,
                "analysis": f"Classified as {classification} using heuristics"
            }
        
        try:
            # Convert image to base64
            with open(image_path, "rb") as img_file:
                image_data = base64.b64encode(img_file.read()).decode("utf-8")
            
            # Create message content with the image
            message_content = [
                {
                    "type": "text",
                    "text": "Analyze this image extracted from a PDF document. What type of content does it contain? "  
                           "Is it a photograph, chart, graph, diagram, table, or illustration? "  
                           "What information does it convey? If it contains a chart or graph, what are the key data points?"
                },
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{image_data}"
                    }
                }
            ]
            
            # Call the vision model
            response = self.vision_model.invoke([
                SystemMessage(content="You are an expert at analyzing images from academic and business documents."),
                HumanMessage(content=message_content)
            ])
            
            # Extract classification from analysis
            analysis_text = response.content.lower()
            
            # Keyword-based classification
            if "chart" in analysis_text and "bar" in analysis_text:
                classification = "bar_chart"
            elif "chart" in analysis_text and "pie" in analysis_text:
                classification = "pie_chart"
            elif "chart" in analysis_text and "line" in analysis_text:
                classification = "line_chart"
            elif "chart" in analysis_text:
                classification = "chart"
            elif "graph" in analysis_text:
                classification = "graph"
            elif "diagram" in analysis_text or "flow" in analysis_text:
                classification = "diagram"
            elif "table" in analysis_text:
                classification = "table"
            elif "photo" in analysis_text or "photograph" in analysis_text:
                classification = "photo"
            elif "illustration" in analysis_text or "drawing" in analysis_text:
                classification = "illustration"
            elif "full.png" in image_path:
                classification = "page_image"  # Full page images
            else:
                classification = "other"
            
            return {
                "classification": classification,
                "analysis": response.content
            }
            
        except Exception as e:
            print(f"Error in AI analysis: {e}")
            classification = self.classify_image_with_heuristics(image_path)
            return {
                "classification": classification,
                "analysis": f"AI analysis failed, classified as {classification} using heuristics"
            }
    
    def analyze_images(self, images: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Analyze all extracted images"""
        analyzed_images = []
        
        print("\nAnalyzing extracted images...")
        for i, img in enumerate(images):
            # Skip full page images initially for efficiency
            if "page_render" in img.get("extraction_method", "") and i > 0:
                continue
                
            print(f"  Analyzing image {i+1}/{len(images)}: {img['filename']}...")
            
            try:
                if self.has_ai:
                    analysis = self.analyze_image_with_ai(img["path"])
                else:
                    classification = self.classify_image_with_heuristics(img["path"])
                    analysis = {
                        "classification": classification,
                        "analysis": f"Classified as {classification} using heuristics"
                    }
                    
                analyzed_images.append({**img, **analysis})
                
                # To avoid rate limits with the AI service
                if i < len(images) - 1 and self.has_ai:
                    time.sleep(0.5)
                    
            except Exception as e:
                print(f"    Error analyzing image {img['filename']}: {e}")
                analyzed_images.append({
                    **img, 
                    "analysis": f"Analysis failed: {str(e)}",
                    "classification": "error"
                })
        
        # Now process full page images (only if we need them for detecting charts/graphs)
        full_page_images = [img for img in images if "page_render" in img.get("extraction_method", "")]
        
        for img in full_page_images:
            img["classification"] = "page_image"
            img["analysis"] = "Full page image for reference"
            analyzed_images.append(img)
            
        return analyzed_images
    
    def organize_by_content_type(self, analyzed_images: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
        """Organize analyzed images by content type"""
        organized = {
            "charts": [],
            "graphs": [],
            "diagrams": [],
            "tables": [],
            "photos": [],
            "illustrations": [],
            "page_images": [],
            "other": []
        }
        
        for img in analyzed_images:
            classification = img.get("classification", "")
            
            if "chart" in classification:
                organized["charts"].append(img)
                # Copy to charts directory
                self._copy_to_category(img["path"], self.charts_dir)
            elif classification == "graph":
                organized["graphs"].append(img)
                # Copy to graphs directory
                self._copy_to_category(img["path"], self.graphs_dir)
            elif classification == "diagram":
                organized["diagrams"].append(img)
            elif classification == "table":
                organized["tables"].append(img)
            elif classification == "photo":
                organized["photos"].append(img)
            elif classification == "illustration":
                organized["illustrations"].append(img)
            elif classification == "page_image":
                organized["page_images"].append(img)
            else:
                organized["other"].append(img)
        
        return organized
    
    def _copy_to_category(self, image_path: str, category_dir: str) -> None:
        """Copy image to category directory"""
        filename = os.path.basename(image_path)
        dest_path = os.path.join(category_dir, filename)
        
        # Read the image and save to destination
        with open(image_path, "rb") as src_file:
            with open(dest_path, "wb") as dest_file:
                dest_file.write(src_file.read())
    
    def extract_text_from_page(self, page_idx: int) -> str:
        """Extract text from a specific page"""
        page = self.doc[page_idx]
        return page.get_text()
    
    def extract_all_text(self) -> Dict[int, str]:
        """Extract text from all pages"""
        text_by_page = {}
        
        for page_idx in range(len(self.doc)):
            text_by_page[page_idx + 1] = self.extract_text_from_page(page_idx)
        
        return text_by_page
    
    def generate_summary_report(self, organized_images: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
        """Generate a summary report of the PDF analysis"""
        counts = {content_type: len(images) for content_type, images in organized_images.items()}
        
        # Filter out empty categories and page images (which are just for reference)
        filtered_counts = {k: v for k, v in counts.items() if v > 0 and k != "page_images"}
        
        report = {
            "document_path": self.pdf_path,
            "total_pages": len(self.doc),
            "image_counts": filtered_counts,
            "image_details": []
        }
        
        # Add details for the most important images (excluding page images)
        for category, images in organized_images.items():
            if category == "page_images" or not images:
                continue
                
            for img in images:
                report["image_details"].append({
                    "category": category,
                    "page": img["page"],
                    "filename": img["filename"],
                    "path": img["path"],
                    "width": img.get("width"),
                    "height": img.get("height"),
                    "analysis": img.get("analysis", "No analysis available")
                })
        
        return report
    
    def process(self) -> Dict[str, Any]:
        """Process the PDF completely"""
        try:
            # Step 1: Extract all images
            images = self.extract_images_from_pdf()
            
            # Step 2: Analyze images
            analyzed_images = self.analyze_images(images)
            
            # Step 3: Organize by content type
            organized_images = self.organize_by_content_type(analyzed_images)
            
            # Step 4: Extract text
            text_by_page = self.extract_all_text()
            
            # Step 5: Generate report
            report = self.generate_summary_report(organized_images)
            
            return {
                "images": images,
                "analyzed_images": analyzed_images,
                "organized_images": organized_images,
                "text_by_page": text_by_page,
                "report": report
            }
            
        finally:
            self.doc.close()
    
    def display_image(self, image_path):
        """Display an image in the notebook"""
        display(IPImage(filename=image_path))

# Example usage
if __name__ == "__main__":
    # Specify the path to your PDF file
    pdf_path = "22-036458-01_GIS_early_process_evaluation_Accessible_CLIENT_USE.pdf"  # Replace with the path to your PDF file
    
    # Create the extractor
    extractor = PDFImageExtractor(pdf_path)
    
    # Process the PDF
    results = extractor.process()
    
    # Display basic stats
    report = results['report']
    print(f"Document: {report['document_path']}")
    print(f"Total pages: {report['total_pages']}")
    print("\nImage counts by type:")
    for category, count in report['image_counts'].items():
        print(f"  - {category}: {count}")

AI analysis capabilities are available.

Extracting embedded images from PDF...
  Page 1: 1 embedded images found
    Error applying mask: code=4: color pixmap must not have an alpha channel
  Page 22: 4 embedded images found
  Page 25: 2 embedded images found
  Page 30: 3 embedded images found
    Error applying mask: code=4: color pixmap must not have an alpha channel
    Error applying mask: code=4: color pixmap must not have an alpha channel
    Error applying mask: code=4: color pixmap must not have an alpha channel
  Page 35: 7 embedded images found
    Error applying mask: code=4: color pixmap must not have an alpha channel
    Error applying mask: code=4: color pixmap must not have an alpha channel
  Page 36: 1 embedded images found
    Error applying mask: code=4: color pixmap must not have an alpha channel

Rendering full page images for charts and diagrams...
  Rendering page 1...
  Rendering page 2...
  Rendering page 3...
  Rendering page 4...
  Rendering page 5...
  Rende