# Electron-Photon Classifier



This notebook implements a deep learning model for classifying electrons and photons using a ResNet-15 architecture.

## Import Libraries and Set Seeds

In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import os
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from tqdm import tqdm
import h5py
import random
import seaborn as sns
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Set random seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


## Dataset Implementation

## Model Architecture

### Residual Block Implementation

In [3]:
class ParticleDataset(Dataset):
    """Custom dataset class for particle data with memory-efficient loading"""
    def __init__(self, electron_path, photon_path, transform=None, chunk_size=1000):
        self.transform = transform
        self.electron_path = electron_path
        self.photon_path = photon_path
        self.chunk_size = chunk_size
        
        # Get dataset sizes without loading full data
        with h5py.File(electron_path, 'r') as f:
            self.electron_len = len(f['X'])
            
        with h5py.File(photon_path, 'r') as f:
            self.photon_len = len(f['X'])
            
        self.total_len = self.electron_len + self.photon_len
        
        # Create shuffled indices
        self.indices = np.random.permutation(self.total_len)
        
        print(f"Dataset initialized with {self.total_len} samples")
        print(f"Electron samples: {self.electron_len}")
        print(f"Photon samples: {self.photon_len}")

    def __len__(self):
        return self.total_len

    def __getitem__(self, idx):
        # Map shuffled index to original index
        orig_idx = self.indices[idx]
        
        # Determine which file to load from
        if orig_idx < self.electron_len:
            file_path = self.electron_path
            local_idx = orig_idx
            label = 0  # electron
        else:
            file_path = self.photon_path
            local_idx = orig_idx - self.electron_len
            label = 1  # photon
        
        # Load single sample from appropriate file
        with h5py.File(file_path, 'r') as f:
            data = f['X'][local_idx]
        
        # Reshape if needed
        if len(data.shape) == 2:  # If shape is (height, width)
            data = data.reshape(2, -1)  # Reshape to (channels, features)
        
        # Convert to tensor
        data = torch.FloatTensor(data)
        
        # Apply transformations if any
        if self.transform:
            data = self.transform(data)
        
        return data, label

In [4]:
class ResidualBlock(nn.Module):
    """Implementation of a single residual block"""
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
    def forward(self, x):
        residual = x
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        
        out += self.shortcut(residual)
        out = self.relu(out)
        
        return out


### ResNet-15 Implementation

In [5]:
class ResNet15(nn.Module):
    """Flexible ResNet-15 architecture that can be scaled in width and depth"""
    def __init__(self, num_classes=2, width_factor=1.0, depth_factor=1.0, dropout_rate=0.2):
        """
        Args:
            num_classes (int): Number of output classes
            width_factor (float): Factor to scale channel widths (1.0 = standard)
            depth_factor (float): Factor to scale number of blocks (1.0 = standard)
            dropout_rate (float): Dropout probability
        """
        super(ResNet15, self).__init__()
        
        # Scale the base channel sizes
        base_channels = int(32 * width_factor)
        self.in_channels = base_channels
        
        # Initial convolution with scaled channels
        self.conv1 = nn.Conv2d(2, base_channels, kernel_size=3, 
                              stride=2, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(base_channels)
        self.relu = nn.ReLU(inplace=True)
        
        # Calculate number of blocks per layer
        blocks_per_layer = int(2 * depth_factor)
        
        # Residual blocks with scaled channels
        self.layer1 = self._make_layer(base_channels, blocks_per_layer, stride=1)
        self.layer2 = self._make_layer(base_channels * 2, blocks_per_layer, stride=2)
        self.layer3 = self._make_layer(base_channels * 4, blocks_per_layer, stride=2)
        
        # Final layers
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(base_channels * 4, num_classes)
        self.dropout = nn.Dropout(dropout_rate)
        
        # Initialize weights
        self._initialize_weights()

    def _make_layer(self, out_channels, num_blocks, stride=1):
        layers = []
        # First block handles stride and channel changes
        layers.append(ResidualBlock(self.in_channels, out_channels, stride))
        
        # Remaining blocks maintain dimensions
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        
        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = self.fc(x)
        
        return x

In [6]:
# Create model variants
class ResNet15Wide(ResNet15):
    """Wider variant of ResNet15"""
    def __init__(self, num_classes=2):
        super().__init__(
            num_classes=num_classes,
            width_factor=1.5,    # 50% wider channels
            depth_factor=1.0,    # Same depth
            dropout_rate=0.3
        )

class ResNet15Deep(ResNet15):
    """Deeper variant of ResNet15"""
    def __init__(self, num_classes=2):
        super().__init__(
            num_classes=num_classes,
            width_factor=1.0,    # Same width
            depth_factor=1.5,    # 50% more blocks
            dropout_rate=0.3
        )

def get_model_variant(variant='standard', num_classes=2):
    """Get specific model variant"""
    variants = {
        'standard': lambda: ResNet15(num_classes=num_classes, width_factor=1.0, depth_factor=1.0),
        'wide': lambda: ResNet15Wide(num_classes=num_classes),
        'deep': lambda: ResNet15Deep(num_classes=num_classes)
    }
    return variants[variant]()

## Training and Evaluation Functions

In [7]:
def train_model(model, dataset, batch_size=128, num_epochs=30, device='cuda'):  # Increased batch size, reduced epochs
    """Memory-efficient and faster training function"""
    
    # Split dataset with reduced validation set
    train_size = int(0.9 * len(dataset))  # Increased train size
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    
    # Optimize data loading
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True,
        num_workers=4,  # Increased workers
        pin_memory=True if device=='cuda' else False,
        persistent_workers=True,  # Keep workers alive between epochs
        prefetch_factor=2  # Prefetch next batches
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size*2,  # Larger batch size for validation
        shuffle=False,
        num_workers=4,
        pin_memory=True if device=='cuda' else False,
        persistent_workers=True
    )
    
    # Use mixed precision training
    scaler = torch.cuda.amp.GradScaler()
    
    # Optimize training components
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(  # Changed to AdamW
        model.parameters(),
        lr=0.002,  # Increased learning rate
        weight_decay=0.01,
        eps=1e-7
    )
    
    # More aggressive learning rate scheduling
    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=0.002,
        epochs=num_epochs,
        steps_per_epoch=len(train_loader),
        pct_start=0.3,
        div_factor=10
    )
    
    # Training loop with mixed precision
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Mixed precision training
            with torch.cuda.amp.autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            
            # Optimize with gradient scaling
            optimizer.zero_grad(set_to_none=True)  # More efficient than zero_grad()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            scheduler.step()  # Update LR each step
            
            # Update metrics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            # Clear cache periodically
            if torch.cuda.is_available() and (total % (10 * batch_size) == 0):
                torch.cuda.empty_cache()
        
        # Quick validation
        if epoch % 2 == 0:  # Validate every 2 epochs
            model.eval()
            val_loss = 0.0
            val_correct = 0
            val_total = 0
            
            with torch.no_grad(), torch.cuda.amp.autocast():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    
                    val_loss += loss.item() * inputs.size(0)
                    _, predicted = outputs.max(1)
                    val_total += labels.size(0)
                    val_correct += predicted.eq(labels).sum().item()
            
            print(f"\nEpoch {epoch+1}: Train Acc: {100.*correct/total:.2f}%, Val Acc: {100.*val_correct/val_total:.2f}%")
    
    return model

In [8]:
def evaluate_model(model, test_loader, device='cuda'):
    """Model evaluation function"""
    model.eval()
    
    true_labels = []
    predictions = []
    probabilities = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc="Evaluating"):
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            probs = torch.nn.functional.softmax(outputs, dim=1)
            
            true_labels.extend(labels.cpu().numpy())
            predictions.extend(preds.cpu().numpy())
            probabilities.extend(probs[:, 1].cpu().numpy())  # Probability of class 1 (photon)
    
    # Calculate metrics
    accuracy = accuracy_score(true_labels, predictions)
    precision = precision_score(true_labels, predictions)
    recall = recall_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions)
    auc = roc_auc_score(true_labels, probabilities)
    conf_matrix = confusion_matrix(true_labels, predictions)
    
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"ROC AUC: {auc:.4f}")
    print("Confusion Matrix:")
    print(conf_matrix)
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Electron', 'Photon'], 
                yticklabels=['Electron', 'Photon'])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.show()
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'auc': auc,
        'confusion_matrix': conf_matrix,
        'true_labels': true_labels,
        'predictions': predictions,
        'probabilities': probabilities
    }


## Visualization Functions

In [9]:
def plot_training_history(history):
    """Function to plot training metrics"""
    plt.figure(figsize=(12, 5))
    
    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    
    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.show()


In [10]:
def visualize_samples(dataset, num_samples=5):
    """Function to visualize dataset samples"""
    fig, axes = plt.subplots(num_samples, 2, figsize=(12, 3*num_samples))
    
    # Get random indices
    indices = np.random.choice(len(dataset), num_samples, replace=False)
    
    for i, idx in enumerate(indices):
        data, label = dataset[idx]
        
        # Energy channel
        im0 = axes[i, 0].imshow(data[0], cmap='viridis')
        axes[i, 0].set_title(f"Sample {idx} - {'Photon' if label == 1 else 'Electron'} - Energy")
        axes[i, 0].axis('off')
        fig.colorbar(im0, ax=axes[i, 0], fraction=0.046, pad=0.04)
        
        # Time channel
        im1 = axes[i, 1].imshow(data[1], cmap='plasma')
        axes[i, 1].set_title(f"Sample {idx} - {'Photon' if label == 1 else 'Electron'} - Time")
        axes[i, 1].axis('off')
        fig.colorbar(im1, ax=axes[i, 1], fraction=0.046, pad=0.04)
        plt.tight_layout()
    plt.show()


In [11]:
def visualize_feature_maps(model, dataset, layer_name, device='cuda'):
    """Function to visualize model feature maps"""
    """
    Visualize feature maps from a specific layer of the model for a sample image
    """
    # Set model to evaluation mode
    model.eval()
    
    # Select a random sample
    idx = np.random.randint(0, len(dataset))
    input_img, label = dataset[idx]
    input_img = input_img.unsqueeze(0).to(device)  # Add batch dimension
    
    # Register hook to get intermediate activations
    activations = {}
    
    def hook_fn(module, input, output):
        activations[layer_name] = output.detach().cpu()
    
    # Register hook
    for name, module in model.named_modules():
        if name == layer_name:
            hook = module.register_forward_hook(hook_fn)
    
    # Forward pass
    with torch.no_grad():
        output = model(input_img)
    
    # Remove hook
    hook.remove()
    
    # Get the feature maps
    feature_maps = activations[layer_name][0]  # First sample in batch
    num_features = min(16, feature_maps.size(0))  # Display at most 16 feature maps
    
    # Plot the feature maps
    fig, axes = plt.subplots(int(np.ceil(num_features/4)), 4, figsize=(12, 3*int(np.ceil(num_features/4))))
    fig.suptitle(f"Feature maps from {layer_name} for {'Photon' if label == 1 else 'Electron'}", fontsize=16)
    
    for i in range(num_features):
        ax = axes[i//4, i%4] if num_features > 4 else axes[i]
        im = ax.imshow(feature_maps[i], cmap='viridis')
        ax.set_title(f"Filter {i}")
        ax.axis('off')
    
    # Hide any empty subplots
    for i in range(num_features, int(np.ceil(num_features/4))*4):
        if i < int(np.ceil(num_features/4))*4:
            axes[i//4, i%4].axis('off')
    
    plt.tight_layout()
    plt.subplots_adjust(top=0.9)
    plt.show()


## Main Training Pipeline

In [12]:
def main():
    """Main execution function"""
    # Check for CUDA availability
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Set paths to data
    electron_path = os.path.join('datasets', 'SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5') 
    photon_path = os.path.join('datasets', 'SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5') 
    
    # Create dataset
    full_dataset = ParticleDataset(electron_path, photon_path)
    
    # Visualize some samples
    visualize_samples(full_dataset)
    
    # Split dataset into train (80%) and test (20%) sets
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])
    
    # Further split train set into train and validation
    train_size = int(0.9 * len(train_dataset))
    val_size = len(train_dataset) - train_size
    train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])
    
    print(f"Train set size: {len(train_dataset)}")
    print(f"Validation set size: {len(val_dataset)}")
    print(f"Test set size: {len(test_dataset)}")
    
    # Create data loaders
    batch_size = 64
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    # Create model
    model = ResNet15(num_classes=2).to(device)
    print(model)
    
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
    
    # Train model
    trained_model, history = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=50,
        device=device
    )
    
    # Plot training history
    plot_training_history(history)
    
    # Evaluate model on test set
    test_results = evaluate_model(trained_model, test_loader, device)
    
    # Visualize feature maps
    visualize_feature_maps(trained_model, test_dataset, 'layer3.1.conv2', device)
    
    # Save model weights
    torch.save(trained_model.state_dict(), 'resnet15_electron_photon_classifier.pth')
    print("Model saved to resnet15_electron_photon_classifier.pth")
    
    # Save test results
    results_df = pd.DataFrame({
        'true_labels': test_results['true_labels'],
        'predictions': test_results['predictions'],
        'probabilities': test_results['probabilities']
    })
    results_df.to_csv('test_results.csv', index=False)
    print("Test results saved to test_results.csv")
    
    return trained_model, test_results


## Experimental Functions

In [13]:
def augmentation_experiment():
    """Data augmentation experiment"""
    # Check for CUDA availability
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Set paths to data
    electron_path = 'datasets/SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5'
    photon_path = 'datasets/SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5'

    # Basic dataset without augmentation
    basic_dataset = ParticleDataset(electron_path, photon_path)
    
    # Define augmentation transforms
    class RandomNoise(object):
        def __init__(self, noise_level=0.05):
            self.noise_level = noise_level
        
        def __call__(self, sample):
            noise = np.random.normal(0, self.noise_level, sample.shape)
            noisy_sample = sample + noise
            return noisy_sample
    
    class RandomFlip(object):
        def __call__(self, sample):
            if np.random.random() > 0.5:
                return np.flip(sample, axis=1)  # Flip horizontally
            return sample
    
    class RandomRotate90(object):
        def __call__(self, sample):
            k = np.random.randint(0, 4)  # 0, 1, 2, or 3 times 90 degrees
            return np.rot90(sample, k=k, axes=(1, 2))
    
    # Create augmented dataset
    class AugmentedParticleDataset(ParticleDataset):
        def __init__(self, electron_path, photon_path):
            super().__init__(electron_path, photon_path)
            self.transform = transforms.Compose([
                RandomNoise(),
                RandomFlip(),
                RandomRotate90()
            ])
        
        def __getitem__(self, idx):
            if torch.is_tensor(idx):
                idx = idx.tolist()
            
            sample = self.data[idx]
            label = self.labels[idx]
            
            # Apply data augmentation
            if self.transform:
                sample = self.transform(sample)
            
            # Convert to torch tensors
            sample = torch.from_numpy(sample).float()
            label = torch.tensor(label).long()
            
            return sample, label
    
    # Create augmented dataset
    augmented_dataset = AugmentedParticleDataset(electron_path, photon_path)
    
    # Split datasets
    train_size = int(0.8 * len(basic_dataset))
    test_size = len(basic_dataset) - train_size
    basic_train, basic_test = random_split(basic_dataset, [train_size, test_size])
    augmented_train, augmented_test = random_split(augmented_dataset, [train_size, test_size])
    
    # Create dataloaders
    batch_size = 64
    basic_train_loader = DataLoader(basic_train, batch_size=batch_size, shuffle=True, num_workers=4)
    basic_val_loader = DataLoader(basic_test, batch_size=batch_size, shuffle=False, num_workers=4)
    augmented_train_loader = DataLoader(augmented_train, batch_size=batch_size, shuffle=True, num_workers=4)
    augmented_val_loader = DataLoader(augmented_test, batch_size=batch_size, shuffle=False, num_workers=4)
    
    # Train model without augmentation
    basic_model = ResNet15(num_classes=2).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(basic_model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
    
    basic_model, basic_history = train_model(
        model=basic_model,
        train_loader=basic_train_loader,
        val_loader=basic_val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=30,
        device=device
    )
    
    # Train model with augmentation
    augmented_model = ResNet15(num_classes=2).to(device)
    optimizer = optim.Adam(augmented_model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
    
    augmented_model, augmented_history = train_model(
        model=augmented_model,
        train_loader=augmented_train_loader,
        val_loader=augmented_val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=30,
        device=device
    )
    
    # Compare results
    plt.figure(figsize=(12, 5))
    
    # Plot validation loss
    plt.subplot(1, 2, 1)
    plt.plot(basic_history['val_loss'], label='Without Augmentation')
    plt.plot(augmented_history['val_loss'], label='With Augmentation')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Loss')
    plt.title('Validation Loss Comparison')
    plt.legend()
    
    # Plot validation accuracy
    plt.subplot(1, 2, 2)
    plt.plot(basic_history['val_acc'], label='Without Augmentation')
    plt.plot(augmented_history['val_acc'], label='With Augmentation')
    plt.xlabel('Epoch')
    plt.ylabel('Validation Accuracy')
    plt.title('Validation Accuracy Comparison')
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    print("Final validation accuracy without augmentation:", basic_history['val_acc'][-1])
    print("Final validation accuracy with augmentation:", augmented_history['val_acc'][-1])


In [14]:
def hyperparameter_tuning():
    """Hyperparameter tuning experiment"""
    # Check for CUDA availability
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Set paths to data
    electron_path = 'datasets/SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5'
    photon_path = 'datasets/SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5'
    
    # Create dataset
    full_dataset = ParticleDataset(electron_path, photon_path)
    
    # Split dataset into train (80%) and test (20%) sets
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])
    
    # Further split train set into train and validation
    train_size = int(0.9 * len(train_dataset))
    val_size = len(train_dataset) - train_size
    train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])
    
    # Create data loaders
    batch_size = 64
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    # Define hyperparameter combinations
    learning_rates = [0.01, 0.001, 0.0001]
    weight_decays = [1e-4, 1e-5, 1e-6]
    dropout_rates = [0.2, 0.3, 0.4]
    
    # Store results
    results = []
    
    # Grid search
    for lr in learning_rates:
        for wd in weight_decays:
            for dr in dropout_rates:
                print(f"Training with lr={lr}, weight_decay={wd}, dropout={dr}")
                
                # Create model with specified dropout rate
                class ResNet15WithDropout(ResNet15):
                    def __init__(self, num_classes=2, dropout_rate=0.3):
                        super().__init__(num_classes)
                        self.dropout = nn.Dropout(dropout_rate)
                
                model = ResNet15WithDropout(num_classes=2, dropout_rate=dr).to(device)
                
                # Define loss function and optimizer
                criterion = nn.CrossEntropyLoss()
                optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=wd)
                scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
                
                # Train model for fewer epochs for quicker tuning
                trained_model, history = train_model(
                    model=model,
                    train_loader=train_loader,
                    val_loader=val_loader,
                    criterion=criterion,
                    optimizer=optimizer,
                    scheduler=scheduler,
                    num_epochs=20,  # Reduced epochs for quicker tuning
                    device=device
                )
                
                # Evaluate on validation set
                trained_model.eval()
                val_correct = 0
                val_total = 0
                
                with torch.no_grad():
                    for inputs, labels in val_loader:
                        inputs, labels = inputs.to(device), labels.to(device)
                        outputs = trained_model(inputs)
                        _, predicted = torch.max(outputs.data, 1)
                        val_total += labels.size(0)
                        val_correct += (predicted == labels).sum().item()
                
                val_accuracy = val_correct / val_total
                
                # Store results
                results.append({
                    'learning_rate': lr,
                    'weight_decay': wd,
                    'dropout_rate': dr,
                    'val_accuracy': val_accuracy,
                    'final_val_loss': history['val_loss'][-1]
                })
    
    # Convert results to DataFrame and find best hyperparameters
    results_df = pd.DataFrame(results)
    best_config = results_df.loc[results_df['val_accuracy'].idxmax()]
    
    print("\nHyperparameter Tuning Results:")
    print(results_df.sort_values('val_accuracy', ascending=False))
    print("\nBest Configuration:")
    print(best_config)
    
    return best_config


In [15]:
def create_ensemble_model():
    """Create and train ensemble of model variants"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Create models
    models = {
        'standard': get_model_variant('standard').to(device),
        'wide': get_model_variant('wide').to(device),
        'deep': get_model_variant('deep').to(device)
    }
    
    # Create dataset
    electron_path = os.path.join('datasets', 'SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5')
    photon_path = os.path.join('datasets', 'SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5')
    dataset = ParticleDataset(electron_path, photon_path)
    
    # Train each model
    trained_models = {}
    for name, model in models.items():
        print(f"\nTraining {name} model...")
        trained_model = train_model(
            model=model,
            dataset=dataset,
            batch_size=128,
            num_epochs=30,
            device=device
        )
        trained_models[name] = trained_model
    
    return trained_models

In [16]:
def ensemble_predict(models, input_batch):
    """Make predictions using ensemble of models"""
    predictions = []
    for model in models.values():
        model.eval()
        with torch.no_grad():
            outputs = model(input_batch)
            predictions.append(torch.softmax(outputs, dim=1))
    return torch.stack(predictions).mean(dim=0)

## Execute Training Pipeline

In [None]:
def check_data_paths():
    """Check if data files exist and are accessible, and verify their structure"""
    # Use os.path.join for cross-platform compatibility
    data_dir = os.path.join('datasets')
    electron_path = os.path.join(data_dir, 'SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5')
    photon_path = os.path.join(data_dir, 'SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5')
    
    # Create datasets directory if it doesn't exist
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
        print(f"Created directory: {data_dir}")
    
    # Check if files exist
    files_exist = True
    if not os.path.isfile(electron_path):
        print(f"Missing electron file: {electron_path}")
        files_exist = False
    if not os.path.isfile(photon_path):
        print(f"Missing photon file: {photon_path}")
        files_exist = False
    
    if not files_exist:
        print("\nPlease download the dataset files and place them in the 'datasets' directory.")
        return False
    
    # Check file structure
    try:
        # Check electron file
        with h5py.File(electron_path, 'r') as f:
            print("\nElectron file structure:")
            keys = list(f.keys())
            print("Available datasets:", keys)
            
            # Print detailed information about each dataset
            for key in keys:
                print(f"{key} shape:", f[key].shape)
                print(f"{key} dtype:", f[key].dtype)
    
        # Check photon file
        with h5py.File(photon_path, 'r') as f:
            print("\nPhoton file structure:")
            keys = list(f.keys())
            print("Available datasets:", keys)
            
            # Print detailed information about each dataset
            for key in keys:
                print(f"{key} shape:", f[key].shape)
                print(f"{key} dtype:", f[key].dtype)
        
        return True
        
    except OSError as e:
        print(f"\nError reading HDF5 files: {str(e)}")
        print("Please ensure the files are valid HDF5 format.")
        return False
    except Exception as e:
        print(f"\nUnexpected error: {str(e)}")
        print("Please check the file permissions and format.")
        return False
        

if __name__ == "__main__":
    # Check if data files exist and have correct structure
    if check_data_paths():
        # Main execution
        print("\n=== Main Training Pipeline ===")
        # Create dataset and prepare for training
        electron_path = os.path.join('datasets', 'SingleElectronPt50_IMGCROPS_n249k_RHv1.hdf5')
        photon_path = os.path.join('datasets', 'SinglePhotonPt50_IMGCROPS_n249k_RHv1.hdf5')
        full_dataset = ParticleDataset(electron_path, photon_path)

        # Train the model using the full dataset
        model = ResNet15(num_classes=2)
        trained_model, history = train_model(
            model=model,
            dataset=full_dataset,
            batch_size=64,
            num_epochs=50,
            device='cuda' if torch.cuda.is_available() else 'cpu'
        )
        
        print("\n=== Data Augmentation Experiment ===")
        augmentation_experiment()
        
        print("\n=== Hyperparameter Tuning ===")
        best_hyperparams = hyperparameter_tuning()
        
        print("\n=== Model Ensemble ===")
        ensemble_results = create_ensemble_model()
    else:
        print("\nExecution stopped due to missing or invalid data files.")
        print("Please check the file structure and ensure it contains 'energy' and 'time' datasets.")
    


Electron file structure:
Available datasets: ['X', 'y']
X shape: (249000, 32, 32, 2)
X dtype: float32
y shape: (249000,)
y dtype: float32

Photon file structure:
Available datasets: ['X', 'y']
X shape: (249000, 32, 32, 2)
X dtype: float32
y shape: (249000,)
y dtype: float32

=== Main Training Pipeline ===
Dataset initialized with 498000 samples
Electron samples: 249000
Photon samples: 249000


  scaler = torch.cuda.amp.GradScaler()
Epoch 1/50:   0%|          | 0/7004 [00:00<?, ?it/s]