In [None]:
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np

# --- 1. Device Configuration ---
# Use CUDA (GPU) if available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- 2. Hyperparameters ---
BATCH_SIZE = 64
LEARNING_RATE = 0.001 # A common learning rate for Adam optimizer
NUM_EPOCHS = 5

# You can increase this for better performance

# --- 3. Data Loading and Preprocessing ---
# Define transformations to apply to the MNIST images
# ToTensor() converts PIL Image to PyTorch Tensor and scales pixel values to [0, 1]
# Normalize() normalizes the pixel values (mean=0.1307, std=0.3081 are standard for MNIST)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,)) # Mean and Std Dev for MNIST
])
"""
Preserves Spatial Relationships: It maintains the relative differences between adjacent pixels, 
which is crucial for convolutional networks to learn spatial features. If you normalized each 
pixel independently, you might distort these relationships.

Loss of Positional Invariance (partially): A core strength of CNNs is their ability to learn 
filters that are useful regardless of where in the image a feature appears (translational invariance).
If you normalize each pixel position differently, you make that position more unique, 
potentially hindering the network's ability to learn generalizable features.
"""

# Download and load the training dataset
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Download and load the test dataset
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
test_loader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Training data samples: {len(train_dataset)}")
print(f"Test data samples: {len(test_dataset)}")
print(f"Number of training batches: {len(train_loader)}")
print(f"Number of test batches: {len(test_loader)}")

# --- 4. Model Definition (LeNet-like Architecture) ---
# This model matches the layer progression you described:
# Conv -> Pool -> Conv -> Pool -> Conv -> Flatten -> Dense -> Dense
class LeNetLike(nn.Module):
    def __init__(self):
        # super(LeNetLike, self).__init__()
        super().__init__()
        
        # Layer 1: Convolutional Layer
        # Input: (N, 1, 28, 28) -> Output: (N, 6, 24, 24)
        # (28 - 5)/1 + 1 = 24
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=0)
        self.tanh = nn.Tanh() # Activation function as per your model definition

        # Layer 2: Pooling Layer (Average Pooling)
        # Input: (N, 6, 24, 24) -> Output: (N, 6, 12, 12)
        # 24 / 2 = 12
        self.pool1 = nn.AvgPool2d(kernel_size=2, stride=2, padding=0)

        # Layer 3: Convolutional Layer
        # Input: (N, 6, 12, 12) -> Output: (N, 16, 8, 8)
        # (12 - 5)/1 + 1 = 8
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=0)
        # self.tanh2 = nn.Tanh() # Activation function

        # Layer 4: Pooling Layer (Average Pooling)
        # Input: (N, 16, 8, 8) -> Output: (N, 16, 4, 4)
        # 8 / 2 = 4
        self.pool2 = nn.AvgPool2d(kernel_size=2, stride=2, padding=0)

        # Layer 5: Convolutional Layer (often called C5 in LeNet)
        # Input: (N, 16, 4, 4) -> Output: (N, 120, 1, 1)
        # (4 - 4)/1 + 1 = 1
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=4, stride=1, padding=0)
        # self.tanh3 = nn.Tanh() # Activation function

        # Flatten Layer: Converts 3D feature maps to 1D vector
        # Input: (N, 120, 1, 1) -> Output: (N, 120)
        # nn.Flatten is simpler than manual reshape
        self.flatten = nn.Flatten()

        # Layer 6: Dense (Linear) Layer
        # Input: (N, 120) -> Output: (N, 84)
        self.fc1 = nn.Linear(in_features=120, out_features=84)
        # self.tanh4 = nn.Tanh() # Activation function

        # Layer 7: Dense (Linear) Layer (Output Layer)
        # Input: (N, 84) -> Output: (N, 10) (10 classes for MNIST digits)
        # No activation here, as CrossEntropyLoss expects raw logits.
        self.fc2 = nn.Linear(in_features=84, out_features=10)

    def forward(self, x):
        # Pass input through each layer
        x = self.conv1(x)
        x = self.tanh(x)
        x = self.pool1(x)

        x = self.conv2(x)
        x = self.tanh(x)
        x = self.pool2(x)

        x = self.conv3(x)
        x = self.tanh(x)
        
        x = self.flatten(x) # Flatten the feature maps into a vector

        x = self.fc1(x)
        x = self.tanh(x)

        x = self.fc2(x) # Output logits
        return x

# Create an instance of the model and move it to the configured device (CPU/GPU)
model = LeNetLike().to(device)
# --- 5. Loss Function and Optimizer ---
# CrossEntropyLoss combines nn.LogSoftmax and nn.NLLLoss in one
criterion = nn.CrossEntropyLoss()
# Adam optimizer is a good general-purpose optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# --- 6. Training Loop ---
print("\n--- Starting Training ---")
train_losses = []
test_losses = []
test_accuracies = []



for epoch in range(NUM_EPOCHS):
    model.train() # Set the model to training mode
    running_loss = 0.0
    for batch_idx, (data, targets) in enumerate(train_loader):
        # Move data and targets to the device (CPU/GPU)
        data = data.to(device)
        targets = targets.to(device)

        # Forward pass: compute predicted outputs by passing inputs to the model
        outputs = model(data)
        
        # Calculate the loss
        loss = criterion(outputs, targets)
        
        # Zero the gradients of the optimizer
        optimizer.zero_grad()
        
        # Backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        
        # Perform a single optimization step (parameter update)
        optimizer.step()
        
        running_loss += loss.item() # Accumulate loss
        
        if (batch_idx + 1) % 100 == 0: # Print every 100 batches
            avg_batch_loss = running_loss / (batch_idx + 1)
            print(f"Epoch: {epoch+1}/{NUM_EPOCHS}, Batch: {batch_idx+1}/{len(train_loader)}, Loss: {avg_batch_loss:.4f}")
        
    # Calculate average training loss for the epoch
    epoch_avg_loss = running_loss / len(train_loader)
    train_losses.append(epoch_avg_loss)
    print(f"Epoch {epoch+1} finished. Average Training Loss: {epoch_avg_loss:.4f}")

    # --- 7. Evaluation after each epoch ---
    model.eval() # Set the model to evaluation mode (disables dropout, batchnorm updates)
    test_running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    # Disable gradient calculation during evaluation for efficiency
    with torch.no_grad():
        for data, targets in test_loader:
            data = data.to(device)
            targets = targets.to(device)

            outputs = model(data)
            loss = criterion(outputs, targets)
            test_running_loss += loss.item()

            # Get predicted class (the index of the max log-probability)
            _, predicted = torch.max(outputs.data, 1)
            total_samples += targets.size(0)
            correct_predictions += (predicted == targets).sum().item()

    avg_test_loss = test_running_loss / len(test_loader)
    accuracy = (correct_predictions / total_samples) * 100
    test_losses.append(avg_test_loss)
    test_accuracies.append(accuracy)

    print(f"Epoch {epoch+1} Evaluation - Test Loss: {avg_test_loss:.4f}, Accuracy: {accuracy:.2f}%\n")

print("--- Training Complete ---")

# --- 8. Plotting Training and Test Loss/Accuracy ---
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(range(1, NUM_EPOCHS + 1), train_losses, label='Training Loss')
plt.plot(range(1, NUM_EPOCHS + 1), test_losses, label='Test Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(range(1, NUM_EPOCHS + 1), test_accuracies, label='Test Accuracy', color='green')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# --- 9. Test with a few samples (Optional) ---
print("\n--- Testing with a few samples ---")
model.eval()
data_iter = iter(test_loader)
images, labels = next(data_iter) # Get a batch

# Display some images and their predictions
fig, axes = plt.subplots(1, 5, figsize=(10, 3))
for i in range(5):
    img = images[i].to(device)
    true_label = labels[i].item()

    with torch.no_grad():
        output = model(img.unsqueeze(0)) # Add batch dimension for single image
        _, predicted = torch.max(output.data, 1)
        predicted_label = predicted.item()

    # Denormalize image for display
    # img_display = img.cpu().squeeze().numpy() * 0.3081 + 0.1307 # if normalized
    img_display = img.cpu().squeeze().numpy() # If not denormalizing, assuming it's okay for visual

    axes[i].imshow(img_display, cmap='gray')
    axes[i].set_title(f"True: {true_label}\nPred: {predicted_label}", color='green' if true_label == predicted_label else 'red')
    axes[i].axis('off')
plt.tight_layout()
plt.show()



In [None]:
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np
import json
import os
import datetime
from collections import defaultdict
import platform
import sklearn.metrics as skmetrics
import ast

# --- Start of Script Execution Timestamp ---
current_time_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
EXPERIMENT_ID = f"LeNet_MNIST_Run_{current_time_str}"
LOG_DIR = "experiment_logs"
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE_PATH = os.path.join(LOG_DIR, f"{EXPERIMENT_ID}.json")

experiment_log = defaultdict(dict)
experiment_log["experiment_id"] = EXPERIMENT_ID
experiment_log["date_time_started"] = datetime.datetime.now().isoformat()
experiment_log["description"] = "Baseline LeNet-like model on MNIST with default Adam parameters and added logging.\
    With no normalisation of data."

# --- 1. Device Configuration ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
experiment_log["hardware"]["device"] = str(device)
if torch.cuda.is_available():
    experiment_log["hardware"]["gpu_name"] = torch.cuda.get_device_name(0)
    experiment_log["hardware"]["gpu_vram_gb"] = torch.cuda.get_device_properties(0).total_memory / (1024**3)
experiment_log["hardware"]["cpu_cores"] = os.cpu_count()

# Add software environment details
experiment_log["software_env"]["python_version"] = platform.python_version()
experiment_log["software_env"]["pytorch_version"] = torch.__version__
experiment_log["software_env"]["torchvision_version"] = torchvision.__version__
if torch.cuda.is_available():
    experiment_log["software_env"]["cuda_version"] = torch.version.cuda
    experiment_log["software_env"]["cudnn_version"] = torch.backends.cudnn.version()


# --- 2. Hyperparameters ---
BATCH_SIZE = 64
LEARNING_RATE = 0.001
NUM_EPOCHS = 5

experiment_log["hyperparameters"]["batch_size"] = BATCH_SIZE
experiment_log["hyperparameters"]["learning_rate"] = LEARNING_RATE
experiment_log["hyperparameters"]["num_epochs"] = NUM_EPOCHS
# experiment_log["hyperparameters"]["optimizer_name"] = "Adam"
# experiment_log["hyperparameters"]["loss_function_name"] = "CrossEntropyLoss"

mean = 0.1307
std = 0.3081
# --- 3. Data Loading and Preprocessing ---
transform = transforms.Compose([
    transforms.ToTensor(),
    # transforms.Normalize((mean,), (std,)) # Normalization disabled as per original request
])

experiment_log["dataset"]["name"] = "MNIST"
experiment_log["dataset"]["transformations"] = [str(t) for t in transform.transforms]
# experiment_log["dataset"]["normalization_mean"] = [mean]
# experiment_log["dataset"]["normalization_std"] = [std]


train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
# FIX: Corrected the DataLoader creation for test_loader
test_loader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False)

experiment_log["dataset"]["train_samples"] = len(train_dataset)
experiment_log["dataset"]["test_samples"] = len(test_dataset)
experiment_log["dataset"]["num_train_batches"] = len(train_loader)
experiment_log["dataset"]["num_test_batches"] = len(test_loader)

train_labels = np.array(train_dataset.targets)
unique, counts = np.unique(train_labels, return_counts=True)
experiment_log["dataset"]["class_distribution_train"] = {str(u): int(c) for u, c in zip(unique, counts)}


print(f"Training data samples: {len(train_dataset)}")
print(f"Test data samples: {len(test_dataset)}")
print(f"Number of training batches: {len(train_loader)}")
print(f"Number of test batches: {len(test_loader)}")

# --- 4. Model Definition (LeNet-like Architecture) ---
class LeNetLike(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Define a single Tanh activation layer for reuse
        self.activation = nn.ReLU()
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=0)
        self.pool1 = nn.AvgPool2d(kernel_size=2, stride=2, padding=0)

        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=0)
        self.pool2 = nn.AvgPool2d(kernel_size=2, stride=2, padding=0)

        self.conv3 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=4, stride=1, padding=0)
        
        self.flatten_op = nn.Flatten() # Renamed to avoid potential conflict with method names

        self.fc1 = nn.Linear(in_features=120, out_features=84)
        self.fc2 = nn.Linear(in_features=84, out_features=10)
        

    def forward(self, x):
        x = self.conv1(x)
        x = self.activation(x) # Reusing the single tanh instance
        x = self.pool1(x)

        x = self.conv2(x)
        x = self.activation(x) # Reusing the single tanh instance
        x = self.pool2(x)

        x = self.conv3(x)
        x = self.activation(x) # Reusing the single tanh instance
        
        x = self.flatten_op(x) # Use the named flatten module

        x = self.fc1(x)
        x = self.activation(x) # Reusing the single tanh instance

        x = self.fc2(x)
        return x

model = LeNetLike().to(device)

# --- Global flag and function for dynamic model architecture logging ---
_model_architecture_traced = False

def trace_model_architecture(model_instance, dummy_input_for_trace, experiment_log_dict):
    """
    Traces the model's forward pass once using hooks to log layer details and output shapes.
    """
    model_trace_results = []
    hooks = []
    
    # Counter for uniquely naming repeated activation layers (e.g., relu_1, relu_2)
    activation_counters = defaultdict(int)
    
    # Create a mapping from module instance to its "user-defined" name from named_children()
    # This will give us names like 'conv1', 'relu', 'pool1' etc.
    named_children_map = {module: name for name, module in model_instance.named_children()}
    
    def forward_hook_fn(module, input, output):
        # Skip the top-level model container itself or empty Sequential (if any)
        if module == model_instance or (isinstance(module, nn.Sequential) and not list(module.children())):
            return

        layer_info = {
            "type": module.__class__.__name__,
        }
        
        # Determine the layer's name based on its type and whether it's a named child
        if isinstance(module, (nn.ReLU, nn.Tanh, nn.Sigmoid)):
            # For activations, always use a counter to differentiate multiple uses
            activation_counters[module.__class__.__name__] += 1
            layer_info["name"] = f"{module.__class__.__name__.lower()}_{activation_counters[module.__class__.__name__]}"
            layer_info["activation"] = module.__class__.__name__ # Add activation field
        elif module in named_children_map:
            # For other modules that are direct named children, use their defined name
            layer_info["name"] = named_children_map[module]
        else:
            # Fallback for other modules (e.g., if inside an nn.Sequential not directly named)
            layer_info["name"] = module.__class__.__name__.lower() # Use lowercase type as fallback name

        # Input shape (take the first element if input is a tuple, otherwise it's the tensor itself)
        input_tensor_for_shape = input[0] if isinstance(input, tuple) else input
        layer_info["input_shape"] = str(list(input_tensor_for_shape.shape))

        # Add specific parameters based on module type
        if isinstance(module, nn.Conv2d):
            layer_info.update({
                "in_channels": module.in_channels,
                "out_channels": module.out_channels,
                "kernel_size": str(module.kernel_size),
                "stride": str(module.stride),
                "padding": str(module.padding)
            })
        elif isinstance(module, (nn.Linear)):
            layer_info.update({
                "in_features": module.in_features,
                "out_features": module.out_features
            })
        elif isinstance(module, (nn.AvgPool2d, nn.MaxPool2d)):
            layer_info.update({
                "kernel_size": str(module.kernel_size),
                "stride": str(module.stride),
                "padding": str(module.padding)
            })
        elif isinstance(module, nn.Flatten):
            pass # No extra parameters needed, type is enough

        layer_info["output_shape"] = str(list(output.shape))
        model_trace_results.append(layer_info)

    # Register hooks to all submodules (recursively using named_modules)
    # The order of execution in forward pass will trigger hooks in that order.
    for _, module in model_instance.named_modules():
        # Skip the top-level model module itself, as it's just a container.
        # Also skip modules that are just containers with no parameters or specific ops (e.g., empty nn.Sequential)
        if module == model_instance or (not list(module.children()) and not list(module.parameters()) and not isinstance(module, (nn.ReLU, nn.Tanh, nn.Sigmoid, nn.Flatten))):
            continue
        
        hook = module.register_forward_hook(forward_hook_fn)
        hooks.append(hook)

    # Perform a dummy forward pass to trigger hooks
    with torch.no_grad():
        _ = model_instance(dummy_input_for_trace)

    # Remove hooks after tracing to avoid performance overhead during training
    for hook in hooks:
        hook.remove()
    
    # Populate the experiment log with the traced results
    experiment_log_dict["model_architecture"]["layers"] = model_trace_results


# --- Dynamic Model Architecture Logging using Hooks (Execution) ---
experiment_log["model_architecture"]["name"] = model.__class__.__name__
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
experiment_log["model_architecture"]["total_trainable_parameters"] = total_params

if not _model_architecture_traced:
    dummy_input_for_trace = torch.randn(1, 1, 28, 28).to(device)
    trace_model_architecture(model, dummy_input_for_trace, experiment_log)
    _model_architecture_traced = True # Set the flag to True after tracing

print(f"Total trainable parameters: {experiment_log['model_architecture']['total_trainable_parameters']}")


# --- 5. Loss Function and Optimizer ---
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

experiment_log["hyperparameters"]["optimizer_name"] = optimizer.__class__.__name__
optimizer_params_cleaned = {}
for k, v in optimizer.defaults.items():
    if isinstance(v, (torch.Tensor, np.ndarray)):
        optimizer_params_cleaned[k] = v.item() if v.numel() == 1 else v.tolist()
    else:
        optimizer_params_cleaned[k] = v
experiment_log["hyperparameters"]["optimizer_params"] = optimizer_params_cleaned
experiment_log["hyperparameters"]["loss_function_name"] = criterion.__class__.__name__


# --- 6. Training Loop ---
print("\n--- Starting Training ---")
experiment_log["training_results"]["train_losses_per_epoch"] = []
experiment_log["training_results"]["test_losses_per_epoch"] = []
experiment_log["training_results"]["test_accuracies_per_epoch"] = []
experiment_log["training_results"]["test_precisions_macro_per_epoch"] = []
experiment_log["training_results"]["test_recalls_macro_per_epoch"] = []
experiment_log["training_results"]["test_f1_scores_macro_per_epoch"] = []
experiment_log["training_results"]["test_roc_auc_macro_per_epoch"] = []


best_test_accuracy = -1.0
best_epoch = -1
best_test_loss = float('inf')

if torch.cuda.is_available():
    torch.cuda.synchronize()
total_training_start_time = time.time()

for epoch in range(NUM_EPOCHS):
    if torch.cuda.is_available():
        torch.cuda.synchronize()
    epoch_start_time = time.time()

    model.train()
    running_loss = 0.0
    for batch_idx, (data, targets) in enumerate(train_loader):
        data = data.to(device)
        targets = targets.to(device)

        outputs = model(data)
        loss = criterion(outputs, targets)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        if (batch_idx + 1) % 100 == 0:
            avg_batch_loss = running_loss / (batch_idx + 1)
            print(f"Epoch: {epoch+1}/{NUM_EPOCHS}, Batch: {batch_idx+1}/{len(train_loader)}, Loss: {avg_batch_loss:.4f}")
        
    epoch_avg_loss = running_loss / len(train_loader)
    experiment_log["training_results"]["train_losses_per_epoch"].append(epoch_avg_loss)
    
    # --- 7. Evaluation after each epoch ---
    model.eval()
    test_running_loss = 0.0
    
    epoch_true_labels = []
    epoch_pred_labels = []
    epoch_pred_scores = []

    with torch.no_grad():
        for data, targets in test_loader:
            data = data.to(device)
            targets = targets.to(device)

            outputs = model(data)
            loss = criterion(outputs, targets)
            test_running_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            
            epoch_true_labels.extend(targets.cpu().numpy())
            epoch_pred_labels.extend(predicted.cpu().numpy())
            epoch_pred_scores.extend(outputs.cpu().numpy())

    avg_test_loss = test_running_loss / len(test_loader)
    accuracy = float(skmetrics.accuracy_score(epoch_true_labels, epoch_pred_labels) * 100)
    
    experiment_log["training_results"]["test_losses_per_epoch"].append(avg_test_loss)
    experiment_log["training_results"]["test_accuracies_per_epoch"].append(accuracy)

    precision, recall, f1, _ = skmetrics.precision_recall_fscore_support(
        epoch_true_labels, epoch_pred_labels, average='macro', zero_division=0
    )
    experiment_log["training_results"]["test_precisions_macro_per_epoch"].append(float(precision))
    experiment_log["training_results"]["test_recalls_macro_per_epoch"].append(float(recall))
    experiment_log["training_results"]["test_f1_scores_macro_per_epoch"].append(float(f1))

    try:
        roc_auc = float(skmetrics.roc_auc_score(epoch_true_labels, epoch_pred_scores, multi_class='ovr', average='macro'))
    except ValueError as e:
        roc_auc = None
        print(f"Warning: Could not compute ROC AUC for epoch {epoch+1}: {e}")
    experiment_log["training_results"]["test_roc_auc_macro_per_epoch"].append(roc_auc)


    # Track best model based on test accuracy
    if accuracy > best_test_accuracy:
        best_test_accuracy = accuracy
        best_epoch = epoch + 1
        best_test_loss = avg_test_loss
        
        # Store detailed metrics for the best epoch
        experiment_log["final_evaluation_metrics"]["best_epoch_details"] = {
            "epoch_number": best_epoch,
            "test_accuracy": float(best_test_accuracy),
            "test_loss": float(best_test_loss),
            "precision_macro_avg": float(precision),
            "recall_macro_avg": float(f1),
            "f1_score_macro_avg": float(f1),
            "roc_auc_macro_avg": roc_auc,
            "confusion_matrix_at_best_epoch": str(skmetrics.confusion_matrix(epoch_true_labels, epoch_pred_labels).tolist())
        }

    if torch.cuda.is_available():
        torch.cuda.synchronize()
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time

    print(f"Epoch {epoch+1} finished. Avg Training Loss: {epoch_avg_loss:.4f}, Test Loss: {avg_test_loss:.4f}, Test Acc: {accuracy:.2f}%, Time: {epoch_time:.2f}s\n")


if torch.cuda.is_available():
    torch.cuda.synchronize()
total_training_end_time = time.time()
total_training_time = total_training_end_time - total_training_start_time

experiment_log["training_results"]["total_training_time_seconds"] = total_training_time
experiment_log["training_results"]["avg_time_per_epoch_seconds"] = total_training_time / NUM_EPOCHS


print("--- Training Complete ---")
print(f"Total training duration: {total_training_time:.2f} seconds")

# --- Final Metrics after training ---
model.eval()
final_true_labels = []
final_pred_labels = []
final_pred_scores = []
final_images = [] # Store images to display wrong predictions

with torch.no_grad():
    for data, targets in test_loader:
        data_cpu = data.cpu() # Keep on CPU for numpy conversion later
        targets_cpu = targets.cpu() # Keep on CPU for numpy conversion later

        data = data.to(device)
        targets = targets.to(device)
        outputs = model(data)
        _, predicted = torch.max(outputs.data, 1)
        
        final_true_labels.extend(targets_cpu.numpy())
        final_pred_labels.extend(predicted.cpu().numpy())
        final_pred_scores.extend(outputs.cpu().numpy())
        final_images.append(data_cpu) # Store the images

final_accuracy = float(skmetrics.accuracy_score(final_true_labels, final_pred_labels) * 100)
final_precision, final_recall, final_f1, _ = skmetrics.precision_recall_fscore_support(
    final_true_labels, final_pred_labels, average='macro', zero_division=0
)
final_precision = float(final_precision)
final_recall = float(final_recall)
final_f1 = float(final_f1)

try:
    final_roc_auc = float(skmetrics.roc_auc_score(final_true_labels, final_pred_scores, multi_class='ovr', average='macro'))
except ValueError:
    final_roc_auc = None
final_cm = str(skmetrics.confusion_matrix(final_true_labels, final_pred_labels).tolist())

experiment_log["final_evaluation_metrics"]["overall_accuracy_last_epoch"] = final_accuracy
experiment_log["final_evaluation_metrics"]["confusion_matrix_last_epoch"] = final_cm
experiment_log["final_evaluation_metrics"]["precision_macro_avg_last_epoch"] = final_precision
experiment_log["final_evaluation_metrics"]["recall_macro_avg_last_epoch"] = final_recall
experiment_log["final_evaluation_metrics"]["f1_score_macro_avg_last_epoch"] = final_f1
experiment_log["final_evaluation_metrics"]["roc_auc_macro_avg_last_epoch"] = final_roc_auc

precision_per_class, recall_per_class, f1_per_class, _ = skmetrics.precision_recall_fscore_support(
    final_true_labels, final_pred_labels, average=None, zero_division=0
)
per_class_metrics = {}
for i in range(len(unique)):
    per_class_metrics[str(i)] = {
        "precision": float(precision_per_class[i]),
        "recall": float(recall_per_class[i]),
        "f1_score": float(f1_per_class[i])
    }
experiment_log["final_evaluation_metrics"]["per_class_metrics_last_epoch"] = per_class_metrics


experiment_log["notes_observations"] = "Initial run of LeNet-like model. Achieved good accuracy. Consider more epochs or data augmentation next."

# --- Save the experiment log to a JSON file ---
print(f"\nSaving experiment log to {LOG_FILE_PATH}")
with open(LOG_FILE_PATH, 'w') as f:
    for key in ["train_losses_per_epoch", "test_losses_per_epoch",
                "test_accuracies_per_epoch", "test_precisions_macro_per_epoch",
                "test_recalls_macro_per_epoch", "test_f1_scores_macro_per_epoch",
                "test_roc_auc_macro_per_epoch"]:
        if key in experiment_log["training_results"] and isinstance(experiment_log["training_results"][key], list):
            # Convert list to string representation for JSON compatibility
            experiment_log["training_results"][key] = str(experiment_log["training_results"][key])
            
    json.dump(experiment_log, f, indent=4)

# --- 8. Plotting Training and Test Loss/Accuracy ---
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
# Use ast.literal_eval to convert string back to list for plotting
train_losses_plot = ast.literal_eval(experiment_log["training_results"]["train_losses_per_epoch"])
test_losses_plot = ast.literal_eval(experiment_log["training_results"]["test_losses_per_epoch"])
plt.plot(range(1, NUM_EPOCHS + 1), train_losses_plot, label='Training Loss')
plt.plot(range(1, NUM_EPOCHS + 1), test_losses_plot, label='Test Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
test_accuracies_plot = ast.literal_eval(experiment_log["training_results"]["test_accuracies_per_epoch"])
plt.plot(range(1, NUM_EPOCHS + 1), test_accuracies_plot, label='Test Accuracy', color='green')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# --- 9. Test with a few random samples (Optional) ---
print("\n--- Testing with a few random samples ---")
model.eval()
data_iter = iter(test_loader)
images, labels = next(data_iter) # Get the first batch for display

fig, axes = plt.subplots(1, 5, figsize=(10, 3))
for i in range(5):
    img = images[i].to(device)
    true_label = labels[i].item()

    with torch.no_grad():
        output = model(img.unsqueeze(0))
        _, predicted = torch.max(output.data, 1)
        predicted_label = predicted.item()

    img_display = img.cpu().squeeze().numpy()

    axes[i].imshow(img_display, cmap='gray')
    axes[i].set_title(f"True: {true_label}\nPred: {predicted_label}", color='green' if true_label == predicted_label else 'red')
    axes[i].axis('off')
plt.tight_layout()
plt.show()


# --- 10. Display Wrong Predictions ---
print("\n--- Displaying Wrong Predictions (up to 20 samples) ---")
# Flatten the list of batches into single tensors for easier indexing
all_test_images = torch.cat(final_images, dim=0)

wrong_indices = [i for i, (true, pred) in enumerate(zip(final_true_labels, final_pred_labels)) if true != pred]

if not wrong_indices:
    print("No wrong predictions found! Model achieved 100% accuracy.")
else:
    num_wrong_to_show = max(len(wrong_indices), 20) # Show up to 20 wrong predictions
    
    # Calculate grid dimensions: try to make it somewhat square-like
    # For 20, 4 rows x 5 cols is good. For fewer, adjust.
    num_cols = 5
    num_rows = (num_wrong_to_show + num_cols - 1) // num_cols

    fig_wrong, axes_wrong = plt.subplots(num_rows, num_cols, figsize=(num_cols * 2, num_rows * 2.5))
    axes_wrong = axes_wrong.flatten() # Flatten for easy iteration

    print(f"Found {len(wrong_indices)} wrong predictions. Displaying {num_wrong_to_show} of them.")

    for i, wrong_idx in enumerate(wrong_indices[:num_wrong_to_show]):
        img = all_test_images[wrong_idx].squeeze().numpy()
        true_label = final_true_labels[wrong_idx]
        predicted_label = final_pred_labels[wrong_idx]

        axes_wrong[i].imshow(img, cmap='gray')
        axes_wrong[i].set_title(f"True: {true_label}\nPred: {predicted_label}", color='red')
        axes_wrong[i].axis('off')
    
    # Hide any unused subplots
    for j in range(i + 1, len(axes_wrong)):
        fig_wrong.delaxes(axes_wrong[j])

    plt.tight_layout()
    plt.suptitle("Misclassified Digits", y=1.02, fontsize=16) # Add a main title
    plt.show()

In [14]:
# Fixed: use 'lambda', close the parenthesis, and convert map to list for display
result = list(map(lambda x, y: x + y, [1, 2, 3], [1, 2, 3]))
print(result)

[2, 4, 6]
