In [1]:
import numpy as np
import torch 
import matplotlib.pyplot as plt
import enum

import os
from PIL import Image
import cv2 #pip install opencv-python
import torchvision.transforms as transforms
import matplotlib.image as mpimg
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet50
from torchvision import datasets, models, transforms
import torchvision
import copy
import sklearn.metrics
import time
import torch.utils
import torch.utils.data
from sklearn.metrics import confusion_matrix
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import classification_report

In [2]:
%run preprocess.ipynb
%run stinna.ipynb
%run extraFunctions.ipynb

## Datasets

Split data into train set, validation set and test set with ratio 80/10/10

In [6]:
# import splitfolders
# splitfolders.ratio(PATHbirdsWithBackground, output="output",seed=42, ratio=(0.8,0.1,0.1) )

In [3]:
PATH_TEST = "split_withbackground/test"
PATH_TRAIN = "split_withbackground/train"
PATH_VAL = "split_withbackground/val"
PATH_FEEDER = "feeder-data"

ResNet accepts input images of shape `(3 x 224 x 224)` and they must be loaded in to a range of `[0,1]` and normalised using `mean = [0.485, 0.456, 0.406]` and `std = [0.229, 0.224, 0.225]` (https://pytorch.org/hub/pytorch_vision_resnet/). Our data already has the correct size, so here, we simply add `ToTensor()`, which converts the images from `(H x W x C)` in range `[0,255]` to `(C x H x W)` in range `[0.0,1.0]`, and the normalisation. 

In [4]:
preprocess_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

Datasets from our imagefolders

In [8]:
dataset_test = datasets.ImageFolder(PATH_TEST, preprocess_transforms)
dataset_train = datasets.ImageFolder(PATH_TRAIN, preprocess_transforms)
dataset_val = datasets.ImageFolder(PATH_VAL, preprocess_transforms)
dataset_feeder = datasets.ImageFolder(PATH_FEEDER, preprocess_transforms)

dataset_size_train = len(dataset_train)
dataset_size_val = len(dataset_val)

print('No of images in training set: {}'.format(len(dataset_train)))
print('No of images in validation set: {}'.format(len(dataset_val)))
print('No of images in test set: {}'.format(len(dataset_test)))
print('No of images in feeder set: {}'.format(len(dataset_feeder)))

class_labels = dataset_val.classes
# print('Labels: {}'.format(class_labels))
# print('Labels (feeder): {}'.format(dataset_feeder.classes))


No of images in training set: 835
No of images in validation set: 104
No of images in test set: 105
No of images in feeder set: 15079


DataLOADERS (which is what we feed to the training)

In [14]:
dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=16, shuffle=True, num_workers=4)
dataloader_validation = torch.utils.data.DataLoader(dataset_val, batch_size=16, shuffle=True, num_workers=4)
dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=16, shuffle=False, num_workers=4)
dataloader_feeder = torch.utils.data.DataLoader(dataset_feeder, batch_size=16, shuffle=False, num_workers=4)


In [11]:
# for taking a little look at the data :) 
# inputs, classes = next(iter(dataloader_train))
# out = torchvision.utils.make_grid(inputs)
# imshow(out, title=[class_labels[x] for x in classes])

In [15]:
# Extracting a subset to be able to test our code before doing big boi training
subset1 = torch.utils.data.Subset(dataset_train, np.random.choice(len(dataset_train), 16, replace=False))
subset2 = torch.utils.data.Subset(dataset_val, [1,8,9,16, 60, 80, 98, 100, 103, 20,31, 40,50,70,90,88])
dataloader_tiny = DataLoader(subset1, batch_size=16, shuffle=True, num_workers=0)
dataloader_tiny_val = DataLoader(subset2, batch_size=16, shuffle=True, num_workers=0)

## Importing ResNet50 model 
and getting it ready for transfer learning!

Put the pedal to the metal and use GPU

In [9]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
def load_and_prep_vgg16():
    model = torchvision.models.vgg16(weights=models.VGG16_Weights.DEFAULT)
    model.classifier[-1] = torch.nn.Linear(in_features=4096, out_features=7)

    #Freeze layers
    for param in model.features.parameters():
      param.requires_grad = False

    #Unfreeze classifier
    for param in model.classifier.parameters():
      param.requires_grad = True

    model = model.to(device)

    return model

In [12]:
pretrained_weights = models.ResNet50_Weights.DEFAULT
def load_and_prep_resnet50(weights = pretrained_weights):
    model = torchvision.models.resnet50(weights=weights)
    
    #Replace last layer to match our 7 classes
    model.fc = torch.nn.Linear(model.fc.in_features, 7)

    # Freeze all layers (i.e., disable training so we dont start from scratch)
    for param in model.parameters():
        param.requires_grad = False

    # Unfreeze final layer (named fc) s.t. we only train that to get a better starting point for fine tuning
    for param in model.fc.parameters():
        param.requires_grad = True
    
    # Put the model on the GPU
    model = model.to(device)

    return model


def unfreeze_layers(model):
    for param in model.parameters():
        param.requires_grad = True

def unfreeze_layer4(model: torchvision.models.resnet50):
    for param in model.layer4.parameters():
            param.requires_grad = True

def unfreeze_layer3(model: torchvision.models.resnet50):
    for param in model.layer3.parameters():
            param.requires_grad = True

def model_frozen_status(model):
    # Print layer freezing status
    for name, param in model.named_parameters():
        print(f"{name} requires_grad={param.requires_grad}")

def get_optimizer(model):
    #Use stochastic gradient descent and optimize parameters
    return torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)    

## Train function

In [16]:
def train_model(model, optimizer, num_epoch=5, train = dataloader_train, validation= dataloader_validation):
    acc_train = []
    loss_train = []
    acc_validation = []
    loss_validation = []
    best_acc = 0.0
    best_loss = 1.0
    best_epoch = 0
    best_model_weight = copy.deepcopy(model.state_dict())

    since = time.time()
    for epoch in range(num_epoch):
        epoch_since = time.time()
        print('Epoch {}/{}'.format(epoch+1, num_epoch))
        print("-"*10)
        #training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0
        for inputs, labels in train:
            inputs = inputs.to(device)
            labels = labels.to(device)

            #zero the parameter gradients 
            optimizer.zero_grad()

            #forward
            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                criterion = torch.nn.CrossEntropyLoss()
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss_train = running_loss / dataset_size_train
        epoch_acc_train = running_corrects.double() / dataset_size_train
        acc_train.append(epoch_acc_train.item())
        loss_train.append(epoch_loss_train)
        print('Train Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss_train, epoch_acc_train))

        #validation phase
        model.eval()
        running_loss_val = 0.0
        running_corrects_val = 0
        for inputs, labels in validation:
            inputs = inputs.to(device)
            labels = labels.to(device)
            # zero the parameter gradients
            optimizer.zero_grad()
            with torch.set_grad_enabled(False):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                criterion = torch.nn.CrossEntropyLoss()
                loss = criterion(outputs, labels)
            running_loss_val += loss.item() * inputs.size(0)
            running_corrects_val += torch.sum(preds == labels.data)
        epoch_loss_val = running_loss_val / dataset_size_val
        epoch_acc_val = running_corrects_val.double() / dataset_size_val
        acc_validation.append(epoch_acc_val.item())
        loss_validation.append(epoch_loss_val)
        print('Val Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss_val, epoch_acc_val))
        
        if(epoch_acc_val >= best_acc):
            if(epoch_loss_val < best_loss):
                best_acc = epoch_acc_val
                best_loss = epoch_loss_val 
                best_epoch = epoch+1
                best_model_weight = copy.deepcopy(model.state_dict())
        
        time_epoch_val = time.time() - epoch_since           
        print('Epoch time {:.0f}m {:.0f}s'.format(time_epoch_val // 60, time_epoch_val % 60))
        print("-"*10)
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print("Validation accuracies:")
    print(acc_validation)
    print("Training accuracies:")
    print(acc_train)
    print("Best model had accuracy {:.4f}, loss {:.4f} at epoch {}".format(best_acc, best_loss, best_epoch))
    data = {"train_loss": loss_train, "val_loss": loss_validation, "train_acc": acc_train, "val_acc": acc_validation, "epochs": num_epoch, "batch_size": train.batch_size}
    model.load_state_dict(best_model_weight)
    return model, data


In [22]:
def labels_and_predictions(dataloader: DataLoader, model: models.ResNet, device) -> tuple[list[float], list[float]]:
    '''
    Gets all labels and predictions for the images in the dataloader 
    '''
    predlist=torch.zeros(0,dtype=torch.long, device='cpu')
    lbllist=torch.zeros(0,dtype=torch.long, device='cpu')
    with torch.no_grad():
        for i, (inputs, classes) in enumerate(dataloader):
            inputs = inputs.to(device)
            classes = classes.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            # Append batch prediction results
            predlist=torch.cat([predlist,preds.view(-1).cpu()])
            lbllist=torch.cat([lbllist,classes.view(-1).cpu()])
    return lbllist.numpy(), predlist.numpy()

def top_k_accuracy(dataloader: DataLoader, model: models.ResNet, device, k):
    '''
    Gets top k accurracy for the images in the dataloader 
    '''
    lbllist=torch.zeros(0,dtype=torch.long, device='cpu')
    outlits = torch.zeros(0, device='cpu')
    total_pred =0 
    correct_pred = 0
    with torch.no_grad():
        for i, (inputs, classes) in enumerate(dataloader):
            inputs = inputs.to(device)
            classes = classes.to(device)
            outputs = model(inputs)
            # ee = sklearn.metrics.top_k_accuracy_score(classes, outputs, k=3)
            _, preds = torch.topk(outputs, k)

            for lab, cls in zip(classes, preds.detach().cpu()):
                if lab in cls:
                    correct_pred += 1
                total_pred +=1            
    return correct_pred / total_pred

In [18]:
def print_stats(data: dict, print_arrays=False):
    if print_arrays:
        print("Training accuracies")
        print(data["train_acc"])
        print("Validation Accuracies")
        print(data["val_acc"])
        print("Validation Loss")
        print(data["val_loss"])
    print("Lowest loss was {:.4f} at epoch {}".format(np.min(data["val_loss"]), np.argmin(data["val_loss"])+1))
    print("Highest accuracy was {:.4f} at epoch {}".format(np.max(data["val_acc"]),np.argmax(data["val_acc"])+1))
    if ("epochs" in data):
        print("Number of epochs run ", data["epochs"])
    if("batch_size" in data):
        print("Batch size was ", data["batch_size"])
    if("optimizer" in data):
        print("Optimizer used: ", data["optimizer"])
    if("test_acc" in data):
        print("Overall accuracy on test data {:.4f}".format(data["test_acc"]))
    if("feeder_acc" in data):
        print("Overall accuracy on feeder data {:.4f}".format(data["feeder_acc"]))
    if("feed_acc" in data):
        print("Overall accuracy on feeder data {:.4f}".format(data["feed_acc"]))
    if("note" in data):
        print(data["note"])

In [None]:
def load_function(model_path: str, model_key: str = "model", info_key: str = "info", device=device):
    ''' 
    e.g. model, info = load_function("Cycle.tar", model_key="model_cycle", info_key = "model_cycle_data")  
    or model, info = load_function("Cycle.tar") if saved under model and info '''
    loaded_info = torch.load(model_path, weights_only=False, map_location=device)
    new_model = load_and_prep_resnet50()
    new_model.load_state_dict(loaded_info[model_key])
    info = loaded_info[info_key]
    return new_model, info 

def save_function(model_path, model, info, extra_info):
    ''' e.g. save_function("model_aug.tar", model, info, extra={"optimizer": "Adam", "test_acc" 0.97, "feeder_acc": 0.38})'''
    all_info = {}
    all_info.update(info)
    all_info.update(extra_info)
    torch.save({"model": model.state_dict(), "info": all_info}, model_path)

In [19]:
def accuracy(truel, predl):
    return np.sum(predl==truel)/predl.size * 100

In [20]:
# Defining the transforms including augmentations (and also the basic ToTensor and normalisation)
preprocess_with_augmentation1 = transforms.Compose([
    transforms.RandomRotation(degrees=10),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomResizedCrop(size=224,scale=(0.3,1)), # lowerbound the scale at 30 % of og img to not get too small portions
    
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Datasets and loaders with augmentations added 
dataset_train_aug1 = datasets.ImageFolder(PATH_TRAIN, preprocess_with_augmentation1)
dataloader_train_aug1 = torch.utils.data.DataLoader(dataset_train_aug1, batch_size=16, shuffle=True, num_workers=4)

In [21]:
# Defining the transforms including augmentations (and also the basic ToTensor and normalisation)
preprocess_with_augmentation2 = transforms.Compose([
    transforms.ColorJitter(brightness=0.1), # Added
    transforms.GaussianBlur(kernel_size=(5,5), sigma=(7, 9)), # Added
    transforms.RandomRotation(degrees=10),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomResizedCrop(size=224,scale=(0.3,1)), # lowerbound the scale at 30 % of og img to not get too small portions
    
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Datasets and loaders with augmentations added 
dataset_train_aug2 = datasets.ImageFolder(PATH_TRAIN, preprocess_with_augmentation2)
dataloader_train_aug2 = torch.utils.data.DataLoader(dataset_train_aug2, batch_size=16, shuffle=True, num_workers=2)

In [22]:
# Defining the transforms including augmentations (and also the basic ToTensor and normalisation)
preprocess_with_augmentation3 = transforms.Compose([
    transforms.GaussianBlur(kernel_size=(5,5), sigma=(7, 9)), # Added
    transforms.RandomRotation(degrees=10),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomResizedCrop(size=224,scale=(0.3,1)), # lowerbound the scale at 30 % of og img to not get too small portions

    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Datasets and loaders with augmentations added 
dataset_train_aug3 = datasets.ImageFolder(PATH_TRAIN, preprocess_with_augmentation3)
dataloader_train_aug3 = torch.utils.data.DataLoader(dataset_train_aug3, batch_size=16, shuffle=True, num_workers=2)

In [None]:
def plot_confusion_matrix(true_labels, pred_labels, class_labels, normalize=False, verbose=False):
    """
    Computes and plots the confusion matrix of given model on provided data (as a dataloader). 
    May be set to normalize.
    """
    # compute confusion matrix

    cm = confusion_matrix(true_labels, pred_labels)
    print(cm)
    if normalize: cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    # start plotting
    fig, ax = plt.subplots(figsize=(8,8))
    im = ax.imshow(cm, cmap=plt.cm.Greens,vmin=0, vmax=1)
    ax.figure.colorbar(im, ax=ax)
    # display and label all ticks, set titles
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=class_labels, yticklabels=class_labels,
           title="Normalized confusion matrix" if normalize else "Confusion matrix", 
           ylabel="True label",
           xlabel="Predicted label"
           )
    
    # rotate labels
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
    
    # Add annotations in each cell
    fmt = '.2f' if normalize else 'd' # format based on normalize setting
    thresh = cm.max() / 2. # when to switch from black to white text
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    
    fig.tight_layout()
    return ax

def plot_accuracies(train_accs, test_accs, legends=["Train", "Val"]):
    num_epochs = len(train_accs)
    plt.figure(figsize=(6,6))
    plt.plot(np.arange(1,num_epochs+1), train_accs,'-')
    plt.plot(np.arange(1,num_epochs+1), test_accs,'-')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(legends)
    plt.title('Accuracy/Epoch')
    plt.show()

def plot_loss(train_loss, val_loss, legends=["Train", "Val"]):
    num_epochs = len(train_loss)
    plt.figure(figsize=(6,6))
    plt.plot(np.arange(1,num_epochs+1), train_loss,'-')
    plt.plot(np.arange(1,num_epochs+1), val_loss,'-')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(legends)
    plt.title('Loss/Epoch')
    plt.show()