# Requirements

In [2]:
#!python --version
#!pip install --upgrade pip
#!pip uninstall keras tensorflow
#!pip install -r ../requirements.txt

# Imports

In [15]:
import torch
import os
from tqdm import tqdm
import json

import numpy as np
import keras
from tensorflow import keras
from keras.datasets import cifar10
from __future__ import print_function
from keras.models import Sequential
from keras.models import save_model, load_model
from keras.layers import Dense, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D

import keras.backend as K
K.clear_session()

# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score
# from model import Mamba, ModelArgs  # Import your custom Mamba implementation
# Assuming the model classes are defined in `model.py`
from model import ImageMamba, ModelArgs

In [5]:
from data_loader import load_cifar10, get_class_names

# Load data consistently
train_loader, test_loader, X_train, X_test, Y_train, Y_test = load_cifar10(batch_size=64, seed=42)
class_names = get_class_names()

# Code to save a model

In [6]:
def save_pytorch_model(model, filepath, epoch=None):
    """
    Save a PyTorch model to disk, including both architecture and weights.
    
    Args:
        model: PyTorch model (nn.Module)
        filepath: Path to save the model
        epoch: Optional epoch number to include in the save
    """
    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True)
    
    # Prepare the save dictionary
    save_dict = {
        'model_state_dict': model.state_dict(),
        'model_args': model.args,  # Saving the model arguments
        'epoch': epoch if epoch is not None else None
    }
    
    # Save the model
    torch.save(save_dict, filepath)
    print(f"Model saved successfully to {filepath}")

def load_pytorch_model(filepath):
    """
    Load a PyTorch model from disk.
    
    Args:
        filepath: Path to the saved model
        
    Returns:
        model: Loaded PyTorch model
        epoch: Epoch number when the model was saved (if available)
    """
    # Load the save dictionary
    save_dict = torch.load(filepath)
    
    # Create a new model instance with the saved arguments
    model = ImageMamba(args=save_dict['model_args'], num_classes=1000)  # Adjust num_classes as needed
    
    # Load the state dictionary
    model.load_state_dict(save_dict['model_state_dict'])
    
    return model, save_dict.get('epoch')

# Example usage:
# Saving the model
# save_pytorch_model(complex_model, '25epoch_complex_model.pt', epoch=25)

# Loading the model
# loaded_model, epoch = load_pytorch_model('25epoch_complex_model.pt')

# Defining the model

In [None]:
# Define model parameters
d_model = 64
n_layer = 4
num_classes = 10  # CIFAR-10 has 10 classes

# Create an instance of ModelArgs
model_args = ModelArgs(d_model=d_model, n_layer=n_layer, vocab_size=0)  # vocab_size is unused here

# Instantiate the ImageMamba model
model = ImageMamba(model_args, num_classes=num_classes)

# Set the device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

## CUDA

In [None]:
print(f"Model device: {next(model.parameters()).device}")
print(f"Is CUDA available? {torch.cuda.is_available()}")

In [None]:
!nvcc --version

In [10]:
#!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124

In [11]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

# Training MAMBA on CIFAR10

In [None]:
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        logits, probabilities = model(inputs)  # Unpack the logits and probabilities

        # Flatten labels if they are not already
        labels = labels.view(-1)  # Flatten the labels to [batch_size]

        # Compute loss
        loss = criterion(logits, labels)  # Use logits for loss computation

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')

## Evaluating the model + testing inference

In [None]:
# Switch to evaluation mode
model.eval()
y_pred = []
y_true = []
y_prob = []  # List to store probabilities

# Test the model on the test dataset
with torch.no_grad():
    for inputs, labels in test_dataset:  # Use test_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
        y_prob.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy
accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy on the test set: {accuracy:.4f}')

# Convert probabilities to percentages
y_prob_percentages = np.array(y_prob) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages):")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_percentages[i][class_index]:.2f}%")

# Switch to evaluation mode for training data
model.eval()
y_pred_train = []
y_true_train = []
y_prob_train = []  # List to store probabilities

# Test the model on training data
with torch.no_grad():
    for inputs, labels in train_dataset:  # Use train_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred_train.extend(predicted.cpu().numpy())
        y_true_train.extend(labels.cpu().numpy())
        y_prob_train.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy on training data
accuracy_train = accuracy_score(y_true_train, y_pred_train)
print(f'Accuracy on the training set: {accuracy_train:.4f}')

# Convert probabilities to percentages
y_prob_train_percentages = np.array(y_prob_train) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages) for training data:")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true_train[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_train_percentages[i][class_index]:.2f}%")

In [None]:
# Extract probabilities for the correct class on the test data
correct_class_probs_test = [y_prob[i][y_true[i]] for i in range(len(y_true))]
average_prob_correct_class_test = np.mean(correct_class_probs_test)
print(f'Average probability for the correct class on the test data: {average_prob_correct_class_test:.4f}')

# Extract probabilities for the correct class on the training data
correct_class_probs_train = [y_prob_train[i][y_true_train[i]] for i in range(len(y_true_train))]
average_prob_correct_class_train = np.mean(correct_class_probs_train)
print(f'Average probability for the correct class on the training data: {average_prob_correct_class_train:.4f}')

# Try 2, more epochs
As we can see there is not much difference between the probabilities of true class predictions on the traindataset instances, and the testdataset instances.

This might indicate that the dataset was hard easy for 10 epochs, lets add 15 more epochs, to get to a total of 25 epochs

In [None]:
# Training loop
num_epochs = 15
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
 
        # Forward pass
        logits, probabilities = model(inputs)  # Unpack the logits and probabilities

        # Flatten labels if they are not already
        labels = labels.view(-1)  # Flatten the labels to [batch_size]

        # Compute loss
        loss = criterion(logits, labels)  # Use logits for loss computation

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')

In [None]:
# Switch to evaluation mode
model.eval()
y_pred = []
y_true = []
y_prob = []  # List to store probabilities

# Test the model on the test dataset
with torch.no_grad():
    for inputs, labels in test_dataset:  # Use test_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
        y_prob.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy
accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy on the test set: {accuracy:.4f}')

# Convert probabilities to percentages
y_prob_percentages = np.array(y_prob) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages):")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_percentages[i][class_index]:.2f}%")

# Switch to evaluation mode for training data
model.eval()
y_pred_train = []
y_true_train = []
y_prob_train = []  # List to store probabilities

# Test the model on training data
with torch.no_grad():
    for inputs, labels in train_dataset:  # Use train_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred_train.extend(predicted.cpu().numpy())
        y_true_train.extend(labels.cpu().numpy())
        y_prob_train.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy on training data
accuracy_train = accuracy_score(y_true_train, y_pred_train)
print(f'Accuracy on the training set: {accuracy_train:.4f}')

# Convert probabilities to percentages
y_prob_train_percentages = np.array(y_prob_train) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages) for training data:")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true_train[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_train_percentages[i][class_index]:.2f}%")

In [None]:
# Extract probabilities for the correct class on the test data
correct_class_probs_test = [y_prob[i][y_true[i]] for i in range(len(y_true))]
average_prob_correct_class_test = np.mean(correct_class_probs_test)
print(f'Average probability for the correct class on the test data: {average_prob_correct_class_test:.4f}')

# Extract probabilities for the correct class on the training data
correct_class_probs_train = [y_prob_train[i][y_true_train[i]] for i in range(len(y_true_train))]
average_prob_correct_class_train = np.mean(correct_class_probs_train)
print(f'Average probability for the correct class on the training data: {average_prob_correct_class_train:.4f}')

As we see still no significant diffirence, lets make a stronger model, and train more epochs to achieve a loss close to 0, and then try inference again:

# Try 3, complex model
To get a much lower loss, we will both increase the number of epochs and make the model more complex.

In [None]:
# Define model parameters
d_model = 128
n_layer = 8
num_classes = 10  # CIFAR-10 has 10 classes

# Create an instance of ModelArgs
model_args = ModelArgs(d_model=d_model, n_layer=n_layer, vocab_size=0)  # vocab_size is unused here

# Instantiate the ImageMamba model
complex_model = ImageMamba(model_args, num_classes=num_classes)

# Set the device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
complex_model.to(device)

In [None]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(complex_model.parameters(), lr=1e-4, weight_decay=1e-5)

# Training loop
num_epochs = 25 
for epoch in range(num_epochs):
    complex_model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        logits, probabilities = complex_model(inputs)  # Unpack the logits and probabilities

        # Flatten labels if they are not already
        labels = labels.view(-1)  # Flatten the labels to [batch_size]

        # Compute loss
        loss = criterion(logits, labels)  # Use logits for loss computation

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')

In [None]:
# Save the model after 25 epochs
save_pytorch_model(complex_model, '../trained_models/25epoch_complex_model.pt', epoch=25)

# Later, to load the model:
# loaded_model, epoch = load_pytorch_model('25epoch_complex_model.pt')

In [None]:
# Switch to evaluation mode
complex_model.eval()
y_pred = []
y_true = []
y_prob = []  # List to store probabilities

# Test the model on the test dataset
with torch.no_grad():
    for inputs, labels in test_dataset:  # Use test_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = complex_model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
        y_prob.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy
accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy on the test set: {accuracy:.4f}')

# Convert probabilities to percentages
y_prob_percentages = np.array(y_prob) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages):")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_percentages[i][class_index]:.2f}%")

# Switch to evaluation mode for training data
complex_model.eval()
y_pred_train = []
y_true_train = []
y_prob_train = []  # List to store probabilities

# Test the model on training data
with torch.no_grad():
    for inputs, labels in train_dataset:  # Use train_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = complex_model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred_train.extend(predicted.cpu().numpy())
        y_true_train.extend(labels.cpu().numpy())
        y_prob_train.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy on training data
accuracy_train = accuracy_score(y_true_train, y_pred_train)
print(f'Accuracy on the training set: {accuracy_train:.4f}')

# Convert probabilities to percentages
y_prob_train_percentages = np.array(y_prob_train) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages) for training data:")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true_train[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_train_percentages[i][class_index]:.2f}%")

In [None]:
# Extract probabilities for the correct class on the test data
correct_class_probs_test = [y_prob[i][y_true[i]] for i in range(len(y_true))]
average_prob_correct_class_test = np.mean(correct_class_probs_test)
print(f'Average probability for the correct class on the test data: {average_prob_correct_class_test:.4f}')

# Extract probabilities for the correct class on the training data
correct_class_probs_train = [y_prob_train[i][y_true_train[i]] for i in range(len(y_true_train))]
average_prob_correct_class_train = np.mean(correct_class_probs_train)
print(f'Average probability for the correct class on the training data: {average_prob_correct_class_train:.4f}')

In [None]:
print("Difference in percentage is: ", (average_prob_correct_class_train - average_prob_correct_class_test)/average_prob_correct_class_train*100)

I think the model started to overfit which is why the loss stopped decreasing, now we will try a different optimiser and scheduler

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR
import torch.optim as optim

# Define optimizer with AdamW
optimizer = optim.AdamW(complex_model.parameters(), lr=1e-3, weight_decay=1e-4)

# Cosine Annealing Scheduler
scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs)

# Training Loop with Scheduler
num_epochs = 50  # More epochs to ensure convergence
for epoch in range(num_epochs):
    complex_model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        logits, probabilities = complex_model(inputs)

        # Flatten labels if necessary
        labels = labels.view(-1)

        # Compute loss
        loss = criterion(logits, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Step the scheduler at each epoch
    scheduler.step()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')


In [None]:
# Save the model after 25 epochs
save_pytorch_model(complex_model, '../trained_models/different_50epoch_complex_model.pt', epoch=25)

# Later, to load the model:
# loaded_model, epoch = load_pytorch_model('25epoch_complex_model.pt')

In [None]:
# Switch to evaluation mode
complex_model.eval()
y_pred = []
y_true = []
y_prob = []  # List to store probabilities

# Test the model on the test dataset
with torch.no_grad():
    for inputs, labels in test_dataset:  # Use test_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = complex_model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
        y_prob.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy
accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy on the test set: {accuracy:.4f}')

# Convert probabilities to percentages
y_prob_percentages = np.array(y_prob) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages):")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_percentages[i][class_index]:.2f}%")

# Switch to evaluation mode for training data
complex_model.eval()
y_pred_train = []
y_true_train = []
y_prob_train = []  # List to store probabilities

# Test the model on training data
with torch.no_grad():
    for inputs, labels in train_dataset:  # Use train_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = complex_model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred_train.extend(predicted.cpu().numpy())
        y_true_train.extend(labels.cpu().numpy())
        y_prob_train.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy on training data
accuracy_train = accuracy_score(y_true_train, y_pred_train)
print(f'Accuracy on the training set: {accuracy_train:.4f}')

# Convert probabilities to percentages
y_prob_train_percentages = np.array(y_prob_train) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages) for training data:")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true_train[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_train_percentages[i][class_index]:.2f}%")

In [None]:
# Extract probabilities for the correct class on the test data
correct_class_probs_test = [y_prob[i][y_true[i]] for i in range(len(y_true))]
average_prob_correct_class_test = np.mean(correct_class_probs_test)
print(f'Average probability for the correct class on the test data: {average_prob_correct_class_test:.4f}')

# Extract probabilities for the correct class on the training data
correct_class_probs_train = [y_prob_train[i][y_true_train[i]] for i in range(len(y_true_train))]
average_prob_correct_class_train = np.mean(correct_class_probs_train)
print(f'Average probability for the correct class on the training data: {average_prob_correct_class_train:.4f}')

In [None]:
print("Difference in percentage is: ", (average_prob_correct_class_train - average_prob_correct_class_test)/average_prob_correct_class_train*100)

# Ai

In [12]:
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import numpy as np

class TrainingMetricsTracker:
    def __init__(self):
        self.train_losses = []
        self.test_losses = []
        self.train_accuracies = []
        self.test_accuracies = []
        self.train_confidences = []
        self.test_confidences = []
        self.confidence_gaps = []
        self.epoch_train_confidences = []  # Store all confidence values for each epoch
        self.epoch_test_confidences = []

    def update(self, train_loss, test_loss, train_acc, test_acc, 
              train_conf, test_conf, train_epoch_confs, test_epoch_confs):
        self.train_losses.append(train_loss)
        self.test_losses.append(test_loss)
        self.train_accuracies.append(train_acc)
        self.test_accuracies.append(test_acc)
        self.train_confidences.append(train_conf)
        self.test_confidences.append(test_conf)
        self.confidence_gaps.append(train_conf - test_conf)
        self.epoch_train_confidences.append(train_epoch_confs)
        self.epoch_test_confidences.append(test_epoch_confs)

    def plot_metrics(self, save_path=None):
        epochs = range(1, len(self.train_losses) + 1)
        
        # Create a figure with subplots
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
        
        # Plot Loss
        ax1.plot(epochs, self.train_losses, 'b-', label='Training Loss')
        ax1.plot(epochs, self.test_losses, 'r-', label='Test Loss')
        ax1.set_title('Loss over Epochs')
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Loss')
        ax1.legend()
        ax1.grid(True)

        # Plot Accuracy
        ax2.plot(epochs, self.train_accuracies, 'b-', label='Training Accuracy')
        ax2.plot(epochs, self.test_accuracies, 'r-', label='Test Accuracy')
        ax2.set_title('Accuracy over Epochs')
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('Accuracy (%)')
        ax2.legend()
        ax2.grid(True)

        # Plot Confidence
        ax3.plot(epochs, self.train_confidences, 'b-', label='Training Confidence')
        ax3.plot(epochs, self.test_confidences, 'r-', label='Test Confidence')
        ax3.set_title('Average Confidence over Epochs')
        ax3.set_xlabel('Epoch')
        ax3.set_ylabel('Confidence')
        ax3.legend()
        ax3.grid(True)

        # Plot Confidence Gap
        ax4.plot(epochs, self.confidence_gaps, 'g-', label='Confidence Gap')
        ax4.set_title('Confidence Gap over Epochs')
        ax4.set_xlabel('Epoch')
        ax4.set_ylabel('Gap (Train - Test)')
        ax4.legend()
        ax4.grid(True)

        plt.tight_layout()
        if save_path:
            plt.savefig(save_path)
        plt.show()

    def plot_confidence_distributions(self, epoch):
        """Plot confidence distributions for a specific epoch"""
        plt.figure(figsize=(10, 6))
        
        # Create density plots
        sns.kdeplot(data=self.epoch_train_confidences[epoch], label='Training', color='blue')
        sns.kdeplot(data=self.epoch_test_confidences[epoch], label='Test', color='red')
        
        plt.title(f'Confidence Distributions (Epoch {epoch+1})')
        plt.xlabel('Confidence')
        plt.ylabel('Density')
        plt.legend()
        plt.grid(True)
        plt.show()

    def statistical_tests(self, epoch=-1):
        """Perform statistical tests on confidence distributions"""
        if epoch == -1:
            epoch = len(self.epoch_train_confidences) - 1

        train_conf = self.epoch_train_confidences[epoch]
        test_conf = self.epoch_test_confidences[epoch]

        # Kolmogorov-Smirnov test
        ks_stat, ks_pval = stats.ks_2samp(train_conf, test_conf)
        
        # Mann-Whitney U test
        mw_stat, mw_pval = stats.mannwhitneyu(train_conf, test_conf, alternative='two-sided')
        
        # Effect size (Cohen's d)
        cohens_d = (np.mean(train_conf) - np.mean(test_conf)) / np.sqrt(
            (np.var(train_conf) + np.var(test_conf)) / 2)

        return {
            'ks_test': {'statistic': ks_stat, 'p_value': ks_pval},
            'mw_test': {'statistic': mw_stat, 'p_value': mw_pval},
            'cohens_d': cohens_d
        }

In [13]:
# In both the training loop and evaluation function, we need to detach tensors before converting to numpy

def evaluate_model(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    total_loss = 0
    confidence_sum = 0
    all_confidences = []
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.view(-1)
            
            logits, probabilities = model(inputs)
            loss = criterion(logits, labels)
            
            _, predicted = torch.max(logits, 1)
            confidence, _ = torch.max(probabilities, 1)
            
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
            total_loss += loss.item()
            confidence_sum += confidence.sum().item()
            all_confidences.extend(confidence.detach().cpu().numpy())  # Fixed here
    
    accuracy = (correct / total) * 100
    avg_loss = total_loss / len(data_loader)
    avg_confidence = confidence_sum / total
    
    return accuracy, avg_loss, avg_confidence, all_confidences

# Train Ai

In [None]:
# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    running_correct = 0
    total_samples = 0
    train_confidence_sum = 0
    train_epoch_confidences = []
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        labels = labels.view(-1)
        logits, probabilities = model(inputs)
        loss = criterion(logits, labels)
        _, predicted = torch.max(logits, 1)
        confidence, _ = torch.max(probabilities, 1)
        running_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)
        train_confidence_sum += confidence.sum().item()
        train_epoch_confidences.extend(confidence.detach().cpu().numpy())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Calculate training metrics
    train_loss = running_loss / len(train_loader)
    train_accuracy = (running_correct / total_samples) * 100
    train_avg_confidence = train_confidence_sum / total_samples
    
    # Evaluate on test set
    test_accuracy, test_loss, test_avg_confidence, test_epoch_confidences = evaluate_model(model, test_loader, device)
    
    # Update metrics tracker
    metrics_tracker.update(
        train_loss, test_loss,
        train_accuracy, test_accuracy,
        train_avg_confidence, test_avg_confidence,
        train_epoch_confidences, test_epoch_confidences
    )
    
    # Print metrics
    print(f'Epoch [{epoch + 1}/{num_epochs}]')
    print(f'  Training:')
    print(f'    Loss: {train_loss:.4f}')
    print(f'    Accuracy: {train_accuracy:.2f}%')
    print(f'    Average Confidence: {train_avg_confidence:.4f}')
    print(f'  Testing:')
    print(f'    Loss: {test_loss:.4f}')
    print(f'    Accuracy: {test_accuracy:.2f}%')
    print(f'    Average Confidence: {test_avg_confidence:.4f}')
    
    # Save model every 20 epochs
    if (epoch + 1) % 20 == 0:
        checkpoint_path = os.path.join(checkpoint_dir, f'model_epoch_{epoch+1}.pt')
        save_pytorch_model(model, checkpoint_path, epoch=epoch+1)
        print(f'Saved model checkpoint at epoch {epoch+1}')
    
    # Perform statistical tests every 5 epochs
    if (epoch + 1) % 5 == 0:
        print("\nStatistical Tests:")
        stats_results = metrics_tracker.statistical_tests(epoch)
        print(f"  Kolmogorov-Smirnov test:")
        print(f"    Statistic: {stats_results['ks_test']['statistic']:.4f}")
        print(f"    p-value: {stats_results['ks_test']['p_value']:.4f}")
        print(f"  Mann-Whitney U test:")
        print(f"    Statistic: {stats_results['mw_test']['statistic']:.4f}")
        print(f"    p-value: {stats_results['mw_test']['p_value']:.4f}")
        print(f"  Cohen's d: {stats_results['cohens_d']:.4f}")
        
        # Plot confidence distributions
        metrics_tracker.plot_confidence_distributions(epoch)
    
    print('-' * 50)

# After training, plot all metrics
metrics_tracker.plot_metrics(save_path='training_metrics.png')

In [None]:
# First load the saved model from epoch 100
checkpoint_path = os.path.join('model_checkpoints', 'model_epoch_100.pt')
print(f"Loading model from {checkpoint_path}")
saved_model = torch.load(checkpoint_path)
model.load_state_dict(saved_model['model_state_dict'])

# Initialize metrics tracker
metrics_tracker = TrainingMetricsTracker()

# Create directories for model checkpoints and plots
checkpoint_dir = 'model_checkpoints_extended'
plots_dir = 'training_plots'
os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(plots_dir, exist_ok=True)

# Training loop - start from epoch 101 to 400
start_epoch = 101
num_epochs = 400
last_plot_epoch = start_epoch - 1

for epoch in range(start_epoch, num_epochs + 1):
    model.train()
    running_loss = 0.0
    running_correct = 0
    total_samples = 0
    train_confidence_sum = 0
    train_epoch_confidences = []
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        labels = labels.view(-1)
        logits, probabilities = model(inputs)
        loss = criterion(logits, labels)
        _, predicted = torch.max(logits, 1)
        confidence, _ = torch.max(probabilities, 1)
        running_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)
        train_confidence_sum += confidence.sum().item()
        train_epoch_confidences.extend(confidence.detach().cpu().numpy())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Calculate training metrics
    train_loss = running_loss / len(train_loader)
    train_accuracy = (running_correct / total_samples) * 100
    train_avg_confidence = train_confidence_sum / total_samples
    
    # Evaluate on test set
    test_accuracy, test_loss, test_avg_confidence, test_epoch_confidences = evaluate_model(model, test_loader, device)
    
    # Update metrics tracker
    metrics_tracker.update(
        train_loss, test_loss,
        train_accuracy, test_accuracy,
        train_avg_confidence, test_avg_confidence,
        train_epoch_confidences, test_epoch_confidences
    )
    
    # Print metrics
    print(f'Epoch [{epoch}/{num_epochs}]')
    print(f'  Training: Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.2f}%, Confidence: {train_avg_confidence:.4f}')
    print(f'  Testing:  Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.2f}%, Confidence: {test_avg_confidence:.4f}')
    
    # Save model every 20 epochs
    if epoch % 20 == 0:
        checkpoint_path = os.path.join(checkpoint_dir, f'model_epoch_{epoch}.pt')
        save_pytorch_model(model, checkpoint_path, epoch=epoch)
        print(f'Saved checkpoint: epoch {epoch}')
    
    # Create summary plots every 100 epochs
    if epoch % 100 == 0:
        plot_filename = os.path.join(plots_dir, f'training_metrics_epoch_{epoch}.png')
        metrics_tracker.plot_metrics(save_path=plot_filename)
        print(f'Saved plots for epochs {last_plot_epoch+1}-{epoch}')
        
        if epoch < num_epochs:
            last_plot_epoch = epoch
            metrics_tracker = TrainingMetricsTracker()
    
    # Perform statistical tests every 5 epochs, but only after we have enough data
    if epoch % 5 == 0 and (epoch - last_plot_epoch) > 0:
        try:
            stats_results = metrics_tracker.statistical_tests(epoch - last_plot_epoch - 1)  # Use -1 to get last complete epoch
            print(f"Stats: KS={stats_results['ks_test']['statistic']:.4f}(p={stats_results['ks_test']['p_value']:.4f}), " 
                  f"MW={stats_results['mw_test']['statistic']:.4f}(p={stats_results['mw_test']['p_value']:.4f}), "
                  f"Cohen's d={stats_results['cohens_d']:.4f}")
            
            metrics_tracker.plot_confidence_distributions(epoch - last_plot_epoch - 1)
        except IndexError:
            print("Skipping statistical tests - not enough data yet")
    
    print('-' * 50)

# Final summary plot
final_plot_filename = os.path.join(plots_dir, 'training_metrics_final.png')
metrics_tracker.plot_metrics(save_path=final_plot_filename)

# Test inference from loaded models

In [None]:
def evaluate_saved_model(model_path, test_loader, train_loader):
    global model  # Use the globally defined model
    saved_model = torch.load(model_path, map_location=torch.device('cpu')) # Only if on machine without GPU
    model.load_state_dict(saved_model['model_state_dict'])
    model = model.to('cpu')
    device = torch.device('cpu')
    model.eval()
    
    metrics = {}
    test_correct = 0
    test_total = 0
    test_loss = 0
    test_confidences = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.view(-1)
            logits, probabilities = model(inputs)
            loss = criterion(logits, labels)
            _, predicted = torch.max(logits, 1)
            confidence, _ = torch.max(probabilities, 1)
            test_correct += (predicted == labels).sum().item()
            test_total += labels.size(0)
            test_loss += loss.item()
            test_confidences.extend(confidence.detach().numpy())
    
    train_correct = 0
    train_total = 0
    train_loss = 0
    train_confidences = []
    
    with torch.no_grad():
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.view(-1)
            logits, probabilities = model(inputs)
            loss = criterion(logits, labels)
            _, predicted = torch.max(logits, 1)
            confidence, _ = torch.max(probabilities, 1)
            train_correct += (predicted == labels).sum().item()
            train_total += labels.size(0)
            train_loss += loss.item()
            train_confidences.extend(confidence.detach().numpy())
    
    metrics['test_accuracy'] = 100 * test_correct / test_total
    metrics['test_loss'] = test_loss / len(test_loader)
    metrics['test_confidence'] = np.mean(test_confidences)
    metrics['train_accuracy'] = 100 * train_correct / train_total
    metrics['train_loss'] = train_loss / len(train_loader)
    metrics['train_confidence'] = np.mean(train_confidences)
    
    return metrics

# Collect epochs and metrics
epochs = []
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
train_confidences = []
test_confidences = []

# Load and evaluate checkpoints from both directories
for directory in ['model_checkpoints', 'model_checkpoints_extended']:
    for filename in sorted(os.listdir(directory)):
        if filename.startswith('model_epoch_'):
            epoch = int(filename.split('_')[-1].split('.')[0])
            print(f"Evaluating epoch {epoch}...")
            model_path = os.path.join(directory, filename)
            metrics = evaluate_saved_model(model_path, test_loader, train_loader)
            # Rest of the code remains same

            epochs.append(epoch)
            train_losses.append(metrics['train_loss'])
            test_losses.append(metrics['test_loss'])
            train_accuracies.append(metrics['train_accuracy'])
            test_accuracies.append(metrics['test_accuracy'])
            train_confidences.append(metrics['train_confidence'])
            test_confidences.append(metrics['test_confidence'])

# Create plots
plt.figure(figsize=(15, 10))

plt.subplot(2, 2, 1)
plt.plot(epochs, train_losses, 'b-', label='Training Loss')
plt.plot(epochs, test_losses, 'r-', label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()
plt.grid(True)

plt.subplot(2, 2, 2)
plt.plot(epochs, train_accuracies, 'b-', label='Training Accuracy')
plt.plot(epochs, test_accuracies, 'r-', label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Accuracy over Epochs')
plt.legend()
plt.grid(True)

plt.subplot(2, 2, 3)
plt.plot(epochs, train_confidences, 'b-', label='Training Confidence')
plt.plot(epochs, test_confidences, 'r-', label='Test Confidence')
plt.xlabel('Epoch')
plt.ylabel('Confidence')
plt.title('Average Confidence over Epochs')
plt.legend()
plt.grid(True)

plt.subplot(2, 2, 4)
confidence_gap = np.array(train_confidences) - np.array(test_confidences)
plt.plot(epochs, confidence_gap, 'g-', label='Confidence Gap')
plt.xlabel('Epoch')
plt.ylabel('Gap (Train - Test)')
plt.title('Confidence Gap over Epochs')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('complete_training_history.png')
plt.close()

In [None]:
# Switch to evaluation mode
model.eval()
y_pred = []
y_true = []
y_prob = []  # List to store probabilities

# Test the model on the test dataset
with torch.no_grad():
    for inputs, labels in test_dataset:  # Use test_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
        y_prob.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy
accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy on the test set: {accuracy:.4f}')

# Convert probabilities to percentages
y_prob_percentages = np.array(y_prob) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages):")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_percentages[i][class_index]:.2f}%")

# Switch to evaluation mode for training data
model.eval()
y_pred_train = []
y_true_train = []
y_prob_train = []  # List to store probabilities

# Test the model on training data
with torch.no_grad():
    for inputs, labels in train_dataset:  # Use train_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred_train.extend(predicted.cpu().numpy())
        y_true_train.extend(labels.cpu().numpy())
        y_prob_train.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy on training data
accuracy_train = accuracy_score(y_true_train, y_pred_train)
print(f'Accuracy on the training set: {accuracy_train:.4f}')

# Convert probabilities to percentages
y_prob_train_percentages = np.array(y_prob_train) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages) for training data:")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true_train[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_train_percentages[i][class_index]:.2f}%")

In [None]:
# Extract probabilities for the correct class on the test data
correct_class_probs_test = [y_prob[i][y_true[i]] for i in range(len(y_true))]
average_prob_correct_class_test = np.mean(correct_class_probs_test)
print(f'Average probability for the correct class on the test data: {average_prob_correct_class_test:.4f}')

# Extract probabilities for the correct class on the training data
correct_class_probs_train = [y_prob_train[i][y_true_train[i]] for i in range(len(y_true_train))]
average_prob_correct_class_train = np.mean(correct_class_probs_train)
print(f'Average probability for the correct class on the training data: {average_prob_correct_class_train:.4f}')

In [None]:
# Switch to evaluation mode
model.eval()
y_pred = []
y_true = []
y_prob = []  # List to store probabilities

# Test the model on the test dataset
with torch.no_grad():
    for inputs, labels in test_dataset:  # Use test_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred.extend(predicted.cpu().numpy())
        y_true.extend(labels.cpu().numpy())
        y_prob.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy
accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy on the test set: {accuracy:.4f}')

# Convert probabilities to percentages
y_prob_percentages = np.array(y_prob) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages):")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_percentages[i][class_index]:.2f}%")

# Switch to evaluation mode for training data
model.eval()
y_pred_train = []
y_true_train = []
y_prob_train = []  # List to store probabilities

# Test the model on training data
with torch.no_grad():
    for inputs, labels in train_dataset:  # Use train_dataset directly
        inputs, labels = inputs.to(device), labels.to(device)
        logits, probabilities = model(inputs)  # Now get both logits and probabilities
        _, predicted = torch.max(logits, 1)
        y_pred_train.extend(predicted.cpu().numpy())
        y_true_train.extend(labels.cpu().numpy())
        y_prob_train.extend(probabilities.cpu().numpy())  # Store probabilities

# Calculate accuracy on training data
accuracy_train = accuracy_score(y_true_train, y_pred_train)
print(f'Accuracy on the training set: {accuracy_train:.4f}')

# Convert probabilities to percentages
y_prob_train_percentages = np.array(y_prob_train) * 100  # Convert probabilities to percentages

# Display the first few predicted class names along with their probabilities and the correct class
print("First few predicted class names and their probabilities (in percentages) for training data:")
for i in range(5):
    print(f"Instance {i+1}:")
    print(f"  Correct Class: {class_names[y_true_train[i]]}")
    for class_index, class_name in enumerate(class_names):
        print(f"  Class: {class_name}, Probability: {y_prob_train_percentages[i][class_index]:.2f}%")

In [None]:
# Extract probabilities for the correct class on the test data
correct_class_probs_test = [y_prob[i][y_true[i]] for i in range(len(y_true))]
average_prob_correct_class_test = np.mean(correct_class_probs_test)
print(f'Average probability for the correct class on the test data: {average_prob_correct_class_test:.4f}')

# Extract probabilities for the correct class on the training data
correct_class_probs_train = [y_prob_train[i][y_true_train[i]] for i in range(len(y_true_train))]
average_prob_correct_class_train = np.mean(correct_class_probs_train)
print(f'Average probability for the correct class on the training data: {average_prob_correct_class_train:.4f}')

In [None]:
print("Difference in percentage is: ", (average_prob_correct_class_train - average_prob_correct_class_test)/average_prob_correct_class_train*100)

# Continue training from epoch 900 to 1500

In [None]:
# First load the saved model from epoch 100
checkpoint_path = os.path.join('model_checkpoints_extended', 'model_epoch_900.pt')
print(f"Loading model from {checkpoint_path}")
saved_model = torch.load(checkpoint_path)
model.load_state_dict(saved_model['model_state_dict'])

# Initialize metrics tracker
metrics_tracker = TrainingMetricsTracker()

# Create directories for model checkpoints and plots
checkpoint_dir = 'model_checkpoints_extended'
plots_dir = 'training_plots'
os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(plots_dir, exist_ok=True)

# Training loop - start from epoch 101 to 400
start_epoch = 901
num_epochs = 1500
last_plot_epoch = start_epoch - 1

for epoch in range(start_epoch, num_epochs + 1):
    model.train()
    running_loss = 0.0
    running_correct = 0
    total_samples = 0
    train_confidence_sum = 0
    train_epoch_confidences = []
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        labels = labels.view(-1)
        logits, probabilities = model(inputs)
        loss = criterion(logits, labels)
        _, predicted = torch.max(logits, 1)
        confidence, _ = torch.max(probabilities, 1)
        running_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)
        train_confidence_sum += confidence.sum().item()
        train_epoch_confidences.extend(confidence.detach().cpu().numpy())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Calculate training metrics
    train_loss = running_loss / len(train_loader)
    train_accuracy = (running_correct / total_samples) * 100
    train_avg_confidence = train_confidence_sum / total_samples
    
    # Evaluate on test set
    test_accuracy, test_loss, test_avg_confidence, test_epoch_confidences = evaluate_model(model, test_loader, device)
    
    # Update metrics tracker
    metrics_tracker.update(
        train_loss, test_loss,
        train_accuracy, test_accuracy,
        train_avg_confidence, test_avg_confidence,
        train_epoch_confidences, test_epoch_confidences
    )
    
    # Print metrics
    print(f'Epoch [{epoch}/{num_epochs}]')
    print(f'  Training: Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.2f}%, Confidence: {train_avg_confidence:.4f}')
    print(f'  Testing:  Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.2f}%, Confidence: {test_avg_confidence:.4f}')
    
    # Create summary plots every 100 epochs
    if epoch % 100 == 0:
        checkpoint_path = os.path.join(checkpoint_dir, f'model_epoch_{epoch}.pt')
        save_pytorch_model(model, checkpoint_path, epoch=epoch)
        print(f'Saved checkpoint: epoch {epoch}')
        plot_filename = os.path.join(plots_dir, f'training_metrics_epoch_{epoch}.png')
        metrics_tracker.plot_metrics(save_path=plot_filename)
        print(f'Saved plots for epochs {last_plot_epoch+1}-{epoch}')
        
        if epoch < num_epochs:
            last_plot_epoch = epoch
            metrics_tracker = TrainingMetricsTracker()
    
    # Perform statistical tests every 50 epochs, but only after we have enough data
    if epoch % 50 == 0 and (epoch - last_plot_epoch) > 0:
        try:
            stats_results = metrics_tracker.statistical_tests(epoch - last_plot_epoch - 1)  # Use -1 to get last complete epoch
            print(f"Stats: KS={stats_results['ks_test']['statistic']:.4f}(p={stats_results['ks_test']['p_value']:.4f}), " 
                  f"MW={stats_results['mw_test']['statistic']:.4f}(p={stats_results['mw_test']['p_value']:.4f}), "
                  f"Cohen's d={stats_results['cohens_d']:.4f}")
            
            metrics_tracker.plot_confidence_distributions(epoch - last_plot_epoch - 1)
        except IndexError:
            print("Skipping statistical tests - not enough data yet")
    
    print('-' * 50)

# Final summary plot
final_plot_filename = os.path.join(plots_dir, f'training_metrics_final_{num_epochs}.png')
metrics_tracker.plot_metrics(save_path=final_plot_filename)

# Test 1500 epooch

In [None]:
def evaluate_saved_model(model_path, test_loader, train_loader):
    global model  # Use the globally defined model
    saved_model = torch.load(model_path, map_location=torch.device('cpu')) # Only if on machine without GPU
    model.load_state_dict(saved_model['model_state_dict'])
    model = model.to('cpu')
    device = torch.device('cpu')
    model.eval()
    
    metrics = {}
    test_correct = 0
    test_total = 0
    test_loss = 0
    test_confidences = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.view(-1)
            logits, probabilities = model(inputs)
            loss = criterion(logits, labels)
            _, predicted = torch.max(logits, 1)
            confidence, _ = torch.max(probabilities, 1)
            test_correct += (predicted == labels).sum().item()
            test_total += labels.size(0)
            test_loss += loss.item()
            test_confidences.extend(confidence.detach().numpy())
    
    train_correct = 0
    train_total = 0
    train_loss = 0
    train_confidences = []
    
    with torch.no_grad():
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.view(-1)
            logits, probabilities = model(inputs)
            loss = criterion(logits, labels)
            _, predicted = torch.max(logits, 1)
            confidence, _ = torch.max(probabilities, 1)
            train_correct += (predicted == labels).sum().item()
            train_total += labels.size(0)
            train_loss += loss.item()
            train_confidences.extend(confidence.detach().numpy())
    
    metrics['test_accuracy'] = 100 * test_correct / test_total
    metrics['test_loss'] = test_loss / len(test_loader)
    metrics['test_confidence'] = np.mean(test_confidences)
    metrics['train_accuracy'] = 100 * train_correct / train_total
    metrics['train_loss'] = train_loss / len(train_loader)
    metrics['train_confidence'] = np.mean(train_confidences)
    
    return metrics

# Collect epochs and metrics
epochs = []
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
train_confidences = []
test_confidences = []
import os
from natsort import natsorted

for directory in ['model_checkpoints', 'model_checkpoints_extended']:
    for filename in natsorted(os.listdir(directory)):
        if filename.startswith('model_epoch_'):
            epoch = int(filename.split('_')[-1].split('.')[0])
            print(f"Evaluating epoch {epoch}...")
            model_path = os.path.join(directory, filename)
            metrics = evaluate_saved_model(model_path, test_loader, train_loader)
            # Rest of the code remains same

            epochs.append(epoch)
            train_losses.append(metrics['train_loss'])
            test_losses.append(metrics['test_loss'])
            train_accuracies.append(metrics['train_accuracy'])
            test_accuracies.append(metrics['test_accuracy'])
            train_confidences.append(metrics['train_confidence'])
            test_confidences.append(metrics['test_confidence'])

# Create plots
plt.figure(figsize=(15, 10))

plt.subplot(2, 2, 1)
plt.plot(epochs, train_losses, 'b-', label='Training Loss')
plt.plot(epochs, test_losses, 'r-', label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()
plt.grid(True)

plt.subplot(2, 2, 2)
plt.plot(epochs, train_accuracies, 'b-', label='Training Accuracy')
plt.plot(epochs, test_accuracies, 'r-', label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Accuracy over Epochs')
plt.legend()
plt.grid(True)

plt.subplot(2, 2, 3)
plt.plot(epochs, train_confidences, 'b-', label='Training Confidence')
plt.plot(epochs, test_confidences, 'r-', label='Test Confidence')
plt.xlabel('Epoch')
plt.ylabel('Confidence')
plt.title('Average Confidence over Epochs')
plt.legend()
plt.grid(True)

plt.subplot(2, 2, 4)
confidence_gap = np.array(train_confidences) - np.array(test_confidences)
plt.plot(epochs, confidence_gap, 'g-', label='Confidence Gap')
plt.xlabel('Epoch')
plt.ylabel('Gap (Train - Test)')
plt.title('Confidence Gap over Epochs')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('complete_training_history.png')
plt.close()

# New with JSON

In [28]:
def train_evaluate_mamba(model, train_loader, test_loader, num_epochs=400, device='cuda', 
                         checkpoint_dir='model_checkpoints_extended'):
    """Train and evaluate MAMBA with checkpointing - exact same format as CNN"""
    os.makedirs(checkpoint_dir, exist_ok=True)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
    
    # Use the same dictionary format as CNN
    metrics = {
        'train_losses': [], 'test_losses': [],
        'train_accuracies': [], 'test_accuracies': [],
        'train_confidences': [], 'test_confidences': [],
        'epoch_train_confidences': [], 'epoch_test_confidences': []
    }
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        running_correct = 0
        total_samples = 0
        train_confidence_sum = 0
        train_epoch_confidences = []
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.view(-1)
            optimizer.zero_grad()
            logits, probabilities = model(inputs)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
            
            _, predicted = torch.max(logits, 1)
            confidence, _ = torch.max(probabilities, 1)
            running_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            train_confidence_sum += confidence.sum().item()
            train_epoch_confidences.extend(confidence.detach().cpu().numpy())
            running_loss += loss.item()
        
        train_loss = running_loss / len(train_loader)
        train_accuracy = (running_correct / total_samples) * 100
        train_avg_confidence = train_confidence_sum / total_samples
        
        # Testing phase
        model.eval()
        test_loss = 0.0
        test_correct = 0
        test_total = 0
        test_confidence_sum = 0
        test_epoch_confidences = []
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                labels = labels.view(-1)
                logits, probabilities = model(inputs)
                loss = criterion(logits, labels)
                
                _, predicted = torch.max(logits, 1)
                confidence, _ = torch.max(probabilities, 1)
                test_correct += (predicted == labels).sum().item()
                test_total += labels.size(0)
                test_confidence_sum += confidence.sum().item()
                test_epoch_confidences.extend(confidence.detach().cpu().numpy())
                test_loss += loss.item()
        
        test_loss = test_loss / len(test_loader)
        test_accuracy = (test_correct / test_total) * 100
        test_avg_confidence = test_confidence_sum / test_total
        
        # Store metrics exactly like CNN
        metrics['train_losses'].append(train_loss)
        metrics['test_losses'].append(test_loss)
        metrics['train_accuracies'].append(train_accuracy)
        metrics['test_accuracies'].append(test_accuracy)
        metrics['train_confidences'].append(train_avg_confidence)
        metrics['test_confidences'].append(test_avg_confidence)
        metrics['epoch_train_confidences'].append(train_epoch_confidences)
        metrics['epoch_test_confidences'].append(test_epoch_confidences)
        
        # Save metrics to JSON exactly like CNN
        metrics_path = os.path.join(checkpoint_dir, 'training_metrics.json')
        with open(metrics_path, 'w') as f:
            json_metrics = {
                'train_losses': [round(float(x), 4) for x in metrics['train_losses']],
                'test_losses': [round(float(x), 4) for x in metrics['test_losses']],
                'train_accuracies': [round(float(x), 4) for x in metrics['train_accuracies']],
                'test_accuracies': [round(float(x), 4) for x in metrics['test_accuracies']],
                'train_confidences': [round(float(x), 4) for x in metrics['train_confidences']],
                'test_confidences': [round(float(x), 4) for x in metrics['test_confidences']],
                'epoch_train_confidences': [[round(float(x), 4) for x in arr] if isinstance(arr, (np.ndarray, list)) 
                                          else round(float(arr), 4) for arr in metrics['epoch_train_confidences']],
                'epoch_test_confidences': [[round(float(x), 4) for x in arr] if isinstance(arr, (np.ndarray, list)) 
                                         else round(float(arr), 4) for arr in metrics['epoch_test_confidences']],
                'current_epoch': epoch + 1
            }
            json.dump(json_metrics, f, indent=4)
        
        # Print progress every 10 epochs (exactly like CNN)
        if epoch % 10 == 0:
            print(f'Epoch [{epoch}/{num_epochs}]')
            print(f'  Training: Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.2f}%, Confidence: {train_avg_confidence:.4f}')
            print(f'  Testing:  Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.2f}%, Confidence: {test_avg_confidence:.4f}')
        
        # Save checkpoint every 50 epochs (exactly like CNN)
        if (epoch + 1) % 50 == 0:
            checkpoint_path = os.path.join(checkpoint_dir, f'mamba_model_epoch_{epoch+1}.pt')
            checkpoint = {
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'metrics': metrics
            }
            torch.save(checkpoint, checkpoint_path)
            print(f'Checkpoint saved: {checkpoint_path}')
    
    return metrics

In [None]:
# Initialize and train the model
d_model = 64
n_layer = 4
num_classes = 10  # CIFAR-10 has 10 classes

# Create model args and initialize model
model_args = ModelArgs(d_model=d_model, n_layer=n_layer, vocab_size=0)
model = ImageMamba(model_args, num_classes=num_classes)

# Set the device and move model to it
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Train the model
metrics = train_evaluate_mamba(
    model=model,
    train_loader=train_loader,
    test_loader=test_loader,
    num_epochs=400,
    device=device
)