# Batch Experiments: Adaptive Integrated Grad-CAM
# =================================================

This notebook runs comprehensive experiments comparing:
- **Adaptive Method** (ours) - dynamically allocates 10-100 steps
- **Fixed-25** - always uses 25 integration steps
- **Fixed-50** - always uses 50 integration steps  
- **Fixed-100** - always uses 100 integration steps

## Metrics Collected:
1. Deletion AUC (lower is better)
2. Insertion AUC (higher is better)
3. Average Drop (lower is better)
4. Computation time (seconds)
5. Steps allocated (for adaptive)
6. Image complexity metrics

## Expected Runtime:
~5 minutes for 20 images on CPU

In [None]:
# Import libraries
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
import pandas as pd
from pathlib import Path
import time
import json
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')

from adaptive_integrated_gradcam import AdaptiveIntegratedGradCAM, BaselineIntegratedGradCAM
from evaluation_metrics import AttributionEvaluator
import matplotlib.pyplot as plt

print("✓ All libraries imported successfully")

In [None]:
# Configuration
DATA_DIR = 'medical_images'
NUM_IMAGES = 20
OUTPUT_DIR = 'results'

# Adaptive method parameters
MIN_STEPS = 10
MAX_STEPS = 100
VARIANCE_THRESHOLD = 0.1
CONVERGENCE_THRESHOLD = 0.05

print("Configuration:")
print(f"  Data directory: {DATA_DIR}")
print(f"  Number of images: {NUM_IMAGES}")
print(f"  Output directory: {OUTPUT_DIR}")
print(f"  Adaptive: min={MIN_STEPS}, max={MAX_STEPS}")

In [None]:
# Setup device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

print("="*80)
print("BATCH EXPERIMENTS: Adaptive Integrated Grad-CAM")
print("="*80)
print(f"\nDevice: {device}")

if device == 'cpu':
    print("⚠ Warning: Running on CPU. This will be slow.")
    print("  Consider using GPU for faster experiments.")
else:
    print(f"✓ Using GPU: {torch.cuda.get_device_name(0)}")

# Create results directory
results_dir = Path(OUTPUT_DIR)
results_dir.mkdir(exist_ok=True)
print(f"\nResults directory: {results_dir.absolute()}")

In [None]:
# Load model
print("\nLoading model...")

model = models.resnet50(weights='IMAGENET1K_V1')
target_layer = model.layer4[-1]
model.eval()
model.to(device)

print("✓ Model loaded: ResNet-50")
print(f"  Parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"  Target layer: layer4[-1]")

In [None]:
# Initialize methods
print("\nInitializing attribution methods...")

methods_dict = {
    'adaptive': {
        'method': AdaptiveIntegratedGradCAM(
            model, target_layer,
            min_steps=MIN_STEPS,
            max_steps=MAX_STEPS,
            variance_threshold=VARIANCE_THRESHOLD,
            convergence_threshold=CONVERGENCE_THRESHOLD
        ),
        'use_adaptive': True
    },
    'fixed_25': {
        'method': BaselineIntegratedGradCAM(model, target_layer, num_steps=25),
        'num_steps': 25
    },
    'fixed_50': {
        'method': BaselineIntegratedGradCAM(model, target_layer, num_steps=50),
        'num_steps': 50
    },
    'fixed_100': {
        'method': BaselineIntegratedGradCAM(model, target_layer, num_steps=100),
        'num_steps': 100
    }
}

print(f"✓ Methods initialized:")
for name in methods_dict.keys():
    print(f"  - {name}")

# Initialize evaluator
evaluator = AttributionEvaluator(model, device)
print("✓ Evaluator initialized")

In [None]:
# Helper functions
def load_and_preprocess_image(image_path, device='cpu'):
    """Load and preprocess image for ResNet."""
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])
    
    try:
        image = Image.open(image_path).convert('RGB')
        image_tensor = transform(image).unsqueeze(0).to(device)
        return image_tensor, image, True
    except Exception as e:
        print(f"Error loading {image_path}: {e}")
        return None, None, False

def calculate_image_complexity(image_pil):
    """Calculate image complexity metrics."""
    import cv2
    
    img_array = np.array(image_pil)
    
    # Calculate entropy
    hist, _ = np.histogram(img_array.flatten(), bins=256, range=(0, 255), density=True)
    hist = hist[hist > 0]
    entropy = -np.sum(hist * np.log2(hist))
    
    # Calculate edge density
    gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
    edges = cv2.Canny(gray, 50, 150)
    edge_density = np.sum(edges > 0) / edges.size
    
    # Calculate variance
    variance = np.var(img_array)
    
    return {
        'entropy': entropy,
        'edge_density': edge_density,
        'variance': variance
    }

print("✓ Helper functions defined")

In [None]:
# Get image paths
data_dir = Path(DATA_DIR)
image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
image_paths = []

for ext in image_extensions:
    image_paths.extend(list(data_dir.glob(f'*{ext}')))
    image_paths.extend(list(data_dir.glob(f'*{ext.upper()}')))

image_paths = sorted(set(image_paths))[:NUM_IMAGES]

if len(image_paths) == 0:
    print(f"\n✗ No images found in {data_dir}")
    print("Please run: 2_create_sample_dataset.ipynb")
else:
    print(f"\n✓ Found {len(image_paths)} images")
    print("\nSample images:")
    for i, path in enumerate(image_paths[:5]):
        print(f"  {i+1}. {path.name}")
    if len(image_paths) > 5:
        print(f"  ... and {len(image_paths) - 5} more")

In [None]:
# Run experiments
print("\n" + "="*80)
print("RUNNING EXPERIMENTS")
print("="*80)
print(f"\nProcessing {len(image_paths)} images with 4 methods...")
print("This may take a few minutes...\n")

all_results = []

for idx, image_path in enumerate(tqdm(image_paths, desc="Processing images")):
    # Load image
    image_tensor, image_pil, success = load_and_preprocess_image(image_path, device)
    if not success:
        continue
    
    # Get prediction
    with torch.no_grad():
        output = model(image_tensor)
        pred_class = output.argmax(dim=1).item()
        confidence = torch.softmax(output, dim=1)[0, pred_class].item()
    
    # Calculate complexity
    complexity_metrics = calculate_image_complexity(image_pil)
    
    # Initialize result
    result = {
        'image_path': str(image_path),
        'image_name': Path(image_path).name,
        'pred_class': pred_class,
        'confidence': confidence,
        **complexity_metrics
    }
    
    # Run each method
    for method_name, method_config in methods_dict.items():
        method_obj = method_config['method']
        use_adaptive = method_config.get('use_adaptive', False)
        
        try:
            # Generate CAM and measure time
            start_time = time.time()
            
            if use_adaptive:
                cam, steps_used = method_obj.generate_cam(
                    image_tensor, pred_class, use_adaptive=True
                )
            else:
                cam = method_obj.generate_cam(image_tensor, pred_class)
                steps_used = method_config.get('num_steps', 0)
            
            generation_time = time.time() - start_time
            
            # Compute metrics
            _, deletion_auc = evaluator.deletion_metric(
                image_tensor, cam, pred_class, steps=10
            )
            
            _, insertion_auc = evaluator.insertion_metric(
                image_tensor, cam, pred_class, steps=10
            )
            
            avg_drop = evaluator.average_drop(
                image_tensor, cam, pred_class, percentile=0.1
            )
            
            # Store results
            result[f'{method_name}_time'] = generation_time
            result[f'{method_name}_deletion_auc'] = deletion_auc
            result[f'{method_name}_insertion_auc'] = insertion_auc
            result[f'{method_name}_avg_drop'] = avg_drop
            result[f'{method_name}_steps'] = steps_used
            
            # For adaptive, store additional info
            if use_adaptive and hasattr(method_obj, 'step_allocation_history'):
                if len(method_obj.step_allocation_history) > 0:
                    history = method_obj.step_allocation_history[-1]
                    result[f'{method_name}_gradient_variance'] = history.get('gradient_variance', 0)
                    result[f'{method_name}_attribution_change'] = history.get('attribution_change', 0)
        
        except Exception as e:
            print(f"  Error with {method_name} on {image_path.name}: {e}")
            result[f'{method_name}_time'] = np.nan
            result[f'{method_name}_deletion_auc'] = np.nan
            result[f'{method_name}_insertion_auc'] = np.nan
            result[f'{method_name}_avg_drop'] = np.nan
            result[f'{method_name}_steps'] = np.nan
    
    all_results.append(result)

print("\n✓ Experiments complete!")

In [None]:
# Save results
print("\n" + "="*80)
print("SAVING RESULTS")
print("="*80)

df = pd.DataFrame(all_results)

# Save to CSV
csv_path = results_dir / 'experiment_results.csv'
df.to_csv(csv_path, index=False)
print(f"✓ Saved results to: {csv_path}")

print(f"\nDataset shape: {df.shape}")
print(f"  - {df.shape[0]} images processed")
print(f"  - {df.shape[1]} features per image")

In [None]:
# Display first few rows
print("\nFirst few rows of results:")
display_cols = ['image_name', 'confidence', 'adaptive_steps', 'adaptive_time', 
                'adaptive_deletion_auc', 'fixed_100_time']
df[display_cols].head(10)

In [None]:
# Calculate summary statistics
summary = {}

for method in ['adaptive', 'fixed_25', 'fixed_50', 'fixed_100']:
    summary[method] = {
        'mean_deletion_auc': df[f'{method}_deletion_auc'].mean(),
        'std_deletion_auc': df[f'{method}_deletion_auc'].std(),
        'mean_insertion_auc': df[f'{method}_insertion_auc'].mean(),
        'std_insertion_auc': df[f'{method}_insertion_auc'].std(),
        'mean_avg_drop': df[f'{method}_avg_drop'].mean(),
        'std_avg_drop': df[f'{method}_avg_drop'].std(),
        'mean_time': df[f'{method}_time'].mean(),
        'std_time': df[f'{method}_time'].std(),
        'mean_steps': df[f'{method}_steps'].mean() if f'{method}_steps' in df.columns else None,
        'std_steps': df[f'{method}_steps'].std() if f'{method}_steps' in df.columns else None,
    }

# Save summary
summary_path = results_dir / 'summary_statistics.json'
with open(summary_path, 'w') as f:
    json.dump(summary, f, indent=2)
print(f"✓ Saved summary to: {summary_path}")

In [None]:
# Display summary table
print("\n" + "="*80)
print("SUMMARY STATISTICS")
print("="*80)

summary_df = pd.DataFrame({
    'Method': ['Adaptive', 'Fixed-25', 'Fixed-50', 'Fixed-100'],
    'Del-AUC ↓': [
        f"{summary['adaptive']['mean_deletion_auc']:.4f}±{summary['adaptive']['std_deletion_auc']:.4f}",
        f"{summary['fixed_25']['mean_deletion_auc']:.4f}±{summary['fixed_25']['std_deletion_auc']:.4f}",
        f"{summary['fixed_50']['mean_deletion_auc']:.4f}±{summary['fixed_50']['std_deletion_auc']:.4f}",
        f"{summary['fixed_100']['mean_deletion_auc']:.4f}±{summary['fixed_100']['std_deletion_auc']:.4f}",
    ],
    'Ins-AUC ↑': [
        f"{summary['adaptive']['mean_insertion_auc']:.4f}±{summary['adaptive']['std_insertion_auc']:.4f}",
        f"{summary['fixed_25']['mean_insertion_auc']:.4f}±{summary['fixed_25']['std_insertion_auc']:.4f}",
        f"{summary['fixed_50']['mean_insertion_auc']:.4f}±{summary['fixed_50']['std_insertion_auc']:.4f}",
        f"{summary['fixed_100']['mean_insertion_auc']:.4f}±{summary['fixed_100']['std_insertion_auc']:.4f}",
    ],
    'Time (s)': [
        f"{summary['adaptive']['mean_time']:.3f}±{summary['adaptive']['std_time']:.3f}",
        f"{summary['fixed_25']['mean_time']:.3f}±{summary['fixed_25']['std_time']:.3f}",
        f"{summary['fixed_50']['mean_time']:.3f}±{summary['fixed_50']['std_time']:.3f}",
        f"{summary['fixed_100']['mean_time']:.3f}±{summary['fixed_100']['std_time']:.3f}",
    ],
    'Steps': [
        f"{summary['adaptive']['mean_steps']:.1f}±{summary['adaptive']['std_steps']:.1f}",
        "25",
        "50",
        "100",
    ]
})

print("\n" + summary_df.to_string(index=False))
summary_df

In [None]:
# Quick visualization of step allocation
print("\nStep Allocation Distribution:")

fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.hist(df['adaptive_steps'], bins=10, edgecolor='black', color='steelblue', alpha=0.7)
ax.axvline(df['adaptive_steps'].mean(), color='red', linestyle='--', 
           linewidth=2, label=f'Mean: {df["adaptive_steps"].mean():.1f}')
ax.set_xlabel('Number of Steps Allocated')
ax.set_ylabel('Frequency')
ax.set_title('Adaptive Step Allocation Distribution')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nStep statistics:")
print(f"  Mean: {df['adaptive_steps'].mean():.1f}")
print(f"  Median: {df['adaptive_steps'].median():.1f}")
print(f"  Min: {df['adaptive_steps'].min():.0f}")
print(f"  Max: {df['adaptive_steps'].max():.0f}")
print(f"  Std: {df['adaptive_steps'].std():.1f}")

In [None]:
# Final summary
print("\n" + "="*80)
print("EXPERIMENTS COMPLETE!")
print("="*80)
print(f"\nProcessed: {len(all_results)} images")
print(f"Methods: {len(methods_dict)}")
print(f"Total measurements: {len(all_results) * len(methods_dict) * 6}")
print("\nFiles saved:")
print(f"  - {csv_path}")
print(f"  - {summary_path}")
print("\nNext steps:")
print("  1. Open notebook: 4_analyze_results.ipynb")
print("  2. Generate publication-quality figures")
print("  3. Review statistical analysis")
print("="*80)