In [5]:
import boto3
import os
import tempfile
import pymupdf
import json
import time
import traceback
from tqdm import tqdm
from typing import List, Dict
from tempfile import NamedTemporaryFile
from dotenv import load_dotenv
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from functools import partial

load_dotenv()

# Set up Minio client using boto3
s3_client = boto3.client(
    's3',
    endpoint_url=os.getenv('MINIO_API_ENDPOINT'),
    aws_access_key_id=os.getenv('MINIO_ACCESS_KEY'),
    aws_secret_access_key=os.getenv('MINIO_SECRET_KEY')
)

def extract_text_from_pdf(file_path, s3_path):
    """Extract text from a PDF file using pymupdf"""
    try:
        doc = pymupdf.open(file_path)
        pdf_text = ""
        for page in doc:
            text = page.get_text().encode("utf8").decode("utf8", errors='ignore')
            pdf_text += text + "\n"
        return {"s3_path": s3_path, "text": pdf_text, "status": "success"}
    except pymupdf.EmptyFileError:
        print(f"Empty PDF file: {s3_path}")
        return {"s3_path": s3_path, "text": "", "status": "empty_file"}
    except Exception as e:
        print(f"Error processing {s3_path}: {e}")
        return {"s3_path": s3_path, "text": "", "status": "error", "error": str(e)}

def process_pdf(key, bucket, temp_dir):
    """Process a single PDF file - for parallel execution"""
    # Create a new S3 client for each process to avoid sharing connections
    s3_process_client = boto3.client(
        's3',
        endpoint_url=os.getenv('MINIO_API_ENDPOINT'),
        aws_access_key_id=os.getenv('MINIO_ACCESS_KEY'),
        aws_secret_access_key=os.getenv('MINIO_SECRET_KEY')
    )
    
    temp_file_path = os.path.join(temp_dir, os.path.basename(key))
    
    try:
        # Download the PDF
        s3_process_client.download_file(bucket, key, temp_file_path)
        
        # Extract text
        result = extract_text_from_pdf(temp_file_path, key)
        
    except Exception as e:
        print(f"Error downloading {key}: {e}")
        result = {"s3_path": key, "text": "", "status": "download_error", "error": str(e)}
    
    # Clean up
    if os.path.exists(temp_file_path):
        os.remove(temp_file_path)
        
    return result

def get_cache_filename(bucket, prefix):
    """Generate a consistent cache filename based on bucket and prefix"""
    # Create a clean filename by replacing invalid chars with underscores
    safe_prefix = prefix.replace('/', '_').replace('\\', '_').rstrip('_')
    return f"s3_keys_cache_{bucket}_{safe_prefix}.json"

def save_keys_to_cache(bucket, prefix, pdf_keys):
    """Save the list of PDF keys to a local cache file"""
    cache_file = get_cache_filename(bucket, prefix)
    cache_data = {
        "bucket": bucket,
        "prefix": prefix,
        "timestamp": time.time(),
        "pdf_keys": pdf_keys
    }
    
    try:
        with open(cache_file, 'w') as f:
            json.dump(cache_data, f)
        print(f"Saved {len(pdf_keys)} keys to cache file: {cache_file}")
        return True
    except Exception as e:
        print(f"Error saving keys to cache: {e}")
        return False

def load_keys_from_cache(bucket, prefix):
    """Load the list of PDF keys from a local cache file if it exists"""
    cache_file = get_cache_filename(bucket, prefix)
    
    if not os.path.exists(cache_file):
        return None
    
    try:
        with open(cache_file, 'r') as f:
            cache_data = json.load(f)
            
        # Verify cache matches current request
        if cache_data.get("bucket") != bucket or cache_data.get("prefix") != prefix:
            print("Cache mismatch: bucket or prefix has changed")
            return None
            
        pdf_keys = cache_data.get("pdf_keys", [])
        timestamp = cache_data.get("timestamp", 0)
        
        # Calculate age of cache in hours
        cache_age_hours = (time.time() - timestamp) / 3600
        
        print(f"Loaded {len(pdf_keys)} keys from cache (age: {cache_age_hours:.2f} hours)")
        return pdf_keys
    except Exception as e:
        print(f"Error loading keys from cache: {e}")
        return None

def speed_test_fitz_extraction(bucket, prefix):
    """Speed test pymupdf text extraction on PDFs in a bucket with parallel processing"""
    start_time = time.time()
    
    # Try to load keys from cache first
    pdf_keys = load_keys_from_cache(bucket, prefix)
    
    # If no cache exists, perform pagination
    if pdf_keys is None:
        print(f"No valid cache found. Paginating S3 bucket {bucket} with prefix {prefix}...")
        pdf_keys = []
        
        # List objects in the bucket with the specified prefix
        paginator = s3_client.get_paginator('list_objects_v2')
        page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
        
        # Collect all PDF keys
        for page in page_iterator:
            if 'Contents' not in page:
                continue
            for obj in page['Contents']:
                key = obj['Key']
                if key.endswith('.pdf'):
                    pdf_keys.append(key)
        
        # Save keys to cache for future use
        save_keys_to_cache(bucket, prefix, pdf_keys)
    
    print(f"Processing {len(pdf_keys)} PDF files...")
    
    # Create a temporary directory for downloading PDFs
    with tempfile.TemporaryDirectory() as temp_dir:
        # Determine the number of processes to use (leave one core free)
        num_processes = max(1, multiprocessing.cpu_count() - 1)
        
        # Create a partial function with fixed arguments
        process_pdf_with_args = partial(process_pdf, bucket=bucket, temp_dir=temp_dir)
        
        # Process PDFs in parallel
        results = []
        with ProcessPoolExecutor(max_workers=num_processes) as executor:
            # Submit all tasks and get futures
            futures = [executor.submit(process_pdf_with_args, key) for key in pdf_keys]
            
            # Process results as they complete using tqdm for progress tracking
            for future in tqdm(futures, desc="Processing PDFs", total=len(pdf_keys)):
                results.append(future.result())
    
    # Write results to JSONL file
    with open("fitz_extraction_results.jsonl", "w", encoding="utf-8") as f:
        for result in results:
            f.write(json.dumps(result) + "\n")
    
    elapsed_time = time.time() - start_time
    print(f"\nProcessed {len(results)} PDFs in {elapsed_time:.2f} seconds")
    print(f"Average time per PDF: {elapsed_time/len(results):.2f} seconds")
    print(f"Results saved to fitz_extraction_results.jsonl")

# Example usage
# Replace with the same bucket and prefix from file_context_0
dest_bucket = os.getenv('MINIO_BUCKET_NAME', 'neurips-2024')
dest_prefix = 'neurips-500-speedtest/' 

# Run the speed test
if __name__ == "__main__":
    speed_test_fitz_extraction(dest_bucket, dest_prefix)

Processing PDFs:  25%|██▍       | 129/519 [00:05<00:16, 23.16it/s]

MuPDF error: unsupported error: cannot create appearance stream for Screen annotations

MuPDF error: unsupported error: cannot create appearance stream for Screen annotations

MuPDF error: unsupported error: cannot create appearance stream for Screen annotations

MuPDF error: unsupported error: cannot create appearance stream for Screen annotations



Processing PDFs: 100%|██████████| 519/519 [00:31<00:00, 16.29it/s]



Processed 519 PDFs in 32.63 seconds
Average time per PDF: 0.06 seconds
Results saved to fitz_extraction_results.jsonl


In [15]:
def pretty_print_result(line_number):
    """
    Opens the fitz_extraction_results.jsonl file and pretty prints the specified line.
    
    Args:
        line_number (int): The line number to print (1-based indexing)
    """
    try:
        with open("fitz_extraction_results.jsonl", "r", encoding="utf-8") as f:
            lines = f.readlines()
            
        if line_number < 1 or line_number > len(lines):
            print(f"Error: Line number {line_number} is out of range. File has {len(lines)} lines.")
            return
        
        # Get the specified line (adjusting for 0-based indexing)
        json_line = lines[line_number - 1].strip()
        
        # Parse the JSON
        data = json.loads(json_line)
        
        print("\n--- Document Metadata ---")
        # Print some basic stats about the extraction
        if "text" in data and "text" in data["text"]:
            print(f"Text length: {len(data['text'])} characters")
            print(f"s3_path: {data['s3_path']}")
        

        print(f"Line {line_number} of {len(lines)}:")
        print("\n--- Document Text ---\n")
        
        # Print the actual text with proper newlines
        if "text" in data:
            # Print the text directly to preserve formatting
            print(data["text"])
        
            
    except FileNotFoundError:
        print("Error: fitz_extraction_results.jsonl file not found.")
    except json.JSONDecodeError:
        print(f"Error: Line {line_number} contains invalid JSON.")
    except KeyError as e:
        print(f"Error: Missing expected key in JSON structure: {str(e)}")
    except Exception as e:
        print(f"Error: {str(e)}")

# Example usage
pretty_print_result(1)  # Print the first result



--- Document Metadata ---
Text length: 67935 characters
s3_path: neurips-500-speedtest/3D Structure Prediction of Atomic Systems with Flow-based Direct Preference Optimization.pdf
Line 1 of 519:

--- Document Text ---

3D Structure Prediction of Atomic Systems with
Flow-Based Direct Preference Optimization
Rui Jiao1,2 Xiangzhe Kong1,2 Wenbing Huang3,4∗Yang Liu1,2∗
1Dept. of Comp. Sci. & Tech., Institute for AI, Tsinghua University
2Institute for AIR, Tsinghua University
3Gaoling School of Artificial Intelligence, Renmin University of China
4 Beijing Key Laboratory of Big Data Management and Analysis Methods, Beijing, China
Abstract
Predicting high-fidelity 3D structures of atomic systems is a fundamental yet
challenging problem in scientific domains. While recent work demonstrates the
advantage of generative models in this realm, the exploration of different probability
paths are still insufficient, and hallucinations during sampling are persistently
occurring. To address these pitfal