# Imports

In [1]:
# Import required libraries
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import random
import os
import time
from datetime import datetime
from tqdm import tqdm
import json
import wandb
from sklearn.datasets import fetch_openml
import glob
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score, roc_curve
from torchvision import datasets, transforms

from IPython import get_ipython
get_ipython().kernel.do_one_iteration = lambda: None




# Implement Classes

In [None]:
class ReLU:
    """ReLU activation function"""
    
    def forward(self, x):
        """Forward pass: f(x) = max(0, x)"""
        self.input = x
        return np.maximum(0, x)
    
    def backward(self, grad_output):
        """Backward pass: f'(x) = 1 if x > 0, else 0"""
        return grad_output * (self.input > 0)

class Tanh:
    """Tanh activation function"""
    
    def forward(self, x):
        """Forward pass: f(x) = tanh(x)"""
        self.output = np.tanh(x)
        return self.output
    
    def backward(self, grad_output):
        """Backward pass: f'(x) = 1 - tanh²(x)"""
        return grad_output * (1 - self.output ** 2)

class Sigmoid:
    """Sigmoid activation function"""
    
    def forward(self, x):
        """Forward pass: f(x) = 1 / (1 + exp(-x))"""
        # Clip x to prevent overflow
        x_clipped = np.clip(x, -500, 500)
        self.output = 1 / (1 + np.exp(-x_clipped))
        return self.output
    
    def backward(self, grad_output):
        """Backward pass: f'(x) = sigmoid(x) * (1 - sigmoid(x))"""
        return grad_output * self.output * (1 - self.output)

class Identity:
    """Identity activation function (no activation)"""
    
    def forward(self, x):
        """Forward pass: f(x) = x"""
        return x
    
    def backward(self, grad_output):
        """Backward pass: f'(x) = 1"""
        return grad_output


class Linear:
    """Linear (fully connected) layer with optional activation function"""
    
    def __init__(self, input_width, output_width, activation=Identity()):
        """
        Initialize linear layer
        """
        self.input_width = input_width
        self.output_width = output_width
        self.activation = activation
        
        # Initialize weights and biases using Xavier/Glorot initialization
        self.weights = np.random.randn(input_width, output_width) * np.sqrt(2.0 / input_width)
        self.biases = np.zeros(output_width)
        
        # Initialize cumulative gradients
        self.grad_weights = np.zeros_like(self.weights)
        self.grad_biases = np.zeros_like(self.biases)
        
        # Store data for backward pass
        self.input = None
        self.linear_output = None
        self.output = None
    
    def forward(self, x):
        """
        Forward pass through the linear layer
        """
        self.input = x
        
        # Linear transformation: y = xW + b
        self.linear_output = np.dot(x, self.weights) + self.biases
        
        # Apply activation function
        self.output = self.activation.forward(self.linear_output)
        
        return self.output
    
    def backward(self, grad_output):
        """
        Backward pass through the linear layer
        """
        # Gradient through activation function
        grad_activation = self.activation.backward(grad_output)
        
        # Gradients with respect to weights and biases
        if len(self.input.shape) == 1:
            # Single sample
            self.grad_weights += np.outer(self.input, grad_activation)
            self.grad_biases += grad_activation
        else:
            # Batch of samples
            self.grad_weights += np.dot(self.input.T, grad_activation)
            self.grad_biases += np.sum(grad_activation, axis=0)
        
        # Gradient with respect to input
        grad_input = np.dot(grad_activation, self.weights.T)
        
        return grad_input
    
    def zero_grad(self):
        """Reset accumulated gradients to zero"""
        self.grad_weights.fill(0)
        self.grad_biases.fill(0)
    
    def update(self, learning_rate):
        """Update parameters using accumulated gradients"""
        self.weights -= learning_rate * self.grad_weights
        self.biases -= learning_rate * self.grad_biases


class MSELoss:
    """Mean Squared Error loss function"""
    
    def forward(self, y_pred, y_true):
        """
        Compute MSE loss
        """
        self.y_pred = y_pred
        self.y_true = y_true
        
        # MSE = (1/n) * sum((y_pred - y_true)^2)
        loss = np.mean((y_pred - y_true) ** 2)
        return loss
    
    def backward(self):
        """
        Compute gradient of MSE loss
        """
        # d(MSE)/dy_pred = 2 * (y_pred - y_true) / n
        n = len(self.y_pred) if len(self.y_pred.shape) > 0 else 1
        return 2 * (self.y_pred - self.y_true) / n

class BCELoss:
    """Binary Cross Entropy loss function"""
    
    def forward(self, y_pred, y_true):
        """
        Compute BCE loss
        """
        self.y_pred = y_pred
        self.y_true = y_true
        
        # Clip predictions to prevent log(0)
        y_pred_clipped = np.clip(y_pred, 1e-15, 1 - 1e-15)
        self.y_pred_clipped = y_pred_clipped
        
        # BCE = -mean(y_true * log(y_pred) + (1 - y_true) * log(1 - y_pred))
        loss = -np.mean(y_true * np.log(y_pred_clipped) + (1 - y_true) * np.log(1 - y_pred_clipped))
        return loss
    
    def backward(self):
        """
        Compute gradient of BCE loss
        """
        # d(BCE)/dy_pred = -(y_true / y_pred - (1 - y_true) / (1 - y_pred)) / n
        n = len(self.y_pred) if len(self.y_pred.shape) > 0 else 1
        grad = -(self.y_true / self.y_pred_clipped - (1 - self.y_true) / (1 - self.y_pred_clipped)) / n
        return grad


class Model:
    """Neural network model class"""
    
    def __init__(self, layers, loss_fn, learning_rate=0.001):
        """
        Initialize model
        """
        self.layers = layers
        self.loss_fn = loss_fn
        self.learning_rate = learning_rate
        
        # Training metrics
        self.train_losses = []
        self.samples_seen = []
    
    def forward(self, x):
        """
        Forward pass through all layers
        """
        output = x
        for layer in self.layers:
            output = layer.forward(output)
        return output
    
    def backward(self, grad_output):
        """
        Backward pass through all layers
        """
        grad = grad_output
        for layer in reversed(self.layers):
            grad = layer.backward(grad)
        return grad
    
    def train(self, x, y):
        """
        Perform forward pass, compute loss, and backpropagate
        """
        # Forward pass
        y_pred = self.forward(x)
        
        # Compute loss
        loss = self.loss_fn.forward(y_pred, y)
        
        # Backward pass
        grad_loss = self.loss_fn.backward()
        self.backward(grad_loss)
        
        return loss
    
    def zero_grad(self):
        """Reset gradients in all layers"""
        for layer in self.layers:
            layer.zero_grad()
    
    def update(self):
        """Update parameters and reset gradients"""
        for layer in self.layers:
            layer.update(self.learning_rate)
        self.zero_grad()
    
    def predict(self, x):
        """
        Make predictions without gradient computation
        """
        return self.forward(x)
    
    def save_to(self, path):
        """
        Save model parameters to file
        """
        params = {}
        for i, layer in enumerate(self.layers):
            params[f'layer_{i}_weights'] = layer.weights
            params[f'layer_{i}_biases'] = layer.biases
        
        np.savez(path, **params)
        print(f"Model saved to {path}")
    
    def load_from(self, path):
        """
        Load model parameters from file
        """
        try:
            params = np.load(path)
            
            # Check if architecture matches
            for i, layer in enumerate(self.layers):
                weight_key = f'layer_{i}_weights'
                bias_key = f'layer_{i}_biases'
                
                if weight_key not in params or bias_key not in params:
                    raise ValueError(f"Missing parameters for layer {i}")
                
                saved_weights = params[weight_key]
                saved_biases = params[bias_key]
                
                if saved_weights.shape != layer.weights.shape:
                    raise ValueError(f"Weight shape mismatch for layer {i}: "
                                   f"expected {layer.weights.shape}, got {saved_weights.shape}")
                
                if saved_biases.shape != layer.biases.shape:
                    raise ValueError(f"Bias shape mismatch for layer {i}: "
                                   f"expected {layer.biases.shape}, got {saved_biases.shape}")
                
                # Load parameters
                layer.weights = saved_weights.copy()
                layer.biases = saved_biases.copy()
            
            print(f"Model loaded from {path}")
            
        except Exception as e:
            raise RuntimeError(f"Error loading model: {e}")

# Training Function

In [None]:
def train_model(model, dataset, epochs=100, batch_size=32, grad_accumulation_steps=1, 
                patience=10, relative_loss_threshold=0.01):
    """
    Train the model with early stopping
    """
    # Create data loader
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # Training history
    train_losses = []
    epoch_losses = []
    samples_seen = []
    sample_count = 0
    
    print(f"Starting training...")
    print(f"Model: {len(model.layers)} layers")
    print(f"Dataset size: {len(dataset)}")
    print(f"Batch size: {batch_size}")
    print(f"Gradient accumulation steps: {grad_accumulation_steps}")
    
    for epoch in range(epochs):
        epoch_loss = 0.0
        epoch_samples = 0

        for batch_idx, (coords_batch, labels_batch) in enumerate(dataloader):
            # Convert to numpy arrays
            x = coords_batch.numpy()
            y = labels_batch.numpy() # Reshape for proper broadcasting
            # Train on batch
            loss = model.train(x, y)
            
            # Accumulate metrics
            epoch_loss += loss * len(x)
            epoch_samples += len(x)
            sample_count += len(x)
            
            # Store loss and sample count for each batch
            train_losses.append(loss)
            samples_seen.append(sample_count)
            

            model.update()
        # Calculate average epoch loss
        avg_epoch_loss = epoch_loss / epoch_samples
        epoch_losses.append(avg_epoch_loss)
        
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_epoch_loss:.6f}")
        
        # Early stopping check
        if epoch >= patience:
            old_loss = epoch_losses[epoch - patience]
            current_loss = avg_epoch_loss
            
            # Check if loss improved by at least the threshold
            if current_loss >= (1- relative_loss_threshold) * old_loss:
                print(f"Early stopping triggered! Loss did not improve by {relative_loss_threshold*100}% "
                          f"over the last {patience} epochs.")
                break
    
    # Save training results
    history = {
        'train_losses': train_losses,
        'epoch_losses': epoch_losses,
        'samples_seen': samples_seen,
        'final_loss': epoch_losses[-1],
        'epochs_trained': len(epoch_losses)
    }
    
    print(f"\nTraining completed!")
    print(f"Final loss: {epoch_losses[-1]:.6f}")
    print(f"Epochs trained: {len(epoch_losses)}")

    timestamp = datetime.now().strftime("%H%M%S")
    run_dir = os.path.join("runs", f"run_{timestamp}")
    os.makedirs(run_dir, exist_ok=True)
    model_path = os.path.join(run_dir, 'model.npz')
    model.save_to(model_path)

    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(samples_seen, train_losses, alpha=0.7)
    plt.xlabel('Samples Seen')
    plt.ylabel('Loss')
    plt.title('Training Loss vs Samples')
    plt.grid(True)
    
    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(epoch_losses) + 1), epoch_losses)
    plt.xlabel('Epoch')
    plt.ylabel('Average Loss')
    plt.title('Epoch Loss')
    plt.grid(True)
    plt.suptitle("(sudershan.sarraf)", fontsize=16, fontweight='bold')
    
    plt.tight_layout()
    plot_path = os.path.join(run_dir, 'training_plot.png')
    plt.savefig(plot_path, dpi=150, bbox_inches='tight')
    plt.close()
    
    return history

# MLP Autoencoder Class

In [None]:
class MLPAutoencoder:
    """
    Multi-Layer Perceptron Autoencoder using the existing MLP framework
    """
    
    def __init__(self, input_dim, encoder_hidden_dims, latent_dim, 
                 decoder_hidden_dims=None, activation_fn=ReLU, learning_rate=0.001):
        """
        Initialize the MLPAutoencoder
        """
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.learning_rate = learning_rate
        
        # If decoder dimensions not specified, mirror the encoder (reversed)
        if decoder_hidden_dims is None:
            decoder_hidden_dims = encoder_hidden_dims[::-1]
        
        # Build encoder layers
        encoder_layers = []
        
        # Input to first hidden layer
        prev_dim = input_dim
        for hidden_dim in encoder_hidden_dims:
            encoder_layers.append(Linear(prev_dim, hidden_dim, activation_fn()))
            prev_dim = hidden_dim
        
        # Last hidden layer to latent layer
        encoder_layers.append(Linear(prev_dim, latent_dim, activation_fn()))
        
        # Build decoder layers
        decoder_layers = []
        
        # Latent to first decoder hidden layer
        prev_dim = latent_dim
        for hidden_dim in decoder_hidden_dims:
            decoder_layers.append(Linear(prev_dim, hidden_dim, activation_fn()))
            prev_dim = hidden_dim
        
        # Last hidden layer to output (reconstruction)
        # Use sigmoid activation for output to ensure values are in [0,1] range
        decoder_layers.append(Linear(prev_dim, input_dim, Sigmoid()))
        
        # Create the complete autoencoder as a single model
        all_layers = encoder_layers + decoder_layers
        
        # Create model with MSE loss (appropriate for reconstruction)
        self.model = Model(all_layers, MSELoss(), learning_rate)
        
        # Store layer indices for encoder/decoder separation
        self.encoder_layers = encoder_layers
        self.decoder_layers = decoder_layers
        self.num_encoder_layers = len(encoder_layers)
        
        print(f"MLPAutoencoder created:")
        print(f"  Input dimension: {input_dim}")
        print(f"  Encoder architecture: {input_dim} -> {' -> '.join(map(str, encoder_hidden_dims))} -> {latent_dim}")
        print(f"  Decoder architecture: {latent_dim} -> {' -> '.join(map(str, decoder_hidden_dims))} -> {input_dim}")
        print(f"  Total parameters: {self.count_parameters()}")
    
    def encode(self, x):
        """
        Encode input to latent representation
        """
        # Forward pass through encoder layers only
        output = x
        for layer in self.encoder_layers:
            output = layer.forward(output)
        return output
    
    def decode(self, z):
        """
        Decode latent representation to reconstruction
        """
        # Forward pass through decoder layers only
        output = z
        for layer in self.decoder_layers:
            output = layer.forward(output)
        return output
    
    def get_latent_representation(self, x):
        """
        Get latent representation of input data
        """
        return self.encode(x)
    
    def count_parameters(self):
        """
        Count total number of parameters in the autoencoder
        """
        total_params = 0
        for layer in self.model.layers:
            total_params += layer.weights.size + layer.biases.size
        return total_params
    
    def analyze_latent_space(self, data, labels=None, num_samples=1000):
        """
        Analyze the latent space representation
        
        Args:
            data: Input data
            labels: Labels for the data (optional)
            num_samples (int): Number of samples to analyze
        """
        # Sample data
        if num_samples < len(data):
            indices = np.random.choice(len(data), num_samples, replace=False)
            sampled_data = [data[i] for i in indices]
            if labels is not None:
                sampled_labels = [labels[i] for i in indices]
        else:
            sampled_data = data
            sampled_labels = labels
        
        # Get latent representations
        latent_representations = []
        for sample in sampled_data:
            if isinstance(sample, (list, tuple)):
                x = sample[0].numpy()
            else:
                x = sample.numpy()
            
            x_flat = x.reshape(1, -1)
            latent = self.get_latent_representation(x_flat)
            latent_representations.append(latent.flatten())
        
        latent_matrix = np.array(latent_representations)
        
        # Visualize latent space (if 2D or can be reduced to 2D)
        if self.latent_dim == 2:
            plt.figure(figsize=(8, 6))
            if sampled_labels is not None:
                scatter = plt.scatter(latent_matrix[:, 0], latent_matrix[:, 1], 
                                    c=sampled_labels, cmap='tab10', alpha=0.7)
                plt.colorbar(scatter)
            else:
                plt.scatter(latent_matrix[:, 0], latent_matrix[:, 1], alpha=0.7)
            plt.xlabel('Latent Dimension 1')
            plt.ylabel('Latent Dimension 2')
            plt.title('2D Latent Space Visualization')
            plt.grid(True)
            plt.show()
        else:
            # Show statistics for higher dimensional latent spaces
            print(f"Latent space statistics (dimension: {self.latent_dim}):")
            print(f"Mean: {np.mean(latent_matrix, axis=0)}")
            print(f"Std: {np.std(latent_matrix, axis=0)}")
            print(f"Min: {np.min(latent_matrix, axis=0)}")
            print(f"Max: {np.max(latent_matrix, axis=0)}")
            
            # Plot histogram of latent dimensions
            fig, axes = plt.subplots(2, min(4, self.latent_dim//2 + 1), figsize=(12, 6))
            axes = axes.flatten()
            
            for i in range(min(8, self.latent_dim)):
                axes[i].hist(latent_matrix[:, i], bins=30, alpha=0.7)
                axes[i].set_title(f'Latent Dim {i+1}')
                axes[i].grid(True)
            
            plt.tight_layout()
            plt.show()
        
        return latent_matrix

# Loading MNIST Dataset

In [None]:
# Load MNIST dataset
def load_mnist_data():
    """
    Load MNIST dataset using torchvision
    """
    
    # Define transform to convert PIL Image to tensor and normalize
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
    ])
    
    # Download and load training data
    train_dataset = datasets.MNIST(
        root='./Dataset', 
        train=True, 
        download=True, 
        transform=transform
    )
    
    # Download and load test data
    test_dataset = datasets.MNIST(
        root='./Dataset', 
        train=False, 
        download=True, 
        transform=transform
    )
    
    print(f"MNIST dataset loaded:")
    print(f"Training samples: {len(train_dataset)}")
    print(f"Test samples: {len(test_dataset)}")
    print(f"Image shape: {train_dataset[0][0].shape}")
    print(f"Number of classes: {len(train_dataset.classes)}")
    
    return train_dataset, test_dataset

# Load the datasets
train_data, test_data = load_mnist_data()

In [None]:
class MNISTAutoencoderDataset(Dataset):
    def __init__(self, mnist_dataset):
        self.mnist_dataset = mnist_dataset
    
    def __len__(self):
        return len(self.mnist_dataset)
    
    def __getitem__(self, idx):
        image, label = self.mnist_dataset[idx]
        # Convert from tensor to numpy and flatten
        x = image.numpy().flatten().astype(np.float32)
        # Normalize to [0, 1] range (from [-1, 1])
        x = (x + 1.0) / 2.0
        return torch.tensor(x), torch.tensor(x)  # For autoencoder, input and target are the same

train_dataset = MNISTAutoencoderDataset(train_data)
test_dataset = MNISTAutoencoderDataset(test_data)

print(f"Training set size: {len(train_dataset)}")
print(f"Test set size: {len(test_dataset)}")

In [None]:
def train_MLPA_on_MNIST(dataset):
    
    # Define autoencoder architecture
    input_dim = 784  # 28x28 images flattened
    encoder_hidden_dims = [128]
    latent_dim = 64
    
    # Create autoencoder model
    autoencoder = MLPAutoencoder(input_dim, encoder_hidden_dims, latent_dim, 
                                 None, activation_fn=ReLU, learning_rate=0.001)
    
    # Train the autoencoder
    history = train_model(autoencoder.model, dataset, epochs=100, batch_size=64, 
                          grad_accumulation_steps=1, patience=10, relative_loss_threshold=0.01)

    return autoencoder, history

def visualize_reconstructions(dataset, model, img_shape=(28, 28)):
        """
        Visualize original and reconstructed images
        """
        # Select one image from each digit class
        num_classes = 10
        selected_samples = []
        selected_labels = []

        # Get one sample from each class
        for digit in range(num_classes):
            # Find samples of this digit
            for i, (data, _) in enumerate(dataset):
                # Get the original label from the MNIST dataset
                original_label = dataset.mnist_dataset[i][1]
                if original_label == digit:
                    selected_samples.append(data.numpy())
                    selected_labels.append(digit)
                    break

        # Convert to numpy arrays
        selected_samples = np.array(selected_samples)

        # Get reconstructions
        reconstructions = []
        reconstruction_errors = []

        for i, sample in enumerate(selected_samples):
            # Forward pass through autoencoder
            reconstruction = model.predict(sample.reshape(1, -1))
            reconstructions.append(reconstruction.flatten())
            
            # Calculate reconstruction error (MSE)
            error = np.mean((sample - reconstruction.flatten()) ** 2)
            reconstruction_errors.append(error)

        # Plotting
        fig, axes = plt.subplots(2, num_classes, figsize=(15, 6))

        for i in range(num_classes):
            # Original image
            original = selected_samples[i].reshape(img_shape)
            axes[0, i].imshow(original, cmap='gray')
            axes[0, i].set_title(f'Original\nDigit {selected_labels[i]}')
            axes[0, i].axis('off')
            
            # Reconstructed image
            reconstructed = reconstructions[i].reshape(img_shape)
            axes[1, i].imshow(reconstructed, cmap='gray')
            axes[1, i].set_title(f'Class {selected_labels[i]}\nError: {reconstruction_errors[i]:.4f}')
            axes[1, i].axis('off')

        plt.tight_layout()
        plt.suptitle('Original vs Reconstructed Images(sudershan.sarraf)', y=1.02, fontsize=16)
        plt.show()

        # Print reconstruction errors
        print("\nReconstruction Errors by Class:")
        for i, error in enumerate(reconstruction_errors):
            print(f"Class {i}: {error:.6f}")

In [None]:
autoencoder, history = train_MLPA_on_MNIST(train_dataset)
visualize_reconstructions(test_dataset, autoencoder.model)

# Anamoly Dataset

In [None]:
class LFWAnomalyDataset(Dataset):
    """
    Dataset class for LFW anomaly detection
    Normal class: George W Bush
    Anomalous class: All other people
    """
    
    def __init__(self, lfw_path, normal_class="George_W_Bush", img_size=(250,250), 
                 max_normal_samples=None, max_anomaly_samples=1000):
        """
        Initialize LFW anomaly detection dataset
        """
        self.lfw_path = lfw_path
        self.normal_class = normal_class
        self.img_size = img_size
        
        self.images = []
        self.labels = []  # 0 = normal, 1 = anomaly
        self.person_names = []
        
        # Load normal class images (George W Bush)
        normal_path = os.path.join(lfw_path, normal_class)
        if os.path.exists(normal_path):
            normal_files = glob.glob(os.path.join(normal_path, "*.jpg"))
            if max_normal_samples:
                normal_files = normal_files[:max_normal_samples]
                
            print(f"Loading {len(normal_files)} normal images from {normal_class}")
            
            for img_path in normal_files:
                try:
                    img = Image.open(img_path).convert('L')
                    img = img.resize(self.img_size)
                    img_array = np.array(img).astype(np.float32) / 255.0
                    self.images.append(img_array)
                    self.labels.append(0)  # Normal
                    self.person_names.append(normal_class)
                except Exception as e:
                    print(f"Error loading {img_path}: {e}")
        
        # Load anomalous class images (all other people)
        anomaly_count = 0
        all_dirs = [d for d in os.listdir(lfw_path) if os.path.isdir(os.path.join(lfw_path, d))]
        
        print(f"Loading anomaly images from other people...")
        
        for person_dir in all_dirs:
            if person_dir == normal_class:
                continue
                
            person_path = os.path.join(lfw_path, person_dir)
            person_files = glob.glob(os.path.join(person_path, "*.jpg"))
            
            for img_path in person_files:
                if max_anomaly_samples and anomaly_count >= max_anomaly_samples:
                    break
                    
                try:
                    img = Image.open(img_path).convert('L')
                    img = img.resize(self.img_size)
                    img_array = np.array(img).astype(np.float32) / 255.0
                    self.images.append(img_array)
                    self.labels.append(1)  # Anomaly
                    self.person_names.append(person_dir)
                    anomaly_count += 1
                except Exception as e:
                    print(f"Error loading {img_path}: {e}")
            
            if max_anomaly_samples and anomaly_count >= max_anomaly_samples:
                break
        
        print(f"Dataset loaded: {len([l for l in self.labels if l == 0])} normal, "
              f"{len([l for l in self.labels if l == 1])} anomaly images")
        
        # Convert to numpy arrays
        self.images = np.array(self.images)
        self.labels = np.array(self.labels)
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        
        # Flatten image for autoencoder
        image_flat = image.flatten()
        
        return torch.FloatTensor(image_flat), torch.FloatTensor([label])
    
    def get_normal_only(self):
        """Return dataset with only normal samples for training"""
        normal_indices = np.where(self.labels == 0)[0]
        normal_images = self.images[normal_indices]
        
        class NormalOnlyDataset(Dataset):
            def __init__(self, images):
                self.images = images
            
            def __len__(self):
                return len(self.images)
            
            def __getitem__(self, idx):
                image = self.images[idx]
                image_flat = image.flatten()
                return torch.FloatTensor(image_flat), torch.FloatTensor(image_flat)
        
        return NormalOnlyDataset(normal_images)

# Load LFW dataset for anomaly detection
lfw_dataset_path = "Data/Q3/LFW_Dataset"

# Create the full dataset (normal + anomaly for evaluation)
full_dataset = LFWAnomalyDataset(
    lfw_path=lfw_dataset_path,
    normal_class="George_W_Bush",
    img_size=(250,250)
)

# Create training dataset (only normal samples)
train_dataset = full_dataset.get_normal_only()

print(f"Training dataset size: {len(train_dataset)}")
print(f"Full evaluation dataset size: {len(full_dataset)}")
print(f"Image dimensions: {250*250} (flattened GreyScale)")

In [None]:
def train_anomaly_autoencoder(train_dataset, latent_dim, img_size=(250,250)):
    """
    Train autoencoder on normal data only for anomaly detection
    """
    # Define autoencoder architecture for face images
    input_dim = img_size[0] * img_size[1] * 1  # Grayscale image flattened
    encoder_hidden_dims = [4000]
    
    print(f"Creating autoencoder for anomaly detection:")
    print(f"Input dimension: {input_dim}")
    print(f"Architecture: {input_dim} -> {encoder_hidden_dims} -> {latent_dim}")
    
    # Create autoencoder model
    autoencoder = MLPAutoencoder(
        input_dim=input_dim,
        encoder_hidden_dims=encoder_hidden_dims,
        latent_dim=latent_dim,
        activation_fn=ReLU,
        learning_rate=0.0001  # Lower learning rate for face images
    )
    
    # Train the autoencoder on normal data only
    print(f"\nTraining autoencoder on {len(train_dataset)} normal samples...")
    history = train_model(
        autoencoder.model, 
        train_dataset, 
        epochs=100,
        batch_size=32,
        grad_accumulation_steps=1,
        patience=10,
        relative_loss_threshold=0.01
    )
    
    return autoencoder, history

def evaluate_anomaly_detection(autoencoder, eval_dataset, img_size=(250,250)):
    """
    Evaluate anomaly detection performance
    """
    print("Evaluating anomaly detection performance...")
    
    reconstruction_errors = []
    true_labels = []
    
    # Calculate reconstruction errors for all samples
    for i in range(len(eval_dataset)):
        image_flat, label = eval_dataset[i]
        image_flat = image_flat.numpy().reshape(1, -1)
        true_labels.append(int(label.numpy()[0]))
        
        # Get reconstruction
        reconstruction = autoencoder.model.predict(image_flat)
        
        # Calculate MSE reconstruction error
        mse_error = np.mean((image_flat - reconstruction) ** 2)
        reconstruction_errors.append(mse_error)
        if i%1000 == 0:
            print(f"Evaluated {i+1} Samples...")
    
    reconstruction_errors = np.array(reconstruction_errors)
    true_labels = np.array(true_labels)
    
    print(f"Normal samples reconstruction error: {np.mean(reconstruction_errors[true_labels == 0]):.6f} ± {np.std(reconstruction_errors[true_labels == 0]):.6f}")
    print(f"Anomaly samples reconstruction error: {np.mean(reconstruction_errors[true_labels == 1]):.6f} ± {np.std(reconstruction_errors[true_labels == 1]):.6f}")
    
    # Find optimal threshold using ROC curve
    fpr, tpr, thresholds = roc_curve(true_labels, reconstruction_errors)
    
    # Optimal threshold: maximize (TPR - FPR)
    optimal_idx = np.argmax(tpr - fpr)
    optimal_threshold = thresholds[optimal_idx]
    
    print(f"Optimal threshold: {optimal_threshold:.6f}")
    
    # Calculate metrics using optimal threshold
    predictions = (reconstruction_errors > optimal_threshold).astype(int)
    
    # Calculate evaluation metrics
    auc_score = roc_auc_score(true_labels, reconstruction_errors)
    precision = precision_score(true_labels, predictions)
    recall = recall_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions)
    
    print(f"\nAnomaly Detection Results:")
    print(f"AUC Score: {auc_score:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")
    
    # Visualization
    plt.figure(figsize=(15, 5))
    
    # Plot 1: Reconstruction error distribution
    plt.subplot(1, 2, 1)
    normal_errors = reconstruction_errors[true_labels == 0]
    anomaly_errors = reconstruction_errors[true_labels == 1]
    
    plt.hist(normal_errors, bins=50, alpha=0.7, label='Normal (George W Bush)', color='blue')
    plt.hist(anomaly_errors, bins=50, alpha=0.7, label='Anomaly (Others)', color='red')
    plt.axvline(optimal_threshold, color='black', linestyle='--', label=f'Threshold: {optimal_threshold:.4f}')
    plt.xlabel('Reconstruction Error (MSE)')
    plt.ylabel('Frequency')
    plt.title('Reconstruction Error Distribution(sudershan.sarraf)')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    # Plot 2: ROC Curve
    plt.subplot(1, 2, 2)
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {auc_score:.4f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random')
    plt.scatter(fpr[optimal_idx], tpr[optimal_idx], color='red', s=100, 
                label=f'Optimal Point (TPR={tpr[optimal_idx]:.3f}, FPR={fpr[optimal_idx]:.3f})')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve(sudershan.sarraf)')
    plt.legend(loc="lower right")
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Return results
    results = {
        'auc_score': auc_score,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'optimal_threshold': optimal_threshold,
        'reconstruction_errors': reconstruction_errors,
        'true_labels': true_labels,
        'predictions': predictions,
        'fpr': fpr,
        'tpr': tpr,
        'thresholds': thresholds,
        'optimal_idx': optimal_idx,
        'model': autoencoder
    }
    
    return results

def visualize_reconstructions_anomaly(autoencoder, eval_dataset, img_size=(250,250), n_samples=10):
    """
    Visualize reconstructions for both normal and anomaly samples
    """
    # Get indices for normal and anomaly samples
    normal_indices = []
    anomaly_indices = []
    
    for i in range(len(eval_dataset)):
        _, label = eval_dataset[i]
        if int(label.numpy()[0]) == 0 and len(normal_indices) < n_samples:
            normal_indices.append(i)
        elif int(label.numpy()[0]) == 1 and len(anomaly_indices) < n_samples:
            anomaly_indices.append(i)
        
        if len(normal_indices) >= n_samples and len(anomaly_indices) >= n_samples:
            break
    
    # Randomly select indices
    normal_indices = random.sample(normal_indices, min(n_samples, len(normal_indices)))
    anomaly_indices = random.sample(anomaly_indices, min(n_samples, len(anomaly_indices)))
    
    fig, axes = plt.subplots(4, n_samples, figsize=(2*n_samples, 8))
    
    # Plot normal samples
    for i, idx in enumerate(normal_indices):
        image_flat, _ = eval_dataset[idx]
        original = image_flat.numpy().reshape(img_size[0], img_size[1])
        
        # Get reconstruction
        reconstruction = autoencoder.model.predict(image_flat.numpy().reshape(1, -1))
        reconstruction = reconstruction.reshape(img_size[0], img_size[1])
        
        # Calculate reconstruction error
        mse_error = np.mean((image_flat.numpy() - reconstruction.flatten()) ** 2)
        
        # Plot original
        axes[0, i].imshow(np.clip(original, 0, 1))
        axes[0, i].set_title(f'Normal Original\nMSE: {mse_error:.4f}', fontsize=8)
        axes[0, i].axis('off')
        
        # Plot reconstruction
        axes[1, i].imshow(np.clip(reconstruction, 0, 1))
        axes[1, i].set_title('Normal Recon', fontsize=8)
        axes[1, i].axis('off')
    
    # Plot anomaly samples
    for i, idx in enumerate(anomaly_indices):
        image_flat, _ = eval_dataset[idx]
        original = image_flat.numpy().reshape(img_size[0], img_size[1])
        
        # Get reconstruction
        reconstruction = autoencoder.model.predict(image_flat.numpy().reshape(1, -1))
        reconstruction = reconstruction.reshape(img_size[0], img_size[1])
        
        # Calculate reconstruction error
        mse_error = np.mean((image_flat.numpy() - reconstruction.flatten()) ** 2)
        
        # Plot original
        axes[2, i].imshow(np.clip(original, 0, 1))
        axes[2, i].set_title(f'Anomaly Original\nMSE: {mse_error:.4f}', fontsize=8)
        axes[2, i].axis('off')
        
        # Plot reconstruction
        axes[3, i].imshow(np.clip(reconstruction, 0, 1))
        axes[3, i].set_title('Anomaly Recon', fontsize=8)
        axes[3, i].axis('off')
    
    # Add row labels
    axes[0, 0].text(-0.1, 0.5, 'Normal\nOriginal', transform=axes[0, 0].transAxes, 
                    rotation=90, va='center', ha='right', fontsize=10, fontweight='bold')
    axes[1, 0].text(-0.1, 0.5, 'Normal\nReconstructed', transform=axes[1, 0].transAxes, 
                    rotation=90, va='center', ha='right', fontsize=10, fontweight='bold')
    axes[2, 0].text(-0.1, 0.5, 'Anomaly\nOriginal', transform=axes[2, 0].transAxes, 
                    rotation=90, va='center', ha='right', fontsize=10, fontweight='bold')
    axes[3, 0].text(-0.1, 0.5, 'Anomaly\nReconstructed', transform=axes[3, 0].transAxes, 
                    rotation=90, va='center', ha='right', fontsize=10, fontweight='bold')
    
    plt.suptitle('Anomaly Detection: Reconstruction Comparison\n(sudershan.sarraf)', fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

In [None]:
# Run the complete anomaly detection pipeline

print("="*60)
print("ANOMALY DETECTION ON LFW DATASET")
print("="*60)
print("Normal class: George W Bush")
print("Anomaly class: All other people")
print("="*60)

# Step 1: Train autoencoder on normal data only
print("\nSTEP 1: Training autoencoder on normal data...")
anomaly_autoencoder, training_history = train_anomaly_autoencoder(
    train_dataset,
    2000, 
    img_size=(250,250)
)

In [None]:
# Step 2: Evaluate on full dataset (normal + anomaly)
print("\nSTEP 2: Evaluating anomaly detection performance...")
evaluation_results = evaluate_anomaly_detection(
    anomaly_autoencoder, 
    full_dataset, 
    img_size=(250, 250)
)

In [None]:

# Step 3: Visualize reconstructions for normal and anomaly samples
print("\nSTEP 3: Visualizing reconstructions...")
visualize_reconstructions_anomaly(anomaly_autoencoder, full_dataset, img_size=(250,250))

In [None]:
bottleneck = [64, 128, 256]
results = {}
for dim in bottleneck:
    print(f"\n{'='*20} Testing Bottleneck dimension: {dim} {'='*20}\\n")
    
    # Train autoencoder
    anomaly_autoencoder, training_history = train_anomaly_autoencoder(
        train_dataset,
        dim, 
        img_size=(250,250)
    )
    
    # Evaluate
    result = evaluate_anomaly_detection(
        anomaly_autoencoder, 
        full_dataset, 
        img_size=(250,250)
    )
    
    results[dim] = result

for dim, res in results.items():
    print(f"Latent dim {dim}: AUC={res['auc_score']:.4f}, Precision={res['precision']:.4f}, Recall={res['recall']:.4f}, F1={res['f1_score']:.4f}")

    # Plot all ROC curves together
plt.figure(figsize=(12, 5))
colors = ['blue', 'red', 'green']
for i, (dim, res) in enumerate(results.items()):
    plt.plot(res['fpr'], res['tpr'], color=colors[i], lw=2, 
                label=f'Latent dim {dim}')

plt.plot([0, 1], [0, 1], color='black', lw=1, linestyle='--', label='Random')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves Comparison(sudershan.sarraf)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

In [None]:
def display_images(image_flat, reconstruction, title, img_size=(250,250)):
    """
    Display original and reconstructed images with proper handling of RGB channels
    """
    # Ensure we have the right shape for RGB
    original = image_flat.reshape(img_size[0], img_size[1])
    reconstructed = reconstruction.reshape(img_size[0], img_size[1])
    
    # Debug prints to understand the data
    print(f"Original image stats: min={original.min():.4f}, max={original.max():.4f}, mean={original.mean():.4f}")
    print(f"Reconstructed image stats: min={reconstructed.min():.4f}, max={reconstructed.max():.4f}, mean={reconstructed.mean():.4f}")
    print(f"Original shape: {original.shape}, Reconstructed shape: {reconstructed.shape}")
    
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 3, 1)
    plt.imshow(np.clip(original, 0, 1))
    plt.title(f'Original Image')
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.imshow(np.clip(reconstructed, 0, 1))
    plt.title('Reconstructed Image')
    plt.axis('off')
    
    # Show the difference
    plt.subplot(1, 3, 3)
    diff = np.abs(original - reconstructed)
    plt.imshow(diff)
    plt.title('Absolute Difference')
    plt.axis('off')
    
    plt.suptitle(title, fontsize=14)
    plt.tight_layout()
    plt.show()

def comprehensive_visualization(autoencoder, history, eval_dataset, img_size=(250,250)):
    optimal_threshold = history['optimal_threshold']
    print(f"Using optimal threshold: {optimal_threshold:.6f}")
    
    tp = 0
    tn = 0
    fp = 0
    fn = 0
    
    for i in range(len(eval_dataset)):
        image_flat, label = eval_dataset[i]
        image_flat_np = image_flat.numpy()
        true_label = int(label.numpy()[0])
        
        # Get reconstruction
        reconstruction = autoencoder.model.predict(image_flat_np.reshape(1, -1))
        # Calculate MSE reconstruction error
        mse_error = np.mean((image_flat_np - reconstruction.flatten()) ** 2)
        prediction = int(mse_error > optimal_threshold)
        image_flat=image_flat_np
        
        if prediction == true_label:
                if true_label == 0 and tn == 0:
                    display_images(image_flat, reconstruction, title=f'True Negative - Reconstruction error: {mse_error:.6f}(sudershan.sarraf)', img_size=img_size)
                    tn = 1
                elif true_label == 1 and tp == 0:
                    display_images(image_flat, reconstruction, title=f'True Positive - Reconstruction error: {mse_error:.6f}(sudershan.sarraf)', img_size=img_size)
                    tp = 1
        else:
            if prediction == 1 and fp == 0:
                display_images(image_flat, reconstruction, title=f'False Positive - Reconstruction error: {mse_error:.6f}(sudershan.sarraf)', img_size=img_size)
                fp = 1
            elif prediction == 0 and fn == 0:
                display_images(image_flat, reconstruction, title=f'False Negative - Reconstruction error: {mse_error:.6f}(sudershan.sarraf)', img_size=img_size)
                fn = 1
        if tp and tn and fp and fn:
            break
    
    # Plot precision-recall curve
    precisions = []
    recalls = []
    thresholds_pr = []

    reconstruction_errors = history['reconstruction_errors']
    true_labels = history['true_labels']

    # Calculate precision-recall for different thresholds
    threshold_range = np.linspace(reconstruction_errors.min(), reconstruction_errors.max(), 1000)

    for threshold in threshold_range:
        predictions = (reconstruction_errors > threshold).astype(int)
        if np.sum(predictions) > 0:  # Avoid division by zero
            precision = precision_score(true_labels, predictions, zero_division=0)
            recall = recall_score(true_labels, predictions, zero_division=0)
            precisions.append(precision)
            recalls.append(recall)
            thresholds_pr.append(threshold)

    # Plot precision-recall curve
    plt.figure(figsize=(8, 6))
    plt.plot(recalls, precisions, color='blue', lw=2, label='Precision-Recall curve')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve(sudershan.sarraf)')
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.show()

comprehensive_visualization(results[128]['model'], results[128], full_dataset)