## Import Required Libraries

In [3]:
import os
import io
import pdfplumber
from google.cloud import storage
import pytesseract
from PIL import Image
import fitz  # PyMuPDF for more efficient image extraction
from dotenv import load_dotenv
import cv2
import numpy as np
import time
import logging


## Load Environment Variables and Initialize Clients

In [4]:
# Load environment variables from the .env file
load_dotenv()

# Set up logging for error tracking
logging.basicConfig(filename='errors.log', level=logging.ERROR)

# Set the GCP credentials and Tesseract path from .env
gcp_key_file = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
tesseract_cmd_path = os.getenv("TESSERACT_CMD_PATH")
bucket_name = os.getenv("GCP_BUCKET_NAME")
source_folder = os.getenv("SOURCE_FOLDER")
target_folder = os.getenv("TARGET_FOLDER")

# Set the path to the GCP credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_key_file

# Initialize GCP storage client
storage_client = storage.Client()

# Initialize Tesseract OCR
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd_path


## Metrics Tracking Variables

In [5]:
# Metrics tracking variables
total_files_processed = 0  # Tracks the number of PDF files processed
total_pages_processed = 0  # Tracks the total number of pages processed across all files
total_errors = 0  # Tracks the number of errors encountered during processing
start_time = time.time()  # Records the start time for calculating total processing time


## Define Preprocessing and Extraction Functions

In [6]:
def preprocess_image(image):
    """Preprocess image to improve OCR results."""
    img = np.array(image)
    if len(img.shape) == 2:
        gray = img
    elif len(img.shape) == 3 and img.shape[2] == 3:
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    elif len(img.shape) == 4:
        gray = cv2.cvtColor(img, cv2.COLOR_RGBA2GRAY)
    else:
        logging.error(f"Unsupported image format: {img.shape}")
        return None
    _, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY_INV)
    return thresh


In [7]:
def format_table(table):
    """Formats the extracted table rows into structured, aligned output."""
    formatted_table = ""
    max_lengths = [max(len(str(cell)) if cell else 0 for cell in col) for col in zip(*table)]
    for row in table:
        formatted_row = " | ".join([str(cell).ljust(max_len) if cell else ''.ljust(max_len) for cell, max_len in zip(row, max_lengths)])
        formatted_table += formatted_row + "\n"
    return formatted_table


In [8]:
def extract_text_and_tables(pdf_data):
    """Extracts text and tables from the PDF using pdfplumber."""
    extracted_text = ""
    structured_data = []
    try:
        with pdfplumber.open(io.BytesIO(pdf_data)) as pdf:
            global total_pages_processed
            total_pages_processed += len(pdf.pages)  # Increment page count
            for page_num, page in enumerate(pdf.pages, 1):
                tables = page.extract_tables()
                if tables:
                    for table in tables:
                        formatted_table = format_table(table)
                        structured_data.append({
                            "page_number": page_num,
                            "tables": formatted_table
                        })
                else:
                    extracted_text += f"Page {page_num} Text:\n"
                    extracted_text += page.extract_text() or ""
        return extracted_text, structured_data
    except Exception as e:
        global total_errors
        total_errors += 1  # Increment error count
        logging.error(f"Error extracting text and tables: {e}")
        return "", []


In [9]:
def extract_images_and_apply_ocr(pdf_data):
    """Extracts images from PDF using PyMuPDF (fitz) and applies OCR to extract text."""
    image_text = ""
    try:
        doc = fitz.open(stream=pdf_data, filetype="pdf")
        for page_num in range(len(doc)):
            page = doc.load_page(page_num)
            image_list = page.get_images(full=True)
            for img_index, img in enumerate(image_list):
                try:
                    xref = img[0]
                    base_image = doc.extract_image(xref)
                    image_bytes = base_image["image"]
                    img = Image.open(io.BytesIO(image_bytes))
                    preprocessed_img = preprocess_image(img)
                    if preprocessed_img is not None:
                        extracted_image_text = pytesseract.image_to_string(preprocessed_img, config="--psm 6")
                        image_text += f"\n\nPage {page_num + 1}, Image {img_index + 1} OCR Text:\n{extracted_image_text}"
                    else:
                        logging.error(f"Skipping image on page {page_num + 1}, image {img_index + 1} due to processing error")
                except Exception as e:
                    logging.error(f"Error extracting image on page {page_num + 1}, image {img_index + 1}: {e}")
        return image_text
    except Exception as e:
        global total_errors
        total_errors += 1  # Increment error count
        logging.error(f"Error extracting images and applying OCR: {e}")
        return ""


## PDF Data Extraction and Saving Functions

In [10]:
def extract_pdf_data_from_gcp(bucket_name, folder_name, file_name):
    """Extracts text, tables, and images (with OCR) from a PDF stored in GCP."""
    global total_files_processed
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(f"{folder_name}/{file_name}")
    
    try:
        pdf_data = blob.download_as_bytes()
        total_files_processed += 1  # Increment file counter
        extracted_text, structured_data = extract_text_and_tables(pdf_data)
        image_text = extract_images_and_apply_ocr(pdf_data)
        full_extracted_content = extracted_text + "\n\nExtracted Text from Images:\n" + image_text
        for table_data in structured_data:
            full_extracted_content += f"\n\nPage {table_data['page_number']} Tables:\n"
            full_extracted_content += table_data['tables']
        return full_extracted_content
    except Exception as e:
        global total_errors
        total_errors += 1  # Increment error count
        logging.error(f"Error extracting PDF data from GCP: {e}")
        return ""


In [11]:
def save_extracted_data_to_gcp(bucket_name, folder_name, file_name, extracted_content):
    """Saves the extracted content as a .txt file to the specified GCP bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    txt_file_name = file_name.replace('.pdf', '.txt')
    blob = bucket.blob(f"{folder_name}/{txt_file_name}")
    blob.upload_from_string(extracted_content, content_type='text/plain')
    print(f"Extracted data saved to: {folder_name}/{txt_file_name} in bucket {bucket_name}")


 ## Processing PDFs and Tracking Metrics

In [12]:
def process_pdfs_in_gcp(bucket_name, source_folder, target_folder):
    """Processes all PDFs in the source folder of GCP bucket and stores extracted data in the target folder."""
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=f"{source_folder}/")
    for blob in blobs:
        if blob.name.endswith('.pdf'):
            file_name = os.path.basename(blob.name)
            print(f"Processing file: {file_name}")
            extracted_content = extract_pdf_data_from_gcp(bucket_name, source_folder, file_name)
            save_extracted_data_to_gcp(bucket_name, target_folder, file_name, extracted_content)


## Calculating Performance Metrics

In [13]:
def calculate_performance_metrics():
    """Calculates and prints performance metrics."""
    end_time = time.time()  # Capture the end time
    total_time = end_time - start_time  # Calculate total time spent processing
    latency = total_time / total_files_processed if total_files_processed > 0 else 0  # Average time per file
    throughput = total_pages_processed / total_time if total_time > 0 else 0  # Pages processed per second
    error_rate = total_errors / total_files_processed if total_files_processed > 0 else 0  # Percentage of files with errors
    
    print(f"Total files processed: {total_files_processed}")
    print(f"Total pages processed: {total_pages_processed}")
    print(f"Total errors: {total_errors}")
    print(f"Total time: {total_time:.2f} seconds")
    print(f"Latency: {latency:.2f} seconds per file")
    print(f"Throughput: {throughput:.2f} pages per second")
    print(f"Error rate: {error_rate:.2%}")

## Run Processing and Calculate Metrics

In [14]:
bucket_name = 'gaia_files_pdf'
source_folder = 'gaia_pdfs'
target_folder = 'opensource_extracted'

# Run the processing pipeline for all PDFs in the source folder
process_pdfs_in_gcp(bucket_name, source_folder, target_folder)

Processing file: 021a5339-744f-42b7-bd9b-9368b3efda7a.pdf
Extracted data saved to: opensource_extracted/021a5339-744f-42b7-bd9b-9368b3efda7a.txt in bucket gaia_files_pdf
Processing file: 32f386b9-73ee-4455-b412-ddad508aa979.pdf
Extracted data saved to: opensource_extracted/32f386b9-73ee-4455-b412-ddad508aa979.txt in bucket gaia_files_pdf
Processing file: 366e2f2b-8632-4ef2-81eb-bc3877489217.pdf
Extracted data saved to: opensource_extracted/366e2f2b-8632-4ef2-81eb-bc3877489217.txt in bucket gaia_files_pdf
Processing file: 4044eab7-1282-42bd-a559-3bf3a4d5858e.pdf
Extracted data saved to: opensource_extracted/4044eab7-1282-42bd-a559-3bf3a4d5858e.txt in bucket gaia_files_pdf
Processing file: 634fca59-03b2-4cdf-9ce4-0205df22f256.pdf
Extracted data saved to: opensource_extracted/634fca59-03b2-4cdf-9ce4-0205df22f256.txt in bucket gaia_files_pdf
Processing file: 67e8878b-5cef-4375-804e-e6291fdbe78a.pdf
Extracted data saved to: opensource_extracted/67e8878b-5cef-4375-804e-e6291fdbe78a.txt in bu

In [15]:
# Calculate and display performance metrics
calculate_performance_metrics()

Total files processed: 13
Total pages processed: 210
Total errors: 0
Total time: 56.85 seconds
Latency: 4.37 seconds per file
Throughput: 3.69 pages per second
Error rate: 0.00%


Explanation of Metrics:
Latency: Average time (in seconds) to process each file. Calculated as total_time / total_files_processed.
Throughput: The number of pages processed per second. Calculated as total_pages_processed / total_time.
Error Rate: The percentage of files that encountered errors during processing. Calculated as total_errors / total_files_processed.
Total Time: The overall time taken to process all files.