# Anomaly Detection with Convolutional Autoencoders using Reconstruction Loss

This is my work for constructing and testing a Convolutional Autoencoder (CAE) that helps detect Alzheimer's Disease early using images from three linked conditions. Specifically, I'm using the reconstruction loss values that my CAE has for each class to detect anomalies.

First, I did all relevant imports for the project and seeded everything.

In [None]:
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm
from sklearn.manifold import TSNE
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torchvision.utils import make_grid
from torch.amp import GradScaler, autocast


def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

set_seed(0)

## Set Percent of Subset for Training the EncoderFor Classifier
train_set_pct = 0.4

Next, I mounted my google drive, where I unzipped all files downloaded in my local drive. All data was obtained from the article "Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning" published in the journal Cell.

Citation for Data Usage:
Kermany D, Goldbaum M, Cai W et al. Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning. Cell. 2018; 172(5):1122-1131. doi:10.1016/j.cell.2018.02.010.

In [None]:
## Mounting the Google Drive
from google.colab import drive
drive.mount('/content/drive')

!rsync -ah -progress "/content/drive/MyDrive/Colab Notebooks/Kermany Data/ZhangLabData.zip" "/content"
!unzip /content/ZhangLabData.zip

PATH_TRAIN = "/content/CellData/OCT/train"
PATH_TEST = "/content/CellData/OCT/test"

## Preparing the Dataset

I defined the transformations, a class for loading the images which is used later, and built the train and test set for the optical coherence tomography (OCT) images. I also split off a random subset of the train data to train my EncoderForClassifier which is defined later.

In [None]:
GRAY_MEAN = [0.5]
GRAY_STD = [0.5]

train_transform = transforms.Compose(
    [
        transforms.Lambda(lambda x: x.convert("L")),
        transforms.Resize((128,128)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=GRAY_MEAN, std=GRAY_STD)
    ]
)

val_transform = transforms.Compose(
    [
        transforms.Lambda(lambda x: x.convert("L")),
        transforms.Resize((128,128)),
        transforms.ToTensor(),
        transforms.Normalize(mean=GRAY_MEAN, std=GRAY_STD)
    ]
)

"""
Class for Loading Images
"""
class ImageLoaderDataset(nn.Module):
    def __init__(self, path_to_folder, transform):
      """
      Initializes instance of the ImageLoaderDataset Class

      Parameters:
      self (ImageLoaderDataset): instance of the class
      path_to_folder (str): the path to the data
      transform (transforms): the transformation to the data
      """
        self.path_to_folder = path_to_folder
        self.training_files = os.listdir(path_to_folder)
        self.transform = transform

    def __len__(self):
      """
      Returns the length of the data files

      Parameter:
      self (ImageLoaderDataset): instance of the class

      Returns:
      int: the length of training files
      """
        return len(self.training_files)

    def __getitem__(self, idx):
      """
      Gets the image at an index in the training files

      Parameters:
      self (ImageLoaderDataset): instance of the class
      idx (int): the index of the image you are returning

      Returns:
      Image: the image at that index
      """
        sample = self.training_files[idx]
        path_to_sample = os.path.join(self.path_to_folder, sample)

        image = Image.open(path_to_sample).convert("L")
        image = self.transform(image)

        return image

## Using Image Folder to Build the Train and Test Dataset ##
train_set = ImageFolder(PATH_TRAIN, transform=train_transform)
test_set = ImageFolder(PATH_TEST, transform=train_transform)

## Assigning Each Class an index ##
data_classes = {idx: val for (idx, val) in enumerate(train_set.classes)}

## Choosing a Random Subset for Pretraining ##
random_indices = random.sample(range(len(train_set)), int(len(train_set) * train_set_pct))
train_set = Subset(train_set, random_indices)

print(f"Training with {len(train_set)} samples")

## Defining ResidualBlocks & The Encoder

Next, I defined a class for ResidualBlocks and the other key encoding classes: the EncoderBlocks, the Encoder, and the EncoderForClassifier, which I will later train on the subset of the data.

In [None]:
"""
ResidualBlock: a class that represents a block of residual connections
"""
class ResidualBlock(nn.Module):

    def __init__(self,
                 in_channels,
                 out_channels,
                 dropout_p=0.0):
      """
      Initializes the ResidualBlock class.

      Parameters:
      self (Residual Block): instance of the class
      in_channels (int): number of input channels
      out_channels (int): number of output channels
      dropout_p (float): dropout probability percent if you are randomly turning off some % of nodes, set to 0
      """
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding="same")
        self.norm1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding="same")
        self.norm2 = nn.BatchNorm2d(out_channels)

        self.identity_conv = nn.Identity()
        if in_channels != out_channels:
            self.identity_conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1)

    def forward(self, x):
      """
      Forward function for ResidualBlock. Applies a residual connection by applying a convolution
      that goes from the input channels to the output channels with stride=1, applies a BatchNorm2d and
      then the activation function. Applies a second convolution from output channels to output channels and
      a BatchNorm2d and then activation again. Finally, it has the residual effect by adding the identity on the residual.

      Parameters:
      self (ResidualBlock): instance of the class
      x: the input to the block

      Returns:
      x: the output after adding the residual
      """

        residual = x

        x = self.conv1(x)
        x = self.norm1(x)
        x = F.relu(x)

        x = self.conv2(x)
        x = self.norm2(x)
        x = F.relu(x)

        x = x + self.identity_conv(residual)

        return x

"""
EncoderBlock: a class that defines the structure and ResidualBlocks that make up the Encoder
of my CAE.
"""
class EncoderBlock(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 residual_blocks_per_group=1,
                 downsample=True,
                 dropout_p=0.0):
      """
      Initializes an instance of the EncoderBlock class.

      Parameters:
      self (EncoderBlock): instance of the class
      in_channels (int): number of input channels
      out_channels (int): number of output channels
      residual_blocks_per_group (int): number of ResidualBlocks per group, set to 1
      downsample (bool): True if downsampling, else False
      dropout_p(float): % of nodes randomly turned off, set to 0.0
      """
        super().__init__()

        self.encoder_block = nn.ModuleList([])
        for i in range(residual_blocks_per_group):
            in_c = in_channels if i == 0 else out_channels
            self.encoder_block.append(
                ResidualBlock(in_c, out_channels, dropout_p)
            )

        if downsample:
            self.encoder_block.append(
                nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=2, padding=1)
            )

    def forward(self, x):
      """
      Forward function for the EncoderBlock that specifies how input flows through the network.

      Parameters:
      self (EncoderBlock): instance of the class
      x: the input passed to the layer
      """
        for block in self.encoder_block:
            x = block(x)

        return x

"""
Encoder: encoder class for the CAE, which has a sequence of EncoderBlocks which go through a channel pattern
and which downsample.
"""
class Encoder(nn.Module):
    def __init__(self,
                 in_channels,
                 channel_pattern=None,
                 residual_blocks_per_group=1,
                 dropout_p=0.0):
        """
        Initializes the Encoder class

        Parameters:
        self (Encoder): instance of the class
        in_channels (int): number of input channels
        channel_pattern (array): the pattern of number of channels for the encoder.
                                If None, default is [32,64,128,256]
        residual_blocks_per_group (int): the number of residual blocks per group, set to 1
        dropout_p (float): percent of nodes randomly turned off, set to 0.0
        """
        super().__init__()

        self.channel_pattern = channel_pattern
        if channel_pattern is None:
            self.channel_pattern = [32,64,128,256]


        self.blocks1 = EncoderBlock(in_channels,
                                    self.channel_pattern[0],
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    downsample=True,
                                    dropout_p=dropout_p)

        self.blocks2 = EncoderBlock(self.channel_pattern[0],
                                    self.channel_pattern[1],
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    downsample=True,
                                    dropout_p=dropout_p)

        self.blocks3 = EncoderBlock(self.channel_pattern[1],
                                    self.channel_pattern[2],
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    downsample=True,
                                    dropout_p=dropout_p)

        self.blocks4 = EncoderBlock(self.channel_pattern[2],
                                    self.channel_pattern[3],
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    downsample=True,
                                    dropout_p=dropout_p)

    def forward(self, x):
      """
      The forward function for the Encoder that specifies how input flows through the
      network. It goes through all 4 encoder blocks.
      """
        x = self.blocks1(x)
        x = self.blocks2(x)
        x = self.blocks3(x)
        x = self.blocks4(x)

        return x

"""
EncoderForClassifier: This is the encoder which learns the classification, being
trained later on just a percent of the data. I will use this to load pretrained weights
to the encoder of the CAE later.
"""
class EncoderForClassifier(nn.Module):
  """
  Initializes the EncoderForClassifier.

  Parameters:
  self (Encoder): instance of the class
  num_classes (int): number of classes, set to four since the OCT dataset has four
  in_channels (int): number of input channels
  channel_pattern (array): the pattern of number of channels for the encoder.
                           If None, default is [32,64,128,256]
  residual_blocks_per_group (int): the number of residual blocks per group, set to 1
  dropout_p (float): percent of nodes randomly turned off, set to 0.0
  """
    def __init__(self,
                 num_classes=4,
                 in_channels=1,
                 channel_pattern=None,
                 residual_blocks_per_group=1,
                 dropout_p=0.0):
        super().__init__()

        self.encoder = Encoder(in_channels,
                               channel_pattern=channel_pattern,
                               residual_blocks_per_group=residual_blocks_per_group,
                               dropout_p=dropout_p)

        self.head = nn.Linear(self.encoder.channel_pattern[-1] * 8 * 8, num_classes)

    def forward(self, x):
        """
        Forward function for the EncoderForClassifier

        Parameters:
        self (EncoderForClassifier): instance of the class
        x: input passed to EncoderForClassifier
        """

        x = self.encoder(x)
        x = x.flatten(1)
        x = self.head(x)

        return x


## Train Classifier on Subset

Here, I wrote my training script and trained the EncoderForClassifier on a subset of the data. I also saved the state dict of the encoder.

In [None]:
"""
Training function for the EncoderForClassifier

Parameters:
model: the model being trained
device: the device being used
epochs (int): number of epochs
optimizer: the optimizer
loss_fn: the loss function
trainloader: the data loader for the training data
valloader: the data loader for the test data

Returns:
log_training: the training log that specifies the epoch, training loss, training
accuracy, validation loss, and validation accuracy.
model: the model
"""
def train(model, device, epochs, optimizer, loss_fn, trainloader, valloader):
    log_training = {"epoch": [],
                    "training_loss": [],
                    "training_acc": [],
                    "validation_loss": [],
                    "validation_acc": []}

    model = model.to(device)
    scaler = GradScaler()  # Initialize the gradient scaler for mixed precision

    for epoch in range(1, epochs + 1):
        print(f"Starting Epoch {epoch}")
        training_losses, training_accuracies = [], []
        validation_losses, validation_accuracies = [], []

        model.train()  # Turn On BatchNorm and Dropout
        for image, label in tqdm(trainloader):
            image, label = image.to(device), label.to(device)
            optimizer.zero_grad()

            # Mixed precision training block
            with autocast(device_type="cuda", dtype=torch.bfloat16):
                out = model.forward(image)
                ### CALCULATE LOSS ###
                loss = loss_fn(out, label)

            # Scale the loss and backpropagate
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            training_losses.append(loss.item())

            ### CALCULATE ACCURACY ###
            predictions = torch.argmax(out, axis=1)
            accuracy = (predictions == label).sum() / len(predictions)
            training_accuracies.append(accuracy.item())

        model.eval()  # Turn Off Batchnorm
        for image, label in tqdm(valloader):
            image, label = image.to(device), label.to(device)

            with torch.no_grad():
                out = model.forward(image)
            ### CALCULATE LOSS ###
            loss = loss_fn(out, label)

            validation_losses.append(loss.item())

            ### CALCULATE ACCURACY ###
            predictions = torch.argmax(out, axis=1)
            accuracy = (predictions == label).sum() / len(predictions)
            validation_accuracies.append(accuracy.item())

        training_loss_mean, training_acc_mean = np.mean(training_losses), np.mean(training_accuracies)
        valid_loss_mean, valid_acc_mean = np.mean(validation_losses), np.mean(validation_accuracies)

        log_training["epoch"].append(epoch)
        log_training["training_loss"].append(training_loss_mean)
        log_training["training_acc"].append(training_acc_mean)
        log_training["validation_loss"].append(valid_loss_mean)
        log_training["validation_acc"].append(valid_acc_mean)

        print("Training Loss:", training_loss_mean)
        print("Training Acc:", training_acc_mean)
        print("Validation Loss:", valid_loss_mean)
        print("Validation Acc:", valid_acc_mean)

    return log_training, model

#WHY
model = EncoderForClassifier(channel_pattern=[64,128,256,256],
                             residual_blocks_per_group=2,
                             dropout_p=0.2)

device = "cuda" if torch.cuda.is_available() else "cpu"
epochs = 8
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.01)
loss_fn = nn.CrossEntropyLoss()
trainloader = DataLoader(train_set, batch_size=64, shuffle=True, num_workers=16)
testloader = DataLoader(test_set, batch_size=64, shuffle=False, num_workers=16)

log_training, model = train(model=model,
                            device=device,
                            epochs=epochs,
                            optimizer=optimizer,
                            loss_fn=loss_fn,
                            trainloader=trainloader,
                            valloader=testloader)

torch.save(model.encoder.state_dict(), "deepEncoder" + str(int(train_set_pct*100)) +".pt")

### Defining my CAE

I defined the main Variational Autoencoder model. Of course, I had to define relevant classes including the class for upsampling and the Decoder (along with the DecoderBlock) for this task.

In [None]:
"""
UpsampleBlock: Class for Upsampling for the CAE since Residual Connections alone don't
change image resolution
"""
class UpsampleBlock(nn.Module):
  """
  Initializes UpsampleBlock

  Parameters:
  self (UpsampleBlock): instance of the class
  in_channels (int): number of input channels
  out_channels (int): number of output channels
  """
    def __init__(self,
                 in_channels,
                 out_channels):
        super().__init__()

        self.up = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding="same")
        )
  """
  Forward function for UpsampleBlock that applies upsampling to the input

  Parameters:
  self (UpSampleBlock): instance of the class
  x: input passed into model
  """
    def forward(self, x):
        return self.up(x)

"""
DecoderBlock: a class that details the structure of residual blocks in the decoder.
"""
class DecoderBlock(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 residual_blocks_per_group=1,
                 upsample=True,
                 dropout_p=0.0):
       """
      Initializes the DecoderBlock.

      Parameters:
      self (DecoderBlock): instance of the class
      in_channels (int): number of input channels
      out_channels (int): number of output channels
      residual_blocks_per_group (int): number of ResidualBlocks per group, set to 1
      upsample (bool): True if upsampling, else False
      dropout_p(float): % of nodes randomly turned off, set to 0.0

      """
        super().__init__()

        self.encoder_block = nn.ModuleList([])
        for i in range(residual_blocks_per_group):
            in_c = in_channels if i == 0 else out_channels
            self.encoder_block.append(
                ResidualBlock(in_c, out_channels, dropout_p)
            )

        if upsample:
            self.encoder_block.append(
                UpsampleBlock(out_channels, out_channels)
            )
  """
  Forward function for the DecoderBlock that specifies how input
  flows through the network's layers.

  Parameters:
  self (DecoderBlock): instance of the class
  x: the input passed to the layer
  """
    def forward(self, x):

        for block in self.encoder_block:
            x = block(x)

        return x

"""
Decoder: decoder class for the CAE, which has a sequence of DecoderBlocks which go through a channel pattern
and which upsample.
"""
class Decoder(nn.Module):
    def __init__(self,
                 out_channels,
                 channel_pattern=None,
                 residual_blocks_per_group=1,
                 dropout_p=0.0):
      """
      Initializes the Encoder class

      Parameters:
      self (Decoder): instance of the class
      out_channels (int): number of output channels
      channel_pattern (array): the pattern of number of channels for the encoder.
                                If None, default is [32,64,128,256]
      residual_blocks_per_group (int): the number of residual blocks per group, set to 1
      dropout_p (float): percent of nodes randomly turned off, set to 0.0
      """
        super().__init__()

        self.channel_pattern = channel_pattern
        if channel_pattern is None:
            self.channel_pattern = [32,64,128,256]
        self.channel_pattern = list(reversed(self.channel_pattern))

        self.blocks1 = DecoderBlock(self.channel_pattern[0],
                                    self.channel_pattern[1],
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    upsample=True,
                                    dropout_p=dropout_p)

        self.blocks2 = DecoderBlock(self.channel_pattern[1],
                                    self.channel_pattern[2],
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    upsample=True,
                                    dropout_p=dropout_p)

        self.blocks3 = DecoderBlock(self.channel_pattern[2],
                                    self.channel_pattern[3],
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    upsample=True,
                                    dropout_p=dropout_p)

        self.blocks4 = DecoderBlock(self.channel_pattern[3],
                                    out_channels,
                                    residual_blocks_per_group=residual_blocks_per_group,
                                    upsample=True,
                                    dropout_p=dropout_p)
  """
  The forward function for the Decoder that specifies how input flows through the
  network. It goes through all 4 decoder blocks.
  """
    def forward(self, x):

        x = self.blocks1(x)
        x = self.blocks2(x)
        x = self.blocks3(x)
        x = self.blocks4(x)

        return x
"""
CAE: Convolutional Autoencoder
"""
class CAE(nn.Module):
    def __init__(self,
                 in_channels,
                 channel_pattern=None,
                 residual_blocks_per_group=1,
                 dropout_p=0.0):
      """
      Initializes the CAE class

      Parameters:
      self (CAE): instance of the class
      in_channels (int): number of input channels
      latent_channels (int): number of latent channels
      channel_pattern (array): the pattern of number of channels for the encoder.
                                If None, default is [32,64,128,256]
      residual_blocks_per_group (int): the number of residual blocks per group, set to 1
      dropout_p (float): percent of nodes randomly turned off, set to 0.0
      """
        super().__init__()

        self.channel_pattern = channel_pattern
        if channel_pattern is None:
            self.channel_pattern = [32,64,128,256]

        self.encoder = Encoder(in_channels=in_channels,
                               channel_pattern=self.channel_pattern,
                               residual_blocks_per_group=residual_blocks_per_group,
                               dropout_p=dropout_p)

        self.decoder = Decoder(out_channels=in_channels,
                               channel_pattern=self.channel_pattern,
                               residual_blocks_per_group=residual_blocks_per_group,
                               dropout_p=dropout_p)

  """
  The forward function for the CAE. Goes through the encoder and decoder.

  Parameters:
  self (CAE): instance of class
  x: input

  Returns:
  enc: the encoded features
  dec: the decoded image
  """
    def forward(self, x):

        with torch.no_grad():
            enc = self.encoder(x)

        dec = self.decoder(enc)
        dec = F.tanh(dec)

        return enc, dec

### Training the CAE ###

Next, I trained the CAE.

In [None]:
def train(model,
          train_set,
          test_set,
          batch_size,
          training_iterations,
          evaluation_iterations,
          verbose=False):
  """
  Training Script for the CAE.

  Parameters:
  model: the model
  kl_weight: the weight for the kL divergence loss
  train_set: the train set
  test_set: the test set
  batch_size: the batch size
  training_iterations: the number of training iterations
  evaluation_iterations: the number of evaluation iterations

  Returns:
  model: the model
  train_losses: the train losses
  evaluation_losses: the evaluation losses
  encoded_data_per_eval: the encoded data
  """
    print("Training Model!")
    print(model)

    ## Set Device, the Loss Function, the DataLoaders, and the Optimizer ##
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    huber_loss = torch.nn.HuberLoss(reduction='mean', delta=1.0)
    trainloader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=8)
    testloader = DataLoader(test_set, batch_size=batch_size, shuffle=True, num_workers=8)
    optimizer = optim.Adam(model.parameters(), lr=0.0005)

    train_loss = []
    evaluation_loss = []
    train_losses = []
    evaluation_losses = []
    encoded_data_per_eval = []

    ## Progress Bar ##
    pbar = tqdm(range(training_iterations))

    train = True
    step_counter = 0
    while train:

        for images in trainloader:

            images = images.to(device)
            encoded, reconstruction = model(images)
            loss = huber_loss(reconstruction, images)
            train_loss.append(loss.item())

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

            optimizer.step()
            optimizer.zero_grad()

            if step_counter % evaluation_iterations == 0:

                model.eval()
                encoded_evaluations = []

                for images in testloader:

                    images = images.to(device)
                    encoded, reconstruction = model(images)
                    loss = huber_loss(reconstruction, images)
                    evaluation_loss.append(loss.item())


                train_loss = np.mean(train_loss)
                evaluation_loss = np.mean(evaluation_loss)

                train_losses.append(train_loss)
                evaluation_losses.append(evaluation_loss)

                print("Training Loss", train_loss)
                print("Evaluation Loss", evaluation_loss)

                train_loss = []
                evaluation_loss = []
                model.train()

                images = images[:4].cpu()
                gens = reconstruction[:4].cpu()

                images = (images + 1) / 2
                gens = torch.clamp(((gens + 1) / 2), 0, 1)

                combined = torch.cat([images, gens], dim=0)

                grid = make_grid(combined, nrow=4)

                plt.imshow(grid.permute(1, 2, 0))
                plt.axis('off')
                plt.show()

            step_counter += 1
            pbar.update(1)

            if step_counter >= training_iterations:
                print("Completed Training.")
                train = False
                break

    ## Storing Encoded Data ##
    encoded_data_per_eval = [np.array(i) for i in encoded_data_per_eval]

    print("Final Training Loss", train_losses[-1])
    print("Final Evaluation Loss", evaluation_losses[-1])

    return model, train_losses, evaluation_losses, encoded_data_per_eval


conv_model = CAE(1, channel_pattern=[64, 128, 256, 256], residual_blocks_per_group=2)

conv_model.encoder.load_state_dict(torch.load("deepEncoder" + str(int(train_set_pct*100)) +".pt", weights_only=True))

train_set = ImageLoaderDataset("CellData/OCT/train/NORMAL", transform=train_transform)
test_set = ImageLoaderDataset("CellData/OCT/test/NORMAL", transform=val_transform)

conv_model, train_losses, evaluation_losses, conv_encoded_data_per_eval = train(conv_model,
                                                                                train_set,
                                                                                test_set,
                                                                                batch_size=32,
                                                                                training_iterations=15000,
                                                                                evaluation_iterations=2500)



I saved the CAE's state dictionary.

In [None]:
torch.save(model.encoder.state_dict(), "cae18_"+ str(train_set_pct*100)+"_lD.pt")

# Building Boxplots

I wanted to build boxplots to conpare the range, mean, and other key features about the reconstruction loss by class for my CAE.

I evaluated the model and saved the losses for each retinal disease to lists. I created a data frame using a dictionary that linked each class label to that class's losses.

I saved the losses to a txt file as well, in case I want to use it later. But, the main thing I did was using seaborn and created plots for the normal reconstruction losses in comparison with each retinal condition.

I also created a plot with all the three conditions and the normal losses' to compare them.

In [None]:
normal_set = ImageLoaderDataset("/content/CellData/OCT/test/NORMAL", transform=val_transform)
dme_set = ImageLoaderDataset("/content/CellData/OCT/test/DME", transform=val_transform)
drusen_set = ImageLoaderDataset("/content/CellData/OCT/test/DRUSEN", transform=val_transform)
cnv_set = ImageLoaderDataset("/content/CellData/OCT/test/CNV", transform=val_transform)

def eval(model, dataset):

    model.eval()
    model = model.to("cuda")

    huber_loss = torch.nn.L1Loss(reduction='none')

    loader = DataLoader(dataset, batch_size=32)

    losses = []
    for images in loader:

        images = images.to("cuda")
        with torch.no_grad():
            enc, dec = model(images)

        loss = huber_loss(dec, images)
        loss = loss.mean(dim=[1,2,3])

        losses.extend(loss.cpu().tolist())

    return losses

normal_losses = eval(conv_model, normal_set)
cnv_losses = eval(conv_model, cnv_set)
drusen_losses = eval(conv_model, drusen_set)
dme_losses = eval(conv_model, dme_set)

import pandas as pd

## Initialize Data to Dicts ##
d = {'CNV': cnv_losses,
     'DME': dme_losses,
     'DRUSEN': drusen_losses,
     'Normal': normal_losses
     }
## Creates Dataframe. ##
df = pd.DataFrame(d)

print(df)
df.to_csv('losses by class'+str(int(train_set_pct*100))+'.txt', sep='\t', index=False)

cn = {'CNV': cnv_losses,
     'Normal': normal_losses
     }
cnn = {"CNV": "red", "Normal":"purple"}
sns.boxplot(data = cn, width=0.3,palette = cnn)
plt.xlabel("Class")
plt.ylabel("Loss")
plt.title("CNV and Normal Loss for CAE trained on " +str(int(train_set_pct*100)) +"% of data")
plt.savefig("CNV-NORMAL_LossDetection"+ str(int(train_set_pct*100))+".jpeg")
plt.show()

dn = {'DRUSEN': drusen_losses,
     'Normal': normal_losses
     }
dn_palette = {'DRUSEN': 'green', 'Normal': 'purple'}
sns.boxplot(data = dn, width=0.3, palette = dn_palette)
plt.xlabel("Class")
plt.ylabel("Loss")
plt.title("DRUSEN and Normal Loss for CAE trained on " +str(int(train_set_pct*100)) +"% of data")
plt.savefig("DRUSEN-NORMAL_LossDetection" + str(int(train_set_pct*100))+".jpeg")
plt.show()


dmn = {'DME': dme_losses,
     'Normal': normal_losses
     }
dmn_palette = {'DME': 'blue', 'Normal': 'purple'}
sns.boxplot(data = dmn, width=0.3, palette = dmn_palette)
plt.xlabel("Class")
plt.ylabel("Loss")
plt.title("DME and Normal Loss for CAE trained on " +str(int(train_set_pct*100)) +"% of data")
plt.savefig("DME-NORMAL_LossDetection" +str(int(train_set_pct*100)) + ".jpeg")
plt.show()

sns.boxplot(data = df, palette = "Set1")
plt.xlabel("Class")
plt.ylabel("Loss")
plt.title("Loss Detection for CAE trained on " +str(int(train_set_pct*100)) +"% of data")
plt.savefig("LossDetection" +str(int(train_set_pct*100)) + ".jpeg")
plt.show()

# Detecting Anomalies

I used the threshold of three standard deviations away from the normal mean to flag an anomaly.

So, I used the means and standard deviations of each class (with respect to that of the normal losses' mean and standard deviation) and calculated the z score for each class. Then I checked if it was > 3, flagging those loss values which were more than three standard deviations from the mean as anomalies.

I also saved this data (as to how much of each class was marked as an anomaly) to a text file.

In [None]:
normal_mean = np.mean(normal_losses)
n_std = np.std(normal_losses)

anomaly_count_cnv = 0
for i in range(len(cnv_losses)):
  z = cnv_losses[i]
  z_score = (z - normal_mean) / n_std
  if z_score >= 3:
    anomaly_count_cnv = anomaly_count_cnv + 1
print("The % we can detect as anomaly for CNV images is " + str(anomaly_count_cnv/len(cnv_losses)))


anomaly_count_dme = 0
for i in range(len(dme_losses)):
  z = dme_losses[i]
  z_score = (z - normal_mean) / n_std
  if z_score >= 3:
    anomaly_count_dme = anomaly_count_dme + 1
print("The % we can detect as anomaly for DME images is " + str(anomaly_count_dme/len(dme_losses)))

anomaly_count_drusen = 0
for i in range(len(drusen_losses)):
  z = drusen_losses[i]
  z_score = (z - normal_mean) / n_std
  if z_score >= 3:
    anomaly_count_drusen = anomaly_count_drusen + 1
print("The % we can detect as anomaly for DRUSEN images is " + str(anomaly_count_drusen/len(drusen_losses)))

anomaly = ["The % we can detect as anomaly for CNV images is " + str(anomaly_count_cnv/len(cnv_losses)),
           "The % we can detect as anomaly for DME images is " + str(anomaly_count_dme/len(dme_losses)),
           "The % we can detect as anomaly for DRUSEN images is " + str(anomaly_count_drusen/len(drusen_losses))]
with open("anomaly_report_100.txt", "w") as file:
    print(anomaly, file=file)