Autoencoder Anomaly Testing
===

This is rebuilt from the "Collecting Network Statistics" notebook. The goal of this notebook is to collect together a set of in-distribution and out-of-distribution images and confirm that the model can distinguish them with a high degree of accuracy.


## Setup

We begin by importing our dependencies.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import math

from model import SplitAutoencoder,ExtensibleEncoder,ExtensibleDecoder
from CustomDataSet import CustomDataSet,CustomDataSetWithError
import os

import GPUtil

Set our seed and other configurations for reproducibility.

In [None]:
#seed = 42
seed = 2662
torch.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

if torch.cuda.is_available():
    platform = "cuda"
else:
    platform = "cpu"
#platform = "cpu"
print(platform)

We set the batch size, the number of training epochs, and the learning rate. Batch size has to be reasonably low as we can't fit a huge number of these images into VRAM on my laptop.

Image size can be set here as I'm automatically resizing the images in my extraction code.

In [None]:
width = 256
height = 256

image_size = width * height

batch_size = 4

#meta-parameters
l2_decay = 0.0
dropout_rate = 0.0
code_sides = [10]
convolution_filters = 32

model_path = "../../Data/OPTIMAM_NEW/model" + str(width) + "_" + str(code_sides[0]) + "_" + str(convolution_filters) + "_" + str(dropout_rate) + "_" + str(l2_decay) + ".pt"

#image_count = 500
image_count = -1

## Gather Base Distribution Information

First we run the model on the entire original distribution and gather statistics on the loss values, encodings etc.

In [None]:
from torchvision.transforms import ToTensor,Grayscale
transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(0.0,65535.0)
    ])

#root_dir = "../../Data/OPTIMAM_NEW/png_images/casewise/ScreeningMammography/"+str(width)
root_dir = "../../Data/OPTIMAM_NEW/png_images/lesions/"
train_dataset = CustomDataSet(root_dir, transform)
if (image_count == -1):
    train_dataset_subset = train_dataset
    image_count = len(train_dataset)
else:
    train_dataset_subset = torch.utils.data.Subset(train_dataset, numpy.random.choice(len(train_dataset), image_count, replace=False))
    
train_loader = torch.utils.data.DataLoader(
    train_dataset_subset, shuffle=False
)


In [None]:
#  use gpu if available
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device(platform)

code_size = code_sides[0] * code_sides[0]

# mean-squared error loss
criterion = nn.MSELoss()

losses = [None] * len(train_dataset_subset)
encodings = [None] * len(train_dataset_subset)

In [None]:
# reload the saved model
model = torch.load(model_path,map_location=device)
model.eval()

We run our autoencoder on the entire dataset and store the encodings

In [None]:
with torch.no_grad():
    count = 0
    for batch_features in train_loader:
        # load it to the active device
        batch_features = batch_features.to(device)

        # compute reconstructions
        code = model.encoder(batch_features)
        outputs = model.decoder(code)
        
        code_reshaped = code.detach().cpu().numpy()[0]
        code_reshaped.reshape(code_size)

        encodings[count] = code_reshaped

        # compute training reconstruction loss
        error_criterion = criterion(outputs,batch_features)

        losses[count] = error_criterion.cpu().numpy()

        count = count + 1

And calculate the encoding statistics:

In [None]:
print(len(encodings))
print(len(encodings[0]))
print(len(losses))

In [None]:
feature_means = np.mean(encodings,axis=0)
feature_stds = np.std(encodings,axis=0)
print(len(feature_means))
print(len(feature_stds))

mse_min = np.amin(losses)
mse_max = np.amax(losses)
mse_mean = np.mean(losses)
print("MSE Min/Mean/Max:" + str(mse_min) + "/" + str(mse_mean) + "/" + str(mse_max))

Now we save the compiled statistics to an excel file.

In [None]:
with torch.no_grad():
    np_losses = np.asarray(losses)

    np_compiled = np.concatenate((np_losses[:, np.newaxis], encodings), axis=1)

    suffix =  "_" + str(width) + "_" + str(code_sides[0]) + "_" + str(convolution_filters) + "_" + str(dropout_rate) + "_" + str(l2_decay)
    
    np.savetxt('encodings' + suffix + '.csv', encodings, delimiter=',',fmt='%10.5f',newline='\n')
    np.savetxt('losses' + suffix + '.csv', np_losses, delimiter=',',fmt='%10.5f',newline='\n')
    np.savetxt('combined' + suffix + '.csv', np_compiled, delimiter=',',fmt='%10.5f',newline='\n')

## Adversarials

We have 2 Datasets (mammographic and non-mammographic) and 3 DataLoaders - Clean Mammo, Distorted Mammo, and Non-Mammo. The goal here is to build an analogously large set of OOD images and test to what degree the autoencoder is capable of detecting the distortions.

The first method for doing this builds a large set of all the datasets classified into In-Distribution and Out-Of-Distribution and determine the accuracy rating of the model as a classifier. The second generates a set of distorted mammographic images at specified distances from the distribution, along with a value roughly analogous to that distortion level. This second method is intended to determine the range in distribution space at which the model becomes able to distinguish, as well as the degree of "grey area" between in and out of distribution (as detected).

In [None]:
with torch.no_grad():
    trigger_chance = 0.8

    PIL_transforms = torchvision.transforms.RandomApply([
        torchvision.transforms.RandomAffine(degrees=15,translate=(0.4,0.4),shear=60),
        torchvision.transforms.RandomVerticalFlip(),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.GaussianBlur(kernel_size=5),
        torchvision.transforms.ColorJitter(brightness=0.8,contrast=0.8)
        ],p=trigger_chance)

    adversarial_transform = torchvision.transforms.Compose([
        PIL_transforms,
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(0.0,65535.0),
         torchvision.transforms.RandomErasing(p=trigger_chance),
        ])

    adversarial_image_count = image_count
    adversarial_dataset = CustomDataSetWithError(root_dir, adversarial_transform)
    
    #adversarial_subset = torch.utils.data.Subset(adversarial_dataset, np.random.choice(len(adversarial_dataset), adversarial_image_count, replace=False))

    adversarial_loader = torch.utils.data.DataLoader(
        adversarial_dataset, shuffle=True
    )
    

Build the first (mixed) set:

In [None]:
with torch.no_grad():
    adversarial_iterator = iter(adversarial_loader)
    genuine_iterator = iter(train_loader)
    mixed_set_scale = 10
    
    mixed_set = []
    mixed_set_np = []
    mixed_class_set = []
    mixed_error_set = []
    for i in range(mixed_set_scale):
        r = torch.rand(1)
        if(r.item() > 0.5):
            adversarial_t = next(adversarial_iterator)
            adversarial = adversarial_t[0].cpu()
            adversarial_np = adversarial.numpy().reshape(width,height)
            adv_error = adversarial_t[1]
            mixed_class_set.append(0)
            mixed_set_np.append(adversarial_np)
            mixed_set.append(adversarial)
            mixed_error_set.append(adv_error)
        else:
            genuine = next(genuine_iterator).cpu()
            genuine_np = genuine.numpy().reshape(width,height)
            mixed_class_set.append(1)
            mixed_set.append(genuine)
            mixed_set_np.append(genuine_np)
            mixed_error_set.append(0.0) # genuine, so no drift
        
    mixed_code_set = []
    mixed_reconstruction_set = []

Now run the model on the mixed set:

In [None]:
with torch.no_grad():
    for mixed_item in mixed_set:
        mixed_example = mixed_item.to(device)
        
        n_code = model.encoder(mixed_example)
        reconstruction = model(mixed_example)
        
        mixed_code_set.append(n_code.cpu())
        mixed_reconstruction_set.append(reconstruction.cpu())

Next, measure the loss and feature statistics for the adversarials:            

In [None]:
mean_squared_errors = []
for image_n in range(mixed_set_scale):
    print(np.shape(mixed_reconstruction_set[image_n]))
    recon = mixed_reconstruction_set[image_n][0][0]
    original = mixed_set[image_n][0][0]
    t_se = 0.0
    q = []
    n = 0
    for y in range(height):
        p = []
        for x in range(width):
            base_pixel = original[y][x].item()
            recon_pixel = recon[y][x].item()
            #specifically ignore all-black pixels
            p.append(se)
            if (base_pixel != 0.0):
                se = (recon_pixel - base_pixel) ** 2
                t_se = t_se + se
                n = n + 1
        q.append(p)
    #print(q)
    
    mse = t_se / n
    
    print("n:" + str(n))
    print("TSE:" + str(t_se))
    print("MSE:" + str(mse))
    print("\n")  

In [None]:
g = np.ma.masked_equal(feature_stds,0)
print(feature_means)
for i in range(mixed_set_scale):
    adv_divergence = (mixed_code_set[i] - feature_means) / g
    print(np.max(adv_divergence,axis=1))

And plot the results:

In [None]:
with torch.no_grad():
    number = mixed_set_scale
    #number = 5
    plt.figure(figsize=(25, 9))
    for index in range(number):
        # display original
        ax = plt.subplot(3, number, index + 1)
        test_examples = mixed_set
        copyback = test_examples[index].cpu()
        plt.imshow(copyback.reshape(height, width))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # display codes
        ax = plt.subplot(3, number, index + number + 1)
        codes = mixed_code_set
        code_copyback = codes[index].cpu()
        plt.imshow(code_copyback.reshape(code_sides[0],code_sides[0]))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # display reconstruction
        ax = plt.subplot(3, number, index + (number*2) + 1)
        reconstruction = mixed_reconstruction_set
        recon_copyback = reconstruction[index].cpu()
        plt.imshow(recon_copyback.reshape(height, width))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    out_path = "adv_output"+ str(width) + "_"  + str(code_sides[0]) + "_" + str(convolution_filters) + "_" + str(dropout_rate) + "_" + str(l2_decay) +".png"
    plt.savefig(out_path)
    plt.show()