<a href="https://colab.research.google.com/github/AdminMas7er/JBG040-DC1-Group-14/blob/main/Final_notebook_sprint_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Introduction:
In this template, methods are provided to get you started on the task at hand (please see project description). Please implement your solution in the code cells marked with **TODO**. Most of the other code cells are hidden, feel free to explore and change these. These cells implement a basic pipeline for training your model but you may want to explore more complex procedures. **Make sure you run all cells before trying to implement your own solution!**



#Imports and definitions:

In [None]:
#@title
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import pandas as pd
import copy
import numpy as np
import requests
import io
from torch.utils.data import TensorDataset
import matplotlib.pyplot as plt
import torch
from scipy.ndimage import rotate
%matplotlib inline
from torch.utils.data import Dataset
import random
import torch
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchsummary import summary
from tqdm import tqdm
from scipy.special import softmax
import math

class BatchSampler():
  """
  Implements an iterable which given a torch dataset and a batch_size
  will produce batches of data of that given size. The batches are
  returned as tuples in the form (images, labels).
  Can produce balanced batches, where each batch will have an equal 
  amount of samples from each class in the dataset. If your dataset is heavily
  imbalanced, this might mean throwing away a lot of samples from 
  over-represented classes!
  """

  def __init__(self, batch_size, dataset, balanced=False):
    self.batch_size = batch_size
    self.dataset = dataset
    self.balanced = balanced
    if self.balanced:
      # Counting the ocurrence of the class labels:
      unique, counts = np.unique(self.dataset.targets, return_counts=True) 
      indexes = []
      # Sampling an equal amount from each class:
      for i in range(len(unique)):
        indexes.append(np.random.choice(np.where(self.dataset.targets == i)[0], size=counts.min(), replace=False))
      # Setting the indexes we will sample from later:
      self.indexes = np.concatenate(indexes)
    else:
      # Setting the indexes we will sample from later (all indexes):
      self.indexes = [i for i in range(len(dataset))]


  def __len__(self):
    return (len(self.indexes) // self.batch_size) + 1
  
  def shuffle(self):
    # We do not need to shuffle if we use the balanced sampling method.
    # Shuffling is already done when making the balanced samples.
    if not self.balanced:
      random.shuffle(self.indexes)
    
  def __iter__(self):
    remaining = False
    self.shuffle()
    # Go over the datset in steps of 'self.batch_size':
    for i in range(0, len(self.indexes), self.batch_size):
        imgs, labels = [], []
        # If our current batch is larger than the remaining data, we quit:
        if i + self.batch_size > len(self.indexes):
          remaining = True
          break
        # If not, we yield a complete batch:
        else:
          # Getting a list of samples from the dataset, given the indexes we defined:
          X_batch = [self.dataset[self.indexes[k]][0] for k in range(i, i + self.batch_size)]
          Y_batch = [self.dataset[self.indexes[k]][1] for k in range(i, i + self.batch_size)]
          # Stacking all the samples and returning the target labels as a tensor:
          yield torch.stack(X_batch).float(), torch.tensor(Y_batch).long()
    # If there is still data left that was not a full batch:
    if remaining:
      # Return the last batch (smaller than batch_size):
      X_batch = [self.dataset[self.indexes[k]][0] for k in range(i, len(self.indexes))]
      Y_batch = [self.dataset[self.indexes[k]][1] for k in range(i, len(self.indexes))]
      yield torch.stack(X_batch).float(), torch.tensor(Y_batch).long()

class ImageDataset(Dataset):
  """
  Creates a DataSet from numpy arrays while keeping the data 
  in the more efficient numpy arrays for as long as possible and only
  converting to torchtensors when needed (torch tensors are the objects used
  to pass the data through the neural network and apply weights).
  """
  
  def __init__(self, x, y, transform=None, target_transform=None):
    self.targets = y
    self.imgs = x
    self.transform = transform
    self.target_transform = target_transform

  def __len__(self):
    return len(self.targets)

  def __getitem__(self, idx):
    image = torch.from_numpy(self.imgs[idx] / 255).float()
    label = self.targets[idx]
    return image, label

def load_numpy_arr_from_url(url):
    """
    Loads a numpy array from surfdrive. 
    
    Input:
    url: Download link of dataset 
    
    Outputs:
    dataset: numpy array with input features or labels
    """
    
    response = requests.get(url)
    response.raise_for_status()

    return np.load(io.BytesIO(response.content)) 


class_labels = {0: 'Atelectasis',
                1: 'Effusion',
                2: 'Infiltration',
                3: 'No Finding',
                4: 'Nodule',
                5: 'Pneumothorax'}

# Downloading the data:
The following cells will download the pre-processed X-ray images with their accompanying labels.

The download (400 MB) may take a while.

In [None]:
#@title
# Downloading the labels of each image:
train_y = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/i6MvQ8nqoiQ9Tci/download')
test_y = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/wLXiOjVAW4AWlXY/download')

In [None]:
#@title
# Downloading the images:
train_x = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/4rwSf9SYO1ydGtK/download')
test_x = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/dvY2LpvFo6dHef0/download')

# Data augmentation

## Methods

In [None]:
def translate(img, label, shift=10, direction='right', roll=True):
    """
    Translates the image in the direction of 'direction' by 'shift' pixels.
    """
    assert direction in ['right', 'left', 'down', 'up'], 'Directions should be top|up|left|right'
    img = img.copy()
    if direction == 'right':
        right_slice = img[:,:, -shift:].copy()
        img[:,:, shift:] = img[:,:, :-shift]
        if roll:
            img[:,:,:shift] = 0
    if direction == 'left':
        left_slice = img[:,:, :shift].copy()
        img[:,:, :-shift] = img[:,:, shift:]
        if roll:
            img[:,:, -shift:] = 0
    if direction == 'down':
        down_slice = img[:,-shift:, :].copy()
        img[:,shift:, :] = img[:,:-shift,:]
        if roll:
            img[:,:shift, :] = 0
    if direction == 'up':
        upper_slice = img[:,:shift, :].copy()
        img[:,:-shift, :] = img[:,shift:, :]
        if roll:
            img[:,-shift:,:] = 0
    return img, label

def rotate_image(img, label, angle = 15):
    """
    Rotates the images by 'angle' degrees
    """
    img = rotate(input = img, angle = angle, axes = (1,2), reshape = False)
    return img, label

def gaussian_noise_image(train_image, train_label, noise):
    """
    Adds guassian noise to the input image
    """
    new_image = train_image + noise
    return new_image, train_label

## Application

In [None]:
def augment_datasets(train_x, train_y, length =5000 * 6):
  """
  Returns new images and labels arrays with balanced classes.
  The images are augmented using the earlier defined functions
  such that the classes are balanced.
  """
  augmentations = []

  # create two new arrays with the correct length 
  n = 0
  new_train_x, new_train_y = np.empty((length, 1, 128, 128)), np.empty(length)

  # extract the initial amount of values per class and how many images are still needed for each class
  # create a dictionary that can take into account how many images should be added to the new arrays per class 
  unique, counts = np.unique(train_y, return_counts=True) 
  augmentations_per_img = dict(zip(unique, [math.ceil((length/6) / i) for i in counts]))
  images_to_go = dict(zip(unique, [length/6] * 6 ))

  for i in range(0, train_y.size):
    # If the new arrays are not yet filled, and the class is not yet represented as often as it should be
    if sum(images_to_go.values()) > 0 and n < length:
      if images_to_go[ train_y[i] ] != 0:

        # save the current image and its label in the new sets
        new_train_x[n], new_train_y[n] = train_x[i], train_y[i]
        n += 1
        images_to_go[ train_y[i] ] -= 1

        # check how often the image should be augmented to reach the required amount of images in the new arrays
        augmentations_to_go = augmentations_per_img[ train_y[i]]
        methods = [0, 1, 3, 0, 1, 2, 5, 6]

        while augmentations_to_go > 0 and images_to_go[ train_y[i] ] > 0 and (n < length):

          # randomly assign one of the augmentation methods 
          augmentations_to_go -= 1
          images_to_go[ train_y[i] ] -= 1
          value = len(methods)
          delete = random.randrange( len(methods) )
          method = methods.pop( delete )

          if method == 0:
            image, label = translate( train_x[i], train_y[i], direction='right', shift=6 )

          elif method == 1:
            image, label = translate( train_x[i], train_y[i], direction='down', shift=6 )
            
          elif method == 3:
            image, label = translate( train_x[i], train_y[i], direction='left', shift=6 )

          elif method == 2:
            angle = random.choice([1, 2, 3, 359, 358, 357])  
            image, label = rotate_image( train_x[i], train_y[i], angle = angle )

          else:
            factor = random.choice([250, 230, 210, 190, 170])
            # this darkens a few pixels on the image
            noise = np.random.binomial(1, 0.99, size = train_x[i].shape) * factor
            image, label = gaussian_noise_image( train_x[i], train_y[i], noise )

          # add the new image to the array 
          new_train_x[n] = image
          new_train_y[n] = label
          n += 1

  return new_train_x, new_train_y


train_x, train_y = augment_datasets(train_x, train_y)

# Building torch dataset

In [None]:
"""Creates augmented image dataset"""

train_dataset = ImageDataset(train_x, train_y)
test_dataset = ImageDataset(test_x, test_y)

unique_labels = set(class_labels.keys())

# Plotting the data distribution:

In [None]:
#@title
# Plotting the label distribution in train/test set:
fig, ax = plt.subplots(ncols=2, figsize=[20,10])

unique, counts = np.unique(train_y, return_counts=True) 
ax[0].barh([class_labels[i] + f'\n({c})' for i, c in zip(unique, counts)], counts)
ax[0].set_title('Number of images per class in train-set')

unique, counts = np.unique(test_y, return_counts=True) 
ax[1].barh([class_labels[i] + f'\n({c})' for i, c in zip(unique, counts)], counts)
ax[1].set_title('Number of images per class in test-set');

#Plotting some samples:

In [None]:
#@title
# Plotting some images
unique_labels = set(class_labels.keys())
fig, ax = plt.subplots(ncols=len(unique_labels), figsize=[25,5])

for k, label in enumerate(unique_labels):
  ind = list(train_y).index(label)
  ax[k].imshow(train_x[ind].reshape(128,128), cmap='gray')
  ax[k].set_title(f'Class:{class_labels[train_y[ind]]}')

# Defining our model as a neural network:
**TODO** define your own model here, follow the structure as presented in the Pytorch tutorial (or see below as an example).

In [None]:
from torch.nn.modules.batchnorm import BatchNorm1d
class Net(nn.Module):   
    def __init__(self, n_classes):
        super(Net, self).__init__()

        self.cnn_layers = nn.Sequential(
            # Defining a 2D convolution layer
            nn.Conv2d(1, 64, kernel_size=4, stride=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=4),

            # Defining another 2D convolution layer
            nn.Conv2d(64, 32, kernel_size=4, stride=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3),

            # Defining another 2D convolution layer
            nn.Conv2d(32, 16, kernel_size=4, stride=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
        )

        self.linear_layers = nn.Sequential(
            nn.Linear(144, 256),
            nn.Linear(256, n_classes),
            nn.BatchNorm1d(6),
            nn.Softmax(dim=1) 
        )

    # Defining the forward pass    
    def forward(self, x):
        x = self.cnn_layers(x)
        # After our convolutional layers which are 2D, we need to flatten our
        # input to be 1 dimensional, as the linear layers require this.
        x = x.view(x.size(0), -1)
        x = self.linear_layers(x)
        return x

# Make sure your model instance is assigned to a variable 'model':
model = Net(n_classes = 6)

# Defining our loss and optimizer functions:
**TODO** Please define your own optimizer and loss function.

In [None]:
#Calculating the weights of the loss function
def calculate_weights(weights_deadly, weights_fact):
  """
  Computes the weights based on 
  """
  weights=[i*j for i,j in zip(weights_deadly,weights_fact)]
  sum_weights=sum(weights)
  weights=[i/sum_weights for i in weights]
  return weights

class_weights = torch.FloatTensor(calculate_weights([1.5,2,1.5,0.75,1.5,1.25],[4,4,5,5,5,5])).cuda()

In [None]:
#optimizer function
optimizer = optim.SGD(model.parameters(), lr=0.0009)
loss_function = nn.CrossEntropyLoss(class_weights, [4,4,5,5,5,5])

#Moving model to CUDA, verifying model structure and printing a summary:

In [None]:
# IMPORTANT! Set this to True to see actual errors regarding 
# the structure of your model (CUDA hides them)!
# Also make sure you set this to False again for actual model training
# as training your model with GPU-acceleration (CUDA) is much faster.
DEBUG = False

In [None]:
#@title
# Moving our model to the right device (CUDA will speed training up significantly!)
if torch.cuda.is_available() and not DEBUG:
  device = 'cuda'
  model.to(device)
  # Creating a summary of our model and its layers:
  summary(model, (1, 128, 128), device=device)
else:
  device='cpu'
  # Creating a summary of our model and its layers:
  summary(model, (1, 128, 128), device=device)

#Defining our training/testing and accuracy methods:

In [None]:
#@title
def train_model(model, train_sampler, optimizer, loss_function):
  # Lets keep track of all the losses:
  losses = []
  # Put the model in train mode:
  model.train()
  # Feed all the batches one by one:
  for batch in train_sampler:
    # Get a batch:
    x, y = batch
    # Making sure our samples are stored on the same device as our model: 
    x, y = x.to(device), y.to(device)
    # Get predictions:
    predictions = model.forward(x)
    loss = loss_function(predictions,y)
    losses.append(loss)
    # We first need to make sure we reset our optimizer at the start.
    # We want to learn from each batch seperately, 
    # not from the entire dataset at once.
    optimizer.zero_grad()
    # We now backpropagate our loss through our model:
    loss.backward()
    # We then make the optimizer take a step in the right direction.
    optimizer.step()
  return losses

def test_model(model, test_sampler, loss_function):
  # Setting the model to evaluation mode:
  model.eval()
  losses = []
  # We need to make sure we do not update our model based on the test data:
  with torch.no_grad():
    for (x, y) in test_sampler:
      # Making sure our samples are stored on the same device as our model:
      x = x.to(device)
      y = y.to(device)
      prediction = model.forward(x)
      loss = loss_function(prediction,y)
      losses.append(loss)
  return losses

def calculate_accuracy(model):
  model.eval()
  with torch.no_grad():
    correct = 0
    count = 0
    for (x, y) in test_sampler:
      # Making sure our samples are stored on the same device as our model:
      x = x.to(device)
      y = y.to(device)
      prediction = model.forward(x).argmax(axis=1)
      correct += sum(prediction == y)
      count += len(y)
  accuracy = (correct/count).detach().cpu().numpy()
  return float(accuracy)

#Training our model:

In [None]:
n_epochs = 50
batch_size = 32

In [None]:


#@title
# Lets now train and test our model for multiple epochs:
train_sampler = BatchSampler(batch_size=batch_size, dataset=train_dataset, balanced=False)
test_sampler = BatchSampler(batch_size=100, dataset=test_dataset, balanced=False)

mean_losses_train = []
mean_losses_test = []
accuracies = []
for e in range(n_epochs):
  # Training
  losses = train_model(model, train_sampler, optimizer, loss_function)
  # Calculating and printing statistics:
  mean_loss = sum(losses) / len(losses)
  mean_losses_train.append(mean_loss)
  print(f'\nEpoch {e + 1} training done, loss on train set: {torch.mean(mean_loss)}\n')

  # Testing:
  losses = test_model(model, test_sampler, loss_function)
  # Calculating and printing statistics:
  mean_loss = sum(losses) / len(losses)
  mean_losses_test.append(mean_loss)
  accuracy = calculate_accuracy(model)
  print(accuracy)
  print(f'\nEpoch {e + 1} testing done, loss on test set: {mean_loss}\n')

  # Saving the model with the highest accuracy
  if (e == 0):
    # lowest_loss = mean_loss
    highest_acc = accuracy
    best_model = copy.deepcopy(model)
    # weights = list(model.parameters())
  if (highest_acc < accuracy):
    highest_acc = accuracy
    best_model = copy.deepcopy(model)
  # Plotting the historic loss:
  fig, ax = plt.subplots()
  ax.plot(mean_losses_train, label='Train loss')
  ax.plot(mean_losses_test, label='Test loss')
  ax.legend()
  plt.show()
  
model = best_model

print(calculate_accuracy(best_model))

#Evaluation our model:
**TODO** write your own methods to evaluate the model. For example, calculate the accuracy of the model on the test-set:

## Accuracy

In [None]:
calculate_accuracy(model)

This accuracy isn't great. Your task is to find a better model that performs better at the classification task. Other methods of evaluation might tell you more why a particular model is not performing well (accuracy is a quite limited aggregated performance metric). 

## Saliency map

In [None]:
def get_saliency_scores(model, train_x, train_y, i=0):
  """"
  Returns the saliency of an image and also the corresponding
  original image and the label
  """
  
  device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  model = model.to(device)
  model.eval()

  image = train_x[i]                # get the image to map, this must be a np.ndarray
  img = image                       # safe to use as the reference picture of the scan
  image.resize([1, 1, 128, 128])
  image = torch.from_numpy(image).float()

  image = image.to(device)
  image.requires_grad_()            # catch the gradient during the backpropagation

  output = model.forward(image)     # run the picture through the model
  output_idx = output.argmax()
  output_max = output[0, output_idx]
  output_max.backward()

  prediction = class_labels[ (torch.argmax(output, 1)[0]).item() ]
  saliency, _ = torch.max(image.grad.data.abs(), dim=1)
  saliency = saliency.reshape(128, 128)
  return saliency, img, prediction

def draw_saliency_map(model, train_x, train_y, i=0):
  saliency_map = 0
  index = i
  #Find a picture with a saliency map that is not black
  while ( saliency_map == 0 ):  
    index += 1
    saliency, img, prediction = get_saliency_scores(model, train_x, train_y, index)
    saliency_map = torch.sum(saliency)
    img = img.reshape(-1, 128, 128)
    
  #Draw the image and its saliency map

  fig, ax = plt.subplots(1, 2)
  ax[0].imshow(img.reshape(128, 128), cmap='gray')
  ax[0].axis('off')
  ax[1].imshow(saliency.cpu(), cmap='hot')
  ax[1].axis('off')
  plt.tight_layout()
  fig.suptitle(f'Image and Its Saliency Map. Class: $\it{class_labels[train_y[i]]}$, Prediction: $\it{prediction}$')
  plt.show()

In [None]:
draw_saliency_map(model, train_x, train_y, i=0)

## Certainty score

In [None]:
def image_to_tensor(image_index, dataset):
    """
    Returns the image as a tensor
    """
    image = [dataset[image_index][0]]
    # label optional, just to keep track
    label = [dataset[image_index][1]]
    # for final use, delete the 2nd object - the label (not present for "real" test image)
    return (torch.stack(image).float(), torch.tensor(label).long())

def prediction_scores(model, sample):
  """
  Predicts the label of a given input image
  """
  # Setting the model to evaluation mode:
  model.eval()
  with torch.no_grad():
    x, y = sample
    x = x.to(device)
    prediction = model.forward(x)
    #The ouput is normalized so it can be interpreted as probabilities
    return prediction


## Confusion matrix


In [None]:
@torch.no_grad()
def get_Y_pred(model, test_set):
  """Returns the predictions made the array 'test_set'"""	
  # we want this functions execution to omit gradient tracking
  model.eval()
  y_preds = torch.tensor([]).cuda()

  with torch.no_grad():
    loader = DataLoader(test_set, batch_size = 500)

  for batch in loader:
      images, labels = batch
      images, labels = images.cuda(), labels.cuda()
      preds = model(images)
      y_preds = torch.cat( (y_preds, preds) , dim=0)

  return y_preds.argmax(dim=1).cpu()
    
def get_matrix():
  """
  Return the confusion matrix
  """
  y_preds = get_Y_pred(model, test_dataset)
  matrix = confusion_matrix(test_dataset.targets, y_preds)
  return matrix

In [None]:
get_matrix()

## Recall, Precision, F1 score


In [None]:
def one_hot_encode(labels):
  """
  Creates a list of lists with the labels one hot encoded
  for every label
  """
  encoded_labels = []
  for k in range(len(labels)):
    encoded_labels.append([1 if labels[k] == i else 0 for i in range(6)])
  return torch.tensor(encoded_labels).to(device)

In [None]:

def calculate_stats():
  """
  Returns a dataframe contain the recall, precision,
  F1 and ROC AUC scores
  """
  y_preds = get_Y_pred(model, test_dataset)
  y_true = test_dataset.targets

  Precision = precision_score(y_true, y_preds, average=None)
  Recall = recall_score(y_true, y_preds, average=None)
  F1 = f1_score(y_true, y_preds, average=None)

  y_true_encoded = one_hot_encode(y_true)
  y_preds_encoded = one_hot_encode(y_preds)
  roc_auc = roc_auc_score(y_true_encoded.cpu(), y_preds_encoded.cpu(), multi_class='ovr', average=None) # one vs rest

  scores = pd.DataFrame( { "Recall" : Recall, "Precision" : Precision , "F1" : F1, 'roc auc' : roc_auc}, 
                        index=["Atelectasis", 'Effusion', 'Infiltration', 'No Finding', 'Nodule', 'Pneumothorax'])

  # https://vitalflux.com/micro-average-macro-average-scoring-metrics-multi-class-classification-python/
  scores.loc['mean (macro)', 'Recall'] =    recall_score(y_true, y_preds, average='macro')
  scores.loc['mean (macro)', 'Precision'] = precision_score(y_true, y_preds, average='macro')
  scores.loc['mean (macro)', 'F1'] =        f1_score(y_true, y_preds, average='macro')
  scores.loc['mean (macro)', 'roc auc'] =   roc_auc_score(y_true_encoded.cpu(), y_preds_encoded.cpu(), multi_class='ovr', average='macro')

  return scores


In [None]:
calculate_stats()

In [None]:
"""
Plots the receiver operating characteristic
"""

fpr = dict()
tpr = dict()
roc_auc = dict()

y_true = test_dataset.targets
y_preds = get_Y_pred(model, test_dataset)
y_true_encoded = one_hot_encode(y_true)
y_preds_encoded = one_hot_encode(y_preds) 

for i in range(6):
    fpr[i], tpr[i], _ = roc_curve(y_true_encoded[:, i].cpu(), y_preds_encoded[:, i].cpu())
    roc_auc[i] = np.round( auc(fpr[i], tpr[i]), 3)

plt.figure()
plt.plot(fpr[0], tpr[0], color='blue', lw=2, label= f'Atelect. (area = {roc_auc[0]})')
plt.plot(fpr[1], tpr[1], color='green', lw=2, label= f'Effusion. (area = {roc_auc[1]})')
plt.plot(fpr[2], tpr[2], color="darkorange", lw=2, label= f"Inftilt. (area = {roc_auc[2]})")
plt.plot(fpr[3], tpr[3], color='brown', lw=2, label= f'Nothing. (area = {roc_auc[3]})')
plt.plot(fpr[4], tpr[4], color='pink', lw=2, label= f'Nodule. (area = {roc_auc[4]})')
plt.plot(fpr[5], tpr[5], color='red', lw=2, label= f'Pneum. (area = {roc_auc[5]})')

plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver Operating Characteristic (ROX) curve")
plt.legend(loc="lower right")
plt.show();

## Running all evaluations

In [None]:
def evaluate():
  """"
  Performes all the evaluation methods on the model
  """
  model.eval()

  accuracy = calculate_accuracy()
  y_preds = get_Y_pred(model, test_dataset)
  matrix = get_matrix()
  scores = calculate_stats()

  return accuracy, y_preds, matrix, scores

## Saving the evaluations

In [None]:
def add_to_df(models_df, batch_size):
  """
  Adds all the performance metrics to the input dataframe
  """

  # calculate the performance metrics
  accuracy, y_preds, matrix, scores = evaluate()

  models_df.loc[run, 'n epochs'] = e
  models_df.loc[run, 'batch size'] = batch_size

  models_df.loc[run, 'accuracy'] = accuracy

  # Add the class with the least amount of predictions and the correspondig number of predictions
  avg = np.sum( matrix, axis=0 )
  models_df.loc[run, 'min predictions nr'] = avg.min()
  models_df.loc[run, 'min predictions group'] = str( [class_labels[i] for i in np.where( avg == avg.min() )[0]] )
  models_df.loc[run, 'matrix'] = str( matrix )

  # add the performance metrics to the dataframe
  for index, row in scores.iterrows():
    if index != 'mean (macro)':
      models_df.loc[run, [f'{index} Recall', f'{index} Precision', f'{index} F1', f'{index} roc'] ] = row['Recall'], row['Precision'], row['F1'], row['roc auc']
    else:
      models_df.loc[run, [f'{index} Recall etc', f'{index} roc'] ] = row['Recall'], row['roc auc']

#Hyperparameter tuning

This code is used to tune the hyperparameters. This takes a couple of hours to run therefore it is commented out. The correct parameters are already implemented in the code above. To run the code, just remove the # signs.

In [None]:
"""
This code is used to get all the combinations possible of batches,
learning rates and decay factors.
"""

# batches =       [8, 16, 32, 64, 100, 128, 200, 256]
# learningRates = [0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0009, 
#                  0.001, 0.003, 0.006, 0.009, 0.01, 0.03, 0.06, 0.09]
# decays =        [True, False]

# combinations = []

# # get all combinations of the three hyperparameters
# for lr in learningRates:
#   for batch in batches:
#     for decay in decays:
#       combination = [batch, lr, decay]
#       combinations.append(combination)

# models_df = pd.DataFrame()
# run = 0

In [None]:
"""
Runs all the possible combinations of parameters and saves the results to a dataframe.
"""

# device = 'cuda' if ( torch.cuda.is_available and not DEBUG) else 'cpu'
# n_epochs = 13
# loss_function = nn.CrossEntropyLoss()


# ############################################################################################
# for parameters in combinations:

#   model = Net(n_classes=6)  # create a model
#   model.to(device)

#   batch_size = parameters[0]

#   train_sampler = BatchSampler(batch_size = batch_size, dataset=train_dataset, balanced=False)
#   test_sampler = BatchSampler(batch_size = 100, dataset=test_dataset, balanced=False)

#   optimizer = optim.Adam(model.parameters(), lr = parameters[1])
# ############################################################################################  
#   mean_losses_train = []
#   mean_losses_test = []
#   accuracies = []

#   print(f'\n batch size {parameters[0]}, learning rate {parameters[1]}, and decay is {parameters[2]}')

#   for e in range(n_epochs):
#     # if weight decay is used, change the value in the optimizer each epoch
#     if parameters[2] and e > 0:
#       decay = parameters[1] / math.sqrt(e)
#       optimizer = optim.Adam(model.parameters(), lr = parameters[1], weight_decay = decay)

#     # Training:
#     losses = train_model(model, train_sampler, optimizer, loss_function)
#     mean_loss = sum(losses) / len(losses)
#     mean_losses_train.append(mean_loss)
  
#     # Testing:
#     losses = test_model(model, test_sampler, loss_function)
#     mean_loss = sum(losses) / len(losses)
#     mean_losses_test.append(mean_loss)
#     # Plotting the historic loss:
    
#   fig, ax = plt.subplots()
#   ax.plot(mean_losses_train, label='Train loss')
#   ax.plot(mean_losses_test, label='Test loss')
#   ax.legend()
#   plt.show()

#   add_to_df(models_df, batch_size)
#   models_df.loc[run, 'learning rate'] = parameters[1]
#   models_df.loc[run, 'weight decay'] = parameters[2]
#   run += 1

#   # save the data in a hyperparameter tuning csv file
#   models_df.to_csv('hyperparameter tuning.csv')

## Visualizations of the hyperparameter tuning

###Learning rate

In [None]:
"""
Makes barplots containing the accuracy of the model with different 
learning rates
"""
# data = pd.read_csv('hyperparameter tuning.csv')

# fig, ax = plt.subplots(nrows = 2, ncols = 2, squeeze=True, sharex=True)
# sns.set(rc={"figure.figsize":(25, 10)} , style="whitegrid")
# data['batch size'] = [int(i) for i in data['batch size']]

# sns.barplot(y = data['accuracy'], x=data['learning rate'] , ax=ax[0, 0] )
# sns.barplot(x = data['learning rate'], y=data['Atelectasis F1'] , ax=ax[0, 1])
# sns.barplot(x = data['learning rate'], y=data['Effusion F1'] , ax=ax[1, 0])
# sns.barplot(x = data['learning rate'], y=data['Infiltration F1'] , ax=ax[1, 1]);
# ax[1,0].set_xticklabels(labels = ax[1,0].get_xticklabels(), rotation=45);
# ax[1,1].set_xticklabels(labels = ax[1,1].get_xticklabels(), rotation=45);

### Batch size

In [None]:
"""
Creates barplots containing the accuracy of different batch sizes
"""

# data = pd.read_csv('hyperparameter tuning.csv')

# fig, ax = plt.subplots(nrows = 2, ncols = 2, squeeze=True, sharex=True)

# sns.set(rc={"figure.figsize":(25, 10)} , style="whitegrid")
# data['batch size'] = [int(i) for i in data['batch size']]

# sns.barplot(y = data['accuracy'], x=data['batch size'] , ax=ax[0, 0] )
# sns.barplot(x = data['batch size'], y=data['Atelectasis F1'] , ax=ax[0, 1])
# sns.barplot(x = data['batch size'], y=data['Effusion F1'] , ax=ax[1, 0])
# sns.barplot(x = data['batch size'], y=data['Infiltration F1'] , ax=ax[1, 1]);
# ax[1,0].set_xticklabels(labels = ax[1,0].get_xticklabels(), rotation=45);
# ax[1,1].set_xticklabels(labels = ax[1,1].get_xticklabels(), rotation=45);

### Weight decay

In [None]:
"""
Creates barplots containing the accuracy of different decay factors
"""

# data = pd.read_csv('hyperparameter tuning.csv')

# fig, ax = plt.subplots(nrows = 2, ncols = 2, squeeze=True, sharex=True)

# sns.set(rc={"figure.figsize":(25, 10)} , style="whitegrid")

# sns.barplot(y = data['accuracy'], x=data['weight decay'] , ax=ax[0, 0] )
# sns.barplot(x = data['weight decay'], y=data['Atelectasis F1'] , ax=ax[0, 1])
# sns.barplot(x = data['weight decay'], y=data['Effusion F1'] , ax=ax[1, 0])
# sns.barplot(x = data['weight decay'], y=data['Infiltration F1'] , ax=ax[1, 1]);
# ax[1,0].set_xticklabels(labels = ax[1,0].get_xticklabels(), rotation=45);
# ax[1,1].set_xticklabels(labels = ax[1,1].get_xticklabels(), rotation=45);

# Testing the final model 5 times

The model using the hyperparameters found is trained and evaluated 5 times. From this we obtained the average accuracy of 28.73%. To run this section, just comment out everything. It is important to note that this code only works when you run the hyperparameter tune before this code.

In [None]:
"""
This cell trains the model 5 times using the earlier found hyperparameters
"""
# n_epochs = 50 #change value to 1 for debugging reasons
# batch_size = 32

# DEBUG = False
# device = 'cuda' if ( torch.cuda.is_available and not DEBUG) else 'cpu'
# model.to(device)

# unique_labels = set(class_labels.keys())


# performanceScores = np.array()
# accuracies = np.array()
# matrices = np.array()


# for (i in range(0, 5)):
#   # download and augment the data
#   train_y = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/i6MvQ8nqoiQ9Tci/download')
#   test_y = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/wLXiOjVAW4AWlXY/download')

#   train_x = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/4rwSf9SYO1ydGtK/download')
#   test_x = load_numpy_arr_from_url('https://surfdrive.surf.nl/files/index.php/s/dvY2LpvFo6dHef0/download')

#   train_x, train_y = augment_datasets(train_x, train_y)

#   train_dataset = ImageDataset(train_x, train_y)
#   test_dataset = ImageDataset(test_x, test_y)


#   # initiate a new model, the loss function and the optimizer
#   model = Net(n_classes=6)  
  
#   train_sampler = BatchSampler(batch_size = batch_size, dataset=train_dataset, balanced=False)
#   test_sampler = BatchSampler(batch_size = 100, dataset=test_dataset, balanced=False)

#   optimizer = optim.Adam(model.parameters(), lr=0.0009)
#   loss_function = nn.CrossEntropyLoss( torch.Tensor( [0.1538, 0.2051, 0.1923, 0.0962, 0.1923, 0.1603] ).to( device ) )


#   # go through the training and testing process for each combination
#   mean_losses_train = []
#   mean_losses_test = []
#   accuracies = []

#   for e in range(n_epochs):

#     if parameters[2] and e > 0:
#       decay = parameters[1] / math.sqrt(e) # https://machinelearningmastery.com/adam-optimization-algorithm-for-deep-learning/
#       optimizer = optim.Adam(model.parameters(), lr = parameters[1], weight_decay = decay)

#     # Training:
#     losses = train_model(model, train_sampler, optimizer, loss_function)
#     mean_loss = sum(losses) / len(losses)
#     mean_losses_train.append(mean_loss)
  
#     # Testing:
#     losses = test_model(model, test_sampler, loss_function)
#     mean_loss = sum(losses) / len(losses)
#     mean_losses_test.append(mean_loss)
#     # Plotting the historic loss:
    

#   # extract the scores and append to the numpy arrays
#   scores = calculate_stats()

#   np.append( performanceScores, scores.to_numpy() )
#   np.append( accuracies, calculate_accuracy() )
#   np.append( matrices, get_matrix() ) 



In [None]:
"""
Creates a heatmap containing the average confusion matrix of the 5 training
attempts.
"""

# meanMatrix = np.mean( matrices, axis=0 )

# #draw the resulting matrix

# labels = ['Atelectasis', 'Effusion',  'Infiltration', 'No Finding', 'Nodule', 'Pneumothorax']
# sns.heatmap(meanMatrix,annot=True,cmap='Blues', fmt='g', xticklabels= labels , yticklabels= labels, xlabel)

In [None]:
"""
Creates a dataframe with the mean values for the five runs.
"""

# meanScores = np.mean( performanceScores, axis=0 )
# pd.DataFrame(np.round( meanScores, 4), 
#              index=["Atelectasis", 'Effusion', 'Infiltration', 'No Finding', 'Nodule', 'Pneumothorax', 'mean (macro)'], 
#              columns= [	'Recall',	'Precision',	'F1', 	'roc auc'])

In [None]:
"""
Calculates the mean accuracy
"""

# meanAcc = np.mean( accuracies, axis=0 )
# print(meanAcc)

# Uncertainty scores

This section is used to get the certainty score for a given input image. It returns an output string which represents the certainty of the perdicted label.

In [None]:
# to suppress scientific notation (exponential)
torch.set_printoptions(sci_mode=False)

In [None]:
def output_certainty(model, sample):
  """
  Gives a certainty of a prediction represented by a string.
  """
  scores = prediction_scores(model, sample)
  for i in scores.cpu().numpy()[0]:
      if i > 0.95:
          return "High certainty"
  for i in scores.cpu().numpy()[0]:
      if i > 0.8:
          return "Medium certainty"
  return "Low certainty"

def predicted_label(model, sample):
  """
  Returns the predicted label of a given input
  """
  scores = prediction_scores(model, sample)[0]
  max_score = 0
  max_i = 0
  for i in range(0, len(scores)):
      if scores[i]>max_score:
          max_score = scores[i]
          max_i = i
  return max_i

def evaluate_certainty(model, ind_min, ind_max, dataset):
    """
    Returns a table containing all the number of occurances of the three
    different certainty levels
    """
    matrix = pd.DataFrame()
    matrix['certainty'] = ['High certainty', 'Medium certainty', 'Low certainty']
    matrix['correct prediction'] = [0,0,0]
    matrix['wrong prediction'] = [0,0,0]
    # matrix = matrix.set_index('certainty')
    
    for i in range(ind_min, ind_max+1):
        sample = image_to_tensor(i, dataset)
        true_label = sample[1]
        pred_label = predicted_label(model, sample)
        output_cert = output_certainty(model, sample)
        cert_dict = {'High certainty':0, 'Medium certainty':1, 'Low certainty':2}
        output_cert_ind = cert_dict[output_cert]
        if true_label == pred_label:
            matrix['correct prediction'][output_cert_ind] +=1
        else:
            matrix['wrong prediction'][output_cert_ind] +=1
            
    return matrix

In [None]:
uncertainty_table = evaluate_certainty(model, 0, len(test_dataset)-1, test_dataset)
uncertainty_table['sum'] = uncertainty_table.sum(axis=1)
uncertainty_table['correct prediction %'] = uncertainty_table['correct prediction']/uncertainty_table['sum']*100
uncertainty_table

#Saving our model:

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
torch.save(model.state_dict(), '/content/gdrive/My Drive/weights_model.txt')