# Image classification Pytorch

In this notebbok we will try to put all together whay we have learned in the previous lecture.


# Pytorch implementation

In [None]:
# libraries to interface with the os
import os
from os.path import join
from glob import glob

from collections import defaultdict # dictionary with default values

# standard scientific libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme(style="whitegrid", font_scale=1.5)

# progress bar
from tqdm.autonotebook import tqdm

# sklearn
from sklearn.model_selection import train_test_split

# pytorch utilities
import torch
from torch import cuda
from torch import nn # layers, networks, losses, ...
from torch.optim import Adam # optimizer
from torch.optim.lr_scheduler import ReduceLROnPlateau # scheduler
from torch.utils.data import Dataset, DataLoader # base class for Datasets and Dataloaders
# pytorch utility module for images
import torchvision
import torchvision.transforms as T

# Python Image Library (instead of opencv)
from PIL import Image

In [None]:
# create a dataframe with columns:
#    X     -> filepaths of the images
#    y     -> label encoded class (0/1)
#    class -> data class as string

data_folder = "Brain_Tumor_Detection"

# no tumor
df_no_imgs = pd.DataFrame({
    "X": sorted(glob(join(data_folder, "no", "*"))),
    "y": 0,
    "class": "NO"
})

# yes tumor
df_yes_imgs = pd.DataFrame({
    "X": sorted(glob(join(data_folder, "yes", "*"))),
    "y": 1,
    "class": "YES"
})

# just keep N samples per class
N = 500
if N > 0:
    df_no_imgs  = df_no_imgs.iloc[:N, :]
    df_yes_imgs = df_yes_imgs.iloc[:N, :]

print("# of images WITHOUT tumor =", len(df_no_imgs))
print("# of images WITH    tumor =", len(df_yes_imgs))

df_no_imgs.head()

In [None]:
# train/val/test split

df = pd.concat([df_no_imgs, df_yes_imgs], ignore_index=True)

val_size  = .15
test_size = .1

df_train, df_test = train_test_split(df,      test_size=val_size+test_size,             stratify=df["y"],      random_state=100)
df_val,   df_test = train_test_split(df_test, test_size=test_size/(test_size+val_size), stratify=df_test["y"], random_state=100)
print("# Train      =", len(df_train))
print("# Validation =", len(df_val))
print("# Test       =", len(df_test))

## Pytorch Dataset and Dataloader

A Dataset maps keys to data (as a list or dictionary just to understand). 
It is a subclass of `torch.utils.data.Dataset` and must implement the methods:
- `__init__`: run once when the Dataset is instanciated
-  `__len__`: return the number of sample sin the dataset
- `__getitem__`: load and return a sample from the daatset given its index

You can retrieve fatures and labels just one at a time from a Dataset. In order to use minibatches during training, there is the `DataLoader` utility which handles everything. It can be constructed just by passing a `Dataset`.

In [None]:
class CustomDataset(Dataset):
    """
    Pytorch dataset which replicates Keras `.flow_from_dataframe()` utility.
    """
    def __init__(self, df, transform):
        """
        df : DataFrame
            Dataframe with the columns:
            - X: filenames of the images to laod
            - y: image label (already label-encoded)
        transform : torchvision transformations
            Torchvision transformations to by applied to the image.
        """
        self.df = df
        self.X = df.loc[:, "X"].tolist()
        self.y = df.loc[:, "y"].tolist()
        self.transform = transform

    def __len__(self):
        """Return the lenght of the dataset."""
        return len(self.df)

    def __getitem__(self, index):
        """Return the element (X,y) given the index."""
        # open image using PIL (since Pytorch already integrates with it)
        X = Image.open(self.X[index])
        # some images are opened as grayscale
        # so convert everything to RGB
        if X.mode == "L": 
            X = X.convert("RGB")
        # apply image transformations
        X = self.transform(X)
        y = self.y[index]
        return X, y


In [None]:
SIZE = (128, 128)

# train data augmentation transformations
train_transform = T.Compose([
    T.ColorJitter(brightness=.2, contrast=.2),
    T.RandomHorizontalFlip(.5),
    T.RandomVerticalFlip(.5),
    T.RandomRotation(30),
    T.Resize(SIZE),
    T.ToTensor() # convert to pytorch tensor
])

# validation and test set just need resize (no data augmentation)
test_transform = T.Compose([
    T.Resize(SIZE),
    T.ToTensor()
])

# create dataset
train_dataset = CustomDataset(df_train, transform=train_transform)
val_dataset   = CustomDataset(df_val,   transform=test_transform)
test_dataset  = CustomDataset(df_test,  transform=test_transform)

# create dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True , num_workers=2)
val_loader   = DataLoader(val_dataset  , batch_size=32, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_dataset , batch_size=32, shuffle=False, num_workers=2)

In [None]:
# let's plot some images retrieved from a dataloader

ncols = 7

fig, axs = plt.subplots(ncols=ncols, figsize=(20,7))

for (x,y),ax in zip(DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=2), axs):
    # X is (BATCH, C, H, W)
    x = x[0]
    ax.imshow(T.ToPILImage()(x), cmap="gray")
    ax.axis("off")
    ax.grid(False)

## Neural Netowk with Pytorch

NNs are created subclassing `nn.Module`. They must contain the methods:
- `__init__`, where you initialize the netowk layers
- `forward`, which defines the operations onm the input data

In [None]:
class PytorchModel(nn.Module):
    """
    Neural Networks with 3 convolutional blocks (conv + max polling + dropout)
    followed by a dense part for classification.
    """
    def __init__(self, input_size, num_class=2):
        super().__init__() # initialize the parent class
        
        self.height, self.width = input_size
        
        # define all the layers

        # pythorch uses (C, H, W) order
        self.conv_block1 = nn.Sequential(
            # need to specify input and output channels
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=5, padding="same"),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(.25)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, padding="same"),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(.25)
        )
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding="same"),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(.25)
        )
        self.flatten = nn.Flatten()
        # there are 3 maxpool layers with kernel=2, so the size
        # of the input image is reduced by a factor 2**3 = 8
        self.dense_block = nn.Sequential(
            nn.Linear(in_features=self.height//8 * self.width//8 * 64, out_features=512),
            nn.ReLU(),
            nn.Dropout(.25),
            nn.Linear(in_features=512, out_features=128),
            nn.ReLU(),
            nn.Dropout(.25),
            nn.Linear(128, num_class)
        )
        self.output = nn.Softmax(dim=1)

    def forward(self, x):
        # define what happens to an input passed to the model
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.flatten(x)
        x = self.dense_block(x)
        out = self.output(x)

        return out


In [None]:
# you need to move the model to the gpu manually
device = torch.device("cuda" if cuda.is_available() else "cpu")
print("Found device:", device, "\n\n")

model = PytorchModel(input_size=SIZE)
model.to(device)

print(model)

In [None]:
# optimizer, scheduler and loss function

optimizer = Adam(model.parameters(), lr=1e-03)
scheduler = ReduceLROnPlateau(optimizer, mode="min", patience=5, factor=.5)
loss_function = nn.CrossEntropyLoss()

### Early Stopping and metrics

Since in Pytorch the training loop is written by the user, we must implement our own early stopping. Here I create a class, which must be updated at every epoch calling the `.step()` method,

In [None]:
class EarlyStopping():
    """
    Early Stopping for Pytorch training. It should replicates Keras early stopping functionalities:
    Inspiration from:
    - https://gist.github.com/stefanonardo/693d96ceb2f531fa05db530f3e21517d
    - https://github.com/Bjarten/early-stopping-pytorch/blob/master/pytorchtools.py
    """

    def __init__(self, patience=10, min_delta=0, mode="min", save_best_weights=True):
        """
        patience : int [default=10]
            Number of epochs without improvement to wait before stopping the training.
        min_delta : float [default=0]
            Minimum value to identify as an improvement.
        mode : str [default="min"]
            One of "min" or "max". Identify whether an improvement
            consists on a smaller or bigger value in the metric.
        save_best_weights : bool [default=True]
            Whether to save the model state when there is an improvement or not.
        """
        # save initialization argument
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        # determine which is the definition of better score given the mode
        if self.mode=="min":
            self._is_better = lambda new, old: new < old - self.min_delta
            self.best_score = np.Inf
        elif self.mode=="max":
            self._is_better = lambda new, old: new > old + self.min_delta
            self.best_score = -np.Inf

        self.save_best_weights = save_best_weights
        self.best_weights = None
        # keep tracks of number of iterations without improvements
        self.n_iter_no_improvements = 0
        # whether to stop the training or not
        self.stop_training = False
        

    def step(self, metric, model=None):
        """
        Method to be called at each epochs to update the progress.

        Parameters
        ----------
        metrics : float
            Metric value at the epoch.
        model : pytorch NN
            Pytorch model to store the weights of in case of improvements.
        """
        # if there is an improvements, update `best_score` and save model weights
        if self._is_better(metric, self.best_score):
            self.best_score = metric
            self.n_iter_no_improvements = 0
            if self.save_best_weights and model is not None:
                self.best_weights = model.state_dict()
        # otherwise update counter
        else:
            self.n_iter_no_improvements += 1

        # if no improvements for more epochs than patient, stop the training
        # (set the flag to False)
        if self.n_iter_no_improvements >= self.patience:
            self.stop_training = True
            print(f"Early Stopping: monitored quantity did not improved in the last {self.patience} epochs.")

        return self.stop_training
        
early_stopping = EarlyStopping(patience=15, save_best_weights=True)

Also the calculation of the metrics is left to the user, since the training loop must be written from scratch. I will show how to do it with the Accuracy.

However, there is also the [torchmetrics](https://torchmetrics.readthedocs.io/en/latest/) library, which provides already built in metrics. As en example, I will use the AUC score.§

In [None]:
!pip install torchmetrics

In [None]:
from torchmetrics import AUROC

# example using torchmetrics
# metrics need to be in the same device as the model
train_auc = AUROC(num_classes=2).to(device)
val_auc   = AUROC(num_classes=2, compute_on_step=False).to(device)

## Training Loop

Training loop must be written from scratch. This add complexity to the code, but allows for a greater flexibility. Also the metric computation is left to the user.

In [None]:
epochs = 100

N_train_inputs  = len(train_loader.sampler)
N_train_batches = len(train_loader)

N_val_inputs  = len(val_loader.sampler)
N_val_batches = len(val_loader)

history = defaultdict(list) # to keep tracks of the metrics during training


# loop over the epochs
for epoch in range(epochs):

    ##################
    # training phase #
    ##################

    model.train()
    # metrics to keep tracks during this epoch
    train_acc  = 0
    train_loss = 0

    # this is just to have a progress bar during each epoch
    pbar = tqdm(total=N_train_batches, desc=f"Epoch {epoch+1}/{epochs}")

    # loop over minibatches
    for i, data in enumerate(train_loader):
        # data is [inputs, labels]
        inputs, labels = data
        # data tensors must be in the same device as the model
        inputs = inputs.to(device)
        labels = labels.to(device)

        # --------------
        # learning phase
        # --------------

        # set the gradients of all tensors to zero.
        optimizer.zero_grad()
        
        # FORWARD PASS
        # (gradient is tracked automatically)
        outputs = model(inputs) # outputs.shape = (BATCH_SIZE, NUM_CLASS) = (32, 2)
        # BACKWARD PASS (compute the loss for every parameter - back propagation)
        loss = loss_function(outputs, labels)
        loss.backward()
        # OPTIMIZATION STEP (updates model parameters - gradient descent)
        optimizer.step()

        # -----------------------------------------------------------
        # now compute the update loss and metricxs for this minibatch
        # -----------------------------------------------------------

        # get class prediction
        classes = torch.argmax(outputs, 1)

        # get the batch loss and update the epoch loss
        batch_loss  = loss.item() # average value per batch
        train_loss += batch_loss*len(inputs)
        
        # same for accuracy
        batch_acc   = float((torch.sum(classes==labels)))
        train_acc  += batch_acc

        # torchmetrics keeps track and updates everything by itself 
        batch_auc = float(train_auc(outputs[:,1], labels)) # torchmetrics

        # this is just to have a fancy progress bar which displays metrics updates during each minibatch
        pbar_metrics = {
            "loss": batch_loss,
            "acc" : batch_acc / len(inputs),
            "auc" : batch_auc
        }
        pbar.set_postfix(pbar_metrics)
        pbar.update()

    # update progress bar to show the metrics at the end of the training epoch
    pbar_metrics["loss"] = train_loss / N_train_inputs
    pbar_metrics["acc"]  = train_acc / N_train_inputs
    pbar_metrics["auc"]  = float(train_auc.compute()) # torchmetrics
    pbar_metrics["lr"]   = optimizer.param_groups[0]["lr"]
    pbar.set_postfix(pbar_metrics)

    ####################
    # validation phase #
    ####################

    model.eval()
    # validation metrics
    val_acc = 0
    val_loss = 0

    # do not store the gradient since here we are not training
    with torch.no_grad():
        # loop over validation minibatches
        for i,data in enumerate(val_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)

            classes = torch.argmax(outputs, 1)

            # update validation metrics during each minibatch
            val_loss += loss.item()*len(inputs)
            val_acc  += float((torch.sum(classes==labels)))
            val_auc(outputs[:,1], labels) # torchmetrics

        # update progress bar to show also validation metrics
        pbar_metrics["val_loss"] = val_loss / N_val_inputs
        pbar_metrics["val_acc"]  = val_acc / N_val_inputs
        pbar_metrics["val_auc"]  = float(val_auc.compute()) # torchmetrics
    pbar.set_postfix(pbar_metrics)
    pbar.close()

    # save metrics for each epoch in `history` dictionary, to plot them later
    # (since I already have the current epoch score in `pbar_metrics`, just copy from it)
    for k,v in pbar_metrics.items():
        history[k].append(v)

    # update the state of the scheduler
    # (to see if lr should be decreased)
    scheduler.step(pbar_metrics["val_loss"])
    # also udate sutom early stopping to see if training must be stopped
    if early_stopping.step(pbar_metrics["val_loss"], model):
        # if stopping, load best model weights
        model.load_state_dict(early_stopping.best_weights)
        break


In [None]:
# save model
torch.save(model.state_dict(), "results/pytorch_model.pt")

# to load
#model = PytorchModel(input_size=SIZE)
#model.load_state_dict(torch.load("results/pytorch_model.pt"))

In [None]:
from utils import plot_history, print_metrics

In [None]:
!pygmentize -g utils.py

In [None]:
plot_history(history, ["acc", "auc"], nrows=2, ncols=2, figsize=(20,8))

## Predictions on the test set

In order to make predictions on new data, a similar loop as for the validation phase must be written.

In [None]:
model.eval()
predictions = []

with torch.no_grad(): # do not store gradient
    for image,label in tqdm(test_dataset):
        # add batch dimension
        image = image[None,:,:,:].to(device)
        pred = model(image)
        pred = pred[:,1].tolist()
        predictions += pred

predictions = np.array(predictions)
y_true = np.array(test_dataset.y)

In [None]:
%load_ext autoreload
%autoreload 2

from utils import print_metrics

_ = print_metrics(predictions, y_true)

Ina real application it could be useful to see the distribution of the  predicted probabilities for each class, to see how much the model is *confident*.

In [None]:
fig = plt.figure(figsize=(10,5))
_ = plt.hist(predictions, bins=25)
plt.xlabel("Prediction Probability")

Also inspecting wrong predictions can be useful.

In [None]:
# plot some misclassify examples
nrows = 2
ncols = 5

fig, axs = plt.subplots(nrows=nrows, ncols=ncols, figsize=(30,10))
wrong_idx = np.argwhere((predictions>0.5).astype(int) != y_true)

for idx,ax in zip(wrong_idx.flatten(), axs.flatten()):
    im, y = test_dataset[idx]
    ax.imshow(T.ToPILImage()(im))
    pred = 0 if predictions[idx] < 0.5 else 1
    ax.set_title(f"Pred = {pred} | True = {y}")
    ax.axis("off")
    ax.grid(False)