# COMP432 Project by Maxime Mahdavian and Artem Chernigel

This jupyter notebook was included for convienience, but all of the test we conducted were on the Python files and thus this file is just these scripts copy/pasted.


In [None]:
import os
import numpy as np
import matplotlib
import torch
import torchvision
import torch.nn as nn
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torchvision.transforms as tt
from torch.utils.data import random_split
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
import timeit
import datetime
import device_function
import ResNet as convnet
import argparse
import tensorflow
from keras_preprocessing.image import ImageDataGenerator
from keras.applications.vgg16 import VGG16
from keras.layers import Dense, Flatten, Dropout,BatchNormalization ,Activation
from keras.models import Model, Sequential
from keras.applications.nasnet import NASNetLarge
from keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
from keras.optimizer_v1 import Adam
from keras.metrics import BinaryAccuracy
from keras.metrics import Precision
from keras.metrics import Recall

# Resnet9

In [None]:
# For using a GPU if available, finds an available device
def get_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')


# Puts some data into the device, particurlarly useful and required if using a GPU
def to_device(data, device):
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)


# Warp a dataloader to move data to device
class DeviceDataloader():
    def __init__(self, dataloader, device):
        self.dataloader = dataloader
        self.device = device

    def __iter__(self):
        for x in self.dataloader:
            yield to_device(x, self.device)

    # Number of batches
    def __len__(self):
        return len(self.dataloader)

In [None]:
# Get the accuracy
# Input: tensor representing the output and truth labels associated with the output
# Output: tensory of the accuracy
def accuracy(output, label):
    _, pred = torch.max(output, dim=1)
    return torch.tensor(torch.sum(pred == label).item() / len(pred))


# Base function for the ImageClassification. It handles all the steps throughout running the model
# It allows more flexibility for recording data through the training of the model
class ImageClassification(nn.Module):
    # Feed forward method
    # Input: Batch from the dataloader
    # Output: Loss calculated with cross entropy
    def training_step(self, batch):

        image, label = batch
        output = self(image)
        loss = torch.nn.functional.cross_entropy(output, label)
        return loss

    # Feed forward during testing
    # Input: Batch from a dataloader
    # Output: Dictionary entry of the loss and accuracy
    def testing_step(self, batch):
        image, label = batch
        output = self(image)
        loss = torch.nn.functional.cross_entropy(output, label)
        acc = accuracy(output, label)
        return {'loss': loss.detach(), 'acc': acc}

    # Collects several stats necessary to see how the model is performaning during training, namely
    # accuracy and loss
    # Input: Tensor representing the output of the model
    # Output: Dictionary entry of the testing loss and testing accuracy
    def testing_end(self, output):
        batch_loss = [x['loss'] for x in output]
        epoch_loss = torch.stack(batch_loss).mean()
        batch_accuracy = [x['acc'] for x in output]
        epoch_accuracy = torch.stack(batch_accuracy).mean()
        return {'loss': epoch_loss.item(), 'acc': epoch_accuracy.item()}

    # Displays the stats at the end of the epoch
    # Input: epoch number, results of the epoch, time it took for the epoch and filename for logging
    def epoch_end(self, epoch, result, time, file):
        print(f"Epoch [{epoch}], last_lr: {result['lrs'][-1]:.5f}, train_loss: {result['test_loss']:.4f}, "
              f"test_loss: {result['loss']:.4f}, acc: {result['acc']:.4f}, Epoch_time: {time}")
        file.write(f"Epoch [{epoch}], last_lr: {result['lrs'][-1]:.5f}, train_loss: {result['test_loss']:.4f}, "
                   f"test_loss: {result['loss']:.4f}, acc: {result['acc']:.4f}, Epoch_time: {time}\n")


# This is an implementation of the Resnet9 neural network. not sure if it's good
class ResNet(ImageClassification):
    def __init__(self, in_channels, num_class):
        super().__init__()

        self.conv1 = create_conv_layer(in_channels, 64)
        self.conv2 = create_conv_layer(64, 128, pool=True)
        self.res1 = nn.Sequential(create_conv_layer(128, 128), create_conv_layer(128, 128))
        self.conv3 = create_conv_layer(128, 256, pool=True)
        self.conv4 = create_conv_layer(256, 512, pool=True)
        self.res2 = nn.Sequential(create_conv_layer(512, 512), create_conv_layer(512, 512, ))
        self.classifier = nn.Sequential(nn.MaxPool2d(4), nn.Flatten(), nn.Linear(512, num_class))

    def forward(self, inputs):
        output = self.conv1(inputs)
        output = self.conv2(output)
        output = self.res1(output) + output
        output = self.conv3(output)
        output = self.conv4(output)
        output = self.res2(output) + output
        output = self.classifier(output)
        return output


# Create a layer of the convolutional layer
# Input: number of in channels, number of out channels, activation function, true if pooling, normalize or
# initialization are enabled, false otherwise
# Output: Sequential object representing a convolutional layer of the model
def create_conv_layer(in_channels, out_channels, activation='relu', pool=False, normalize=True, init=True):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)]

    if init and activation == 'relu':
        torch.nn.init.kaiming_uniform_(layers[0].weight, nonlinearity='relu')
    elif init and activation == 'leaky':
        torch.nn.init.kaiming_uniform_(layers[0].weight)

    if normalize:
        layers.append(nn.BatchNorm2d(out_channels))

    if activation == 'relu':
        layers.append(nn.ReLU(inplace=True))
    elif activation == 'leaky':
        layers.append(nn.LeakyReLU(inplace=True))

    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)


# Test function
# Input: model and dataloader
# Output: Dictionary entry of the loss and accuracy
@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    output = [model.testing_step(batch) for batch in loader]
    return model.testing_end(output)


# Returns the learning rate
# Input: Pytorch optimizer object
# Output: Float for the learning rate
def get_learning_rate(optimizer):
    for param in optimizer.param_groups:
        return param['lr']


# Main training function
# Input: Max number of epochs, learning rate, model, train and test dataloaders, weight_decay,
# max accuracy where the model will stop training, float for weight decay, optimizer function, filename for logging
# Output: Dictionary of history of model's performance
def cycle(epochs, max_lr, model, trn_dataloader, tst_dataloader, weight_decay=0, max_acc=65, grad_clip=None,
          opt_func=torch.optim.SGD, file=None):
    torch.cuda.empty_cache()
    history = []

    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs,
    #                                                 steps_per_epoch=len(trn_dataloader))

    # file.write(f'\nmax_lr: {max_lr}, weight_decay: {weight_decay}, grad_clip: {grad_clip}, optimizer: {opt_func}\n')
    # file.write('----------------------------------------------------------------------------------------------\n\n')
    for epoch in range(1, epochs + 1):
        start = timeit.default_timer()
        model.train()
        trn_loss = []
        lr_list = []
        for batch in trn_dataloader:
            loss = model.training_step(batch)
            trn_loss.append(loss)
            loss.backward()

            # Gradient clipping
            if grad_clip:
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)

            optimizer.step()
            optimizer.zero_grad()

            lr_list.append(get_learning_rate(optimizer))
            # scheduler.step()

        result = evaluate(model, tst_dataloader)
        result['test_loss'] = torch.stack(trn_loss).mean().item()
        result['lrs'] = lr_list
        end = timeit.default_timer() - start
        model.epoch_end(epoch, result, end, file)
        history.append(result)
        if result['acc'] >= max_acc:
            break
    return history


In [None]:
# ratio('dataset2/train', output='output', seed=40093125, ratio=(0.8, 0.2))

data_dir = 'dataset'
torch.manual_seed(0)

# Add the command line argument for the learning rate
parser = argparse.ArgumentParser()
parser.add_argument("-lr", help='learning rate')
args = parser.parse_args()

# Some hyperparameter and objective function
batch_size = 200
epoch = 50
max_lr = float(args.lr)
grad_clip = 0.1
weight_decay = 1e-4
max_acc = 0.65
obj_func = torch.optim.SGD

file = open('test.txt', 'a')


# Show part of a batch
def show_batch(dataloader):
    for images, label in dataloader:
        fig, ax = plt.subplots(figsize=(12, 12))
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(make_grid(images[:64], nrow=8).permute(1, 2, 0))
        plt.show()
        break

# Plot the graph for the loss and accuracy at the end of a run
# Input: History of model's training in dictionary form and string for the title of the file
def plot_graph(history, title):
    plt.subplot(1, 2, 1)
    acc = [x['acc'] for x in history]
    plt.plot(acc, '-x')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.title('Accuracy')

    plt.subplot(1, 2, 2)
    test_loss = [x['loss'] for x in history]
    train_loss = [x.get('test_loss') for x in history]
    plt.plot(test_loss, '-x')
    plt.plot(train_loss, '-o')
    plt.xlabel('epoch')
    plt.ylabel('Loss')
    plt.title('Training and test loss')
    plt.legend(['test loss', 'training loss'])

    plt.savefig(title)
    plt.show()


# print(os.listdir(data_dir))
# classes = os.listdir(data_dir + '/train')
# print(classes)

# Normalizes the data by setting the mean to 0 and variance by 1.
# Also data augmentation, pads the image by 4 pixels, then take a random crop of 32x32 and then
# there is a 50% chance of the image being flipped horizontally
# This happens at every epoch, so the model sees slightly different versions of the images every time
train_tfms = tt.Compose([tt.RandomCrop(48, padding=4, padding_mode='reflect'),
                         tt.RandomHorizontalFlip(),
                         tt.ToTensor()])

test_tfms = tt.Compose([tt.ToTensor()])

# Datasets
train_dataset = ImageFolder(data_dir + '/train', train_tfms)
test_dataset = ImageFolder(data_dir + '/test', test_tfms)
val_dataset = ImageFolder(data_dir + '/val', test_tfms)

# DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size, shuffle=True, num_workers=3, pin_memory=True)
test_dataloader = DataLoader(test_dataset, batch_size * 3, num_workers=3, pin_memory=True)
validation_dataloader = DataLoader(val_dataset, batch_size * 3, num_workers=3, pin_memory=True)

# show_batch(test_dataloader)

# Get the device that will be used to train the model
# It will be cuda if your GPU is available or cpu
device = device_function.get_device()
print(device)

# Wrap the dataloader to put it to the device
train_dataloader = device_function.DeviceDataloader(train_dataloader, device)
test_dataloader = device_function.DeviceDataloader(test_dataloader, device)
validation_dataloader = device_function.DeviceDataloader(validation_dataloader, device)
#
for i in range(3):
    filename = "test" + str(i) + ".png"

    # Create the model ConvNetwork and put it on the device
    model = device_function.to_device(convnet.ResNet(3, 7), device)
    #
    # History is just to get the history of the loss and accuracy, mostly to see how the model
    # evolves
    history = [convnet.evaluate(model, validation_dataloader)]
    print(f"Epoch [{0}], "
          f"test_loss: {history[0]['loss']:.4f}, acc: {history[0]['acc']:.4f}, Epoch_time: {0}")
    file.write(f"Epoch [{0}], "
               f"test_loss: {history[0]['loss']:.4f}, acc: {history[0]['acc']:.4f}, Epoch_time: {0}\n")
    #
    #
    start = timeit.default_timer()
    # Training step is the main training function
    history += convnet.cycle(epoch, max_lr, model, train_dataloader, validation_dataloader, max_acc=max_acc,
                             grad_clip=grad_clip, weight_decay=weight_decay, opt_func=obj_func, file=file)

    # Orion helper function, need to comment it out when not doing cross-validation
    # report_objective(history[-1]['loss'])

    duration = timeit.default_timer() - start
    plot_graph(history, filename)
    print(datetime.timedelta(seconds=duration))
    file.write(str(datetime.timedelta(seconds=duration)) + "\n")


# VGGNET16

In [None]:

# Data augmentation to train the model with more data, so it generalizes better
train_datagen = ImageDataGenerator(
        # rescale = 1./255,
        validation_split = 0.2,                         
        # rotation_range=5,
        # width_shift_range=0.2,
        # height_shift_range=0.2,
        # shear_range=0.2,
        # horizontal_flip=True,
        # vertical_flip=True,
        # fill_mode='nearest'
        )

valid_datagen = ImageDataGenerator(
    # rescale = 1./255,
    validation_split = 0.2
    )

test_datagen  = ImageDataGenerator(
    # rescale = 1./255
    )

# Get the training and test data and initialize train/validation/test sets
train_dataset = train_datagen.flow_from_directory(directory="face_recognition/train", target_size=(48, 48), class_mode="categorical", subset="training", batch_size=64)

valid_dataset = train_datagen.flow_from_directory(directory="face_recognition/train", target_size=(48, 48), class_mode="categorical", subset="validation", batch_size=64)

test_dataset = train_datagen.flow_from_directory(directory="face_recognition/test", target_size=(48, 48), class_mode="categorical", batch_size=64)

# VGG16 is a simple and widely used Convolutional Neural Network Architecture used for ImageNet,
# a large visual database project used in visual object recognition software research
base_model = VGG16(input_shape = (48, 48, 3), include_top = False, weights = "imagenet")

# We want to have a deep neural network, but we do not want to spend much time training it
# That is why, we freeze the weights of the layers, and use a pretrained model with already useful weights
for layer in base_model.layers[:-4]:
    layer.trainable=False


model = Sequential()
model.add(base_model)
# Add Dropout, which is a regularization method that efficiently approximates training a large number of neural networks and avoids co-adaptations
# model.add(Dropout(0.5))
# Add a Flatten layer to squash the 3 dimensions of an image to a single dimension
model.add(Flatten())
# Batch normalization is similar to input normalization, but it is computed at minibatch level in the internal layers
model.add(BatchNormalization())
# Add a Dense layer at the top of the Convolution layer to classify the images
model.add(Dense(32, kernel_initializer="he_uniform"))
# Add Batch normalization, which is similar to input normalization, but it is computed at minibatch level in the internal layers
model.add(BatchNormalization())
# Add relu(Rectified Linear Unit) activation to each layers so that all the negative values are not passed to the next layer
model.add(Activation("relu"))
# Add Dropout, which is a regularization method that efficiently approximates training a large number of neural networks and avoids co-adaptations
# model.add(Dropout(0.5))
# Add a Dense layer at the top of the Convolution layer to classify the images
model.add(Dense(32, kernel_initializer="he_uniform"))
# Add Batch normalization, which is similar to input normalization, but it is computed at minibatch level in the internal layers
model.add(BatchNormalization())
# Add relu(Rectified Linear Unit) activation to each layers so that all the negative values are not passed to the next layer
model.add(Activation("relu"))
# Add Dropout, which is a regularization method that efficiently approximates training a large number of neural networks and avoids co-adaptations
# model.add(Dropout(0.5))
# Add a Dense layer at the top of the Convolution layer to classify the images
model.add(Dense(32, kernel_initializer="he_uniform"))
# Add Batch normalization, which is similar to input normalization, but it is computed at minibatch level in the internal layers
model.add(BatchNormalization())
# Add relu(Rectified Linear Unit) activation to each layers so that all the negative values are not passed to the next layer
model.add(Activation("relu"))
# 7 unit Dense layer since we have 7 classes to predict 
model.add(Dense(7, activation="softmax"))

model.summary()

metrics = [
    BinaryAccuracy(name="accuracy"),
    Precision(name="precision"),
    Recall(name="recall")
]

# Use Adam optimizer to reach the global minima whilce training the model
# If it is stuck in local minima while training then the Adam optimiser will help to get out of local minima and reach global minima
# 
model.compile(optimizer="Adam", loss="categorical_crossentropy", metrics=metrics)

model.fit(train_dataset, validation_data=valid_dataset, epochs=5, verbose=1)
