# Machine Learning Lab - Steel Industry
## Part 4: Anomaly Detection with Deep Learning

In this part, we will use convolutional neural networks (CNN) with PyTorch to detect anomalies in industrial images, using the MVTec AD dataset.

### Objectives:
- Understand the principles of anomaly detection with Deep Learning
- Implement a CNN model with PyTorch
- Evaluate and visualize the results

### Methods covered:
- Convolutional autoencoders with PyTorch
- Transfer Learning
- Anomaly detection metrics

⚠️ **Important**: Before starting, activate the GPU:
1. Menu "Runtime" > "Change runtime type"
2. Select "T4 GPU" in the "Hardware accelerator" dropdown
3. Click "Save"

In [1]:
# Import necessary packages
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc
import torch.nn.functional as F
from torchvision.models import resnet18
import torch.distributions as dist

# GPU check
print("GPU available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU name:", torch.cuda.get_device_name(0))

# Configuration
sns.set_theme(style='whitegrid')
%matplotlib inline
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

GPU available: True
GPU name: Tesla T4
Using device: cuda


### 1. Loading and preparing the MVTec AD data (Bottle dataset)

The MVTec AD dataset contains industrial images of different objects, with normal and abnormal examples.

❓ Questions about the data:
- Why use only "normal" images for training?
- How does this reflect industrial reality?
- What is the purpose of data augmentation in this context?

In [4]:
# Create directory for the dataset
!mkdir -p mvtec_data

# Download only the bottle dataset
!wget -P mvtec_data https://www.mydrive.ch/shares/38536/3830184030e49fe74747669442f0f282/download/420938113-1629952094/bottle.tar.xz
!cd mvtec_data
!tar -xf bottle.tar.xz

# Check dataset structure
print("\nDataset structure:")
!ls -R mvtec_data/bottle | grep ":$" | sed -e 's/:$//' -e 's/[^-][^\\/]*\\//  /g' -e 's/^/  /'

# Create a custom Dataset
class MVTecDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = [os.path.join(root_dir, f) for f in os.listdir(root_dir)]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image

# Define transformations
train_transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Downscale from 256x256 to 128x128
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.05, contrast=0.05),
    transforms.ToTensor(),
    transforms.Grayscale(num_output_channels=3)
])

test_transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Downscale from 256x256 to 128x128
    transforms.ToTensor(),
    transforms.Grayscale(num_output_channels=3)
])

# Create datasets with new paths
train_dataset = MVTecDataset('mvtec_data/bottle/train/good', transform=train_transform)
test_normal_dataset = MVTecDataset('mvtec_data/bottle/test/good', transform=test_transform)
# /!\\ Complete the '...' to create a test_anomaly_dataset sourced from mvtec_data/bottle/test/broken_large images /!\\
test_anomaly_dataset = MVTecDataset("mvtec_data/bottle/test/broken_large")

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)  # Batch size changed from 32 to 16
test_normal_loader = DataLoader(test_normal_dataset, batch_size=16, shuffle=False)
test_anomaly_loader = DataLoader(test_anomaly_dataset, batch_size=16, shuffle=False)

print("\nNumber of images:")
print(f"Training (normal): {len(train_dataset)}")
print(f"Test (normal): {len(test_normal_dataset)}")
print(f"Test (anomalies): {len(test_anomaly_dataset)}")


Dataset structure:
sed: -e expression #2, char 20: unknown option to `s'

Number of images:
Training (normal): 209
Test (normal): 20
Test (anomalies): 20


### 2. Creating the Autoencoder model with PyTorch

❓ Questions about the architecture:
- Why choose an encoder-decoder architecture?
- What does the latent space represent in our industrial context?
- How does the bottleneck force the learning of relevant features?

In [5]:
class ImprovedAutoencoder(nn.Module):
    def __init__(self):
        super(ImprovedAutoencoder, self).__init__()

        # Reduced number of filters
        self.enc1 = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),  # 64 -> 32
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2),
            nn.Conv2d(32, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2)
        )

        self.pool1 = nn.MaxPool2d(2, 2)

        self.enc2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1),  # 128 -> 64
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2)
        )

        self.pool2 = nn.MaxPool2d(2, 2)

        self.enc3 = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1),  # 256 -> 128
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2),
            nn.Conv2d(128, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2)
        )

        self.pool3 = nn.MaxPool2d(2, 2)

        # Bottleneck
        self.bottleneck = nn.Sequential(
            nn.Conv2d(128, 256, 3, padding=1),  # 512 -> 256
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),
            nn.Conv2d(256, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2)
        )

        # Decoder (update dimensions accordingly)
        self.up3 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.dec3 = nn.Sequential(
            nn.Conv2d(256, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2),
            nn.Conv2d(128, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2)
        )

        self.up2 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.dec2 = nn.Sequential(
            nn.Conv2d(128, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2)
        )

        self.up1 = nn.ConvTranspose2d(64, 32, 2, stride=2)
        self.dec1 = nn.Sequential(
            nn.Conv2d(64, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2),
            nn.Conv2d(32, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2),
            nn.Conv2d(32, 3, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Encoding
        e1 = self.enc1(x)
        p1 = self.pool1(e1)

        e2 = self.enc2(p1)
        p2 = self.pool2(e2)

        e3 = self.enc3(p2)
        p3 = self.pool3(e3)

        # Bottleneck
        b = self.bottleneck(p3)

        # Decoding with skip connections
        d3 = self.up3(b)
        d3 = torch.cat([d3, e3], dim=1)
        d3 = self.dec3(d3)

        d2 = self.up2(d3)
        d2 = torch.cat([d2, e2], dim=1)
        d2 = self.dec2(d2)

        d1 = self.up1(d2)
        d1 = torch.cat([d1, e1], dim=1)
        d1 = self.dec1(d1)

        return d1, b

# Enhanced loss with structural component
class EnhancedLoss(nn.Module):
    def __init__(self, alpha=0.7, beta=0.3):
        super(EnhancedLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.mse = nn.MSELoss()
        self.l1 = nn.L1Loss()

    def forward(self, output, target, encoded_output, encoded_target):
        # MSE for global reconstruction
        mse_loss = self.mse(output, target)

        # L1 for fine details
        l1_loss = self.l1(output, target)

        # MSE on the latent space
        # /!\\ Complete the '...' to create a feature loss as an mse between encoded_output and encoded_target /!\\
        feature_loss = self.mse(encoded_output, encoded_target)

        # Combine losses
        total_loss = self.alpha * (0.5 * mse_loss + 0.5 * l1_loss) + self.beta * feature_loss
        return total_loss

# Update datasets with new transformations
train_dataset = MVTecDataset('mvtec_data/bottle/train/good', transform=train_transform)
test_normal_dataset = MVTecDataset('mvtec_data/bottle/test/good', transform=test_transform)
test_anomaly_dataset = MVTecDataset('mvtec_data/bottle/test/broken_large', transform=test_transform)

# Create and move model to the appropriate device
model = ImprovedAutoencoder().to(device)
criterion = EnhancedLoss(alpha=0.7, beta=0.3)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

In [6]:
# Improved training function
def train_improved_epoch(model, dataloader, criterion, optimizer):
    model.train()
    total_loss = 0

    for batch in dataloader:
        batch = batch.to(device)
        optimizer.zero_grad()

        # Forward pass with encoded features
        reconstructed, encoded = model(batch)
        with torch.no_grad():
            _, target_encoded = model(batch)

        # Compute combined loss
        loss = criterion(reconstructed, batch, encoded, target_encoded.detach())

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Explicit memory release
        del reconstructed, encoded, target_encoded
        torch.cuda.empty_cache()

    return total_loss / len(dataloader)

# Training with early stopping
n_epochs = 100
best_loss = float('inf')
patience = 10
patience_counter = 0
train_losses = []

for epoch in range(n_epochs):
    loss = train_improved_epoch(model, train_loader, criterion, optimizer)
    train_losses.append(loss)

    # Learning rate scheduling
    scheduler.step(loss)

    # Early stopping
    if loss < best_loss:
        best_loss = loss
        patience_counter = 0
        # Save the best model
        torch.save(model.state_dict(), 'best_autoencoder.pth')
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print(f'Early stopping at epoch {epoch+1}')
        break

    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch+1}/{n_epochs}], Loss: {loss:.4f}')

# Load the best model for evaluation
model.load_state_dict(torch.load('best_autoencoder.pth'))

Epoch [5/100], Loss: 0.0134
Epoch [10/100], Loss: 0.0049
Epoch [15/100], Loss: 0.0040
Epoch [20/100], Loss: 0.0037
Epoch [25/100], Loss: 0.0043
Epoch [30/100], Loss: 0.0028
Epoch [35/100], Loss: 0.0031
Epoch [40/100], Loss: 0.0026
Epoch [45/100], Loss: 0.0030
Epoch [50/100], Loss: 0.0028
Epoch [55/100], Loss: 0.0027
Epoch [60/100], Loss: 0.0024
Epoch [65/100], Loss: 0.0022
Early stopping at epoch 69


<All keys matched successfully>

### 3. Evaluation and anomaly detection

In [7]:
@torch.no_grad()
def compute_reconstruction_error(model, dataloader):
    model.eval()
    errors = []

    for batch in dataloader:
        batch = batch.to(device)
        reconstructed, encoded = model(batch)  # Unpack tuple
        error = torch.mean((batch - reconstructed) ** 2, dim=(1,2,3))
        errors.extend(error.cpu().numpy())

    return np.array(errors)

# Compute reconstruction errors
normal_errors = compute_reconstruction_error(model, test_normal_loader)
anomaly_errors = compute_reconstruction_error(model, test_anomaly_loader)

# Visualization of error distribution
plt.figure(figsize=(10, 6))
plt.hist(normal_errors, bins=50, alpha=0.5, label='Normal')
plt.hist(anomaly_errors, bins=50, alpha=0.5, label='Anomaly')
plt.title('Distribution of reconstruction errors')
plt.xlabel('MSE')
plt.ylabel('Number of samples')
plt.legend()
plt.show()

# Compute and display ROC curve
y_true = np.concatenate([np.zeros(len(normal_errors)), np.ones(len(anomaly_errors))])
# /!\\ Complete the '...' to create y_scores as the concatenation of normal_errors and anomaly_errors /!\\
y_scores = np.concatenate([normal_errors, anomaly_errors])

fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'PIL.Image.Image'>

### 4. Results visualization

In [None]:
@torch.no_grad()
def plot_results(model, dataloader, n=5):
    model.eval()

    # Get a batch
    batch = next(iter(dataloader))
    batch = batch[:n].to(device)
    reconstructed, _ = model(batch)

    # Convert for display
    batch = batch.cpu()
    reconstructed = reconstructed.cpu()

    plt.figure(figsize=(20, 4))
    for i in range(n):
        # Original
        plt.subplot(2, n, i + 1)
        img = batch[i].permute(1, 2, 0).numpy()
        plt.imshow(img, cmap='gray')  # Use cmap='gray' for grayscale images
        plt.title('Original')
        plt.axis('off')

        # Reconstruction
        plt.subplot(2, n, i + n + 1)
        img = reconstructed[i].permute(1, 2, 0).numpy()
        plt.imshow(img, cmap='gray')  # Use cmap='gray' for grayscale images
        # /!\\ Complete the '...' to add the title 'Reconstruction' to the plot /!\\
        plt.title('Reconstruction')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Display results
print("Normal examples:")
plot_results(model, test_normal_loader)

print("\nExamples with anomalies:")
plot_results(model, test_anomaly_loader)

❓ Questions about the classic autoencoder:

- Why are the reconstructions almost identical to the original images?
- How to explain the small difference between the reconstruction errors of normal and abnormal images?
- Did the autoencoder really learn the "normal" features of the bottles?
- Is this "too good" reconstruction desirable for anomaly detection?
- What happens if the autoencoder simply learns to "copy" the input?

### 5. Improvement with a Variational Autoencoder (VAE)

To improve anomaly detection, we will use a VAE which should:
- Better regularize the latent space
- Learn a more robust distribution of normal images
- Better "repair" anomalies in the reconstructions

In [None]:
vae_train_transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.05, contrast=0.05),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x)
])

# /!\ Complete the '...' to transform the input image to 128x128 resolution during the transformation /!\
vae_test_transform = transforms.Compose([
    ...,
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x)
])

# Create new datasets for the VAE
vae_train_dataset = MVTecDataset('mvtec_data/bottle/train/good', transform=vae_train_transform)
vae_test_normal_dataset = MVTecDataset('mvtec_data/bottle/test/good', transform=vae_test_transform)
vae_test_anomaly_dataset = MVTecDataset('mvtec_data/bottle/test/broken_large', transform=vae_test_transform)

# Create dataloaders for the VAE
vae_train_loader = DataLoader(vae_train_dataset, batch_size=16, shuffle=True)
vae_test_normal_loader = DataLoader(vae_test_normal_dataset, batch_size=16, shuffle=False)
vae_test_anomaly_loader = DataLoader(vae_test_anomaly_dataset, batch_size=16, shuffle=False)
# Definition of the VAE
class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dim=64):  # Reduced latent dimension
        super().__init__()

        # Encoder inspired by the autoencoder that worked well
        self.encoder = nn.Sequential(
            # First block
            nn.Conv2d(3, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            # Second block
            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            # Third block
            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )

        # mu and logvar projections
        self.fc_mu = nn.Linear(128 * 16 * 16, latent_dim)
        self.fc_var = nn.Linear(128 * 16 * 16, latent_dim)

        # Symmetric decoder
        self.decoder_input = nn.Linear(latent_dim, 128 * 16 * 16)

        self.decoder = nn.Sequential(
            # First block
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(128, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            # Second block
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(128, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            # Third block
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(64, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),

            # Final layer
            nn.Conv2d(32, 3, 1),
            nn.Sigmoid()
        )

    def encode(self, x):
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        return self.fc_mu(x), self.fc_var(x)

    def reparameterize(self, mu, logvar):
        if self.training:
            std = torch.exp(0.5 * logvar)
            eps = torch.randn_like(std)
            return mu + eps * std
        return mu

    def decode(self, z):
        z = self.decoder_input(z)
        z = z.view(z.size(0), 128, 16, 16)
        return self.decoder(z)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

# Loss function with a very low KL weight
class VAELoss(nn.Module):
    def __init__(self, kld_weight=0.0001):  # Very low KL weight
        super().__init__()
        self.kld_weight = kld_weight

    def forward(self, recon_x, x, mu, logvar):
        recon_loss = F.mse_loss(recon_x, x, reduction='mean')
        kld_loss = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
        return recon_loss + self.kld_weight * kld_loss

In [None]:
# Training parameters
vae_model = VariationalAutoencoder().to(device)
vae_criterion = VAELoss()
vae_optimizer = optim.Adam(vae_model.parameters(), lr=0.0001)

# VAE training function
def train_vae_epoch(model, dataloader, criterion, optimizer):
    model.train()
    total_loss = 0

    for batch in dataloader:
        batch = batch.to(device)
        optimizer.zero_grad()

        recon_batch, mu, logvar = model(batch)
        loss = criterion(recon_batch, batch, mu, logvar)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

# VAE training
print("Starting VAE training...")
n_epochs = 100
best_loss = float('inf')
vae_losses = []

for epoch in range(n_epochs):
    loss = train_vae_epoch(vae_model, vae_train_loader, vae_criterion, vae_optimizer)
    vae_losses.append(loss)

    if loss < best_loss:
        best_loss = loss
        torch.save(vae_model.state_dict(), 'best_vae.pth')

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{n_epochs}], Loss: {loss:.4f}')

In [None]:
# Visualization of VAE results
@torch.no_grad()
def plot_vae_results(model, dataloader, n=5):
    model.eval()
    batch = next(iter(dataloader))
    batch = batch[:n].to(device)
    recon, _, _ = model(batch)

    batch = batch.cpu()
    recon = recon.cpu()

    plt.figure(figsize=(20, 4))
    for i in range(n):
        # Original
        plt.subplot(2, n, i + 1)
        plt.imshow(batch[i].permute(1, 2, 0), cmap='gray')
        plt.title('Original')
        plt.axis('off')

        # Reconstruction
        plt.subplot(2, n, i + n + 1)
        plt.imshow(recon[i].permute(1, 2, 0), cmap='gray')
        plt.title('Reconstruction VAE')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

print("\nVAE results on normal images:")
plot_vae_results(vae_model, vae_test_normal_loader)

print("\nVAE results on images with anomalies:")
# /!\ Complete the '...' to display the input and reconstructed images (vae_test_anomaly_loader) via the variational auto-encoder /!\
plot_vae_results(...)

❓ Questions about the advantages of the VAE:
- Why are VAE reconstructions more "blurry"?
- How is this "blurriness" actually an advantage for anomaly detection?
- How does the VAE handle unusual features differently?

In [None]:
@torch.no_grad()
def evaluate_vae(model, normal_loader, anomaly_loader):
    model.eval()

    normal_errors = []
    anomaly_errors = []

    # For normal images
    for batch in normal_loader:
        batch = batch.to(device)
        recon, mu, logvar = model(batch)
        # Pixel-wise reconstruction error
        recon_error = F.mse_loss(recon, batch, reduction='none')
        # Mean over channels only (keeps spatial dimensions)
        recon_error = recon_error.mean(dim=1)
        # Local maximum to detect local anomalies
        recon_error = F.max_pool2d(recon_error, kernel_size=3, stride=1, padding=1)
        # Mean over the image
        recon_error = recon_error.mean(dim=(1,2))

        normal_errors.extend(recon_error.cpu().numpy())

    # For images with anomalies
    for batch in anomaly_loader:
        batch = batch.to(device)
        recon, mu, logvar = model(batch)
        recon_error = F.mse_loss(recon, batch, reduction='none')
        recon_error = recon_error.mean(dim=1)
        recon_error = F.max_pool2d(recon_error, kernel_size=3, stride=1, padding=1)
        recon_error = recon_error.mean(dim=(1,2))

        anomaly_errors.extend(recon_error.cpu().numpy())

    # Error normalization
    all_errors = np.concatenate([normal_errors, anomaly_errors])
    error_mean = np.mean(all_errors)
    error_std = np.std(all_errors)
    normal_errors = (normal_errors - error_mean) / error_std
    anomaly_errors = (anomaly_errors - error_mean) / error_std

    # Compute metrics
    y_true = np.concatenate([np.zeros(len(normal_errors)), np.ones(len(anomaly_errors))])
    y_scores = np.concatenate([normal_errors, anomaly_errors])

    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)

    return {
        'normal_errors': normal_errors,
        'anomaly_errors': anomaly_errors,
        'AUC': roc_auc,
        'FPR': fpr,
        'TPR': tpr
    }

# Improved results visualization
metrics = evaluate_vae(vae_model, vae_test_normal_loader, vae_test_anomaly_loader)

# ROC curve
plt.figure(figsize=(8, 6))
plt.plot(metrics['FPR'], metrics['TPR'], color='darkorange', lw=2,
         label=f'ROC curve (AUC = {metrics[\"AUC\"]:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve of the VAE')
plt.legend(loc="lower right")
plt.show()

# Error distribution
plt.figure(figsize=(10, 6))
plt.hist(metrics['normal_errors'], bins=50, alpha=0.7, label='Normal', density=True)
plt.hist(metrics['anomaly_errors'], bins=50, alpha=0.7, label='Anomaly', density=True)
plt.title('Distribution of normalized reconstruction errors')
plt.xlabel('Reconstruction error (normalized)')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)
# /!\ Complete the '...' to display the plot /!\
plt.show()

### Analysis of Anomaly Reconstructions

❓ In-depth questions:
- How does the VAE "force" the learning of useful features?
- Why is the normal distribution constraint in the latent space important?
- How could anomaly detection be further improved?

### Conclusion and perspectives
❓ Final questions:
- Which model should be chosen for a real industrial application?
- How to adapt these models to other types of anomalies?
- What further improvements could be made?