# Exercise 27 Solution - Autoencoder

### Task
Train an autoencoder (on a dataset composed of circles) and apply it to the task of anomaly detection (using a dataset composed of circles and squares)
- Generate a normal and anomalous dataset (modify the data generation parameters if needed)
- Train the autoencoder on only the normal dataset (modify the neural network and training parameters if needed)
- Apply the autoencoder to the anomolous dataset and compare the reconstruction with reconstructions obtained with normal data
- Improve the autoencoder through the denoising autoencoder extension (corrupt the training data and modify the loss function)

### Learning goals
- Understand the autoencoder architecture and its training
- Familiarize yourself with the autoencoder implementation
- Use an autoencoder for anomaly detection

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import time
from torchinfo import summary

In [None]:
torch.manual_seed(2)
np.random.seed(2)
torch.backends.cudnn.deterministic = True
device = torch.device('mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu')

# Hyperparameters

**data generation parameters**

In [None]:
N = 128
domainLength = 1
numberOfCircles = 5
radius = 0.1

numberOfSamples = 20  #200
numberOfAnomolousSamples = 128

**neural network parameters**

In [None]:
depth = 4
numberOfFilters = 3
convolutionalLayers = 2

**training parameters**

In [None]:
epochs = 200
lr = 5e-3
batchSize = 64
regularization = 1e-4

## Data generation

**helper functions to generated normal and anomolous data**

In [None]:
def generateNonOverlappingCirclesInDomain(N, domainLength, numberOfCircles, radius):
    domain = np.ones((N, N))
    x = np.linspace(0, domainLength, N)
    y = np.linspace(0, domainLength, N)
    x, y = np.meshgrid(x, y)

    for i in range(numberOfCircles):
        overlap = True
        while overlap == True:
            xc = np.random.uniform(radius, domainLength - radius)
            yc = np.random.uniform(radius, domainLength - radius)

            mask = (x - xc) ** 2 + (y - yc) ** 2 < radius ** 2
            if ~np.any(domain[mask] == -1):
                overlap = False
        domain[mask] = -1
    return domain


def generateAnomolousDataInDomain(N, domainLength, numberOfCircles, radius, numberOfSquares):
    domain = np.ones((N, N))
    x = np.linspace(0, domainLength, N)
    y = np.linspace(0, domainLength, N)
    x, y = np.meshgrid(x, y)

    for i in range(numberOfCircles):
        overlap = True
        while overlap == True:
            xc = np.random.uniform(radius, domainLength - radius)
            yc = np.random.uniform(radius, domainLength - radius)

            if i >= numberOfSquares:
                mask = (x - xc) ** 2 + (y - yc) ** 2 < radius ** 2
            else:
                mask = x > xc - radius
                mask *= x < xc + radius
                mask *= y > yc - radius
                mask *= y < yc + radius
            if ~np.any(domain[mask] == -1):
                overlap = False
        domain[mask] = -1
    return domain

**data generation**

In [None]:
import os
os.makedirs("data", exist_ok=True)    # Ensure the data directory exists

normalSamples = torch.zeros((numberOfSamples, 1, N, N))
for i in range(numberOfSamples):
    normalSamples[i, 0] = torch.from_numpy(
        generateNonOverlappingCirclesInDomain(N, domainLength, numberOfCircles, radius)).to(torch.float32)
torch.save(normalSamples, "data/normalData.pt")

sampleList = []
anomalySamples = torch.zeros((numberOfSamples, 1, N, N))
for j in range(0, numberOfCircles + 1):  # 0 is not anomalous
    anomalySamples = torch.zeros((numberOfAnomolousSamples, 1, N, N))
    for i in range(numberOfAnomolousSamples):
        anomalySamples[i, 0] = torch.from_numpy(
            generateAnomolousDataInDomain(N, domainLength, numberOfCircles, radius, j)).to(torch.float32)
    torch.save(anomalySamples, "data/anomalyData" + str(j) + ".pt")
    sampleList.append(anomalySamples)

fig, ax = plt.subplots(1, 6, figsize=(10, 3))
for i in range(6):
    ax[i].imshow(sampleList[i][0, 0], origin='lower', cmap='jet', interpolation='none', vmin=-1, vmax=1)
    ax[i].axis('off')
plt.show()

## Noise functions (Exercise 27.2)

In [None]:
def add_gaussian_noise(sample, noise_level=0.1):
    """Add Gaussian noise to the input sample"""
    noise = torch.randn_like(sample) * noise_level
    noisy_sample = sample + noise
    noisy_sample = torch.clamp(noisy_sample, -1, 1)
    return noisy_sample

def add_salt_pepper_noise(sample, salt_prob=0.1, pepper_prob=0.1):
    """Add salt and pepper noise to the input sample"""
    noisy_sample = sample.clone()
    noise = torch.rand_like(sample)
    noisy_sample[noise < pepper_prob] = -1  # pepper noise (black pixels)
    noisy_sample[noise > (1 - salt_prob)] = 1  # salt noise (white pixels)
    return noisy_sample

def add_masking_noise(sample, mask_prob=0.1):
    """Add masking noise to the input sample"""
    noisy_sample = sample.clone()
    mask = torch.rand_like(sample) < mask_prob
    noisy_sample[mask] = -1
    return noisy_sample

In [None]:
torch.rand_like(sample) < 0.1

In [None]:
# Visualization of noise functions
sample = normalSamples[0:1]  # Take first sample for visualization

# Apply different noise types
gaussian_noisy = add_gaussian_noise(sample, noise_level=0.1)
salt_pepper_noisy = add_salt_pepper_noise(sample, salt_prob=0.1, pepper_prob=0.1)
masking_noisy = add_masking_noise(sample, mask_prob=0.2)


# Create visualization
fig, ax = plt.subplots(1, 4, figsize=(12, 3))

ax[0].imshow(sample[0, 0], origin='lower', vmin=-1, vmax=1, cmap='jet', interpolation='none')
ax[0].set_title('Original')
ax[0].axis('off')

ax[1].imshow(gaussian_noisy[0, 0], origin='lower', vmin=-1, vmax=1, cmap='jet', interpolation='none')
ax[1].set_title('Gaussian Noise')
ax[1].axis('off')

ax[2].imshow(salt_pepper_noisy[0, 0], origin='lower', vmin=-1, vmax=1, cmap='jet', interpolation='none')
ax[2].set_title('Salt & Pepper Noise')
ax[2].axis('off')

ax[3].imshow(masking_noisy[0, 0], origin='lower', vmin=-1, vmax=1, cmap='jet', interpolation='none')
ax[3].set_title('Masking Noise')
ax[3].axis('off')

plt.tight_layout()
plt.show()

# Print noise statistics
print(f"Original - Min: {sample.min():.3f}, Max: {sample.max():.3f}, Mean: {sample.mean():.3f}")
print(f"Gaussian - Min: {gaussian_noisy.min():.3f}, Max: {gaussian_noisy.max():.3f}, Mean: {gaussian_noisy.mean():.3f}")
print(f"Salt & Pepper - Min: {salt_pepper_noisy.min():.3f}, Max: {salt_pepper_noisy.max():.3f}, Mean: {salt_pepper_noisy.mean():.3f}")
print(f"Masking - Min: {masking_noisy.min():.3f}, Max: {masking_noisy.max():.3f}, Mean: {masking_noisy.mean():.3f}")

## Dataset definition

**load generated data into PyTorch dataset**

In [None]:
class normalDataset(Dataset):
    def __init__(self):
        self.data = torch.load("data/normalData.pt", weights_only=False)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]


class anomalyDataset(Dataset):
    def __init__(self, degree):
        self.data = torch.load("data/anomalyData" + str(degree) + ".pt", weights_only=False)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

## Autoencoder architecture definition

In [None]:
class autoencoder(torch.nn.Module):
    def __init__(self, depth, numberOfFilters, convolutionalLayers, bottleneckConvolutions=True):
        super().__init__()

        self.depth = depth
        self.convolutionalLayers = convolutionalLayers
        self.bottleneckConvolutions = bottleneckConvolutions

        FilterSizes = np.linspace(-1, depth - 1, depth + 1, dtype=np.int32)
        FilterSizes[1:] = 2 ** FilterSizes[1:] * numberOfFilters
        FilterSizes[0] = 1

        self.convDown = torch.nn.ModuleList()
        self.batchNormDown = torch.nn.ModuleList()
        self.activationDown = torch.nn.ModuleList()
        self.downsample = torch.nn.ModuleList()

        self.convBottleneck = torch.nn.ModuleList()
        self.batchNormBottleneck = torch.nn.ModuleList()
        self.activationBottleneck = torch.nn.ModuleList()

        self.convUp = torch.nn.ModuleList()
        self.batchNormUp = torch.nn.ModuleList()
        self.activationUp = torch.nn.ModuleList()
        self.upsample = torch.nn.ModuleList()

        for i in range(depth):
            # downsampling
            for j in range(convolutionalLayers):
                if j == 0:
                    self.convDown.append(torch.nn.Conv2d(FilterSizes[i], FilterSizes[i + 1], kernel_size=3, stride=1,
                                                         padding=1))  # ADD SECOND LAYER
                else:
                    self.convDown.append(
                        torch.nn.Conv2d(FilterSizes[i + 1], FilterSizes[i + 1], kernel_size=3, stride=1,
                                        padding=1))  # ADD SECOND LAYER
                self.batchNormDown.append(torch.nn.BatchNorm2d(FilterSizes[i + 1]))
                self.activationDown.append(torch.nn.PReLU(init=0.2))
                self.downsample.append(torch.nn.MaxPool2d(kernel_size=2, stride=2))

                # layers at bottleneck
            for j in range(convolutionalLayers):
                self.convBottleneck.append(
                    torch.nn.Conv2d(FilterSizes[-1], FilterSizes[-1], kernel_size=3, stride=1, padding=1))
                self.batchNormBottleneck.append(torch.nn.BatchNorm2d(FilterSizes[-1]))
                self.activationBottleneck.append(torch.nn.PReLU(init=0.2))

            # upsampling
            for j in range(convolutionalLayers):
                if j == 0:
                    self.convUp.append(
                        torch.nn.Conv2d(FilterSizes[-i - 1], FilterSizes[-i - 2], kernel_size=3, stride=1, padding=1))
                else:
                    self.convUp.append(
                        torch.nn.Conv2d(FilterSizes[-i - 2], FilterSizes[-i - 2], kernel_size=3, stride=1, padding=1))
                self.batchNormUp.append(torch.nn.BatchNorm2d(FilterSizes[-i - 2]))
                self.activationUp.append(torch.nn.PReLU(init=0.2))
            self.upsample.append(torch.nn.Upsample(scale_factor=2,
                                                   mode='nearest'))  # nearest instead of bilinear as field is not continuous

    def forward(self, x):
        y = x

        # downsampling        
        for i in range(self.depth):
            for j in range(self.convolutionalLayers):
                y = self.activationDown[i * self.convolutionalLayers + j](
                    self.batchNormDown[i * self.convolutionalLayers + j](
                        self.convDown[i * self.convolutionalLayers + j](y)))
            y = self.downsample[i](y)

        # bottleneck
        if self.bottleneckConvolutions == True:
            for j in range(self.convolutionalLayers):
                y = self.activationBottleneck[j](self.batchNormBottleneck[j](self.convBottleneck[j](y)))

        # upsampling
        for i in range(self.depth):
            y = self.upsample[i](y)
            for j in range(self.convolutionalLayers):
                y = self.activationUp[i * self.convolutionalLayers + j](
                    self.batchNormUp[i * self.convolutionalLayers + j](
                        self.convUp[i * self.convolutionalLayers + j](y)))

        return y

## Training

**data preparation including training/validation split**

In [None]:
dataset = normalDataset()
datasetTraining, datasetValidation = torch.utils.data.dataset.random_split(dataset, [0.9, 0.1])
dataloaderTraining = DataLoader(datasetTraining, batch_size=batchSize, shuffle=True)
dataloaderValidation = DataLoader(datasetValidation, batch_size=10000, shuffle=False)  # all samples

**neural network instantiation**

In [None]:
model = autoencoder(depth, numberOfFilters, convolutionalLayers, bottleneckConvolutions=True)
summary(model, (1, 1, 128, 128))
print("achieved reduction in bottleneck: {:.2f}".format(
    (numberOfFilters * (depth + 1) * (128 / 2 ** depth) ** 2) / 128 ** 2))
model = model.to(device) # move model to gpu, summary only works on cpu

**optimizer and history**

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=regularization)

costHistoryTrain = np.zeros(epochs)
costHistoryValidation = np.zeros(epochs)

**training loop**

In [None]:
start = time.perf_counter()
start0 = start
bestCost = 1e10

for epoch in range(epochs):

    model.train()
    for batch, sample in enumerate(dataloaderTraining):
        optimizer.zero_grad(set_to_none=True)

        # Choose which noise to apply (uncomment one of the following lines)
        # sample = add_gaussian_noise(sample, noise_level=0.1)
        sample = add_salt_pepper_noise(sample, salt_prob=0.1, pepper_prob=0.1)
        # sample = add_masking_noise(sample, mask_prob=0.1)


        sample = sample.to(device)  # move sample to gpu

        cost = torch.sum((model(sample) - sample) ** 2) / len(sample) / 128 ** 2
        costHistoryTrain[epoch] += cost.detach().cpu()

        cost.backward()

        optimizer.step()

        del sample
    costHistoryTrain[epoch] /= (batch + 1)

    model.eval()
    sample = next(iter(dataloaderValidation))  # get one batch of validation data
    with torch.no_grad():
        sample = sample.to(device)
        cost = torch.sum((model(sample) - sample) ** 2) / len(sample) / 128 ** 2
        costHistoryValidation[epoch] = cost.detach().cpu()

    if (epoch % 10 == 0):
        elapsed_time = time.perf_counter() - start
        string = "Epoch: {}/{}\t\tCost (Train): {:.3E}\t\tCost (Validation): {:.3E}\nEpoch time: {:2f}"
        print(string.format(epoch, epochs - 1, costHistoryTrain[epoch], costHistoryValidation[epoch], elapsed_time))
        start = time.perf_counter()

    # early stopping
    if bestCost > costHistoryValidation[epoch]:
        bestCost = costHistoryValidation[epoch]
        torch.save(model.state_dict(), "model")
        bestEpoch = epoch

print("Total elapsed time: {:2f}".format(time.perf_counter() - start0))
print("best epoch: {:d}".format(bestEpoch))
model.load_state_dict(torch.load("model", map_location=device, weights_only=False))  # early stopping


## Post-processing

In [None]:
model.eval()
print("validation cost: {:.2e} training cost: {:.2e}".format(np.min(costHistoryValidation), np.min(costHistoryTrain)))

**training history**

In [None]:
fig, ax = plt.subplots()
ax.grid()
ax.plot(costHistoryTrain, 'k')
ax.plot(costHistoryValidation, 'r')
ax.set_yscale('log')
plt.show()

**prediction of normal training sample**

In [None]:
sample = next(iter(dataloaderTraining))
sample = sample.to(device)  # move sample to gpu
index = 0
fig, ax = plt.subplots(1, 3)
ax[0].imshow(sample[index, 0].detach().cpu(), origin='lower', vmin=0, vmax=1, cmap='jet', interpolation='none')
ax[1].imshow(model(sample)[index, 0].detach().cpu(), origin='lower', vmin=0, vmax=1, cmap='jet', interpolation='none')
ax[2].imshow((sample[index, 0].detach().cpu() - model(sample)[index, 0].detach().cpu()) ** 2, origin='lower', cmap='jet', interpolation='none')
for i in range(3):
    ax[i].axis('off')
plt.show()

**prediction of normal validation sample**

In [None]:
sample = next(iter(dataloaderValidation))
sample = sample.to(device)  # move sample to gpu
index = 0
fig, ax = plt.subplots(1, 3)
ax[0].imshow(sample[index, 0].detach().cpu(), origin='lower', vmin=0, vmax=1, cmap='jet')
ax[1].imshow(model(sample)[index, 0].detach().cpu(), origin='lower', vmin=0, vmax=1, cmap='jet')
ax[2].imshow((sample[index, 0].detach().cpu() - model(sample)[index, 0].detach().cpu()) ** 2, origin='lower', cmap='jet')
for i in range(3):
    ax[i].axis('off')
plt.show()

**prediction of anomolous (previously unseen & out of distribution) data**

In [None]:
# change degree to choose number of squares
anomaly_dataset = anomalyDataset(degree=1)
print(len(anomaly_dataset))
anomalyPrediction = model(anomaly_dataset.data.to(device))  # move anomaly dataset to gpu

index = 0
fig, ax = plt.subplots(1, 3)
ax[0].imshow(anomaly_dataset[index, 0].detach().cpu(), origin='lower', vmin=0, vmax=1, cmap='jet')
ax[1].imshow(anomalyPrediction[index, 0].detach().cpu(), origin='lower', vmin=0, vmax=1, cmap='jet')
ax[2].imshow((anomaly_dataset[index, 0].detach().cpu() - anomalyPrediction[index, 0].detach().cpu()) ** 2, origin='lower',
             vmin=0, vmax=1, cmap='jet')
for i in range(3):
    ax[i].axis('off')
plt.show()

**compute reconstruction error for varying degree of anomaly (i.e., number of squares)**

In [None]:
errors = np.zeros((6, len(anomaly_dataset.data)))
for degree in range(6):
    anomaly_dataset = anomalyDataset(degree)
    anomalyPrediction = model(anomaly_dataset.data.to(device))  # move anomaly dataset to gpu
    errors[degree] = torch.mean((anomaly_dataset.data - anomalyPrediction.detach().cpu()) ** 2, dim=(1, 2, 3))

**histogram of reconstruction errors**

In [None]:
numberOfBins = 100
bins = np.histogram(errors, bins=numberOfBins)[1]
fig, ax = plt.subplots()

ax.hist(errors[0], bins=bins, color='r', alpha=0.7, label='no squares, 5 circles')
ax.hist(errors[1], bins=bins, color='b', alpha=0.7, label='1 square, 4 circles')
ax.hist(errors[2], bins=bins, color='gray', alpha=0.7, label='2 squares, 3 circles')
ax.hist(errors[3], bins=bins, color='k', alpha=0.7, label='3 squares, 2 circles')
ax.hist(errors[4], bins=bins, color='orange', alpha=0.7, label='4 squares, 1 circle')
ax.hist(errors[5], bins=bins, color='magenta', alpha=0.7, label='5 squares, no circles')

ax.set_xlabel("reconstruction error")
ax.set_ylabel("number of structures")

legend = ax.legend()
fig.tight_layout()
plt.show()
