# Multi-PDF Parsing with LLama 4 Maverick and ThreadPool

**Author: Erica Yuen**

**Date: 07/03/2025**

###Summary

This notebook processes all PDFs in a Unity Catalog directory. It first converts the PDFs into individual pages as base 64 images, and then transcribes them with Llama 4, a best-in-class multimodal model that can caption figures and transcribe text and tables to markdown. This notebook automatically adjusts the number of workers using Threadpool and can be up to 2x faster than using a spark udf. This maximizes throughput, and outputs the progress of your files being processed.

This uses Databricks Model Serving, which is also HIPAA compliant.

### Notebook Instructions
1. **Update the paths to your UC Schema, source PDFs, and workspace URL in [Section 2]** and run the subsequent cells


2. **Optional:**
    - Choose if you want to save your results in one table with `PROCESSING_MODE = combined` (recommended) or `separate`
      - `PROCESSING_MODE = combined` will append the results in a single table (recommended). This will let you browse and query the results in Sections 5 and 6.
      - `PROCESSING_MODE = separate` will separate the results in different tables, for large PDFs (1k+ pages) or if the tables need different permissions

    - Update if you want `save_to_unity_catalog` `mode = "append"`(recommended) or `mode="overwrite"`


3. **Execute the batch processing in [Section 4]. Choose pay-per-token with `model=databricks-llama-4-maverick` or your own Provisioned Throughput Endpoint and update the number of initial, min, and max workers.**

    - To create the Provisioned Throughput endpoint, go to **Catalog** > system.ai > Models > llama-4-maverick. Then click **Serve this Model**. 

    - _**Note:** Scale-to-zero is not available for Llama 4 model yet, so only provision what you need and delete the endpoint if you are not using it_

    - Change the initial, max, and lowest workers based on your model endpoint. The code will automatically adapt the number of workers based on rate limits. For pay-per-token, 5-10 workers is a good start. For Provisioned Throughput 200 model units, use 30-40 workers. 

4. **Optional:**
    - Query and Inspect the tables in Section 5 if `PROCESSING_MODE = combined`
    - Browse the PDF page images and their respective transcribed outputs in Section 6

### Speed Estimate 
Speed depends on your cluster configurations and the Llama 4 endpoint. 
Based on some example PDFs, here are the estimated times for processing files:
- Converting PDF into base64 images: ~150 pages per min
- Parsing pages with Llama 4 Maverick
  - ~ 30 pages / min on Pay Per Token
  - ~120 pages / min for Provisioned Throughput endpoint with 200 model units (scale to zero not available for now)
  - So a PDF with 440 pages will take about 3 min preprocessing + 12 min parsing = 15 min on pay per token, or 6 min on PT endpoint with 200 model units

###Price Estimate
The price per page depends on the number of tokens each pages has. $3-7 dollars per 1k pages is estimate if you use pay per token, and if you use a provisioned throughput model, it is about $6 / hour for one band (this evens out to about the same cost per page if you are saturating the endpoint)



##Example input

![multimodal example.png](./assets/pdf_figure_example.png "pdf_figure_example.png")


##Example output



## FIGURE 2‚Äì5. CYTOKINE BALANCE

`<figure>` A diagram showing factors influencing the balance between Th1 and Th2 cytokine responses. The left side lists factors favoring Th1 phenotype, including presence of older siblings, early exposure to day care, tuberculosis, measles, or hepatitis A infection, and rural environment. The right side shows factors favoring Th2 phenotype, such as widespread use of antibiotics, Western lifestyle, urban environment, diet, and sensitization to house-dust mites and cockroaches. The diagram illustrates how these factors shift the balance toward either protective immunity (Th1) or allergic diseases including asthma (Th2). `</figure>`

| Factors favoring the Th1 phenotype | Factors favoring the Th2 phenotype |
| --- | --- |
| Presence of older siblings | Widespread use of antibiotics |
| Early exposure to day care | Western lifestyle |
| Tuberculosis, measles, or hepatitis A infection | Urban environment |
| Rural environment | Diet |
|  | Sensitization to house-dust mites and cockroaches |

Numerous factors, including alterations in the number or type of infections early in life, the widespread use of antibiotics, adoption of the Western lifestyle, and repeated exposure to allergens, may affect the balance between Th1-type and Th2-type cytokine responses and increase the likelihood that the immune response will be dominated by Th2 cells and thus will ultimately lead to the expression of allergic diseases such as asthma.

Reprinted by permission from Busse WW, Lemanske RF. Advances in Immunology N Engl J Med 2001; 344: 350-62. Copyright ¬© 2001 Massachusetts Medical Society. All rights reserved.

dramatic increases in asthma prevalence in westernized countries. This hypothesis is based on the assumption that the immune system of the newly born is skewed toward Th2 cytokine generation. Following birth, environmental stimuli such as infections will activate Th1 responses and bring the Th1/Th2 relationship to an appropriate balance. Evidence indicates that the incidence of asthma is reduced in association with certain infections (M. tuberculosis, measles, or hepatitis A), exposure to other children (e.g., presence of older siblings and early enrollment in childcare), and less frequent use of antibiotics (Eder et al. 2006; Gern et al. 1999; Gern and Busse 2002; Horwood et al. 1985; Sears et al. 2003). Furthermore, the absence of these lifestyle events is associated with the persistence of a Th2 cytokine pattern. Under these conditions, the genetic background of the child who has a cytokine imbalance toward Th2 will set the stage to promote the production of IgE antibodies to key environmental antigens, such as house-dust mite, cockroach, Alternaria, and possibly cat. Therefore, a gene-by-environment interaction occurs in which the susceptible host is exposed to environmental factors that are capable of generating IgE, and sensitization occurs. Precisely why the airways of some individuals are susceptible to these allergic events has not been established.

There also appears to be a reciprocal interaction between the two subpopulations in which Th1 cytokines can inhibit Th2 generation and vice versa. Allergic inflammation may be the result of an excessive expression of Th2 cytokines. Alternatively, recent studies have suggested the possibility that the loss of normal immune balance arises from a cytokine dysregulation in which Th1 activity in asthma is diminished. The focus on actions of cytokines and chemokines to regulate and activate the inflammatory profile in asthma has provided
'''

# 1) Installations, Imports, and preliminary setup

In [0]:
%pip install --quiet databricks-sdk httpx PyMuPDF openai
dbutils.library.restartPython()

In [0]:
import base64
import fitz 
import pandas as pd
import os
import glob
from pathlib import Path
import time
import random
import threading
from collections import deque
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql.functions import col, concat, lit, regexp_replace, split
from openai import OpenAI
from tqdm import tqdm


# 2) Set the UC paths and configuration

### To do: update the paths below

In [0]:
# UPDATE THESE PATHS FOR YOUR SETUP
PDF_DIRECTORY = "/Volumes/erica/parsing/pdfs/"  # Directory containing PDFs
OUTPUT_CATALOG = "erica.parsing"  # Catalog and schema for output tables

# You can choose the processing mode:
# "combined" - All PDFs go into one table with doc_id to distinguish (recommended)
# "separate" - Each PDF gets its own table
PROCESSING_MODE = "combined"  # or "separate"

# Table naming
if PROCESSING_MODE == "combined":
    INTERMEDIATE_TABLE = f"{OUTPUT_CATALOG}.all_pdfs_parsed_intermediate"
    FINAL_TABLE = f"{OUTPUT_CATALOG}.all_pdfs_parsed"
else:
    # For separate mode, tables will be named dynamically per PDF
    pass

DATABRICKS_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
DATABRICKS_BASE_URL = 'https://e2-demo-field-eng.cloud.databricks.com/serving-endpoints/'

print(f"üìÇ PDF Directory: {PDF_DIRECTORY}")
print(f"üìä Output Catalog: {OUTPUT_CATALOG}")
print(f"üîß Processing Mode: {PROCESSING_MODE}")
if PROCESSING_MODE == "combined":
    print(f"üíæ Final Table: {FINAL_TABLE}")

##Count number of pages

In [0]:
def count_pdf_pages_fitz(directory=".", show_details=True):
    """
    Count pages in all PDF files in a directory using PyMuPDF (fitz)
    
    Args:
        directory (str): Directory path to scan for PDFs
        show_details (bool): Whether to show individual file counts
    
    Returns:
        tuple: (total_pages, file_count, errors)
    """
    pdf_files = list(Path(directory).glob("*.pdf"))
    
    if not pdf_files:
        print(f"No PDF files found in '{directory}'")
        return 0, 0, 0
    
    total_pages = 0
    file_count = 0
    errors = 0
    
    print(f"Scanning {len(pdf_files)} PDF files in '{directory}'...\n")
    
    for pdf_file in pdf_files:
        try:
            # Open PDF with fitz
            doc = fitz.open(pdf_file)
            pages = doc.page_count
            doc.close()
            
            if show_details:
                print(f"{pdf_file.name:<50} {pages:>6} pages")
            
            total_pages += pages
            file_count += 1
            
        except Exception as e:
            print(f"ERROR - {pdf_file.name}: {e}")
            errors += 1
    print("\n")
    return total_pages, file_count, errors

In [0]:

total_pages, num_documents, errors = (count_pdf_pages_fitz(PDF_DIRECTORY, show_details=True))

print(f"Total {num_documents} PDFs found with a total of {total_pages} pages. {errors} errors.")

#3) Function Definitions

## PDF Processing Functions

In [0]:
def get_pdf_files(directory_path):
    """
    Get all PDF files from a Unity Catalog volume directory.
    
    Args:
        directory_path: Path to directory containing PDFs
        
    Returns:
        List of PDF file paths
    """
    try:
        # List all files in the directory
        files = dbutils.fs.ls(directory_path)
        
        # Filter for PDF files and clean the paths
        pdf_files = []
        for file in files:
            if file.path.lower().endswith('.pdf'):
                # Remove 'dbfs:' prefix if present to work with PyMuPDF
                clean_path = file.path.replace('dbfs:', '') if file.path.startswith('dbfs:') else file.path
                pdf_files.append(clean_path)
        
        print(f"Found {len(pdf_files)} PDF files in {directory_path}")
        for pdf in pdf_files:
            file_name = os.path.basename(pdf)
            print(f"  - {file_name}")
            print(f"    Path: {pdf}")
            
        return pdf_files
        
    except Exception as e:
        print(f"Error accessing directory {directory_path}: {str(e)}")
        return []

def get_clean_doc_name(pdf_path):
    """Extract a clean document name from the PDF path for table naming."""
    file_name = os.path.basename(pdf_path)
    # Remove .pdf extension and clean up for table naming
    clean_name = file_name.replace('.pdf', '').replace('.PDF', '')
    # Replace special characters with underscores
    clean_name = ''.join(c if c.isalnum() else '_' for c in clean_name)
    # Remove consecutive underscores and strip
    clean_name = '_'.join(filter(None, clean_name.split('_')))
    return clean_name.lower()

In [0]:
def convert_pdf_to_base64(pdf_path, dpi=300):
    """
    PDF conversion with better metadata and error handling.
    
    Args:
        pdf_path: Path to PDF file
        dpi: Resolution
    
    Returns:
        pandas DataFrame with metadata, success boolean, error message
    """
    
    zoom = dpi / 72
    zoom_matrix = fitz.Matrix(zoom, zoom)
    
    try:
        doc = fitz.open(pdf_path)
        num_pages = len(doc)
        
        # Extract document metadata
        metadata = doc.metadata
        file_name = os.path.basename(pdf_path)
        clean_doc_name = get_clean_doc_name(pdf_path)
        
        print(f"Converting {file_name} to base64: {num_pages} pages at {dpi} DPI...")
        
        df_data = []
        start_time = time.time()
        
        for page_num in range(num_pages):
            if page_num % 25 == 0:  # Progress update every 25 pages
                print(f"  Converting page {page_num + 1}/{num_pages} to base64")
            
            page = doc.load_page(page_num)
            
            # Get page dimensions and text for metadata
            page_rect = page.rect
            page_text_length = len(page.get_text())
            
            pix = page.get_pixmap(matrix=zoom_matrix, alpha=False)
            img_bytes = pix.tobytes("png")  
            img_base64 = base64.b64encode(img_bytes).decode('utf-8')
            
            df_data.append({
                'doc_id': pdf_path,
                'doc_name': clean_doc_name,
                'file_name': file_name,
                'page_num': page_num + 1,
                'total_pages': num_pages,
                'page_width': page_rect.width,
                'page_height': page_rect.height,
                'page_text_length': page_text_length,
                'base64_img': img_base64,
                'processed_timestamp': datetime.now(),
                'dpi': dpi,
                'doc_title': metadata.get('title', ''),
                'doc_author': metadata.get('author', ''),
                'doc_subject': metadata.get('subject', ''),
                'doc_creator': metadata.get('creator', '')
            })
        
        doc.close()
        processing_time = time.time() - start_time
        
        print(f"  Conversion complete: {len(df_data)} pages in {processing_time:.1f}s")
        
        return pd.DataFrame(df_data), True, None
        
    except Exception as e:
        error_msg = f"Error processing {pdf_path}: {str(e)}"
        print(f"‚ùå {error_msg}")
        return None, False, error_msg

### Optional: change `mode="append"` or `mode="overwrite"` in the function below

In [0]:
def save_to_unity_catalog(df, table_path, mode="append"):
    """
    Save function with better error handling and options.
    """
    try:
        spark_df = spark.createDataFrame(df)
        
        if mode == "overwrite":
            spark_df.write \
                .format("delta") \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(table_path)
        else:
            spark_df.write \
                .format("delta") \
                .mode("append") \
                .saveAsTable(table_path)
        
        print(f"‚úÖ Saved {len(df)} records to: {table_path}")
        return True
        
    except Exception as e:
        print(f"‚ùå Error saving to {table_path}: {str(e)}")
        return False

## Threadpool with Adaptive Concurrency Functions

Handle the LLM processing with adaptive concurrency based on rate limits.

In [0]:
RETRYABLE_ERROR_SUBSTRINGS = ["retry", "got empty embedding result", "request_limit_exceeded", "rate limit", "insufficient_quota", "expecting value", "rate", "overloaded", "429", "bad gateway", "502"]

class RateLimitTracker:
    """Track API rate limits and adjust concurrency dynamically."""
    
    def __init__(self, initial_workers=5, min_workers=1, max_workers=10):
        self.current_workers = initial_workers
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.rate_limit_events = deque(maxlen=20)  # Track recent rate limits
        self.success_count = 0
        self.lock = threading.Lock()
        
    def record_rate_limit(self):
        """Record a rate limit event and potentially reduce workers."""
        with self.lock:
            self.rate_limit_events.append(datetime.now())
            
            # If we've had multiple rate limits recently, reduce workers
            recent_limits = sum(1 for event in self.rate_limit_events 
                              if datetime.now() - event < timedelta(minutes=2))
            
            if recent_limits >= 3 and self.current_workers > self.min_workers:
                old_workers = self.current_workers
                self.current_workers = max(self.min_workers, self.current_workers - 1)
                print(f"üîΩ Rate limits detected! Reducing workers: {old_workers} ‚Üí {self.current_workers}")
                
    def record_success(self):
        """Record successful processing and potentially increase workers."""
        with self.lock:
            self.success_count += 1
            
            # If no recent rate limits and we've had some successes, gradually increase workers
            recent_limits = sum(1 for event in self.rate_limit_events 
                              if datetime.now() - event < timedelta(minutes=5))
            
            # Increase workers every 20 successes if no recent rate limits
            if (recent_limits == 0 and 
                self.current_workers < self.max_workers and 
                self.success_count % 20 == 0):
                old_workers = self.current_workers
                self.current_workers = min(self.max_workers, self.current_workers + 1)
                print(f"üîº Performance good! Increasing workers: {old_workers} ‚Üí {self.current_workers}")

In [0]:
def process_single_image(prompt, image_data, image_index, databricks_token, databricks_url, model, rate_tracker):
    """Process a single image with adaptive rate limiting."""
    
    client = OpenAI(api_key=databricks_token, base_url=databricks_url)
    
    # Skip empty images
    if pd.isna(image_data) or image_data == "":
        return (image_index, "ERROR: Empty image")
    
    
    # Retry logic with exponential backoff
    for attempt in range(3):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[{
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}
                        }
                    ]
                }]
            )
            
            result = response.choices[0].message.content.strip()
            rate_tracker.record_success()
            
            # Print success message if this was a retry attempt
            if attempt > 0:
                print(f"‚úÖ SUCCESS: Image {image_index} processed successfully after {attempt + 1} attempts")
            
            return (image_index, result)
            
        except Exception as e:
            error_str = str(e).lower()
            is_retryable = any(substring in error_str for substring in RETRYABLE_ERROR_SUBSTRINGS)
            
            if is_retryable:
                rate_tracker.record_rate_limit()
                
                if attempt < 2:  # Only retry if we have attempts left
                    # Exponential backoff with jitter
                    wait_time = (2 ** attempt) + random.uniform(1, 3)
                    print(f"‚ö†Ô∏è  RATE LIMIT: Image {image_index}, attempt {attempt + 1}/3. Retrying in {wait_time:.1f}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    print(f"‚ùå FAILED: Image {image_index} failed after 3 attempts due to rate limiting")
                    return (image_index, f"ERROR: Rate limited after 3 attempts - {str(e)}")
            else:
                print(f"‚ùå ERROR: Image {image_index} failed with non-retryable error: {str(e)}")
                return (image_index, f"ERROR: {str(e)}")
    
    return (image_index, "ERROR: Max retries exceeded")

In [0]:
def process_images_adaptive(prompt, images, databricks_token, databricks_url, 
                           model="databricks-llama-4-maverick", 
                           initial_workers=5, min_workers=1, max_workers=10):
    """
    Adaptive processing that adjusts concurrency based on rate limits.
    
    Args:
        images: pandas Series of base64 encoded image strings
        databricks_token: Token for Databricks API  
        databricks_url: Base URL for Databricks API
        model: Model name to use
        initial_workers: Starting number of concurrent workers
        min_workers: Minimum workers (fallback during heavy rate limiting)
        max_workers: Maximum workers (cap for scaling up)
        
    Returns:
        pandas Series: Results with same index as input
    """
    
    # Convert to pandas Series if needed
    if not isinstance(images, pd.Series):
        images = pd.Series(images)
    
    results = pd.Series(index=images.index, dtype='object')
    rate_tracker = RateLimitTracker(
        initial_workers=initial_workers, 
        min_workers=min_workers, 
        max_workers=max_workers
    )
    
    print(f"üöÄ Starting transcription of {len(images)} images...")
    print(f"üìä Model: {model}")
    print(f"‚öôÔ∏è  Workers: {initial_workers} (range: {min_workers}-{max_workers})")
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        with tqdm(total=len(images), desc="Processing images", unit="img") as pbar:
            
            remaining_items = list(images.items())
            
            while remaining_items:
                # Submit batch based on current worker count
                batch_size = min(rate_tracker.current_workers, len(remaining_items))
                current_batch = remaining_items[:batch_size]
                remaining_items = remaining_items[batch_size:]
                
                # Submit current batch
                futures = {
                    executor.submit(process_single_image, prompt, img_data, idx, 
                                  databricks_token, databricks_url, model, rate_tracker): idx
                    for idx, img_data in current_batch
                }
                
                # Process batch results
                for future in as_completed(futures):
                    try:
                        image_index, result = future.result()
                        results[image_index] = result
                        
                        # Update progress bar with status and current worker count
                        if result.startswith("ERROR:"):
                            pbar.set_postfix({
                                "Last": f"‚ùå {image_index}", 
                                "Workers": rate_tracker.current_workers,
                                "Rate Limits": len(rate_tracker.rate_limit_events)
                            })
                        else:
                            pbar.set_postfix({
                                "Last": f"‚úÖ {image_index}", 
                                "Workers": rate_tracker.current_workers,
                                "Rate Limits": len(rate_tracker.rate_limit_events)
                            })
                        
                    except Exception as e:
                        idx = futures[future]
                        results[idx] = f"ERROR: Exception - {str(e)}"
                        pbar.set_postfix({
                            "Last": f"‚ùå {idx} (Exception)", 
                            "Workers": rate_tracker.current_workers
                        })
                        print(f"‚ùå EXCEPTION: Image {idx} failed with exception: {str(e)}")
                    
                    pbar.update(1)
                
                # Small delay between batches if we have more to process
                if remaining_items:
                    time.sleep(0.2)  # Small delay to prevent overwhelming
    
    # Final summary statistics
    error_count = sum(1 for result in results if str(result).startswith("ERROR:"))
    success_count = len(results) - error_count
    
    print(f"\nüìà Llama 4 Transcription Summary:")
    print(f"   ‚úÖ Successful: {success_count}/{len(results)}")
    print(f"   ‚ùå Failed: {error_count}/{len(results)}")
    print(f"   üìä Success rate: {(success_count/len(results)*100):.1f}%")
    print(f"   üîß Final worker count: {rate_tracker.current_workers}")
    print(f"   ‚ö†Ô∏è  Total rate limit events: {len(rate_tracker.rate_limit_events)}")
    
    return results

## Main Batch Processing Function

In [0]:

    # Define the prompt
    PROMPT = """
    Instructions: Transcribe only the visible text from this PDF page. 

    Rules:
    - Use markdown formatting only for text that appears formatted in the original
    - Do not add document titles, page headers, or section headings unless explicitly visible
    - Do not add introductory text like 'This page contains...' or 'The document shows...' or '# Transcription of PDF Page'
    - Preserve exact wording and medical terminology
    - For images/diagrams: describe content within <figure></figure> tags
    - For tables: use markdown table format if present
    - Start transcription immediately without preamble

    For visual elements, follow these rules:

    **TABLES**: If the content is clearly a structured table, provide BOTH:
    1. A detailed caption in <figure></figure> tags describing the table structure and content
    2. The actual table recreated in markdown format with proper alignment

    **FLOWCHARTS/DECISION TREES**: Provide detailed caption in <figure></figure> tags including:
    - Starting point and decision criteria
    - All pathways and decision branches
    - Specific thresholds, values, and conditions
    - Final outcomes and recommendations
    - Flow direction and logical connections

    **CHARTS/DIAGRAMS**: Provide detailed caption in <figure></figure> tags including:
    - Chart type and title
    - All categories, sections, and color coding
    - Specific values, ranges, and criteria
    - Evidence levels and recommendations
    - Visual organization and groupings

    **FORMS/CHECKLISTS**: Transcribe structure using markdown formatting, preserving:
    - Section headers and numbering
    - Checkbox options and rating scales
    - Please bold the Key in Key-Value Pairs in the form, e.g. **Name **: John Doe.

    Preserve exact medical terminology, drug names, dosages, and clinical criteria for diagnostic accuracy.

    This transcription will be used for medical diagnosis, so accuracy is critical.
    """

In [0]:
def process_multiple_pdfs(pdf_directory, output_catalog, prompt=PROMPT, processing_mode="combined", 
                         dpi=300, model="databricks-llama-4-maverick", initial_workers=5, 
                         min_workers=1, max_workers=10):
    """
    Process all PDFs in a directory.
    
    Args:
        pdf_directory: Directory containing PDF files
        output_catalog: Catalog.schema for output tables
        processing_mode: "combined" or "separate"
        dpi: Image resolution
        model: LLM model to use
    """
    
    # Discover PDF files
    pdf_files = get_pdf_files(pdf_directory)
    
    if not pdf_files:
        print("No PDF files found. Exiting.")
        return
    
    print(f"\nüöÄ Starting batch processing of {len(pdf_files)} PDFs")
    print(f"üìä Processing mode: {processing_mode}")
    print(f"üéØ Output catalog: {output_catalog}")
    
    # Initialize tracking variables
    total_files = len(pdf_files)
    successful_files = 0
    failed_files = 0
    total_pages_processed = 0
    all_results = []
    processing_log = []
    
    # Process each PDF
    for file_idx, pdf_path in enumerate(pdf_files, 1):
        file_name = os.path.basename(pdf_path)
        clean_doc_name = get_clean_doc_name(pdf_path)
        
        print(f"\n{'='*60}")
        print(f"üìÑ Processing file {file_idx}/{total_files}: {file_name}")
        print(f"{'='*60}")
        
        file_start_time = time.time()
        
        try:
            # Convert PDF to base64 images
            df, success, error = convert_pdf_to_base64(pdf_path, dpi=dpi)
            
            if not success:
                failed_files += 1
                processing_log.append({
                    'file_name': file_name,
                    'status': 'FAILED_CONVERSION',
                    'error': error,
                    'pages_processed': 0,
                    'processing_time': time.time() - file_start_time
                })
                continue
            
            # Save intermediate results
            if processing_mode == "combined":
                intermediate_table = f"{output_catalog}.all_pdfs_parsed_intermediate"
                save_mode = "append" if file_idx > 1 else "overwrite"
            else:
                intermediate_table = f"{output_catalog}.{clean_doc_name}_parsed_intermediate"
                save_mode = "overwrite"
                
            save_to_unity_catalog(df, intermediate_table, mode=save_mode)
            
            # Process images with LLM
            print(f"ü§ñ Starting LLM processing for {len(df)} pages...")
            
            # Process with adaptive rate limiting
            results_series = process_images_adaptive(
                prompt=prompt,
                images=df['base64_img'],
                databricks_token=DATABRICKS_TOKEN,
                databricks_url=DATABRICKS_BASE_URL,
                model=model,
                initial_workers=initial_workers,
                min_workers=min_workers,
                max_workers=max_workers
            )
            
            # Add transcription results to dataframe
            df['transcription'] = results_series
            
            # Count successful transcriptions
            error_count = sum(1 for result in results_series if str(result).startswith("ERROR:"))
            success_count = len(results_series) - error_count
            
            # Save final results
            if processing_mode == "combined":
                final_table = f"{output_catalog}.all_pdfs_parsed"
                save_mode = "append" if file_idx > 1 else "overwrite"
            else:
                final_table = f"{output_catalog}.{clean_doc_name}_parsed"
                save_mode = "overwrite"
                
            save_success = save_to_unity_catalog(df, final_table, mode=save_mode)
            
            if save_success:
                successful_files += 1
                total_pages_processed += len(df)
                all_results.append(df)
                
                file_processing_time = time.time() - file_start_time
                
                processing_log.append({
                    'file_name': file_name,
                    'status': 'SUCCESS',
                    'pages_processed': len(df),
                    'successful_transcriptions': success_count,
                    'failed_transcriptions': error_count,
                    'processing_time': file_processing_time,
                    'final_table': final_table
                })
                
                print(f"‚úÖ File completed successfully:")
                print(f"   üìä Pages: {len(df)}")
                print(f"   ‚úÖ Successful transcriptions: {success_count}")
                print(f"   ‚ùå Failed transcriptions: {error_count}")
                print(f"   ‚è±Ô∏è  Processing time: {file_processing_time:.1f}s")
                print(f"   üíæ Saved to: {final_table}")
            else:
                failed_files += 1
                processing_log.append({
                    'file_name': file_name,
                    'status': 'FAILED_SAVE',
                    'pages_processed': len(df),
                    'processing_time': time.time() - file_start_time
                })
                
        except Exception as e:
            failed_files += 1
            file_processing_time = time.time() - file_start_time
            error_msg = str(e)
            
            processing_log.append({
                'file_name': file_name,
                'status': 'FAILED_EXCEPTION',
                'error': error_msg,
                'pages_processed': 0,
                'processing_time': file_processing_time
            })
            
            print(f"‚ùå Failed to process {file_name}: {error_msg}")
    
    # Final summary
    print(f"\n{'='*80}")
    print(f"üéä BATCH PROCESSING COMPLETE")
    print(f"{'='*80}")
    print(f"üìä Files processed: {successful_files}/{total_files}")
    print(f"üìÑ Total pages processed: {total_pages_processed}")
    print(f"‚úÖ Successful files: {successful_files}")
    print(f"‚ùå Failed files: {failed_files}")
    
    if processing_mode == "combined" and successful_files > 0:
        print(f"üíæ All results combined in: {output_catalog}.all_pdfs_parsed")
    
    # Show processing log
    print(f"\nüìã PROCESSING LOG:")
    for log_entry in processing_log:
        status_emoji = "‚úÖ" if log_entry['status'] == 'SUCCESS' else "‚ùå"
        print(f"   {status_emoji} {log_entry['file_name']}: {log_entry['status']} "
              f"({log_entry['pages_processed']} pages, {log_entry['processing_time']:.1f}s)")
        
        if 'error' in log_entry:
            print(f"      Error: {log_entry['error']}")
    
    return processing_log, all_results

# 4) üöÄ Execute the Batch Processing

This cell will start processing all PDFs in your directory.

It will first start off with converting the PDFs into base64 images (~150 - 200 pages a min)
Then it will call Llama 4 on this images.

For pay-per-token model = `databricks-llama-4-maverick`, put initial workers = 5, and max workers at 10.  This processes about 30 pages a minute (depending on the number of tokens per page).

For provisioned throughput with 200 model units, put initial workers at 30 and max workers at 40. This processes about 120 pages a minute.

In [0]:
# Run the batch processing
processing_log, all_results = process_multiple_pdfs(
    pdf_directory=PDF_DIRECTORY,
    output_catalog=OUTPUT_CATALOG,
    prompt = PROMPT,
    processing_mode=PROCESSING_MODE,
    dpi=300,
    model="llama-4-maverick-pt", #default databricks-llama-4-maverick, change to your own provisioned throughput endpoint for more speed
    initial_workers=30, #update if you have a provisioned throughput endpoint
    min_workers=1, #default 1
    max_workers=50 #update if you have a provisioned throughput endpoint
)

# 5) Optional: Query and Inspect Results (PROCESSING_MODE == combined only)

Use these cells to explore your processed data.

In [0]:
# If using combined mode, show summary statistics
if PROCESSING_MODE == "combined":
    summary_df = spark.sql(f"""
        SELECT 
            file_name,
            doc_name,
            COUNT(*) as total_pages,
            SUM(CASE WHEN transcription NOT LIKE 'ERROR:%' THEN 1 ELSE 0 END) as successful_pages,
            SUM(CASE WHEN transcription LIKE 'ERROR:%' THEN 1 ELSE 0 END) as failed_pages,
            AVG(page_text_length) as avg_page_text_length,
            MIN(processed_timestamp) as first_processed,
            MAX(processed_timestamp) as last_processed
        FROM {FINAL_TABLE}
        GROUP BY file_name, doc_name
        ORDER BY file_name
    """)
    
    print("üìä PROCESSING SUMMARY BY FILE:")
    display(summary_df)

In [0]:
# Show sample transcriptions
if PROCESSING_MODE == "combined":
    sample_df = spark.sql(f"""
        SELECT file_name, page_num, LEFT(transcription, 500) as transcription_preview
        FROM {FINAL_TABLE}
        WHERE transcription NOT LIKE 'ERROR:%'
        AND LENGTH(transcription) > 100
        LIMIT 5
    """)
    
    print("üìù SAMPLE TRANSCRIPTIONS:")
    display(sample_df)

In [0]:
# Show error analysis
if PROCESSING_MODE == "combined":
    error_df = spark.sql(f"""
        SELECT 
            file_name,
            CASE 
                WHEN transcription LIKE 'ERROR: Rate limited%' THEN 'Rate Limited'
                WHEN transcription LIKE 'ERROR: Empty image%' THEN 'Empty Image'
                WHEN transcription LIKE 'ERROR:%' THEN 'Other Error'
                ELSE 'Success'
            END as result_type,
            COUNT(*) as count
        FROM {FINAL_TABLE}
        GROUP BY file_name, 
            CASE 
                WHEN transcription LIKE 'ERROR: Rate limited%' THEN 'Rate Limited'
                WHEN transcription LIKE 'ERROR: Empty image%' THEN 'Empty Image'
                WHEN transcription LIKE 'ERROR:%' THEN 'Other Error'
                ELSE 'Success'
            END
        ORDER BY file_name, result_type
    """)
    
    print("‚ö†Ô∏è ERROR ANALYSIS:")
    display(error_df)

# 6) Optional: View individual pages and their transcribed outputs

Use this cell to browse individual pages if needed for debugging.

In [0]:
# Browse individual images - Update these values to explore
BROWSE_FILE_NAME = "GOLD-2025-Report-v1.0-15Nov2024_WMV.pdf"  # Change this to your file
BROWSE_PAGE_NUM = 40  # Change this to the page you want to see

    page_data = spark.sql(f"""
        SELECT base64_img, transcription 
        FROM {FINAL_TABLE}
        WHERE file_name = '{BROWSE_FILE_NAME}' 
        AND page_num = {BROWSE_PAGE_NUM}
        LIMIT 1
    """).collect()
    
    if page_data:
        from IPython.display import Image as IPImage
        import base64
        
        def show_image(base64_str):
            return IPImage(data=base64.b64decode(base64_str))
        
        print(f"üìÑ Viewing: {BROWSE_FILE_NAME}, Page {BROWSE_PAGE_NUM}")
        display(show_image(page_data[0]['base64_img']))
        
        print(page_data[0]['transcription'])
    else:
        print(f"‚ùå No data found for {BROWSE_FILE_NAME}, page {BROWSE_PAGE_NUM}")
else:
    print("Image browsing only available in 'combined' mode")