# Single Model, Single Prompt Test

## Overview
This notebook demonstrates the extraction of work order numbers from invoice images using a single model (Pixtral-12B) and a single prompt approach. It serves as a replication of the successful RunPod experiment using our new structured framework.

## Objectives
- Load the Pixtral-12B model using our model management framework
- Apply a specific prompt that was effective in previous tests
- Process invoice images to extract work order numbers
- Evaluate extraction accuracy against ground truth data
- Establish a baseline for further prompt and model experimentation

## Approach
We'll use the basic prompt that worked well in our initial tests:
```
Extract the work order number from this invoice image.
```

This will be formatted appropriately for the Pixtral model using our prompt management system.

## Expected Results
- Extraction accuracy metrics (exact match rate, character error rate)
- Processing time analysis
- Comparison with previous RunPod experiment results
- Foundation for systematic prompt and model comparisons

This notebook represents Step 1 in our experimental workflow, focusing on reproducing known successful results before expanding to prompt comparison (Step 2) and model comparison (Step 3).

In [None]:
# Import required libraries
import os
import sys
import platform
import torch
import transformers
from pathlib import Path
import yaml
import logging
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

# Configure basic logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('single_model_test')

# Display basic system information
print(f"🔍 Environment Setup and Verification")
print(f"🐍 Python version: {platform.python_version()}")
print(f"📊 PyTorch version: {torch.__version__}")
print(f"🤖 Transformers version: {transformers.__version__}")
print(f"📅 Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

# Set up project paths
# This cell finds the project root and initializes path configuration

# First, try to find the project root directory
notebook_path = Path().resolve()
project_root = None

# Look for project root markers (traversing upward from the notebook)
for parent in [notebook_path] + list(notebook_path.parents):
    if any((parent / marker).exists() for marker in ['.git', 'setup.py', 'requirements.txt']):
        project_root = parent
        break

if project_root is None:
    # If markers not found, assume we're in a subdirectory of the project
    # and the parent directory is the project root
    project_root = notebook_path.parent
    print("⚠️ Could not definitively locate project root, using parent directory")
else:
    print(f"✅ Found project root: {project_root}")

# Add project root to Python path if not already there
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))
    print(f"📌 Added {project_root} to Python path")

# Set environment variable for project root
os.environ['PROJECT_ROOT'] = str(project_root)
print(f"🔄 Set PROJECT_ROOT environment variable to {project_root}")

# Verify critical directories exist, create if needed
critical_dirs = [
    "configs/environments",
    "configs/prompts",       # For prompt configurations
    "src/config",
    "data/images",
    "models/cache",
    "results/raw",
    "results/visualizations", # For result visualizations
    "logs"
]

for dir_path in critical_dirs:
    full_path = project_root / dir_path
    if not full_path.exists():
        print(f"📁 Creating directory: {dir_path}")
        full_path.mkdir(parents=True, exist_ok=True)
    else:
        print(f"✅ Directory exists: {dir_path}")

# Check for ground truth data file
ground_truth_path = project_root / "data" / "ground_truth.csv"
if os.path.exists(ground_truth_path):
    print(f"✅ Ground truth data file found")
else:
    print(f"⚠️ Ground truth data file not found at: {ground_truth_path}")
    print("   This will be needed for comparing extraction results.")

# Check for GPU availability
if torch.cuda.is_available():
    device_name = torch.cuda.get_device_name(0)
    memory = torch.cuda.get_device_properties(0).total_memory / 1e9
    print(f"🖥️ GPU: {device_name} ({memory:.2f} GB)")
else:
    print("⚠️ No GPU detected - running in CPU mode")

In [None]:
# Import and initialize environment configuration
# This cell loads appropriate configuration for local or RunPod environment

try:
    # Import configuration modules
    from src.config.environment import get_environment_config
    from src.config.paths import get_path_config
    
    # Get environment configuration
    env_config = get_environment_config()
    print(f"📌 Detected environment: {env_config.environment}")
    
    # Get path configuration for this experiment
    experiment_name = f"single_model_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    paths = get_path_config(experiment_name=experiment_name)
    
    # Display configurations
    print("\n🔍 Environment Configuration Summary:")
    env_config.print_summary()
    
    print("\n📂 Path Configuration:")
    print(paths)
    
    # Verify paths are valid
    if paths.verify(verbose=True):
        print("✅ All critical paths verified")
    else:
        print("⚠️ Some paths could not be verified")
    
    # Import experiment configuration
    from src.config.experiment import get_experiment_config
    
    # Create experiment configuration for this test
    experiment_config = get_experiment_config(
        experiment_type="single_model",
        overrides={
            "experiment.name": experiment_name,
            "experiment.description": "Testing work order extraction with Pixtral-12B using a single prompt",
            "experiment.model_name": "pixtral-12b",
            "experiment.field_to_extract": "work_order",
            "experiment.prompt_category": "specific",  # Using a specific prompt
            "experiment.specific_prompt": "basic_work_order"  # The prompt that worked in RunPod
        }
    )
    
    # Initialize prompt system
    try:
        from src.prompts import initialize_prompt_system
        
        # Initialize the prompt system
        prompt_init_result = initialize_prompt_system()
        if prompt_init_result["status"] == "success":
            print(f"✅ Prompt system initialized with {prompt_init_result['prompt_count']} prompts")
        else:
            print(f"⚠️ Prompt system initialization issue: {prompt_init_result.get('error', 'Unknown error')}")
        
        # Print experiment configuration summary
        print("\n🧪 Experiment Configuration:")
        experiment_config.print_summary()
        
    except ImportError as e:
        print(f"❌ Error importing prompt system: {e}")
        print("📝 Make sure the src/prompts directory is properly set up")
    
except ImportError as e:
    print(f"❌ Error importing configuration modules: {e}")
    print("📝 This suggests there might be an issue with your project structure or Python path")
    print("💡 Make sure the src/config directory exists and contains the necessary files")
    
except Exception as e:
    print(f"❌ Error initializing configuration: {e}")
    print("💡 Check your YAML configuration files for syntax errors")

In [None]:
# Final verification and setup confirmation
# This cell confirms the environment is ready for experiments

print("🔍 Performing final environment checks...")

# Check for critical data files
gt_path = paths.ground_truth_path
if os.path.exists(gt_path):
    # Load and show basic info about ground truth data
    try:
        ground_truth_df = pd.read_csv(gt_path)
        print(f"✅ Ground truth data loaded: {len(ground_truth_df)} records")
        print(f"   Columns: {', '.join(ground_truth_df.columns[:5])}{'...' if len(ground_truth_df.columns) > 5 else ''}")
    except Exception as e:
        print(f"⚠️ Ground truth file exists but couldn't be loaded: {e}")
else:
    print(f"⚠️ Ground truth file not found at: {gt_path}")
    print("   You'll need to add this before running experiments.")

# Check for images
image_paths = list(paths.get_image_paths())
if image_paths:
    print(f"✅ Found {len(image_paths)} invoice images")
else:
    print("⚠️ No invoice images found in data directory")
    print("   You'll need to add images before running experiments.")

# Save setup confirmation
setup_info = {
    "timestamp": datetime.now().isoformat(),
    "environment": env_config.environment,
    "python_version": platform.python_version(),
    "pytorch_version": torch.__version__,
    "transformers_version": transformers.__version__,
    "gpu_available": torch.cuda.is_available(),
    "images_found": len(image_paths),
    "ground_truth_records": len(ground_truth_df) if 'ground_truth_df' in locals() else 0
}

# Save setup information for future reference
setup_info_path = paths.get_results_path("setup_info.json")
import json
with open(setup_info_path, "w") as f:
    json.dump(setup_info, f, indent=2)

print(f"\n✨ Setup information saved to: {setup_info_path}")
print("\n📋 Environment setup complete!")
print("   You're now ready to run experiments in the execution notebook.")

### Prompt Selection
The following cell selects an initial, basic prompt for loading into the experiment. 

In [None]:
# Prompt Selection and Verification
# This cell selects and formats the specific prompt for our experiment

try:
    # Import prompt utilities
    from src.prompts import get_prompt, format_prompt, analyze_prompt
    
    # Get the specific prompt that worked in RunPod
    prompt_name = experiment_config.get("experiment.specific_prompt", "basic_work_order")
    selected_prompt = get_prompt(prompt_name)
    
    if selected_prompt:
        print(f"✅ Selected prompt: \"{selected_prompt.name}\"")
        print(f"   Category: {selected_prompt.category}")
        print(f"   Field: {selected_prompt.field_to_extract}")
        print(f"   Prompt text: \"{selected_prompt.text}\"")
        
        # Format the prompt for the Pixtral model
        model_name = experiment_config.get("experiment.model_name", "pixtral-12b")
        formatted_prompt = format_prompt(selected_prompt, model_name)
        
        print(f"\n📝 Formatted for {model_name}:")
        print(f"   \"{formatted_prompt}\"")
        
        # Analyze the prompt
        prompt_analysis = analyze_prompt(selected_prompt)
        print(f"\n🔍 Prompt Analysis:")
        print(f"   Word count: {prompt_analysis['word_count']}")
        print(f"   Character count: {prompt_analysis['character_count']}")
        print(f"   Complexity score: {prompt_analysis['complexity_score']:.2f}")
        
        # Store for later use
        prompt_info = {
            "name": selected_prompt.name,
            "text": selected_prompt.text,
            "formatted_text": formatted_prompt,
            "category": selected_prompt.category,
            "field_to_extract": selected_prompt.field_to_extract,
            "analysis": prompt_analysis
        }
        
    else:
        print(f"❌ Error: Prompt '{prompt_name}' not found in registry")
        print("   Please check your prompt configuration and registry initialization")
        prompt_info = None
        
except ImportError as e:
    print(f"❌ Error importing prompt modules: {e}")
    print("   Make sure your prompt management system is properly installed")
    prompt_info = None
    
except Exception as e:
    print(f"❌ Error during prompt selection: {e}")
    prompt_info = None

Final Verification check

In [None]:
# Final verification and setup confirmation
# This cell confirms the environment is ready for experiments

print("🔍 Performing final environment checks...")

# Check for critical data files
gt_path = paths.ground_truth_path
if os.path.exists(gt_path):
    # Load and show basic info about ground truth data
    try:
        ground_truth_df = pd.read_csv(gt_path)
        print(f"✅ Ground truth data loaded: {len(ground_truth_df)} records")
        print(f"   Columns: {', '.join(ground_truth_df.columns[:5])}{'...' if len(ground_truth_df.columns) > 5 else ''}")
    except Exception as e:
        print(f"⚠️ Ground truth file exists but couldn't be loaded: {e}")
else:
    print(f"⚠️ Ground truth file not found at: {gt_path}")
    print("   You'll need to add this before running experiments.")

# Check for images
image_paths = list(paths.get_image_paths())
if image_paths:
    print(f"✅ Found {len(image_paths)} invoice images")
    # Display a sample image path
    if len(image_paths) > 0:
        print(f"   Sample image: {image_paths[0].name}")
else:
    print("⚠️ No invoice images found in data directory")
    print("   You'll need to add images before running experiments.")

# Verify prompt selection was successful
if 'prompt_info' in locals() and prompt_info is not None:
    print(f"✅ Prompt selected and formatted successfully: {prompt_info['name']}")
else:
    print("⚠️ Prompt selection failed. Check previous cell for errors.")

# Save setup confirmation with complete experiment details
setup_info = {
    "timestamp": datetime.now().isoformat(),
    "environment": env_config.environment,
    "python_version": platform.python_version(),
    "pytorch_version": torch.__version__},
    "transformers_version": transformers.__version__,
    "gpu_available": torch.cuda.is_available(),
    "images_found": len(image_paths),
    "ground_truth_records": len(ground_truth_df) if 'ground_truth_df' in locals() else 0,
    # Add experiment details
    "experiment_name": experiment_name,
    "experiment_type": experiment_config.experiment_type,
    "model_name": experiment_config.get("experiment.model_name", "unknown")
}

# Add prompt information if available
if 'prompt_info' in locals() and prompt_info is not None:
    setup_info["prompt"] = {
        "name": prompt_info["name"],
        "text": prompt_info["text"],
        "category": prompt_info["category"],
        "field_to_extract": prompt_info["field_to_extract"],
        "word_count": prompt_info["analysis"]["word_count"]
    }

# Save setup information for future reference
setup_info_path = paths.get_results_path("setup_info.json")
import json
with open(setup_info_path, "w") as f:
    json.dump(setup_info, f, indent=2)

print(f"\n✨ Setup information saved to: {setup_info_path}")
print("\n📋 Environment setup complete!")
print("   You're now ready to proceed with the model loading and extraction test.")

## Run the Test on the Model
### Model Loading

In [None]:
# Model Loading Cell
# This cell loads the Pixtral-12B model using our model loading framework

try:
    # Import model loading utilities
    from src.models.loader import load_model_and_processor, get_gpu_memory_info, verify_gpu_compatibility
    
    # Get model name from experiment config
    model_name = experiment_config.get("experiment.model_name", "pixtral-12b")
    
    # Check GPU compatibility before attempting to load
    compatibility = verify_gpu_compatibility(model_name)
    if not compatibility["compatible"]:
        print(f"❌ Warning: {compatibility['reason']}")
        print(f"   Model requirements: {compatibility['model_requirements']}")
        print(f"   Current GPU: {compatibility['current_gpu']}")
        print("   Proceeding anyway, but may encounter memory issues")
    else:
        print(f"✅ GPU compatible with {model_name} requirements")
    
    # Display pre-loading memory state
    if torch.cuda.is_available():
        pre_memory = get_gpu_memory_info()
        print(f"🧠 Pre-loading GPU memory: {pre_memory['allocated_memory_gb']:.2f} GB / {pre_memory['total_memory_gb']:.2f} GB")
    
    # Load the model with appropriate configuration
    print(f"⏳ Loading {model_name}... (this may take a minute)")
    model, processor = load_model_and_processor(
        model_name=model_name,
        # Optional: uncomment the following lines to configure model loading
        # quantization="4bit",  # Quantization strategy (None, "4bit", "8bit")
        # cache_dir=paths.model_cache_dir  # Specify cache directory
    )
    
    # Display post-loading memory state
    if torch.cuda.is_available():
        post_memory = get_gpu_memory_info()
        print(f"🧠 Post-loading GPU memory: {post_memory['allocated_memory_gb']:.2f} GB / {post_memory['total_memory_gb']:.2f} GB")
        print(f"   Model memory usage: {post_memory['allocated_memory_gb'] - pre_memory['allocated_memory_gb']:.2f} GB")
    
    # Store model info for later reference
    model_info = {
        "name": model_name,
        "device": model.device,
        "dtype": str(next(model.parameters()).dtype),
        "loaded_at": datetime.now().isoformat()
    }
    
    print(f"✅ Successfully loaded {model_name}")
    print(f"   Model dtype: {model_info['dtype']}")
    print(f"   Model device: {model_info['device']}")
    
except Exception as e:
    print(f"❌ Error loading model: {e}")
    print("   Check your model configuration and GPU setup")
    # Set model/processor to None if loading failed
    model, processor = None, None

Prepare Ground Truth for Testing

In [None]:
# Ground Truth Mapping Cell
# This cell creates a mapping between image IDs and ground truth values

try:
    # Import needed utilities
    from src.execution.batch import prepare_batch_items
    
    print("🔄 Preparing ground truth mapping...")
    
    # Define the field to extract from experiment config
    field_type = experiment_config.get("experiment.field_to_extract", "work_order")
    
    # Determine the column name in the CSV for the field type
    field_mapping = {
        "work_order": "Work Order Number/Numero de Orden",
        "cost": "Total",
        # Add more field types as needed
    }
    
    field_column = field_mapping.get(field_type, field_type)
    image_id_column = "Invoice"  # Column containing image IDs
    
    print(f"📊 Using field column: '{field_column}' for {field_type} extraction")
    
    # Create mapping from image ID to ground truth
    ground_truth_mapping = {}
    unmatched_images = []
    
    # Ensure ground truth data is loaded
    if 'ground_truth_df' not in locals() or ground_truth_df is None:
        ground_truth_df = pd.read_csv(paths.ground_truth_path)
        print(f"📂 Loaded ground truth data: {len(ground_truth_df)} records")
    
    # Create the mapping
    for _, row in ground_truth_df.iterrows():
        # Convert image ID to string and remove any file extension
        image_id = str(row[image_id_column])
        image_id = Path(image_id).stem  # Get just the filename without extension
        
        # Store the ground truth value
        if field_column in row:
            # Convert to string and strip whitespace
            ground_truth_mapping[image_id] = str(row[field_column]).strip()
        else:
            print(f"⚠️ Field column '{field_column}' not found in row for image {image_id}")
    
    # Get list of image paths
    image_paths = list(paths.get_image_paths())
    
    # Check if all images have ground truth
    for img_path in image_paths:
        img_id = img_path.stem
        if img_id not in ground_truth_mapping:
            unmatched_images.append(img_id)
    
    # Prepare structured batch items for processing
    batch_items = prepare_batch_items(image_paths, ground_truth_mapping)
    
    # Display statistics
    print(f"✅ Created ground truth mapping for {len(ground_truth_mapping)} images")
    print(f"📸 Total images available: {len(image_paths)}")
    print(f"🔄 Prepared {len(batch_items)} items for processing")
    
    if unmatched_images:
        print(f"⚠️ Found {len(unmatched_images)} images without ground truth data")
        if len(unmatched_images) < 10:
            print(f"   Unmatched images: {', '.join(unmatched_images)}")
        else:
            print(f"   First 10 unmatched images: {', '.join(unmatched_images[:10])}...")
    else:
        print("✅ All images have matching ground truth data")
    
    # Display a few examples from the mapping
    print("\n📋 Sample of ground truth mapping:")
    for i, (img_id, gt_value) in enumerate(list(ground_truth_mapping.items())[:5]):
        print(f"   {img_id}: '{gt_value}'")
    
    # Store mapping info for later reference
    mapping_info = {
        "total_images": len(image_paths),
        "mapped_images": len(ground_truth_mapping),
        "unmapped_images": len(unmatched_images),
        "field_type": field_type,
        "field_column": field_column
    }
    
except Exception as e:
    print(f"❌ Error creating ground truth mapping: {e}")
    print("   Check your ground truth CSV file and field column names")
    # Create empty mapping in case of error
    ground_truth_mapping = {}
    batch_items = []
    mapping_info = {"error": str(e)}

Initialize the Pipeline

In [None]:
# Pipeline Initialization Cell
# This cell initializes the extraction pipeline with experiment settings

try:
    # Import the extraction pipeline
    from src.execution.pipeline import ExtractionPipeline
    
    print("🚀 Initializing extraction pipeline...")
    
    # Gather experiment parameters
    experiment_params = {
        "experiment_name": experiment_name,
        "model_name": experiment_config.get("experiment.model_name", "pixtral-12b"),
        "field_to_extract": experiment_config.get("experiment.field_to_extract", "work_order"),
        "prompt_name": prompt_info["name"] if prompt_info else None,
        "prompt_category": experiment_config.get("experiment.prompt_category", "specific"),
        "description": experiment_config.get("experiment.description", "Single model test"),
    }
    
    # Set up the pipeline configuration
    pipeline_config = {
        # General experiment settings
        "experiment_name": experiment_params["experiment_name"],
        "model_name": experiment_params["model_name"],
        "field_to_extract": experiment_params["field_to_extract"],
        "prompt_name": experiment_params["prompt_name"],
        "prompt_category": experiment_params["prompt_category"],
        "description": experiment_params["description"],
        
        # Batch processing settings
        "batch_processing": {
            "auto_batch_size": True,  # Estimate optimal batch size
            "max_batch_size": 8,      # Maximum batch size to consider
            "default_batch_size": 1,  # Default if auto-estimation fails
            "optimize_between_batches": True  # Clean up memory between batches
        },
        
        # Checkpointing settings
        "enable_checkpointing": True,
        "checkpoint_frequency": 5,  # Save checkpoint after every 5 batches
        "resume_from_checkpoint": True,  # Resume from existing checkpoint if available
        
        # Output settings
        "show_progress": True,
        "metrics": ["exact_match", "character_error_rate"],
        
        # Additional settings specific to this experiment
        "timestamp": datetime.now().isoformat(),
        "environment": env_config.environment
    }
    
    # Initialize the pipeline
    pipeline = ExtractionPipeline(experiment_name=experiment_params["experiment_name"])
    
    # Update pipeline config
    for key, value in pipeline_config.items():
        pipeline.config[key] = value
    
    # Set the model and processor if already loaded
    if 'model' in locals() and model is not None and 'processor' in locals() and processor is not None:
        pipeline.model = model
        pipeline.processor = processor
        print("✅ Using pre-loaded model and processor")
    
    # Set the ground truth mapping if already created
    if 'ground_truth_mapping' in locals() and ground_truth_mapping:
        pipeline.ground_truth_mapping = ground_truth_mapping
        print(f"✅ Using pre-created ground truth mapping with {len(ground_truth_mapping)} entries")
    
    # Get the prompt that will be used
    if prompt_info:
        from src.prompts.registry import get_prompt
        pipeline_prompt = get_prompt(prompt_info["name"])
        print(f"✅ Using prompt: {pipeline_prompt.name}")
        print(f"   Prompt text: \"{pipeline_prompt.text}\"")
    else:
        pipeline_prompt = None
        print("⚠️ No specific prompt selected, pipeline will use default")
    
    # Save the pipeline configuration
    config_path = paths.get_results_path("pipeline_config.json")
    with open(config_path, "w") as f:
        json.dump(pipeline.config, f, indent=2)
    
    print(f"💾 Saved pipeline configuration to {config_path}")
    print("✅ Pipeline initialization complete!")
    
except Exception as e:
    print(f"❌ Error initializing pipeline: {e}")
    print("   Check your pipeline implementation and configuration")
    pipeline = None
    pipeline_prompt = None

Test on just a single image

In [None]:
# Single Image Test Cell
# This cell tests the pipeline with a single image before running the full extraction

try:
    if pipeline is None:
        raise ValueError("Pipeline not initialized. Run the previous cell first.")
    
    print("🧪 Testing extraction on a single image...")
    
    # Select a sample image
    sample_images = list(paths.get_image_paths())[:5]  # Get first 5 images
    if not sample_images:
        raise ValueError("No images found. Add images to your data directory.")
    
    # Choose the first image that has ground truth data
    sample_image = None
    for img_path in sample_images:
        img_id = img_path.stem
        if img_id in pipeline.ground_truth_mapping:
            sample_image = img_path
            break
    
    if sample_image is None:
        # If no image with ground truth found, just use the first image
        sample_image = sample_images[0]
        print(f"⚠️ No image with ground truth found in first 5 images, using: {sample_image.name}")
    else:
        print(f"✅ Selected test image: {sample_image.name}")
    
    # Get ground truth for the sample image
    sample_id = sample_image.stem
    ground_truth = pipeline.ground_truth_mapping.get(sample_id, "Unknown")
    print(f"📋 Ground truth value: '{ground_truth}'")
    
    # Create a test item
    test_item = {
        "image_id": sample_id,
        "image_path": str(sample_image),
        "ground_truth": ground_truth
    }
    
    # Get the model and processor from pipeline or load them
    if pipeline.model is None or pipeline.processor is None:
        print("⏳ Loading model for test...")
        model, processor = pipeline.setup_model(model_name=pipeline.config.get("model_name", "pixtral-12b"))
    else:
        model, processor = pipeline.model, pipeline.processor
    
    # Get the prompt
    if pipeline_prompt is None:
        print("⏳ Getting prompt from registry...")
        prompt = pipeline.get_experiment_prompt()
    else:
        prompt = pipeline_prompt
    
    print(f"🔍 Running extraction with prompt: {getattr(prompt, 'name', 'custom')}")
    
    # Import the single-image processor
    from src.execution.inference import process_image_with_metrics
    
    # Process the single image
    print("⏳ Processing image... (This may take a few seconds)")
    result = process_image_with_metrics(
        image_path=test_item["image_path"],
        ground_truth=test_item["ground_truth"],
        prompt=prompt,
        model_name=pipeline.config.get("model_name", "pixtral-12b"),
        field_type=pipeline.config.get("field_to_extract", "work_order"),
        model=model,
        processor=processor,
        metrics=["exact_match", "character_error_rate"]
    )
    
    # Display detailed results
    print("\n📊 Extraction Results:")
    print(f"   Raw extraction: '{result['raw_extraction']}'")
    print(f"   Processed text: '{result['processed_extraction']}'")
    print(f"   Ground truth:   '{result['ground_truth']}'")
    print(f"   Exact match:     {result['exact_match']}")
    print(f"   Character error: {result['character_error_rate']:.4f}")
    print(f"   Processing time: {result['processing_time']:.2f} seconds")
    
    # Save the test result
    test_result_path = paths.get_results_path("single_image_test_result.json")
    with open(test_result_path, "w") as f:
        json.dump(result, f, indent=2)
    
    print(f"💾 Test result saved to {test_result_path}")
    
    # Store the test result for later reference
    single_test_result = result
    
    # Provide assessment of test
    if result["exact_match"]:
        print("✅ Test successful! Extraction matches ground truth.")
        print("   Ready to proceed with full extraction.")
    else:
        print("⚠️ Test extraction does not exactly match ground truth.")
        print("   Review the results to determine if this is acceptable.")
        if result["character_error_rate"] < 0.3:
            print("   Character error rate is relatively low, may still produce useful results.")
        else:
            print("   Character error rate is high, consider reviewing prompt or model.")
    
except Exception as e:
    print(f"❌ Error during single image test: {e}")
    print("   Fix the error before proceeding to full extraction.")
    single_test_result = {"error": str(e)}

Full Test Run

In [None]:
# Full Extraction Run Cell
# This cell runs the complete extraction pipeline on all images

try:
    if pipeline is None:
        raise ValueError("Pipeline not initialized. Run the pipeline initialization cell first.")
    
    # Check if we have at least a successful test before proceeding
    if 'single_test_result' not in locals() or 'error' in single_test_result:
        print("⚠️ Warning: No successful single image test found.")
        proceed = input("Do you want to proceed with full extraction anyway? (y/n): ").strip().lower()
        if proceed != 'y':
            raise ValueError("Full extraction aborted by user. Run the single image test first.")
        print("Proceeding with full extraction...")
    
    print("\n🚀 Starting full extraction pipeline...")
    
    # Determine how many images to process
    image_count = len(list(paths.get_image_paths()))
    limit = None  # Process all images by default
    
    # Optional: Uncomment to limit processing to a subset of images
    # limit = 10  # Set to None to process all images
    
    if limit:
        print(f"ℹ️ Processing limited to first {limit} images (out of {image_count} total)")
    else:
        print(f"ℹ️ Processing all {image_count} images")
    
    # Set up checkpoint path for resumability
    checkpoint_path = paths.get_results_path("extraction_checkpoint.json")
    
    # Record start time for benchmarking
    start_time = time.time()
    
    # Display configuration summary
    print("\n📋 Extraction Configuration:")
    print(f"   Model: {pipeline.config.get('model_name', 'pixtral-12b')}")
    print(f"   Field: {pipeline.config.get('field_to_extract', 'work_order')}")
    print(f"   Prompt: {getattr(pipeline_prompt, 'name', 'default')}")
    print(f"   Checkpointing: {'Enabled' if pipeline.config.get('enable_checkpointing', True) else 'Disabled'}")
    print(f"   Auto batch size: {'Enabled' if pipeline.config.get('batch_processing', {}).get('auto_batch_size', True) else 'Disabled'}")
    
    # Run the extraction pipeline
    print("\n⏳ Running extraction (this may take some time)...")
    results = pipeline.run_extraction(
        field_type=pipeline.config.get("field_to_extract", "work_order"),
        prompt=pipeline_prompt,
        batch_size=None,  # Auto-determine batch size
        checkpoint_path=checkpoint_path,
        metrics=["exact_match", "character_error_rate"],
        limit=limit  # Limit the number of images if specified
    )
    
    # Calculate total processing time
    total_time = time.time() - start_time
    images_processed = len(results)
    
    print(f"\n✅ Extraction complete!")
    print(f"   Processed {images_processed} images in {total_time:.2f} seconds")
    print(f"   Average time per image: {total_time/images_processed:.2f} seconds")
    
    # Get quick summary statistics
    exact_matches = sum(1 for r in results if r.get("exact_match", False))
    accuracy = (exact_matches / images_processed * 100) if images_processed > 0 else 0
    
    print(f"\n📊 Quick Results Summary:")
    print(f"   Exact match accuracy: {accuracy:.2f}% ({exact_matches}/{images_processed})")
    
    # Store results for later analysis
    extraction_results = results
    
except Exception as e:
    print(f"\n❌ Error during extraction: {e}")
    print("   Check the error and previous cells before retrying")
    extraction_results = []

Report on any error in processing

In [None]:
# After running the extraction
if extraction_results:
    # Check for errors in results
    errors = [r for r in extraction_results if "error" in r]
    if errors:
        print(f"\n⚠️ Warning: {len(errors)} images failed during extraction")
        print("   Top 3 errors:")
        for i, error in enumerate(errors[:3]):
            print(f"      {i+1}. Image: {error.get('image_id', 'unknown')}")
            print(f"         Error: {error.get('error', 'Unknown error')}")
    else:
        print("✅ All images processed successfully without errors")

## Analysis
Basic Analysis

In [None]:
# Results Analysis Cell
# This cell analyzes the extraction results and computes summary metrics

try:
    if 'extraction_results' not in locals() or not extraction_results:
        raise ValueError("No extraction results found. Run the extraction cell first.")
    
    print("📊 Analyzing extraction results...")
    
    # If the pipeline has results, use those - otherwise use the extraction_results variable
    results = pipeline.results if hasattr(pipeline, 'results') and pipeline.results else extraction_results
    
    # Get the field type and model name for reference
    field_type = pipeline.config.get("field_to_extract", "work_order")
    model_name = pipeline.config.get("model_name", "pixtral-12b")
    
    # Calculate basic metrics
    total_images = len(results)
    exact_matches = sum(1 for r in results if r.get("exact_match", False))
    exact_match_accuracy = (exact_matches / total_images * 100) if total_images > 0 else 0
    
    # Calculate character error rate statistics
    all_cer = [r.get("character_error_rate", 1.0) for r in results]
    avg_cer = sum(all_cer) / total_images if total_images > 0 else 1.0
    min_cer = min(all_cer) if all_cer else 1.0
    max_cer = max(all_cer) if all_cer else 1.0
    
    # Calculate timing statistics
    all_times = [r.get("processing_time", 0.0) for r in results]
    avg_time = sum(all_times) / total_images if total_images > 0 else 0.0
    min_time = min(all_times) if all_times else 0.0
    max_time = max(all_times) if all_times else 0.0
    total_time = sum(all_times)
    
    # Identify errors
    errors = [r for r in results if "error" in r]
    error_count = len(errors)
    
    # Find examples of failures (non-exact matches with highest CER)
    failures = [r for r in results if not r.get("exact_match", False) and "error" not in r]
    failures.sort(key=lambda x: x.get("character_error_rate", 0.0), reverse=True)  # Sort by highest CER
    
    # Find examples of successes
    successes = [r for r in results if r.get("exact_match", True) and "error" not in r]
    
    # Create comprehensive summary
    summary = {
        "experiment_name": pipeline.experiment_name,
        "timestamp": datetime.now().isoformat(),
        "model_name": model_name,
        "field_type": field_type,
        "accuracy": {
            "total_images": total_images,
            "exact_match_count": exact_matches,
            "exact_match_accuracy": round(exact_match_accuracy, 2),
            "error_count": error_count
        },
        "character_error_rate": {
            "average": round(avg_cer, 4),
            "minimum": round(min_cer, 4),
            "maximum": round(max_cer, 4)
        },
        "timing": {
            "average_seconds": round(avg_time, 2),
            "minimum_seconds": round(min_time, 2),
            "maximum_seconds": round(max_time, 2),
            "total_seconds": round(total_time, 2)
        }
    }
    
    # Print summary report
    print("\n📈 Extraction Results Summary:")
    print(f"   Model: {model_name}")
    print(f"   Field: {field_type}")
    print(f"   Total images processed: {total_images}")
    print(f"   Exact match accuracy: {exact_match_accuracy:.2f}% ({exact_matches}/{total_images})")
    print(f"   Average character error rate: {avg_cer:.4f}")
    print(f"   Processing errors: {error_count}")
    print(f"   Average processing time: {avg_time:.2f} seconds per image")
    print(f"   Total processing time: {total_time:.2f} seconds")
    
    # Display examples of failures and successes
    print("\n🔍 Analysis of Non-Exact Matches:")
    if failures:
        print(f"   Found {len(failures)} non-exact matches")
        print("   Top 3 worst extractions:")
        for i, failure in enumerate(failures[:3]):
            print(f"      {i+1}. Image: {failure.get('image_id', 'unknown')}")
            print(f"         Ground truth:   '{failure.get('ground_truth', '')}'")
            print(f"         Extracted text: '{failure.get('processed_extraction', '')}'")
            print(f"         Character error rate: {failure.get('character_error_rate', 1.0):.4f}")
    else:
        print("   No non-exact matches found! Perfect accuracy. 🎉")
    
    # Save detailed results to raw directory
    os.makedirs(paths.raw_dir, exist_ok=True)
    raw_results_path = paths.get_raw_path(f"{model_name}_{field_type}_results.json")
    with open(raw_results_path, "w") as f:
        json.dump(results, f, indent=2)
    print(f"\n💾 Raw results saved to {raw_results_path}")
    
    # Save summary to processed directory
    os.makedirs(paths.processed_dir, exist_ok=True)
    summary_path = paths.get_processed_path(f"{model_name}_{field_type}_summary.json")
    with open(summary_path, "w") as f:
        json.dump(summary, f, indent=2)
    print(f"💾 Summary saved to {summary_path}")
    
    # Save failures analysis to processed directory
    if failures:
        failures_path = paths.get_processed_path(f"{model_name}_{field_type}_failures.json")
        with open(failures_path, "w") as f:
            json.dump({
                "timestamp": datetime.now().isoformat(),
                "model_name": model_name,
                "field_type": field_type,
                "total_failures": len(failures),
                "failures": failures[:10]  # Save top 10 worst failures
            }, f, indent=2)
        print(f"💾 Failure analysis saved to {failures_path}")
    
    # Store analysis results for later use
    analysis_results = {
        "summary": summary,
        "failures": failures,
        "successes": successes,
        "errors": errors
    }
    
except Exception as e:
    print(f"❌ Error analyzing results: {e}")
    print("   Check that extraction results are available and valid")
    analysis_results = {"error": str(e)}

Visualization

In [None]:
# Visualization Cell
# This cell creates visualizations of the extraction results

try:
    if 'analysis_results' not in locals() or 'summary' not in analysis_results:
        raise ValueError("No analysis results found. Run the analysis cell first.")
    
    import matplotlib.pyplot as plt
    import pandas as pd
    import numpy as np
    from matplotlib.colors import LinearSegmentedColormap
    
    print("📊 Creating visualizations of extraction results...")
    
    # Get key information from analysis
    summary = analysis_results["summary"]
    total_images = summary["accuracy"]["total_images"]
    exact_matches = summary["accuracy"]["exact_match_count"]
    model_name = summary["model_name"]
    field_type = summary["field_type"]
    
    # Create visualizations directory if it doesn't exist
    os.makedirs(paths.visualizations_dir, exist_ok=True)
    
    # 1. Create accuracy bar chart
    plt.figure(figsize=(10, 6))
    categories = ['Exact Match', 'Non-Match']
    counts = [exact_matches, total_images - exact_matches]
    colors = ['#2ecc71', '#e74c3c']
    
    plt.bar(categories, counts, color=colors)
    plt.title(f'Extraction Accuracy for {field_type} using {model_name}', fontsize=14)
    plt.ylabel('Number of Images', fontsize=12)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    # Add count and percentage labels on bars
    for i, count in enumerate(counts):
        percentage = (count / total_images) * 100
        plt.text(i, count/2, f"{count}\n({percentage:.1f}%)", 
                 ha='center', va='center', color='white', fontweight='bold')
    
    plt.tight_layout()
    accuracy_chart_path = paths.get_visualization_path("accuracy_chart.png")
    plt.savefig(accuracy_chart_path)
    plt.close()
    
    # 2. Create character error rate histogram
    if 'extraction_results' in locals() and extraction_results:
        plt.figure(figsize=(10, 6))
        all_cer = [r.get("character_error_rate", 1.0) for r in extraction_results 
                  if "error" not in r]  # Exclude error cases
        
        plt.hist(all_cer, bins=20, color='#3498db', alpha=0.7, edgecolor='black')
        plt.title(f'Character Error Rate Distribution for {field_type}', fontsize=14)
        plt.xlabel('Character Error Rate', fontsize=12)
        plt.ylabel('Number of Images', fontsize=12)
        plt.grid(axis='y', linestyle='--', alpha=0.5)
        plt.axvline(x=np.mean(all_cer), color='#e74c3c', linestyle='--', 
                    label=f'Mean: {np.mean(all_cer):.4f}')
        plt.legend()
        
        plt.tight_layout()
        cer_chart_path = paths.get_visualization_path("character_error_rate_histogram.png")
        plt.savefig(cer_chart_path)
        plt.close()
    
    # 3. Create processing time chart
    plt.figure(figsize=(10, 6))
    all_times = [r.get("processing_time", 0.0) for r in extraction_results 
                if "error" not in r]  # Exclude error cases
    
    plt.hist(all_times, bins=20, color='#9b59b6', alpha=0.7, edgecolor='black')
    plt.title(f'Processing Time Distribution for {field_type}', fontsize=14)
    plt.xlabel('Processing Time (seconds)', fontsize=12)
    plt.ylabel('Number of Images', fontsize=12)
    plt.grid(axis='y', linestyle='--', alpha=0.5)
    plt.axvline(x=np.mean(all_times), color='#e74c3c', linestyle='--', 
                label=f'Mean: {np.mean(all_times):.2f}s')
    plt.legend()
    
    plt.tight_layout()
    time_chart_path = paths.get_visualization_path("processing_time_histogram.png")
    plt.savefig(time_chart_path)
    plt.close()
    
    # 4. Create comparison table of failures
    if analysis_results.get("failures"):
        top_failures = analysis_results["failures"][:10]  # Top 10 failures
        
        failure_data = []
        for f in top_failures:
            failure_data.append({
                "Image ID": f.get("image_id", "unknown"),
                "Ground Truth": f.get("ground_truth", ""),
                "Extraction": f.get("processed_extraction", ""),
                "CER": f.get("character_error_rate", 1.0)
            })
        
        failure_df = pd.DataFrame(failure_data)
        
        # Create a visually appealing HTML table
        html_table = failure_df.to_html(index=False, classes="table table-striped")
        styled_html = f"""
        <html>
        <head>
            <style>
                .table {{
                    width: 100%;
                    border-collapse: collapse;
                    font-family: Arial, sans-serif;
                }}
                .table-striped tr:nth-child(even) {{
                    background-color: #f2f2f2;
                }}
                th {{
                    background-color: #4CAF50;
                    color: white;
                    padding: 12px;
                    text-align: left;
                }}
                td {{
                    padding: 8px;
                    text-align: left;
                    border-bottom: 1px solid #ddd;
                }}
                .title {{
                    text-align: center;
                    font-size: 24px;
                    margin: 20px 0;
                    font-family: Arial, sans-serif;
                }}
            </style>
        </head>
        <body>
            <div class="title">Top Extraction Failures</div>
            {html_table}
        </body>
        </html>
        """
        
        failure_table_path = paths.get_visualization_path("top_failures.html")
        with open(failure_table_path, "w") as f:
            f.write(styled_html)
    
    # 5. Create model info visualization if metadata is available
    if any("metadata" in r for r in extraction_results):
        # Try to gather memory usage data across records
        memory_data = []
        for r in extraction_results:
            if "metadata" in r and "gpu_info" in r["metadata"]:
                gpu_info = r["metadata"]["gpu_info"]
                if "allocated_memory_gb" in gpu_info and "total_memory_gb" in gpu_info:
                    memory_data.append({
                        "image_id": r.get("image_id", "unknown"),
                        "allocated_gb": gpu_info["allocated_memory_gb"],
                        "total_gb": gpu_info["total_memory_gb"],
                        "utilization": (gpu_info["allocated_memory_gb"] / gpu_info["total_memory_gb"]) * 100
                    })
        
        if memory_data:
            # Create memory usage chart
            plt.figure(figsize=(12, 6))
            df = pd.DataFrame(memory_data)
            plt.plot(range(len(df)), df["allocated_gb"], 'b-', label="Allocated Memory (GB)")
            plt.axhline(y=df["total_gb"].iloc[0], color='r', linestyle='--', 
                       label=f"Total GPU Memory: {df['total_gb'].iloc[0]:.1f} GB")
            plt.xlabel("Image Index")
            plt.ylabel("GPU Memory (GB)")
            plt.title(f"GPU Memory Usage During {model_name} Extraction")
            plt.legend()
            plt.grid(True, alpha=0.3)
            plt.tight_layout()
            
            memory_chart_path = paths.get_visualization_path("gpu_memory_usage.png")
            plt.savefig(memory_chart_path)
            plt.close()
    
    # 6. Create a summary dashboard HTML file
    dashboard_html = f"""
    <html>
    <head>
        <title>Extraction Results Dashboard - {model_name}</title>
        <style>
            body {{
                font-family: Arial, sans-serif;
                margin: 0;
                padding: 20px;
                background-color: #f5f5f5;
            }}
            .container {{
                max-width: 1200px;
                margin: 0 auto;
                background-color: white;
                padding: 20px;
                border-radius: 5px;
                box-shadow: 0 0 10px rgba(0,0,0,0.1);
            }}
            h1, h2 {{
                color: #333;
            }}
            .metrics {{
                display: flex;
                flex-wrap: wrap;
                margin: 20px 0;
            }}
            .metric-card {{
                background-color: white;
                border-radius: 5px;
                box-shadow: 0 0 5px rgba(0,0,0,0.1);
                padding: 15px;
                margin: 10px;
                min-width: 200px;
                flex: 1;
            }}
            .metric-title {{
                font-size: 14px;
                color: #666;
                margin-bottom: 5px;
            }}
            .metric-value {{
                font-size: 24px;
                font-weight: bold;
                color: #333;
            }}
            .chart-container {{
                margin: 20px 0;
                text-align: center;
            }}
            .chart {{
                max-width: 100%;
                height: auto;
                border: 1px solid #ddd;
                border-radius: 5px;
            }}
        </style>
    </head>
    <body>
        <div class="container">
            <h1>Extraction Results Dashboard</h1>
            <p>
                <strong>Model:</strong> {model_name}<br>
                <strong>Field:</strong> {field_type}<br>
                <strong>Date:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}<br>
                <strong>Total Images:</strong> {total_images}
            </p>
            
            <div class="metrics">
                <div class="metric-card">
                    <div class="metric-title">Accuracy</div>
                    <div class="metric-value">{summary["accuracy"]["exact_match_accuracy"]}%</div>
                </div>
                <div class="metric-card">
                    <div class="metric-title">Character Error Rate</div>
                    <div class="metric-value">{summary["character_error_rate"]["average"]}</div>
                </div>
                <div class="metric-card">
                    <div class="metric-title">Avg Processing Time</div>
                    <div class="metric-value">{summary["timing"]["average_seconds"]}s</div>
                </div>
                <div class="metric-card">
                    <div class="metric-title">Total Processing Time</div>
                    <div class="metric-value">{summary["timing"]["total_seconds"]}s</div>
                </div>
            </div>
            
            <h2>Visualizations</h2>
            
            <div class="chart-container">
                <h3>Accuracy</h3>
                <img class="chart" src="accuracy_chart.png" alt="Accuracy Chart">
            </div>
            
            <div class="chart-container">
                <h3>Character Error Rate Distribution</h3>
                <img class="chart" src="character_error_rate_histogram.png" alt="CER Distribution">
            </div>
            
            <div class="chart-container">
                <h3>Processing Time Distribution</h3>
                <img class="chart" src="processing_time_histogram.png" alt="Processing Time Distribution">
            </div>
        </div>
    </body>
    </html>
    """
    
    dashboard_path = paths.get_visualization_path("dashboard.html")
    with open(dashboard_path, "w") as f:
        f.write(dashboard_html)
    
    # Display a summary of created visualizations
    print("\n✅ Created the following visualizations:")
    print(f"   1. Accuracy Bar Chart: {accuracy_chart_path}")
    if 'cer_chart_path' in locals():
        print(f"   2. Character Error Rate Histogram: {cer_chart_path}")
    print(f"   3. Processing Time Histogram: {time_chart_path}")
    if 'failure_table_path' in locals():
        print(f"   4. Top Failures Table: {failure_table_path}")
    if 'memory_chart_path' in locals():
        print(f"   5. GPU Memory Usage Chart: {memory_chart_path}")
    print(f"   6. Results Dashboard: {dashboard_path}")
    
    # Store visualization paths for reference
    visualization_paths = {
        "accuracy_chart": str(accuracy_chart_path),
        "processing_time_histogram": str(time_chart_path),
        "dashboard": str(dashboard_path)
    }
    
    if 'cer_chart_path' in locals():
        visualization_paths["character_error_rate_histogram"] = str(cer_chart_path)
    
    if 'failure_table_path' in locals():
        visualization_paths["top_failures_table"] = str(failure_table_path)
    
    if 'memory_chart_path' in locals():
        visualization_paths["gpu_memory_usage"] = str(memory_chart_path)
    
except Exception as e:
    print(f"❌ Error creating visualizations: {e}")
    print("   Check that analysis results are available and valid")
    visualization_paths = {"error": str(e)}