In [10]:
!pip install haystack-ai
!pip install pymupdf  
!pip install pillow  
!pip install pandas  
!pip install pytesseract  


Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Downloading pytesseract-0.3.13-py3-none-any.whl (14 kB)
Installing collected packages: pytesseract
Successfully installed pytesseract-0.3.13


In [2]:
import logging

import json
import fitz  # PyMuPDF
from typing import List, Dict, Any, Optional
from haystack import Document, component
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from PIL import Image
import base64
import io
import pandas as pd

In [5]:
@component
class PDFToJSONProcessor:
    """
    A Haystack component that processes PDFs and extracts text, images, and tables
    into a structured JSON format.
    """
    
    def __init__(self, extract_images: bool = True, extract_tables: bool = True):
        self.extract_images = extract_images
        self.extract_tables = extract_tables
    
    @component.output_types(documents=List[Document])
    def run(self, sources: List[str]) -> Dict[str, Any]:
        """
        Process PDF files and return structured JSON data.
        
        Args:
            sources: List of PDF file paths
            
        Returns:
            Dictionary containing processed documents
        """
        processed_documents = []
        
        for pdf_path in sources:
            try:
                json_data = self._process_pdf(pdf_path)
                doc = Document(content=json.dumps(json_data, indent=2))
                doc.meta["source"] = pdf_path
                doc.meta["content_type"] = "structured_json"
                processed_documents.append(doc)
            except Exception as e:
                print(f"Error processing {pdf_path}: {str(e)}")
                continue
        
        return {"documents": processed_documents}
    
    def _process_pdf(self, pdf_path: str) -> Dict[str, Any]:
        """Extract all content from PDF and structure it as JSON."""
        pdf_document = fitz.open(pdf_path)
        
        result = {
            "document_info": {
                "filename": pdf_path.split("/")[-1],
                "total_pages": len(pdf_document),
                "metadata": pdf_document.metadata
            },
            "pages": []
        }
        
        for page_num in range(len(pdf_document)):
            page = pdf_document[page_num]
            page_data = {
                "page_number": page_num + 1,
                "text_content": self._extract_text(page),
                "images": self._extract_images(page) if self.extract_images else [],
                "tables": self._extract_tables(page) if self.extract_tables else []
            }
            result["pages"].append(page_data)
        
        pdf_document.close()
        return result
    
    def _extract_text(self, page) -> Dict[str, Any]:
        """Extract text content with formatting information."""
        text_dict = page.get_text("dict")
        
        text_content = {
            "raw_text": page.get_text(),
            "structured_text": [],
            "fonts_used": set()
        }
        
        for block in text_dict["blocks"]:
            if "lines" in block:  # Text block
                block_data = {
                    "bbox": block["bbox"],
                    "lines": []
                }
                
                for line in block["lines"]:
                    line_data = {
                        "bbox": line["bbox"],
                        "spans": []
                    }
                    
                    for span in line["spans"]:
                        span_data = {
                            "text": span["text"],
                            "font": span["font"],
                            "size": span["size"],
                            "flags": span["flags"],
                            "bbox": span["bbox"]
                        }
                        line_data["spans"].append(span_data)
                        text_content["fonts_used"].add(span["font"])
                    
                    block_data["lines"].append(line_data)
                text_content["structured_text"].append(block_data)
        
        text_content["fonts_used"] = list(text_content["fonts_used"])
        return text_content
    
    def _extract_images(self, page) -> List[Dict[str, Any]]:
        """Extract images from the page with proper error handling."""
        images = []
        
        try:
            # Get the full image list with all metadata
            image_list = page.get_images(full=True)
            
            for img_index, img_item in enumerate(image_list):
                try:
                    # Extract xref from the full image item
                    xref = img_item[0]  # First element is always xref
                    
                    # Check if this is a valid image reference
                    if xref <= 0:
                        continue
                    
                    # Extract image using document's extract_image method
                    base_image = page.parent.extract_image(xref)
                    image_bytes = base_image["image"]
                    
                    # Get image position using the full image item
                    try:
                        # For images with item[-1] != 0, we need special handling
                        if img_item[-1] != 0:
                            # Try to get bbox using XObject list as fallback
                            xobject_list = page.parent.get_page_xobjects(page.number)
                            img_rect = None
                            for xobj in xobject_list:
                                if xobj[0] == xref:
                                    img_rect = fitz.Rect(xobj[1])
                                    break
                            
                            if img_rect is None:
                                # Use page dimensions as fallback
                                img_rect = page.rect
                        else:
                            # Standard bbox extraction
                            img_rect = page.get_image_bbox(img_item)
                            
                    except Exception:
                        # Fallback: try to find image position using get_image_rects
                        try:
                            rects = page.get_image_rects(img_item)
                            img_rect = rects[0] if rects else page.rect
                        except Exception:
                            img_rect = page.rect  # Use full page as last resort
                    
                    # Convert to base64
                    img_base64 = base64.b64encode(image_bytes).decode()
                    
                    image_info = {
                        "image_index": img_index,
                        "xref": xref,
                        "bbox": list(img_rect),
                        "width": base_image.get("width", 0),
                        "height": base_image.get("height", 0),
                        "colorspace": base_image.get("colorspace", "unknown"),
                        "ext": base_image.get("ext", "png"),
                        "base64_data": img_base64,
                        "smask": img_item[1] if len(img_item) > 1 else 0  # Mask reference
                    }
                    images.append(image_info)
                    
                except Exception as e:
                    print(f"Error extracting image {img_index}: {str(e)}")
                    # Add placeholder for failed extraction
                    images.append({
                        "image_index": img_index,
                        "xref": img_item[0] if img_item else -1,
                        "error": str(e),
                        "bbox": [0, 0, 0, 0],
                        "extraction_failed": True
                    })
                    continue
                    
        except Exception as e:
            print(f"Error getting image list: {str(e)}")
        
        return images
    
    def _extract_tables(self, page) -> List[Dict[str, Any]]:
        """Extract tables from the page."""
        tables = []
        
        try:
            # Find tables using PyMuPDF's table detection
            tabs = page.find_tables()
            
            for tab_index, tab in enumerate(tabs):
                table_data = {
                    "table_index": tab_index,
                    "bbox": list(tab.bbox),
                    "rows": [],
                    "column_count": 0,
                    "row_count": 0
                }
                
                # Extract table content
                table_content = tab.extract()
                if table_content:
                    table_data["rows"] = table_content
                    table_data["row_count"] = len(table_content)
                    table_data["column_count"] = len(table_content[0]) if table_content else 0
                
                tables.append(table_data)
                
        except Exception as e:
            print(f"Error extracting tables: {str(e)}")
        
        return tables


In [6]:
# Pipeline Implementation
@component
class JSONStructureEnhancer:
    """
    Enhances the extracted JSON with additional processing and cleaning.
    """
    
    @component.output_types(documents=List[Document])
    def run(self, documents: List[Document]) -> Dict[str, Any]:
        """
        Enhance and clean the extracted JSON data.
        """
        enhanced_documents = []
        
        for doc in documents:
            try:
                json_data = json.loads(doc.content)
                enhanced_data = self._enhance_structure(json_data)
                
                enhanced_doc = Document(content=json.dumps(enhanced_data, indent=2))
                enhanced_doc.meta = doc.meta.copy()
                enhanced_doc.meta["processing_stage"] = "enhanced"
                enhanced_documents.append(enhanced_doc)
                
            except Exception as e:
                print(f"Error enhancing document: {str(e)}")
                enhanced_documents.append(doc)  # Return original if enhancement fails
        
        return {"documents": enhanced_documents}
    
    def _enhance_structure(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Add summary statistics and content analysis."""
        enhanced = data.copy()
        
        # Add document-level statistics
        enhanced["document_statistics"] = {
            "total_text_length": sum(len(page["text_content"]["raw_text"]) 
                                   for page in data["pages"]),
            "total_images": sum(len(page["images"]) for page in data["pages"]),
            "total_tables": sum(len(page["tables"]) for page in data["pages"]),
            "pages_with_images": sum(1 for page in data["pages"] if page["images"]),
            "pages_with_tables": sum(1 for page in data["pages"] if page["tables"])
        }
        
        # Add content type classification for each page
        for page in enhanced["pages"]:
            page["content_analysis"] = {
                "has_text": bool(page["text_content"]["raw_text"].strip()),
                "has_images": bool(page["images"]),
                "has_tables": bool(page["tables"]),
                "text_length": len(page["text_content"]["raw_text"]),
                "dominant_content": self._classify_page_content(page)
            }
        
        return enhanced
    
    def _classify_page_content(self, page: Dict[str, Any]) -> str:
        """Classify the dominant content type of a page."""
        text_length = len(page["text_content"]["raw_text"])
        image_count = len(page["images"])
        table_count = len(page["tables"])
        
        if table_count > 0 and text_length < 500:
            return "table_heavy"
        elif image_count > 0 and text_length < 200:
            return "image_heavy"
        elif text_length > 1000:
            return "text_heavy"
        elif image_count > 0 and table_count > 0:
            return "mixed_content"
        else:
            return "minimal_content"

In [7]:
from haystack import Pipeline

In [8]:
def create_pdf_processing_pipeline():
    """Create a complete PDF processing pipeline."""
    
    # Initialize components
    pdf_processor = PDFToJSONProcessor(
        extract_images=True,
        extract_tables=True
    )
    json_enhancer = JSONStructureEnhancer()
    
    # Create pipeline
    pipeline = Pipeline()
    pipeline.add_component("pdf_processor", pdf_processor)
    pipeline.add_component("json_enhancer", json_enhancer)
    
    # Connect components
    pipeline.connect("pdf_processor.documents", "json_enhancer.documents")
    
    return pipeline

# Usage
def process_pdfs(pdf_files: List[str]) -> List[Dict[str, Any]]:
    """Process PDF files and return structured JSON data."""
    
    pipeline = create_pdf_processing_pipeline()
    
    # Run the pipeline
    result = pipeline.run({
        "pdf_processor": {"sources": pdf_files}
    })
    
    # Extract and parse results
    processed_data = []
    for doc in result["json_enhancer"]["documents"]:
        json_data = json.loads(doc.content)
        processed_data.append(json_data)
    
    return processed_data

# Example usage
if __name__ == "__main__":
    pdf_files = [r"C:\Users\T440\Documents\GitHub\00_AudiRAG\data\raw\Galaxy_Image_Classification_Based_on_Citizen_Science_Data_A_Comparative_Study autoencoder.pdf"]
    results = process_pdfs(pdf_files)
    
    # Save results
    for i, result in enumerate(results):
        with open(f"output_{i}.json", "w") as f:
            json.dump(result, f, indent=2)
        print(f"Processed {result['document_info']['filename']}")

Processed C:\Users\T440\Documents\GitHub\00_AudiRAG\data\raw\Galaxy_Image_Classification_Based_on_Citizen_Science_Data_A_Comparative_Study autoencoder.pdf


In [9]:
#open JSON and reconstruct base64 images
def reconstruct_images_from_json(json_file: str) -> List[Image.Image]:
    """Reconstruct images from a JSON file containing base64 encoded images."""
    with open(json_file, 'r') as f:
        data = json.load(f)
    
    images = []
    for page in data.get("pages", []):
        for img in page.get("images", []):
            if "base64_data" in img:
                img_data = base64.b64decode(img["base64_data"])
                image = Image.open(io.BytesIO(img_data))
                images.append(image)
    
    return images

In [10]:
#save images to disk
def save_images_to_disk(images: List[Image.Image], output_dir: str):
    """Save a list of PIL Images to the specified directory."""
    for i, img in enumerate(images):
        img_path = f"{output_dir}/image_{i}.png"
        img.save(img_path)
        print(f"Saved image to {img_path}")

In [11]:

images = reconstruct_images_from_json("output_0.json")


In [12]:

import os
if not os.path.exists("output_images"):
    os.makedirs("output_images")
# Now save the images again
save_images_to_disk(images, "output_images")

Saved image to output_images/image_0.png


Saved image to output_images/image_1.png
Saved image to output_images/image_2.png
Saved image to output_images/image_3.png
Saved image to output_images/image_4.png
Saved image to output_images/image_5.png
Saved image to output_images/image_6.png
Saved image to output_images/image_7.png
Saved image to output_images/image_8.png
Saved image to output_images/image_9.png
Saved image to output_images/image_10.png
Saved image to output_images/image_11.png
Saved image to output_images/image_12.png
Saved image to output_images/image_13.png
Saved image to output_images/image_14.png
Saved image to output_images/image_15.png
Saved image to output_images/image_16.png
Saved image to output_images/image_17.png
Saved image to output_images/image_18.png
Saved image to output_images/image_19.png
Saved image to output_images/image_20.png
Saved image to output_images/image_21.png
Saved image to output_images/image_22.png
Saved image to output_images/image_23.png
Saved image to output_images/image_24.png
S

In [None]:
# Load model directly
from transformers import AutoProcessor, AutoModelForImageTextToText


processor = AutoProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = AutoModelForImageTextToText.from_pretrained("Salesforce/blip-image-captioning-base")



In [None]:
#USE THIS FOR QUICK GENERATION

#iterate over images and generate captions
captions = []
def generate_caption(image: Image.Image) -> str:
    """Generate a caption for a given image using the BLIP model."""
    inputs = processor(images=image, return_tensors="pt", padding=True, truncation=True)
    outputs = model.generate(**inputs)
    caption = processor.decode(outputs[0], skip_special_tokens=True)
    return caption

for image in images:
    caption = generate_caption(image)
    captions.append(caption)
    
print("Generated Captions:")
for i, caption in enumerate(captions):
    print(f"Image {i}: {caption}")

Generated Captions:
Image 0: the logo for the ieee access program
Image 1: the logo for the ieee access
Image 2: the logo for the ieee access
Image 3: the logo for the ieee access
Image 4: a diagram showing the different stages of the ecdg
Image 5: the logo for the ieee access
Image 6: a diagram of the two different types of the electromagnetic spectrum
Image 7: the diagram shows the different layers of the milky
Image 8: the logo for the ieee access
Image 9: the logo for the ieee access
Image 10: the logo for the ieee access
Image 11: a black and white image of a light
Image 12: a black and white photo of a man in a suit
Image 13: a black and white image of a light in the dark
Image 14: a black square object with a white background
Image 15: a black and white image of a light
Image 16: the andromus galaxy, a large galaxy in the constellation
Image 17: a very large white object in the dark sky
Image 18: a black and white image of a light in the dark
Image 19: the galaxies in the conste

In [None]:
#This is much better with this detailed description generation.!!!!!!!
#Generate a detailed image description
def generate_detailed_description(image: Image.Image) -> str:
    """Generate a detailed description for a given image using the BLIP model."""
    inputs = processor(images=image, return_tensors="pt", padding=True, truncation=True)
    outputs = model.generate(**inputs, max_length=500, num_beams=5, early_stopping=True)
    description = processor.decode(outputs[0], skip_special_tokens=True)
    return description

# Generate detailed descriptions for all images
detailed_descriptions = []
for image in images:
    description = generate_detailed_description(image)
    detailed_descriptions.append(description)
print("Generated Detailed Descriptions:")
for i, description in enumerate(detailed_descriptions):
    print(f"Image {i}: {description}")



Generated Detailed Descriptions:
Image 0: the ieee access logo
Image 1: the ieee access logo
Image 2: the ieee access logo
Image 3: the ieee access logo
Image 4: a diagram showing the different types of the leds
Image 5: the ieee access logo
Image 6: a diagram showing the different layers of an image
Image 7: a diagram showing the different types of galaxies
Image 8: the ieee access logo
Image 9: the ieee access logo
Image 10: the ieee access logo
Image 11: a black and white image of the moon
Image 12: an image of a black and white photo
Image 13: a black and white image of a light in the dark
Image 14: a black and white photo of a large object
Image 15: a black and white image of a light in the dark
Image 16: an image of a galaxy in the dark sky
Image 17: a black hole in the middle of a black hole
Image 18: a black and white image of the moon
Image 19: an image of a galaxy in the sky
Image 20: a black and white image of a light in the dark
Image 21: a black and white image of a light 

In [None]:
# Load model directly
from transformers import AutoProcessor, AutoModelForImageTextToText

processor = AutoProcessor.from_pretrained("Salesforce/blip-image-captioning-large", ProgressBar=True)
model = AutoModelForImageTextToText.from_pretrained("Salesforce/blip-image-captioning-large")

In [32]:
#This is much better with this detailed description generation.!!!!!!!
#Generate a detailed image description
def generate_detailed_description(image: Image.Image) -> str:
    """Generate a detailed description for a given image using the BLIP model."""
    inputs = processor(images=image, return_tensors="pt", padding=True, truncation=True)
    outputs = model.generate(**inputs, max_length=500, num_beams=5, early_stopping=True)
    description = processor.decode(outputs[0], skip_special_tokens=True)
    return description

# Generate detailed descriptions for all images
detailed_descriptions = []
for image in images:
    description = generate_detailed_description(image)
    detailed_descriptions.append(description)
print("Generated Detailed Descriptions:")
for i, description in enumerate(detailed_descriptions):
    print(f"Image {i}: {description}")

Generated Detailed Descriptions:
Image 0: an image of a blue and white logo with the words ieee access
Image 1: this is an image of a blue and white logo with the words life access
Image 2: this is an image of a blue and white logo with the words life access
Image 3: this is an image of a blue and white logo with the words life access
Image 4: an image of a diagram showing the different layers of a sound system
Image 5: this is an image of a blue and white logo with the words life access
Image 6: an image of a diagram of the process of producing a new product
Image 7: an image of a diagram of the structure of a galaxy
Image 8: this is an image of a blue and white logo with the words life access
Image 9: this is an image of a blue and white logo with the words life access
Image 10: this is an image of a blue and white logo with the words life access
Image 11: a black and white photo of a bright light in the dark
Image 12: an image of a blurry image of a black hole in the sky
Image 13: a

In [None]:

processor = AutoProcessor.from_pretrained("Salesforce/blip-image-captioning-large")
model = AutoModelForImageTextToText.from_pretrained("Salesforce/blip-image-captioning-large").to("cuda")  # Move model to CUDA
# Move input tensors to CUDA as well
inputs = processor(image, return_tensors="pt").to("cuda")


#This is much better with this detailed description generation.!!!!!!!
#Generate a detailed image description
def generate_detailed_description(image: Image.Image) -> str:
    """Generate a detailed description for a given image using the BLIP model."""
    inputs = processor(images=image, return_tensors="pt", padding=True, truncation=True)
    outputs = model.generate(**inputs, max_length=500, num_beams=5, early_stopping=True)
    description = processor.decode(outputs[0], skip_special_tokens=True)
    return description

# Generate detailed descriptions for all images
detailed_descriptions = []
for image in images:
    description = generate_detailed_description(image)
    detailed_descriptions.append(description)
print("Generated Detailed Descriptions:")
for i, description in enumerate(detailed_descriptions):
    print(f"Image {i}: {description}")

RuntimeError: Input type (torch.FloatTensor) and weight type (torch.cuda.FloatTensor) should be the same or input should be a MKLDNN tensor and weight is a dense tensor