In [7]:
import os
import time
import logging
import pandas as pd
import numpy as np
from pathlib import Path
import pytesseract
from pdf2image import convert_from_path
import psutil
import re
import json
from typing import List, Dict, Any

tesseract

In [8]:
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'

In [10]:
class TesseractBenchmark:
    def __init__(self, pdf_dir: str):
        """
        Initialize Tesseract OCR benchmarking tool.
        
        Args:
            pdf_dir: Directory containing PDF files to process
        """
        self.pdf_dir = Path(pdf_dir)
        self.results = []
        self.setup_logging()

    def setup_logging(self):
        """Configure logging for the benchmark"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('tesseract_benchmark.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger('TesseractBenchmark')

    def convert_pdf_to_images(self, pdf_path: Path) -> List[Any]:
        """
        Convert PDF pages to images.
        
        Args:
            pdf_path: Path to PDF file
            
        Returns:
            List of PIL Image objects
        """
        try:
            self.logger.info(f"Converting PDF to images: {pdf_path}")
            images = convert_from_path(str(pdf_path), dpi=300)
            self.logger.info(f"Successfully converted {len(images)} pages from {pdf_path}")
            return images
        except Exception as e:
            self.logger.error(f"Error converting PDF {pdf_path}: {str(e)}", exc_info=True)
            return []

    def process_image(self, image) -> Dict[str, Any]:
        """
        Process image with Tesseract OCR and measure performance.
        
        Args:
            image: PIL Image object
            
        Returns:
            Dictionary containing OCR results and performance metrics
        """
        start_time = time.time()
        start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        
        try:
            # Process with Tesseract
            text = pytesseract.image_to_string(
                image,
                config='--psm 6 --oem 3'  # Page segmentation mode 6: Assume uniform text block
            )
            
            # Get confidence scores
            data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
            confidence_scores = [float(conf) for conf in data['conf'] if conf != '-1']
            avg_confidence = np.mean(confidence_scores) if confidence_scores else 0
            
        except Exception as e:
            self.logger.error(f"Error in OCR processing: {str(e)}")
            return None
            
        end_time = time.time()
        end_memory = psutil.Process().memory_info().rss / 1024 / 1024
        
        return {
            'text': text,
            'processing_time': end_time - start_time,
            'memory_usage': end_memory - start_memory,
            'confidence': avg_confidence
        }

    def analyze_scientific_content(self, text: str) -> Dict[str, int]:
        """
        Analyze scientific content in extracted text.
        
        Args:
            text: Extracted text from OCR
            
        Returns:
            Dictionary containing counts of scientific elements
        """
        # Scientific notation pattern (e.g., 1.23e-4)
        scientific_pattern = r'\d+\.?\d*[eE][+-]?\d+'
        
        # Mathematical equation pattern (text between $ or $$)
        equation_pattern = r'\$.*?\$|\$\$.*?\$\$'
        
        # Table header pattern
        table_pattern = r'Table \d+|TABLE \d+'
        
        # Figure caption pattern
        figure_pattern = r'Figure \d+|Fig\. \d+|FIG\. \d+'
        
        return {
            'scientific_numbers': len(re.findall(scientific_pattern, text)),
            'equations': len(re.findall(equation_pattern, text)),
            'tables': len(re.findall(table_pattern, text)),
            'figures': len(re.findall(figure_pattern, text))
        }

    def analyze_layout(self, text: str) -> Dict[str, float]:
        """
        Analyze text layout characteristics.
        
        Args:
            text: Extracted text from OCR
            
        Returns:
            Dictionary containing layout metrics
        """
        lines = [line.strip() for line in text.split('\n') if line.strip()]
        
        if not lines:
            return {
                'avg_line_length': 0,
                'line_length_std': 0,
                'layout_consistency': 0
            }
            
        line_lengths = [len(line) for line in lines]
        
        return {
            'avg_line_length': np.mean(line_lengths),
            'line_length_std': np.std(line_lengths),
            'layout_consistency': np.std(line_lengths) / np.mean(line_lengths) if np.mean(line_lengths) > 0 else 0
        }

    def run_benchmark(self, sample_size: int = None) -> pd.DataFrame:
        """
        Run the benchmark on PDF files.
        
        Args:
            sample_size: Optional number of PDFs to process (for testing)
            
        Returns:
            DataFrame containing benchmark results
        """
        pdf_files = list(self.pdf_dir.glob('*.pdf'))
        if sample_size:
            pdf_files = pdf_files[:sample_size]

        for pdf_file in pdf_files:
            self.logger.info(f"Processing {pdf_file.name}")
            
            # Convert PDF to images
            images = self.convert_pdf_to_images(pdf_file)
            
            if not images:
                continue
                
            # Process first page only for benchmark
            ocr_result = self.process_image(images[0])
            
            if ocr_result is None:
                continue
                
            # Analyze the extracted text
            scientific_metrics = self.analyze_scientific_content(ocr_result['text'])
            layout_metrics = self.analyze_layout(ocr_result['text'])
            
            # Compile results
            self.results.append({
                'file_name': pdf_file.name,
                'processing_time': ocr_result['processing_time'],
                'memory_usage_mb': ocr_result['memory_usage'],
                'confidence_score': ocr_result['confidence'],
                **scientific_metrics,
                **layout_metrics
            })

        # Convert results to DataFrame
        results_df = pd.DataFrame(self.results)
        
        # Save results
        results_df.to_csv('tesseract_benchmark_results.csv', index=False)
        return results_df

    def generate_report(self, results_df: pd.DataFrame) -> Dict[str, Any]:
        """
        Generate a summary report of the benchmark results.
        
        Args:
            results_df: DataFrame containing benchmark results
            
        Returns:
            Dictionary containing summary statistics
        """
        def safe_mean(series):
            """Calculate mean safely, return 0 if series is empty or doesn't exist"""
            try:
                return float(series.mean()) if not series.empty else 0
            except:
                return 0
                
        def safe_std(series):
            """Calculate standard deviation safely, return 0 if series is empty or doesn't exist"""
            try:
                return float(series.std()) if not series.empty else 0
            except:
                return 0

        report = {
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'total_files_processed': len(results_df),
            'performance_metrics': {
                'avg_processing_time': safe_mean(results_df.get('processing_time', pd.Series())),
                'std_processing_time': safe_std(results_df.get('processing_time', pd.Series())),
                'avg_memory_usage': safe_mean(results_df.get('memory_usage_mb', pd.Series())),
                'avg_confidence': safe_mean(results_df.get('confidence_score', pd.Series()))
            },
            'content_metrics': {
                'avg_scientific_numbers': safe_mean(results_df.get('scientific_numbers', pd.Series())),
                'avg_equations': safe_mean(results_df.get('equations', pd.Series())),
                'avg_tables': safe_mean(results_df.get('tables', pd.Series())),
                'avg_figures': safe_mean(results_df.get('figures', pd.Series()))
            },
            'layout_metrics': {
                'avg_layout_consistency': safe_mean(results_df.get('layout_consistency', pd.Series()))
            }
        }

        # Save report
        with open('tesseract_benchmark_report.json', 'w') as f:
            json.dump(report, f, indent=4)
            
        return report

def main():
    # Set the correct path to PDFs
    pdf_dir = r'C:\Users\thinkpad\Documents\GitHub\Textra_AI_Research\data\raw\pdfs'
    
    # Verify path exists
    if not os.path.exists(pdf_dir):
        print(f"Error: Directory {pdf_dir} does not exist!")
        return
        
    # Check if directory contains PDFs
    pdf_files = list(Path(pdf_dir).glob('*.pdf'))
    if not pdf_files:
        print(f"Error: No PDF files found in {pdf_dir}")
        return
        
    print(f"Found {len(pdf_files)} PDF files in directory")
    
    # Initialize benchmark
    benchmark = TesseractBenchmark(pdf_dir=pdf_dir)
    
    try:
        # Run benchmark on sample
        print("Starting benchmark process...")
        results_df = benchmark.run_benchmark(sample_size=5)  # Start with 2 files for testing
        
        if len(results_df) == 0:
            print("No results were generated. Check the log file for errors.")
            return
            
        # Generate report
        report = benchmark.generate_report(results_df)
        
        print("\nBenchmark complete. Results saved to 'tesseract_benchmark_results.csv'")
        print("Detailed report saved to 'tesseract_benchmark_report.json'")
        
        # Print summary statistics
        print("\nSummary Statistics:")
        print(f"Files processed: {report['total_files_processed']}")
        print(f"Average processing time: {report['performance_metrics']['avg_processing_time']:.2f} seconds")
        print(f"Average memory usage: {report['performance_metrics']['avg_memory_usage']:.2f} MB")
        print(f"Average confidence score: {report['performance_metrics']['avg_confidence']:.2f}%")
        
        # Print first few results for verification
        if not results_df.empty:
            print("\nFirst results:")
            print(results_df.head().to_string())
            
    except Exception as e:
        print(f"An error occurred during benchmark: {str(e)}")
        logging.error(f"Benchmark error: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()

2024-11-07 16:11:16,072 - INFO - Processing 2411.04106v1.pdf
2024-11-07 16:11:16,073 - INFO - Converting PDF to images: C:\Users\thinkpad\Documents\GitHub\Textra_AI_Research\data\raw\pdfs\2411.04106v1.pdf


Found 5 PDF files in directory
Starting benchmark process...


2024-11-07 16:11:19,586 - INFO - Successfully converted 10 pages from C:\Users\thinkpad\Documents\GitHub\Textra_AI_Research\data\raw\pdfs\2411.04106v1.pdf
2024-11-07 16:11:26,910 - INFO - Processing 2411.04108v1.pdf
2024-11-07 16:11:26,911 - INFO - Converting PDF to images: C:\Users\thinkpad\Documents\GitHub\Textra_AI_Research\data\raw\pdfs\2411.04108v1.pdf
2024-11-07 16:11:36,322 - INFO - Successfully converted 29 pages from C:\Users\thinkpad\Documents\GitHub\Textra_AI_Research\data\raw\pdfs\2411.04108v1.pdf
2024-11-07 16:11:41,101 - INFO - Processing 2411.04109v1.pdf
2024-11-07 16:11:41,102 - INFO - Converting PDF to images: C:\Users\thinkpad\Documents\GitHub\Textra_AI_Research\data\raw\pdfs\2411.04109v1.pdf
2024-11-07 16:11:45,536 - INFO - Successfully converted 16 pages from C:\Users\thinkpad\Documents\GitHub\Textra_AI_Research\data\raw\pdfs\2411.04109v1.pdf
2024-11-07 16:11:51,409 - INFO - Processing 2411.04112v1.pdf
2024-11-07 16:11:51,410 - INFO - Converting PDF to images: C:\Us


Benchmark complete. Results saved to 'tesseract_benchmark_results.csv'
Detailed report saved to 'tesseract_benchmark_report.json'

Summary Statistics:
Files processed: 5
Average processing time: 6.29 seconds
Average memory usage: 9.46 MB
Average confidence score: 80.97%

First results:
          file_name  processing_time  memory_usage_mb  confidence_score  scientific_numbers  equations  tables  figures  avg_line_length  line_length_std  layout_consistency
0  2411.04106v1.pdf         7.322845         8.667969         78.831950                   0          0       0        0        79.186667        26.673679            0.336846
1  2411.04108v1.pdf         4.761900        13.238281         80.411255                   0          0       0        0        64.809524        31.935380            0.492758
2  2411.04109v1.pdf         5.809524         8.382812         83.913374                   0          0       0        0        78.607843        30.133740            0.383343
3  2411.04112v1.

easyocr

In [1]:
import os
import time
import logging
import pandas as pd
import numpy as np
from pathlib import Path
import easyocr
from pdf2image import convert_from_path
import psutil
import re
import json
from typing import List, Dict, Any

: 

In [1]:
import sys
print(f"Python version: {sys.version}")

try:
    import pandas as pd
    import numpy as np
    print("Pandas version:", pd.__version__)
    print("Numpy version:", np.__version__)
except ImportError as e:
    print("Error importing pandas/numpy:", e)

try:
    import torch
    print("PyTorch version:", torch.__version__)
    print("CUDA available:", torch.cuda.is_available())
except ImportError as e:
    print("Error importing PyTorch:", e)

try:
    import pdf2image
    print("pdf2image imported successfully")
except ImportError as e:
    print("Error importing pdf2image:", e)

try:
    import psutil
    print("psutil version:", psutil.__version__)
except ImportError as e:
    print("Error importing psutil:", e)

try:
    import easyocr
    print("EasyOCR imported successfully")
except ImportError as e:
    print("Error importing EasyOCR:", e)

Python version: 3.9.20 | packaged by conda-forge | (main, Sep 30 2024, 17:43:23) [MSC v.1929 64 bit (AMD64)]
Pandas version: 2.2.3
Numpy version: 2.0.2
PyTorch version: 2.5.1+cpu
CUDA available: False
pdf2image imported successfully
psutil version: 6.1.0


: 

In [1]:
import sys
print(f"Python version: {sys.version}")

try:
    import easyocr
    print("EasyOCR imported successfully")
    reader = easyocr.Reader(['en'])
    print("EasyOCR reader initialized successfully")
except Exception as e:
    print(f"Error with EasyOCR: {str(e)}")

Python version: 3.9.20 | packaged by conda-forge | (main, Sep 30 2024, 17:43:23) [MSC v.1929 64 bit (AMD64)]


: 

In [None]:
class EasyOCRBenchmark:
    def __init__(self, pdf_dir: str):
        """
        Initialize EasyOCR benchmarking tool.
        
        Args:
            pdf_dir: Directory containing PDF files to process
        """
        self.pdf_dir = Path(pdf_dir)
        self.results = []
        self.setup_logging()
        self.setup_ocr()

    def setup_logging(self):
        """Configure logging for the benchmark"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('easyocr_benchmark.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger('EasyOCRBenchmark')

    def setup_ocr(self):
        """Initialize EasyOCR with desired settings"""
        try:
            self.logger.info("Initializing EasyOCR...")
            self.reader = easyocr.Reader(['en'], gpu=False)  # Set gpu=True if GPU is available
            self.logger.info("EasyOCR initialized successfully")
        except Exception as e:
            self.logger.error(f"Error initializing EasyOCR: {str(e)}")
            raise

    def convert_pdf_to_images(self, pdf_path: Path) -> List[Any]:
        """
        Convert PDF pages to images.
        
        Args:
            pdf_path: Path to PDF file
            
        Returns:
            List of PIL Image objects
        """
        try:
            self.logger.info(f"Converting PDF to images: {pdf_path}")
            images = convert_from_path(str(pdf_path), dpi=300)
            self.logger.info(f"Successfully converted {len(images)} pages from {pdf_path}")
            return images
        except Exception as e:
            self.logger.error(f"Error converting PDF {pdf_path}: {str(e)}", exc_info=True)
            return []

    def process_image(self, image) -> Dict[str, Any]:
        """
        Process image with EasyOCR and measure performance.
        
        Args:
            image: PIL Image object
            
        Returns:
            Dictionary containing OCR results and performance metrics
        """
        start_time = time.time()
        start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        
        try:
            # Convert PIL Image to numpy array if needed
            image_np = np.array(image)
            
            # Process with EasyOCR
            results = self.reader.readtext(image_np)
            
            # Extract text and confidence scores
            text = ' '.join([result[1] for result in results])
            confidence_scores = [result[2] for result in results]
            avg_confidence = np.mean(confidence_scores) if confidence_scores else 0
            
        except Exception as e:
            self.logger.error(f"Error in OCR processing: {str(e)}")
            return None
            
        end_time = time.time()
        end_memory = psutil.Process().memory_info().rss / 1024 / 1024
        
        return {
            'text': text,
            'processing_time': end_time - start_time,
            'memory_usage': end_memory - start_memory,
            'confidence': avg_confidence,
            'bounding_boxes': len(results)
        }

    def analyze_scientific_content(self, text: str) -> Dict[str, int]:
        """
        Analyze scientific content in extracted text.
        
        Args:
            text: Extracted text from OCR
            
        Returns:
            Dictionary containing counts of scientific elements
        """
        # Scientific notation pattern (e.g., 1.23e-4)
        scientific_pattern = r'\d+\.?\d*[eE][+-]?\d+'
        
        # Mathematical equation pattern (text between $ or $$)
        equation_pattern = r'\$.*?\$|\$\$.*?\$\$'
        
        # Table header pattern
        table_pattern = r'Table \d+|TABLE \d+'
        
        # Figure caption pattern
        figure_pattern = r'Figure \d+|Fig\. \d+|FIG\. \d+'
        
        # Reference pattern
        reference_pattern = r'\[\d+\]|\[\d+,\s*\d+\]'
        
        return {
            'scientific_numbers': len(re.findall(scientific_pattern, text)),
            'equations': len(re.findall(equation_pattern, text)),
            'tables': len(re.findall(table_pattern, text)),
            'figures': len(re.findall(figure_pattern, text)),
            'references': len(re.findall(reference_pattern, text))
        }

    def analyze_layout(self, text: str) -> Dict[str, float]:
        """
        Analyze text layout characteristics.
        
        Args:
            text: Extracted text from OCR
            
        Returns:
            Dictionary containing layout metrics
        """
        lines = [line.strip() for line in text.split('\n') if line.strip()]
        
        if not lines:
            return {
                'avg_line_length': 0,
                'line_length_std': 0,
                'layout_consistency': 0
            }
            
        line_lengths = [len(line) for line in lines]
        
        return {
            'avg_line_length': np.mean(line_lengths),
            'line_length_std': np.std(line_lengths),
            'layout_consistency': np.std(line_lengths) / np.mean(line_lengths) if np.mean(line_lengths) > 0 else 0
        }

    def run_benchmark(self, sample_size: int = None) -> pd.DataFrame:
        """
        Run the benchmark on PDF files.
        
        Args:
            sample_size: Optional number of PDFs to process (for testing)
            
        Returns:
            DataFrame containing benchmark results
        """
        pdf_files = list(self.pdf_dir.glob('*.pdf'))
        if sample_size:
            pdf_files = pdf_files[:sample_size]

        for pdf_file in pdf_files:
            self.logger.info(f"Processing {pdf_file.name}")
            
            # Convert PDF to images
            images = self.convert_pdf_to_images(pdf_file)
            
            if not images:
                continue
                
            # Process first page only for benchmark
            ocr_result = self.process_image(images[0])
            
            if ocr_result is None:
                continue
                
            # Analyze the extracted text
            scientific_metrics = self.analyze_scientific_content(ocr_result['text'])
            layout_metrics = self.analyze_layout(ocr_result['text'])
            
            # Compile results
            self.results.append({
                'file_name': pdf_file.name,
                'processing_time': ocr_result['processing_time'],
                'memory_usage_mb': ocr_result['memory_usage'],
                'confidence_score': ocr_result['confidence'],
                'bounding_boxes': ocr_result['bounding_boxes'],
                **scientific_metrics,
                **layout_metrics
            })

        # Convert results to DataFrame
        results_df = pd.DataFrame(self.results)
        
        # Save results
        results_df.to_csv('easyocr_benchmark_results.csv', index=False)
        return results_df

    def generate_report(self, results_df: pd.DataFrame) -> Dict[str, Any]:
        """
        Generate a summary report of the benchmark results.
        
        Args:
            results_df: DataFrame containing benchmark results
            
        Returns:
            Dictionary containing summary statistics
        """
        def safe_mean(series):
            """Calculate mean safely, return 0 if series is empty or doesn't exist"""
            try:
                return float(series.mean()) if not series.empty else 0
            except:
                return 0

        def safe_std(series):
            """Calculate standard deviation safely, return 0 if series is empty or doesn't exist"""
            try:
                return float(series.std()) if not series.empty else 0
            except:
                return 0

        report = {
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'total_files_processed': len(results_df),
            'performance_metrics': {
                'avg_processing_time': safe_mean(results_df.get('processing_time', pd.Series())),
                'std_processing_time': safe_std(results_df.get('processing_time', pd.Series())),
                'avg_memory_usage': safe_mean(results_df.get('memory_usage_mb', pd.Series())),
                'avg_confidence': safe_mean(results_df.get('confidence_score', pd.Series())),
                'avg_bounding_boxes': safe_mean(results_df.get('bounding_boxes', pd.Series()))
            },
            'content_metrics': {
                'avg_scientific_numbers': safe_mean(results_df.get('scientific_numbers', pd.Series())),
                'avg_equations': safe_mean(results_df.get('equations', pd.Series())),
                'avg_tables': safe_mean(results_df.get('tables', pd.Series())),
                'avg_figures': safe_mean(results_df.get('figures', pd.Series())),
                'avg_references': safe_mean(results_df.get('references', pd.Series()))
            },
            'layout_metrics': {
                'avg_layout_consistency': safe_mean(results_df.get('layout_consistency', pd.Series()))
            }
        }

        # Save report
        with open('easyocr_benchmark_report.json', 'w') as f:
            json.dump(report, f, indent=4)
            
        return report

def check_gpu_availability():
    """
    Check if GPU is available for EasyOCR
    """
    try:
        import torch
        return torch.cuda.is_available()
    except ImportError:
        return False

def main():
    # Check GPU availability
    gpu_available = check_gpu_availability()
    print(f"GPU Available: {gpu_available}")
    
    # Set the correct path to PDFs
    pdf_dir = r'data\raw\pdfs'
    
    # Verify path exists
    if not os.path.exists(pdf_dir):
        print(f"Error: Directory {pdf_dir} does not exist!")
        return
        
    # Check if directory contains PDFs
    pdf_files = list(Path(pdf_dir).glob('*.pdf'))
    if not pdf_files:
        print(f"Error: No PDF files found in {pdf_dir}")
        return
        
    print(f"Found {len(pdf_files)} PDF files in directory")
    
    try:
        # Initialize benchmark
        benchmark = EasyOCRBenchmark(pdf_dir=pdf_dir)
        
        # Run benchmark on sample
        print("Starting benchmark process...")
        results_df = benchmark.run_benchmark(sample_size=5)  # Start with 2 files for testing
        
        if len(results_df) == 0:
            print("No results were generated. Check the log file for errors.")
            return
            
        # Generate report
        report = benchmark.generate_report(results_df)
        
        print("\nBenchmark complete. Results saved to 'easyocr_benchmark_results.csv'")
        print("Detailed report saved to 'easyocr_benchmark_report.json'")
        
        # Print summary statistics
        print("\nSummary Statistics:")
        print(f"Files processed: {report['total_files_processed']}")
        print(f"Average processing time: {report['performance_metrics']['avg_processing_time']:.2f} seconds")
        print(f"Average memory usage: {report['performance_metrics']['avg_memory_usage']:.2f} MB")
        print(f"Average confidence score: {report['performance_metrics']['avg_confidence']:.2f}%")
        print(f"Average bounding boxes detected: {report['performance_metrics']['avg_bounding_boxes']:.0f}")
        
        # Print first few results for verification
        if not results_df.empty:
            print("\nFirst results:")
            print(results_df.head().to_string())
            
    except Exception as e:
        print(f"An error occurred during benchmark: {str(e)}")
        logging.error(f"Benchmark error: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()

: 

## paddleocr

In [2]:
import sys
print(f"Python version: {sys.version}")

try:
    from paddleocr import PaddleOCR
    print("PaddleOCR imported successfully")
    # Initialize PaddleOCR with minimal settings
    ocr = PaddleOCR(use_angle_cls=True, lang='en', use_gpu=False)
    print("PaddleOCR initialized successfully")
except Exception as e:
    print(f"Error with PaddleOCR: {str(e)}")

Python version: 3.9.20 | packaged by conda-forge | (main, Sep 30 2024, 17:43:23) [MSC v.1929 64 bit (AMD64)]
Error with PaddleOCR: No module named 'paddle'


In [None]:
import os
import time
import logging
import pandas as pd
from pathlib import Path
from paddleocr import PaddleOCR
from pdf2image import convert_from_path
import psutil
import json
from typing import List, Dict, Any, Optional
import numpy as np

class PaddleOCRBenchmark:
    def __init__(self, pdf_dir: str):
        """
        Initialize PaddleOCR benchmarking tool.
        
        Args:
            pdf_dir: Directory containing PDF files to process
        """
        self.pdf_dir = Path(pdf_dir)
        self.results = []
        self.setup_logging()
        self.setup_ocr()
        
    def setup_logging(self):
        """Configure logging for the benchmark"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('paddleocr_benchmark.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger('PaddleOCRBenchmark')

    def setup_ocr(self):
        """Initialize PaddleOCR with desired settings"""
        try:
            self.logger.info("Initializing PaddleOCR...")
            self.ocr = PaddleOCR(
                use_angle_cls=True,  # Detect text orientation
                lang='en',          # English language
                use_gpu=False,      # CPU only
                show_log=False,     # Disable verbose logging
                enable_mkldnn=True  # Enable Intel MKL optimization
            )
            self.logger.info("PaddleOCR initialized successfully")
        except Exception as e:
            self.logger.error(f"Error initializing PaddleOCR: {str(e)}")
            raise

    def convert_pdf_to_images(self, pdf_path: Path) -> List[Any]:
        """
        Convert PDF pages to images.
        
        Args:
            pdf_path: Path to PDF file
        Returns:
            List of PIL Image objects
        """
        try:
            self.logger.info(f"Converting PDF to images: {pdf_path}")
            images = convert_from_path(
                str(pdf_path), 
                dpi=300,
                first_page=1,
                last_page=1  # Only process first page for benchmark
            )
            self.logger.info(f"Successfully converted {len(images)} pages from {pdf_path}")
            return images
        except Exception as e:
            self.logger.error(f"Error converting PDF {pdf_path}: {str(e)}", exc_info=True)
            return []

    def process_image(self, image) -> Optional[Dict[str, Any]]:
        """
        Process a single image with PaddleOCR and measure performance.
        
        Args:
            image: PIL Image object
        Returns:
            Dictionary containing OCR results and performance metrics
        """
        start_time = time.time()
        start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        
        try:
            # Save image temporarily (PaddleOCR works better with file input)
            temp_path = "temp_page.jpg"
            image.save(temp_path)
            
            # Process with PaddleOCR
            result = self.ocr.ocr(temp_path)
            
            # Extract text and confidence scores
            texts = []
            confidence_scores = []
            
            if result[0]:  # Check if any text was detected
                for line in result[0]:
                    if len(line) >= 2:  # Check if line contains both bbox/text and confidence
                        texts.append(line[1][0])  # Text content
                        confidence_scores.append(float(line[1][1]))  # Confidence score
            
            # Clean up temporary file
            os.remove(temp_path)
            
            # Calculate metrics
            end_time = time.time()
            end_memory = psutil.Process().memory_info().rss / 1024 / 1024
            
            return {
                'text': ' '.join(texts),
                'processing_time': end_time - start_time,
                'memory_usage': end_memory - start_memory,
                'confidence': np.mean(confidence_scores) if confidence_scores else 0,
                'text_blocks': len(texts)
            }
            
        except Exception as e:
            self.logger.error(f"Error in OCR processing: {str(e)}")
            if os.path.exists("temp_page.jpg"):
                os.remove("temp_page.jpg")
            return None

    def analyze_content(self, text: str) -> Dict[str, int]:
        """
        Analyze scientific content in extracted text.
        
        Args:
            text: Extracted text from OCR
        Returns:
            Dictionary containing content analysis metrics
        """
        import re
        
        # Scientific notation pattern
        scientific_pattern = r'\d+\.?\d*[eE][+-]?\d+'
        
        # Table pattern
        table_pattern = r'Table \d+|TABLE \d+'
        
        # Figure pattern
        figure_pattern = r'Figure \d+|Fig\. \d+|FIG\. \d+'
        
        # Reference pattern
        reference_pattern = r'\[\d+\]|\[\d+,\s*\d+\]'
        
        return {
            'scientific_numbers': len(re.findall(scientific_pattern, text)),
            'tables': len(re.findall(table_pattern, text)),
            'figures': len(re.findall(figure_pattern, text)),
            'references': len(re.findall(reference_pattern, text))
        }

    def run_benchmark(self, sample_size: Optional[int] = None) -> pd.DataFrame:
        """
        Run the benchmark on PDF files.
        
        Args:
            sample_size: Optional number of PDFs to process
        Returns:
            DataFrame containing benchmark results
        """
        pdf_files = list(self.pdf_dir.glob('*.pdf'))
        if sample_size:
            pdf_files = pdf_files[:sample_size]

        self.logger.info(f"Starting benchmark with {len(pdf_files)} files")

        for pdf_file in pdf_files:
            self.logger.info(f"Processing {pdf_file.name}")
            
            # Convert PDF to images
            images = self.convert_pdf_to_images(pdf_file)
            
            if not images:
                continue
                
            # Process first page
            ocr_result = self.process_image(images[0])
            
            if ocr_result is None:
                continue
                
            # Analyze the extracted text
            content_metrics = self.analyze_content(ocr_result['text'])
            
            # Compile results
            self.results.append({
                'file_name': pdf_file.name,
                'processing_time': ocr_result['processing_time'],
                'memory_usage_mb': ocr_result['memory_usage'],
                'confidence_score': ocr_result['confidence'],
                'text_blocks': ocr_result['text_blocks'],
                **content_metrics
            })

        # Convert results to DataFrame
        results_df = pd.DataFrame(self.results)
        
        # Save results
        results_df.to_csv('paddleocr_benchmark_results.csv', index=False)
        return results_df

    def generate_report(self, results_df: pd.DataFrame) -> Dict[str, Any]:
        """
        Generate a summary report of the benchmark results.
        
        Args:
            results_df: DataFrame containing benchmark results
        Returns:
            Dictionary containing summary statistics
        """
        def safe_mean(series):
            """Calculate mean safely, return 0 if series is empty or doesn't exist"""
            try:
                return float(series.mean()) if not series.empty else 0
            except:
                return 0

        report = {
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'total_files_processed': len(results_df),
            'performance_metrics': {
                'avg_processing_time': safe_mean(results_df.get('processing_time', pd.Series())),
                'avg_memory_usage': safe_mean(results_df.get('memory_usage_mb', pd.Series())),
                'avg_confidence': safe_mean(results_df.get('confidence_score', pd.Series())),
                'avg_text_blocks': safe_mean(results_df.get('text_blocks', pd.Series()))
            },
            'content_metrics': {
                'avg_scientific_numbers': safe_mean(results_df.get('scientific_numbers', pd.Series())),
                'avg_tables': safe_mean(results_df.get('tables', pd.Series())),
                'avg_figures': safe_mean(results_df.get('figures', pd.Series())),
                'avg_references': safe_mean(results_df.get('references', pd.Series()))
            }
        }

        # Save report
        with open('paddleocr_benchmark_report.json', 'w') as f:
            json.dump(report, f, indent=4)
            
        return report

def main():
    # Set the correct path to PDFs
    pdf_dir = r'data\raw\pdfs'
    
    # Verify path exists
    if not os.path.exists(pdf_dir):
        print(f"Error: Directory {pdf_dir} does not exist!")
        return
        
    # Check if directory contains PDFs
    pdf_files = list(Path(pdf_dir).glob('*.pdf'))
    if not pdf_files:
        print(f"Error: No PDF files found in {pdf_dir}")
        return
        
    print(f"Found {len(pdf_files)} PDF files in directory")
    
    try:
        # Initialize benchmark
        benchmark = PaddleOCRBenchmark(pdf_dir=pdf_dir)
        
        # Run benchmark on sample
        print("Starting benchmark process...")
        results_df = benchmark.run_benchmark(sample_size=1)  # Start with 1 file for testing
        
        # Generate report
        report = benchmark.generate_report(results_df)
        
        print("\nBenchmark complete. Results saved to 'paddleocr_benchmark_results.csv'")
        print("Detailed report saved to 'paddleocr_benchmark_report.json'")
        
        # Print summary statistics
        print("\nSummary Statistics:")
        print(f"Files processed: {report['total_files_processed']}")
        print(f"Average processing time: {report['performance_metrics']['avg_processing_time']:.2f} seconds")
        print(f"Average memory usage: {report['performance_metrics']['avg_memory_usage']:.2f} MB")
        print(f"Average confidence score: {report['performance_metrics']['avg_confidence']:.2f}%")
        print(f"Average text blocks detected: {report['performance_metrics']['avg_text_blocks']:.0f}")
        
    except Exception as e:
        print(f"An error occurred during benchmark: {str(e)}")
        logging.error(f"Benchmark error: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()