<h1>CSC 249 Model Training</h1>



In [None]:
# import packages
import cv2
import os
import glob 
import numpy as np
import pandas as pd
import torchvision
import torch
import torch.nn as nn
import torch.nn.functional as F
import tensorflow as tf
from torch.autograd import Variable
import torch.utils.data as Data
from matplotlib import pyplot as plt
from torchvision import transforms, models
from GPUtil import showUtilization as gpu_usage
from numba import cuda
from PIL import Image
from copy import copy
import splitfolders
from torchsummary import summary
# import scheduler for learning rate change
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.cuda.amp import GradScaler, autocast
import collections
#import tensorflowjs

# resampling reference: https://github.com/ufoym/imbalanced-dataset-sampler
from torchsampler import ImbalancedDatasetSampler

In [None]:
# you might not need this if your testing set has been divided into publictest and privatetest
def split():
    import os, os.path, shutil

    folder_path = "C:/Users/jixua/OneDrive/Desktop/Machine learning package/archive/test/surprise"
    old_path = "C:/Users/jixua/OneDrive/Desktop/Machine learning package/test_data_2/public"
    new_path = "C:/Users/jixua/OneDrive/Desktop/Machine learning package/test_data_2/private"
    #images = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    images = os.listdir(folder_path)
    print(images)

    for image in images:
        folder_name = image.split('_')[0]
        #new_path = os.path.join(folder_path, folder_name)
        if not os.path.exists(new_path):
            os.makedirs(new_path)

        old_image_path = os.path.join(old_path, image)
        new_image_path = os.path.join(new_path, image)
        shutil.move(old_image_path, new_image_path)

In [None]:
# check the image is grayscale or colored
import cv2
file = "C:/Users/jixua/OneDrive/Desktop/Machine learning package/archive/train/angry/Training_3908.jpg"

image = cv2.imread(file)
if image.any() != None:
    if(len(image.shape)<2):
        print ('grayscale')
    elif len(image.shape)==3:
        print ('Colored')
else:
    print("cannot find image") 

In [None]:
# adding gaussian noise transforms
class AddGaussianNoise(object):
    def __init__(self, mean=0.0, std=1.0):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [None]:
# preprocessing the data, creating dataset and dataloader
# load in data directory
data_dir = 'C:/Users/jixua/OneDrive/Desktop/Machine learning package'
classes = os.listdir(data_dir + "/archive/train")
#print(classes)

# split the training set into training set and validation set (0.9 : 0.1)
# splitfolders.ratio(data_dir + '/archive/train', output = 'data', seed = 1337, ratio = (0.9, 0.1, 0))

# normalize according to pytorch pretrained model
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
#mu, st = 0, 255
    
# resnet50 is for 224
# data augmentation and normalization for trainning data
train_image_transform = transforms.Compose([
    #transforms.Resize([256,256]),
    #transforms.RandomResizedCrop(224),
    #transforms.RandomAffine(degrees = 0, translate = (0.1, 0.1)),
    #transforms.RandomVerticalFlip(),
    #transforms.GaussianBlur(kernel_size = 3),
    #transforms.RandomRotation(10),
    #transforms.RandomAffine(degrees = 180, translate = (0.1, 0.1)),
    #transforms.RandomHorizontalFlip(),
    #transforms.Grayscale(num_output_channels=3),
    #transforms.ToTensor(),
    #normalize,
    transforms.Grayscale(),
    transforms.RandomResizedCrop([48,48], scale = (0.8, 1.2)),
    transforms.RandomApply([transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5)], p=0.5),
    transforms.RandomApply([transforms.RandomAffine(0, translate=(0.1, 0.1))], p=0.5),
    transforms.RandomApply([transforms.RandomRotation(10)], p=0.5), 
    transforms.RandomHorizontalFlip(),
    transforms.FiveCrop(40),
    transforms.Lambda(lambda crops : torch.stack([transforms.ToTensor()(crop) for crop in crops])),
    transforms.Lambda(lambda tensors: torch.stack([transforms.RandomErasing()(t) for t in tensors])),
        
    #transforms.Lambda(lambda tensors: torch.stack([normalize(t) for t in tensors])),
    #transforms.Lambda(lambda tensors: torch.stack([transforms.RandomApply([AddGaussianNoise(0.1, 0.02)], p=0.5)(t) for t in tensors])),
])

# only data normalization for validation data and test data
no_augmentation_transform = transforms.Compose([
    #transforms.ten
    transforms.Resize([224, 224]),
    #transforms.Grayscale(),
    transforms.ToTensor(),
])
    
    
no_crop_transform = transforms.Compose([
    transforms.Resize(48),
    transforms.Grayscale(),
    transforms.ToTensor(),
])
    
valid_image_transform = transforms.Compose([
    transforms.Resize(48),
    transforms.Grayscale(),
    transforms.TenCrop(40),
    transforms.Lambda(lambda crops : torch.stack([transforms.ToTensor()(crop) for crop in crops])),
    #transforms.Lambda(lambda tensors: torch.stack([normalize(t) for t in tensors])),
])
    
test_image_transform = transforms.Compose([
    transforms.Grayscale(),
    transforms.Resize(48),
    transforms.TenCrop(40),
    transforms.Lambda(lambda crops : torch.stack([transforms.ToTensor()(crop) for crop in crops])),
    #transforms.Lambda(lambda tensors: torch.stack([normalize(t) for t in tensors])),
])
    
    
# train set is reading from the train folder
train_set = torchvision.datasets.ImageFolder(data_dir + '/archive/train', transform = train_image_transform)


# valid set should not have transformation
valid_set = torchvision.datasets.ImageFolder(data_dir + '/test_data/public', transform = valid_image_transform)

#train_set_temp, valid_set = Data.random_split(valid_data_set, [train_size, valid_size])
    

# batch_size might affect cuda memory
#train_loader = Data.DataLoader(dataset = train_set, sampler=ImbalancedDatasetSampler(train_set), batch_size = 256, num_workers=0)
train_loader = Data.DataLoader(dataset = train_set, batch_size = 256, shuffle = True, num_workers=0)
valid_loader = Data.DataLoader(dataset = valid_set, batch_size = 256, shuffle = True, num_workers=0)

# create testing set
# testing set is reading from the test folder
test_set_no_aug = torchvision.datasets.ImageFolder(data_dir + '/test_data/private', transform = no_crop_transform)
    
test_set = torchvision.datasets.ImageFolder(data_dir + '/test_data/private', transform = test_image_transform)
    
test_loader = Data.DataLoader(dataset = test_set, batch_size = 256, shuffle = True, num_workers=0)
    
test_loader_no_aug = Data.DataLoader(dataset = test_set_no_aug, batch_size = 128, shuffle = True, num_workers=0)
    
    
# get label class
labels_class = train_set.classes
    
# get number of samples in each class
print(dict(collections.Counter(train_set.targets)))
    
# generate loss weight
class_weights = torch.tensor([1/3395, 1/436, 1/4097, 1/7215, 1/4965, 1/4830, 1/3171])
print(class_weights)
print(labels_class)
    
    #return train_set, train_loader, valid_loader, test_loader, test_image_transform, valid_image_transform, 
    #labels_class, class_weights, no_augmentation_transform, test_loader_no_aug

In [None]:
#train_set, train_loader, valid_loader, test_loader, test_image_transform, valid_image_transformation, labels_class, class_weights, no_augmentation_transform, test_loader_2 = preprocessing()

device = torch.device('cuda') if torch.cuda.is_available else torch.device('cpu')

In [None]:
# visualize some of the training data and testing validing data
def show_data(dataloader):
    for data in dataloader:
        images, labels = data
        bs, ncrops, c, h, w = images.size()
        images = images.view(-1, c, h, w)
        fig, ax = plt.subplots(figsize=(12, 12))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(torchvision.utils.make_grid(images[:100], nrow=8).permute(1,2,0))
        break

In [None]:
#print(len(train_loader))
show_data(train_loader)
show_data(valid_loader)


In [None]:
# output the hitogram for experimental results
data_dir = 'C:/Users/jixua/OneDrive/Desktop/Machine learning package'
x_dict = dict(collections.Counter(train_set.targets))
x = []
for key in x_dict:
    x.append(x_dict[key])
print(x)

x = pd.Series(x)
x_label = labels_class
plt.figure(figsize = (5, 5))
ax = x.plot(kind = 'bar')
ax.set_title('Training data set distribution')
#ax.set_xlabel('class')
ax.set_ylabel('number')
ax.set_xticklabels(x_label, rotation = 'horizontal')

rects = ax.patches

# label each accuracy
labels = x
for rect, label in zip(rects, labels):
    height = rect.get_height()
    ax.text(rect.get_x() + rect.get_width() / 10, height + 20, label)
plt.savefig('Comparison.png', facecolor = 'w')


In [None]:
# set up the model, using pytorch pretrained model, 3 choices (resnet50, vgg16, and densenet)
def model_set_up(name):
    device = torch.device('cuda') if torch.cuda.is_available else torch.device('cpu')
    #print(device)

    if name == 'resnet':
        net = models.resnet50(pretrained = True)
        num_fcs = net.fc.in_features
        net.fc = nn.Sequential(#nn.Dropout(0.5),
                               #nn.Flatten(),
                            nn.Linear(num_fcs, 4096),
                            nn.LeakyReLU(),
                            #nn.Dropout(0.5),
                            nn.Linear(4096, 1024),
                            #nn.BatchNorm1d(1024),
                            nn.LeakyReLU(),
                            #nn.Dropout(0.5),
                            nn.Linear(1024, 7))

        net.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 

    
    if name == 'vgg':
        net = models.vgg16_bn(pretrained = True)
        num_fcs = net.classifier[6]. in_features
        net.classifier[6] = nn.Linear(num_fcs, 7)
        net.features[0] = torch.nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
    
    if name == 'densenet':
        net = models.densenet161(pretrained=True)
        net.features[0] = torch.nn.Conv2d(1, 96, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        num_fcs = net.classifier.in_features
        net.classifier = nn.Linear(num_fcs, 7)
        
    if name == 'wide_resnet':
        net = models.wide_resnet50_2(pretrained = True)
        net.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False) 
        num_fcs = net.fc.in_features
        net.fc = nn.Linear(num_fcs, 7)
        
    if name == 'regnet':
        net = models.regnet_x_32gf(pretrained = True)
        net.stem[0] = torch.nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
        num_fcs = net.fc.in_features
        net.fc = nn.Linear(num_fcs, 7)
        
    if name == "efficientnet":
        net = models.efficientnet_b7(pretrained = True)
        net.features[0][0] = torch.nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        num_fcs = net.classifier[1].in_features
        net.classifier[1] = nn.Linear(num_fcs, 7)

        
    # unfreeze the base model
    for param in net.parameters():
        param.requires_grad = True
    
    # define criterion, optimizer
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0001, nesterov=True)
    
    criterion = nn.CrossEntropyLoss()
    #criterion = nn.NLLLoss() 
 
    # move the net to gpu
    net = net.to(device)
    
    print(net)
    
    return net, optimizer, criterion, device

In [None]:
net, optimizer, criterion, device = model_set_up('vgg')
#sum(p.numel() for p in net.parameters() if p.requires_grad)

In [None]:
from torchinfo import summary
# get the summary of the model
summary(net, input_size=(256, 1, 48, 48))

  $x = \lambda x_{i} + (1 - \lambda)x_{j}$
  
  $y = \lambda y_{i} + (1 - \lambda)y_{j}$

In [None]:
def label_smooth(true_labels: torch.Tensor, classes: int, smoothing=0.0):
    #if smoothing == 0, it's one-hot method
    #if 0 < smoothing < 1, it's smooth method
    
    #print(f"true: {true_labels}")
    device = true_labels.device
    true_labels = torch.nn.functional.one_hot(true_labels, classes).detach().cpu()
    assert 0 <= smoothing < 1
    confidence = 1.0 - smoothing
    label_shape = torch.Size((true_labels.size(0), classes))
    with torch.no_grad():
        true_dist = torch.empty(size=label_shape, device=true_labels.device)
        true_dist.fill_(smoothing / (classes - 1))
        _, index = torch.max(true_labels, 1)

        true_dist.scatter_(1, torch.LongTensor(index.unsqueeze(1)), confidence)
    #print(f"smooth: {true_dist}")
    return true_dist.to(device)

# mixup paper reference: https://doi.org/10.48550/arXiv.1710.09412
def mixup_data(x, y, alpha=0.2):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
        
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).cuda()

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [None]:
def train_model(net, epochs, optimizer, criterion, name):
    scaler = GradScaler()
    criterion = nn.CrossEntropyLoss()
    
    #scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=10, min_lr=0.00001, verbose=1)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = epochs)
    
    train_losses = []
    val_losses = []
    train_accuracy = []
    val_accuracy = []
    best_accuracy = 0.0
    for epoch in range(epochs):
        net.train()
        running_loss_train = 0.0
        running_loss_val = 0.0
        val_total = 0.0
        train_total = 0.0
        val_correct = 0.0
        train_correct = 0.0
        for i, data in enumerate(train_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Automatic Mixed Precision
            with autocast():
                # according to pytorch documentation, fivecrop creates more dimension
                bs, ncrops, c, h, w = inputs.shape
                inputs = inputs.view(-1, c, h, w)
                
                # repeat the labels tensor to fit inputs crop
                labels = torch.repeat_interleave(labels, repeats = ncrops, dim = 0) 


                inputs, labels_a, labels_b, lam = mixup_data(inputs, labels, 0.2)
                inputs, labels_a, labels_b = map(Variable, (inputs, labels_a, labels_b))

                outputs = net(inputs)

                soft_labels_a = label_smooth(labels_a, classes = 7, smoothing = 0.1)
                soft_labels_b = label_smooth(labels_b, classes = 7, smoothing = 0.1)
        
                # use CrossEntropyLoss if no mix-up
                #loss = nn.CrossEntropyLoss()(outputs, labels)
                
                # use mixup loss function if there is mix-up
                loss = mixup_criterion(criterion, outputs, soft_labels_a, soft_labels_b, lam)
                
                # nivdia Automatic Mixed Precision
                optimizer.zero_grad()
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()

                running_loss_train += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                
                train_total += labels.size()[0]
                train_correct += torch.sum(preds == labels.data).item()
            
        train_losses.append(running_loss_train / len(train_loader.sampler))
        train_accuracy.append(train_correct / train_total)
        
        with torch.no_grad():
            net.eval()
            for val_data in valid_loader:
                val_inputs, val_labels = val_data
                val_inputs = val_inputs.cuda()
                val_labels = val_labels.cuda()
                
                bs, ncrops_val, c, h, w = val_inputs.shape
                val_inputs = val_inputs.view(-1, c, h, w)
                
                prediction = net(val_inputs)
                
                # average over crops
                prediction = prediction.view(bs, ncrops_val, -1)
                prediction = torch.sum(prediction, dim = 1) / ncrops_val
                
                val_loss = criterion(prediction, val_labels)
                
                
                running_loss_val += val_loss.item() * val_inputs.size(0)
                _, predicted = torch.max(prediction, 1)
                val_total += val_labels.size()[0]
                val_correct += torch.sum(predicted == val_labels.data).item()
            
        val_losses.append(running_loss_val / len(valid_loader.sampler))
        val_accuracy.append(val_correct / val_total)
        
        curr_lr = optimizer.param_groups[0]['lr']
        scheduler.step()
        
        # divide loss by the number of crops to get average loss
        print(f'Epoch {epoch + 1} \t \
            Training Loss: {running_loss_train / len(train_loader.sampler)} \t \
            Validation Loss: {running_loss_val / len(valid_loader.sampler)} \t \
            Training Accuracy: {train_correct / train_total} \t \
            Validation Accuracy: {val_correct / val_total} \t \
            LR: {curr_lr}\n' )
        

        # save the best model
        if (val_correct / val_total) > best_accuracy:
            torch.save(net, f'C:/Users/jixua/OneDrive/Desktop/Machine learning package/{name}')
            best_accuracy = val_correct / val_total
            
    # learning curve
    #fig, ax = plt.subplots()
    #plt.figure(figsize = (10,4))
    fig = plt.figure(figsize = (10,8))
    
    ax1 = fig.add_subplot(221)
    ax1.plot(train_losses, 'r', label = 'train_loss')
    ax1.plot(val_losses, 'g', label = 'val_loss')
    ax1.legend(loc = "upper right")
    ax1.set_title("Learning curve")

    # accuracy curve
    ax2 = fig.add_subplot(223)
    ax2.plot(train_accuracy, 'r', label = 'train_accuracy')
    ax2.plot(val_accuracy, 'g', label = 'val_accuracy')
    ax2.legend(loc = "upper right")
    ax2.set_title("Accuracy curve")
    #plt.show()
    
    # save the model
    #torch.save(net, 'C:/Users/jixua/OneDrive/Desktop/Machine learning package/resnet50')
    
    # save the plot
    fig.savefig(f"curve_{name}.pdf")
    return net

In [None]:
model = train_model(net, 150, optimizer, criterion, 'efficientnet_tmax150')

In [None]:
# caclculate accuracy on test loader
def accuracy(model, device, crop, testloader = test_loader):
    
    model = model.to(device)
    model.eval()
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for data in testloader:
            
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            
            if crop == True:
                bs, ncrops, c, h, w = images.shape
                images = images.view(-1, c, h, w)
            
            outputs = model(images)
            
            if crop == True:
                outputs = outputs.view(bs, ncrops, -1)
                outputs = torch.sum(outputs, dim=1) / ncrops
            
            _, outputs = torch.max(outputs.data, 1)
            test_total += labels.size()[0]
            test_correct += torch.sum(outputs == labels.data).item()
            
    print(f'correct: {test_correct}')
    print(f'total: {test_total}')       
    print(f'Accuracy of the network on the {test_total} testing images: {100 * test_correct / test_total} %')
    return (test_correct / test_total)

In [None]:
# load model
trained_model = torch.load('resnet50_elite_test2_tmax150')
accuracy1 = accuracy(trained_model, device, True, test_loader)
#print(accuracy1)

In [None]:
# this function is only for our testing purpose
def test(model):
    model = model.to(device)
    # testing on new images, maybe transform - special_case_dataloader
    image = Image.open('C:/Users/jixua/OneDrive/Desktop/Machine learning package/archive/special_case/happy/zhuzhu_box.jpg')
    image = image.convert("RGB")
    image_tensor = test_image_transform(image)
    image_variable = Variable(image_tensor.unsqueeze(0))
    image_variable = image_variable.to(device)
    bs, ncrops, c, h, w = image_variable.shape
    #print(image_variable.shape)
    image_variable = image_variable.view(-1, c, h, w)
    #print(image_variable.shape)

    outputs = model(image_variable)
    outputs = outputs.view(bs, ncrops, -1)
    outputs = torch.sum(outputs, dim=1) / ncrops
    
    # because the last layer is linear 
    h_x = torch.nn.functional.softmax(outputs, dim = 1).data.squeeze()
    probs, idx = h_x.sort(0, True)
    #print(idx[0])
    
    display(image)
    print('predicted classes: {}. probability:{:.3f}'.format(labels_class[idx[0].item()], probs[0]))

In [None]:
test(torch.load("ensemble_model6_tmax150"))

In [None]:
# visualize some results
def visualize(model, num_images = 10):
    model = model.to(device)
    # inverse transform
    images_number = 0
    #inverse_transform = test_image_transform
    fig = plt.figure()
    model.eval()
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            bs, ncrops, c, h, w = images.shape
            images = images.view(-1, c, h, w)
            
            prediction = model(images)
            
            #prediction = prediction.view(bs, ncrops, -1)
            #prediction = torch.sum(prediction, dim=1) / ncrops
            # uncomment this if using softmax activation
            #prediction = torch.exp(prediction)
            
            prediction = torch.nn.functional.softmax(prediction,dim = 1)
            _, predicted = torch.max(prediction, 1)
            prediction_label = torch.max(prediction).item()
            #prediction_label = round(prediction_label, 5)
            
            for i in range(images.size(0)):
                images_number += 1
                ax = plt.subplot(5, 2, images_number)
                ax.axis('off')
                #ax.set_title(f'prediced: {labels_class[predicted[i]]} {prediction_label * 100}%')
                ax.set_title(f'predicted: {labels_class[predicted[i]]}')
                #print(images.cpu().data[i])
                plt.imshow(images.cpu().data[i].permute(1,2,0))
                #plt.imshow(images_inverse.numpy())              
                # 0<= images_number <=  num_images, break when it's out range
                if images_number == num_images:
                    return
                

In [None]:
model = torch.load('ensemble_model5_2_tmax150')
visualize(model)

In [None]:
# does not work on tencrop
# reference: https://medium.com/intelligentmachines/implementation-of-class-activation-map-cam-with-pytorch-c32f7e414923
features_blobs = []
def hook_feature(module, input, output):
    features_blobs.append(output.data.cpu().numpy())

def CAM(features_conv, weight_softmax, class_idx):
    size_unsample = (256, 256)
    bz, nc, h, w = features_conv.shape
    output_cam = []
    for _ in class_idx:
        cam = weight_softmax[class_idx].dot(features_conv.reshape((nc, h*w)))
        cam = cam.reshape(h, w)
        cam = cam - np.min(cam)
        cam_img = cam / np.max(cam)
        
        cam_img = np.uint8(255 * cam_img)
        output_cam.append(cv2.resize(cam_img, size_unsample))
    return output_cam

In [None]:
# reference: https://notebook.community/
def draw_CAM(model, path, index, valid_transform):
    finalconv_name = 'layer4' # subject to change(depend on the network architecture)
    model = model.to('cpu') # you can change to gpu if you have memory left 
    model.eval()
    
    model._modules.get(finalconv_name).register_forward_hook(hook_feature)
    
    # get the softmax weight
    params = list(model.parameters())
    weight_softmax = np.squeeze(params[-2].data.cpu().numpy())
    
    # process the image and run through the network, return the highest probability and label_class index
    image = Image.open(path)
    image = image.convert("RGB")
    image_tensor = valid_transform(image)
    image_variable = Variable(image_tensor.unsqueeze(0))
    
    #bs, ncrops, c, h, w = image_variable.shape
    #image_variable = image_variable.view(-1, c, h, w)
    
    image_variable = image_variable.to('cpu')

    
    logit = model(image_variable)
    
    #logit = logit.view(bs, ncrops, -1)
    #logit = torch.sum(logit, dim=1) / ncrops
    
    h_x = torch.nn.functional.softmax(logit, dim = 1).data.squeeze()
    probs, idx = h_x.sort(0, True)
    
    # only formatting here does the correct rounding
    print('predicted classes: {}. probability:{:.3f}'.format(labels_class[idx[0].item()], probs[0])) 
    
    img = cv2.imread(path)
    height, width, _ = img.shape
    CAMs = CAM(features_blobs[0], weight_softmax, [idx[0].item()])

    heatmap = cv2.applyColorMap(cv2.resize(CAMs[0], (width, height)), cv2.COLORMAP_JET)
    result = heatmap * 0.4 + img * 0.5
    cv2.imwrite(f'CAM/CAM{index}.jpg', result)
    
    plt.imshow(Image.open(f'CAM/CAM{index}.jpg'))
    plt.axis('off')
    plt.show()

In [None]:
draw_CAM(torch.load('resnet50'), 'C:/Users/jixua/OneDrive/Desktop/Machine learning package/test_data/private/happy/PrivateTest_2028370.jpg', 5, no_augmentation_transform)

In [None]:
from statistics import mode
# caclculate accuracy on test loader
def max_voting(model1, model2, model3, model4, device, crop):
    
    model1 = model1.to(device)
    model1.eval()
    model2 = model2.to(device)
    model2.eval()
    model3 = model3.to(device)
    model3.eval()
    model4 = model4.to(device)
    model4.eval()
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for data in test_loader:
            
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            
            if crop == True:
                bs, ncrops, c, h, w = images.shape
                images = images.view(-1, c, h, w)
            
            outputs1 = model1(images)
            outputs2 = model2(images)
            outputs3 = model3(images)
            outputs4 = model4(images)
            if crop == True:
                outputs1 = outputs1.view(bs, ncrops, -1)
                outputs1 = torch.sum(outputs1, dim=1) / ncrops
                outputs2 = outputs2.view(bs, ncrops, -1)
                outputs2 = torch.sum(outputs2, dim=1) / ncrops
                outputs3 = outputs3.view(bs, ncrops, -1)
                outputs3 = torch.sum(outputs3, dim=1) / ncrops
                outputs4 = outputs4.view(bs, ncrops, -1)
                outputs4 = torch.sum(outputs4, dim=1) / ncrops
            
            _, outputs1 = torch.max(outputs1.data, 1)
            _, outputs2 = torch.max(outputs2.data, 1)
            _, outputs3 = torch.max(outputs3.data, 1)
            _, outputs4 = torch.max(outputs4.data, 1)
            
            final_output = [outputs1, outputs2, outputs3, outputs4]
            final_output = mode(final_output) 
            #final_output = (outputs1 + outputs2 + outputs3)/3
            test_total += labels.size()[0]
            test_correct += torch.sum(final_output == labels.data).item()
    print(f'correct: {test_correct}')
    print(f'total: {test_total}')       
    print(f'Accuracy of the network on the {test_total} testing images: {100 * test_correct / test_total} %')
    return (test_correct / test_total)

In [None]:
# nvidia autocast scaler does not work when require_grad is false
def train_model_no_autocast(net, epochs, name):
    device = 'cuda'
    # define criterion, optimizer
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0001, nesterov=True)
    criterion = nn.CrossEntropyLoss()
    
    #scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=10, min_lr=0.00001, verbose=1)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = epochs)
    
    train_losses = []
    val_losses = []
    train_accuracy = []
    val_accuracy = []
    best_accuracy = 0.0
    for epoch in range(epochs):
        net.train()
        running_loss_train = 0.0
        running_loss_val = 0.0
        val_total = 0.0
        train_total = 0.0
        val_correct = 0.0
        train_correct = 0.0
        for i, data in enumerate(train_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Automatic Mixed Precision
            # according to pytorch documentation, fivecrop creates more dimension
            bs, ncrops, c, h, w = inputs.shape
            inputs = inputs.view(-1, c, h, w)
                
            # repeat the labels tensor to fit inputs crop
            labels = torch.repeat_interleave(labels, repeats = ncrops, dim = 0) 


            inputs, labels_a, labels_b, lam = mixup_data(inputs, labels, 0.2)
            inputs, labels_a, labels_b = map(Variable, (inputs, labels_a, labels_b))


            outputs = net(inputs)  

            soft_labels_a = label_smooth(labels_a, classes = 7, smoothing = 0.1)
            soft_labels_b = label_smooth(labels_b, classes = 7, smoothing = 0.1)
        
            # use CrossEntropyLoss if no mix-up
            #loss = nn.CrossEntropyLoss()(outputs, labels)
                
            # use mixup loss function if there is mix-up
            loss = mixup_criterion(criterion, outputs, soft_labels_a, soft_labels_b, lam)
                
            # nivdia Automatic Mixed Precision
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss_train += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
                
            train_total += labels.size()[0]
            train_correct += torch.sum(preds == labels.data).item()
            
        train_losses.append(running_loss_train / len(train_loader.sampler) /5)
        train_accuracy.append(train_correct / train_total)
        
        with torch.no_grad():
            net.eval()
            for val_data in valid_loader:
                val_inputs, val_labels = val_data
                val_inputs = val_inputs.cuda()
                val_labels = val_labels.cuda()
                
                bs, ncrops_val, c, h, w = val_inputs.shape
                val_inputs = val_inputs.view(-1, c, h, w)
                
                prediction = net(val_inputs)
                
                # average over crops
                prediction = prediction.view(bs, ncrops_val, -1)
                prediction = torch.sum(prediction, dim = 1) / ncrops_val
                
                val_loss = criterion(prediction, val_labels)
                
                
                running_loss_val += val_loss.item() * val_inputs.size(0)
                _, predicted = torch.max(prediction, 1)
                val_total += val_labels.size()[0]
                val_correct += torch.sum(predicted == val_labels.data).item()
            
        val_losses.append(running_loss_val / len(valid_loader.sampler) /10)
        val_accuracy.append(val_correct / val_total)
        
        curr_lr = optimizer.param_groups[0]['lr']
        scheduler.step()
        
        # divide loss by the number of crops to get average loss
        print(f'Epoch {epoch + 1} \t \
            Training Loss: {running_loss_train / len(train_loader.sampler) / 5} \t \
            Validation Loss: {running_loss_val / len(valid_loader.sampler) /10} \t \
            Training Accuracy: {train_correct / train_total} \t \
            Validation Accuracy: {val_correct / val_total} \t \
            LR: {curr_lr}\n' )
        

        # save the best model
        if (val_correct / val_total) > best_accuracy:
            torch.save(net, f'C:/Users/jixua/OneDrive/Desktop/Machine learning package/{name}')
            best_accuracy = val_correct / val_total
            
    # learning curve
    #fig, ax = plt.subplots()
    #plt.figure(figsize = (10,4))
    fig = plt.figure(figsize = (10,8))
    
    ax1 = fig.add_subplot(221)
    ax1.plot(train_losses, 'r', label = 'train_loss')
    ax1.plot(val_losses, 'g', label = 'val_loss')
    ax1.legend(loc = "upper right")
    ax1.set_title("Learning curve")

    # accuracy curve
    ax2 = fig.add_subplot(223)
    ax2.plot(train_accuracy, 'r', label = 'train_accuracy')
    ax2.plot(val_accuracy, 'g', label = 'val_accuracy')
    ax2.legend(loc = "upper right")
    ax2.set_title("Accuracy curve")
    #plt.show()
    
    # save the model
    #torch.save(net, 'C:/Users/jixua/OneDrive/Desktop/Machine learning package/resnet50')
    
    # save the plot
    fig.savefig(f"curve_{name}.pdf")
    return net

In [None]:
model1 = torch.load('resnet50_elite_test2_tmax150')
model2 = torch.load('vgg16_elite_test_tmax150')
model3 = torch.load('densenet_elite_test_tmax150')
model4 = torch.load('wide_resnet')
model5 = torch.load('regnet_tmax150')
model6 = torch.load('efficientnet_tmax150')
#accuracy = max_voting(model1, model2, model3, device, True)
#accuracy = max_voting(model1, model2, model3, model4, device, True)

In [None]:
class MyEnsemble(nn.Module):
    def __init__(self, model1, model2, model3, model4, num_classes=7):
        super(MyEnsemble, self).__init__()
        self.model1 = model1
        self.model2 = model2
        self.model3 = model3
        self.model4 = model4
        self.model5 = model5
        self.model6 = model6
        # Remove last linear layer
        self.model1.fc = nn.Identity()
        self.model2.fc = nn.Identity()
        self.model3.fc = nn.Identity()
        self.model4.fc = nn.Identity()
        self.model5.fc = nn.Identity()
        self.model6.fc = nn.Identity()
        # Create new classifier
        #self.classifier = nn.Linear(4110, num_classes)
        self.classifier = nn.Sequential(#nn.Dropout(0.5),
                               #nn.Flatten(),
                            nn.Linear(6637, 4096),
                            nn.LeakyReLU(),
                            #nn.Dropout(0.5),
                            nn.Linear(4096, 1024),
                            #nn.BatchNorm1d(1024),
                            nn.LeakyReLU(),
                            #nn.Dropout(0.5),
                            nn.Linear(1024, 7))
        
    def forward(self, x):
        x1 = self.model1(x.clone())  # clone to make sure x is not changed by inplace methods
        x1 = x1.view(x1.size(0), -1)
        x2 = self.model2(x)
        x2 = x2.view(x2.size(0), -1)
        x3 = self.model3(x)
        x3 = x3.view(x3.size(0), -1)
        x4 = self.model4(x)
        x4 = x4.view(x4.size(0), -1)
        x5 = self.model5(x)
        x5 = x5.view(x5.size(0), -1)
        x6 = self.model6(x)
        x6 = x6.view(x6.size(0), -1)
        
        x = torch.cat((x1, x2, x3, x4, x5, x6), dim=1)
        x = self.classifier(F.relu(x))
        return x





In [None]:
# Freeze these models
for param in model1.parameters():
    param.requires_grad = False
    
for param in model2.parameters():
    param.requires_grad = False
    
for param in model3.parameters():
    param.requires_grad = False
    
for param in model4.parameters():
    param.requires_grad = False
    
for param in model5.parameters():
    param.requires_grad = False
    
for param in model6.parameters():
    param.requires_grad = False
# Create ensemble model
model = MyEnsemble(model1, model2, model3, model4, model5, model6)
model = model.to(device)


model = train_model_no_autocast(model, 50, 'ensemble_model6_tmax150')

In [None]:
#model = torch.load('ensemble_model3_tmax150')
#accuracy_ensemble = accuracy(model, device, True)
#accuracy(torch.load("resnet50_elite_test7"), device, False, test_loader_no_aug)
accuracy_resnet50_all = accuracy(torch.load("ensemble_model6_tmax150"), device, True)
accuracy_test = accuracy(torch.load("resnet50_elite_test2_tmax150"), device, True)

In [None]:
# output the hitogram for different models comparison
# x is a list
torch.cuda.empty_cache()


accuracy_resnet50_all = accuracy(torch.load("resnet50_elite_test2_tmax150"), device, True)
#accuracy_resnet50_no_smoothing = accuracy(torch.load("resnet50_elite_test4"), device, True)
accuracy_resnet50_crop = accuracy(torch.load("resnet50_elite_test5"), device, True)
accuracy_resnet50_no_mixup_smoothing = accuracy(torch.load("resnet50_elite_test6"), device, True)
accuracy_resnet50_no_augmentation_cosine = accuracy(torch.load("resnet50_elite_test7"), device, False, test_loader_no_aug)
accuracy_resnet50_no_augmentation_reduce = accuracy(torch.load("resnet50_elite_test8"), device, False, test_loader_no_aug)
accuracy_resnet50_class_weight = accuracy(torch.load("resnet_elite_INS"), device, True)
accuracy_vgg16 = accuracy(torch.load("vgg16_elite_test_tmax150"), device, True)
accuracy_densenet = accuracy(torch.load("densenet_elite_test_tmax150"), device, True)
accuracy_wide_resnet = accuracy(torch.load("wide_resnet"), device, True)
accuracy_ensemble = accuracy(torch.load("ensemble_model3_tmax150"), device, True)
# accuracy_ensemble_model3 = accuracy(torch.load("ensemble_model2", device, True))

In [None]:
class MyEnsemble_3(nn.Module):
    def __init__(self, model1, model2, model3, model4, num_classes=7):
        super(MyEnsemble, self).__init__()
        self.model1 = model1
        self.model2 = model2
        self.model3 = model3

        # Remove last linear layer
        self.model1.fc = nn.Identity()
        self.model2.fc = nn.Identity()
        self.model3.fc = nn.Identity()

        # Create new classifier
        self.classifier = nn.Linear(2062, num_classes)
        
    def forward(self, x):
        x1 = self.model1(x.clone())  # clone to make sure x is not changed by inplace methods
        x1 = x1.view(x1.size(0), -1)
        x2 = self.model2(x)
        x2 = x2.view(x2.size(0), -1)
        x3 = self.model3(x)
        x3 = x3.view(x3.size(0), -1)
        #x5 = self.model5(x)
        #x5 = x5.view(x5.size(0), -1)
        
        x = torch.cat((x1, x2, x3), dim=1)
        x = self.classifier(F.relu(x))
        return x

In [None]:
# we decided to use excel instead :)
accuracy_dist_1 = [accuracy_resnet50_no_augmentation_reduce, accuracy_resnet50_no_augmentation_cosine, 
                accuracy_resnet50_no_mixup_smoothing, accuracy_resnet50_crop, accuracy_resnet50_all, 
                accuracy_resnet50_class_weight, accuracy_vgg16, accuracy_densenet, accuracy_wide_resnet, 
                accuracy_ensemble]
accuracy_dist = pd.Series(accuracy_dist_1)

x_label = ["resnet50 reduce LR", "resnet50 cosine annealing", "+ augmentation", "+ N-Crop", "+ mixup and smoothing", " + loss weight","vgg16", "densenet161", "wide_resnet50", "ensemble model"]

plt.figure(figsize = (7, 7))
ax = accuracy_dist.plot(kind = 'line')
ax.set_title('Final models Comparison')
#ax.set_xlabel('class')
ax.set_ylabel('Accuracy')
ax.set_xticks(accuracy_dist)
ax.set_xticklabels(x_label, rotation = 30, horizontalalignment='right')

rects = ax.patches

# label each accuracy
#labels = accuracy_dist
#for rect, label in zip(rects, labels):
    #height = rect.get_height()
    #ax.text(rect.get_x() + rect.get_width() / 6, height+0.01, round(label,3))
    
plt.tight_layout()
plt.savefig('Comparison_result.png', facecolor = 'w')

In [None]:
# plot the confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd

torch.cuda.empty_cache()
model = torch.load("ensemble_model6_tmax150")
classes = labels_class
len_class = len(classes)
confusion_matrix = np.zeros((len_class, len_class))

# iterate over all test data
for data in test_loader:
            
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            
        
            bs, ncrops, c, h, w = images.shape
            images = images.view(-1, c, h, w)
            
            outputs = model(images)
            
           
            outputs = outputs.view(bs, ncrops, -1)
            outputs = torch.sum(outputs, dim=1) / ncrops
            _, outputs = torch.max(outputs.data, 1)
            for t, p in zip(labels.view(-1), outputs.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1




In [None]:
# build the confusion matrix
df_cm = pd.DataFrame(confusion_matrix, index = [i for i in classes], columns = [i for i in classes])
plt.figure(figsize = (15,10))
sn.heatmap(df_cm, annot=True)
plt.xlabel("Predicted label")
plt.ylabel("True label")
plt.savefig('confusion_matrix.jpg')