In [None]:
### Some useful imports

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, Dataset, random_split
import torchvision
import torchvision.transforms as transforms
from torchvision.transforms import Compose, ToTensor, Normalize, RandomRotation, ToPILImage
from torchvision import models
import torch.optim as optim

import matplotlib.pyplot as plt
import numpy as np

# We tested the difference between accuracy_score and f1_score 
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import StratifiedShuffleSplit

!pip install pycm livelossplot
%pylab inline

from livelossplot import PlotLosses
from pycm import *

import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

In [None]:
# Unzip the data file in the google drive
!unzip -q "/content/gdrive/My Drive/acse-miniproject.zip"

In [None]:
def set_seed(seed):
    """
    Use this to set ALL the random seeds to a fixed value 
    and take out any randomness from cuda kernels.
    
    seed, the seed number.
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.benchmark = False  ##uses the inbuilt cudnn auto-tuner to find the fastest convolution algorithms. -
    torch.backends.cudnn.enabled   = False

    return True

# Set the random seed at the begining of code 
set_seed(42)

In [None]:
device = 'cpu'
if torch.cuda.device_count() > 0 and torch.cuda.is_available():
    print("Cuda installed! Running on GPU!")
    device = 'cuda'
else:
    print("No GPU available!")

# 1. get data

### Load classes

In [None]:
import json
import os
from os.path import join as pjoin
from scipy import misc
from PIL import Image

In [None]:
def load_mapping(fname):
    '''
    Load the classes from the json file.
    
    fname, the path of the json file.  
    '''
    with open(fname, mode="r") as f:
        folder_to_class = json.load(f)
    
    return folder_to_class

classes = load_mapping("./mapping.json")

In [None]:
def load_train_data(data_dir):
    '''
    Load the training data and its lables from the file directory.
    Count the number of gray images to see their proportion and 
    change them to 3-channel images.
    
    data_dir, the directory of training data.  
    '''
    count = 0
    gray_count = 0
    all_images = []
    labels = []
    tmp_label = 0
    
    for guy in os.listdir(data_dir):
        # Something in files but shouldn't be included
        if guy == '.DS_Store':
            continue
        
        # Set the corresponding labels for different images
        tmp_label = classes[guy]
        
        # Path of folder of each class
        person_dir_1 = pjoin(data_dir, guy)
        # Path of "images" folder of each class
        person_dir = pjoin(person_dir_1, "images")
        for i in os.listdir(person_dir):
            # Path of each image
            image_dir = pjoin(person_dir, i)
            img2 = Image.open(image_dir)
            img2=np.array(img2)
            
            # Process the gray-image
            if len(img2.shape) == 2:
                img2 = np.stack((img2, img2, img2), axis=-1)
                gray_count+=1
        
            all_images.append(img2)
            labels.append(tmp_label)
            count = count+1
    print(count)
    
    return count, all_images, gray_count, labels

class MiniDataset(Dataset):
    def __init__(self, data, targets, transform=None):
        """
        Initialize the arguments of a dataset.
        
        data (Tensor), a tensor containing the data e.g. images
        targets (Tensor), a tensor containing all the labels
        transform (callable, optional), optional transform to be applied
                on a sample.
        """
        self.data = data
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample, label = self.data[idx], self.targets[idx]

        if self.transform:
            sample = self.transform(sample)

        return sample, label

# taken from: https://discuss.pytorch.org/t/torch-utils-data-dataset-random-split/32209/4 (Thanks ptrblk!)
class Subset2Dataset(Dataset):
    def __init__(self, subset, transform=None):
        """
        Initialize the arguments of a sub dataset.
        
        subset (Tensor), a tensor containing the data 
        transform (callable, optional), optional transform to be applied
                on a sample.
        """
        self.subset = subset
        self.transform = transform
        
    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y
        
    def __len__(self):
        return len(self.subset)

In [None]:
# Get the images(np.array) and labels of training data
all_count, images, gray_count, labels = load_train_data("./train")

# See the proportion of gray images
prop = gray_count/all_count 

print("The total number of training data: %d\nThe proportion of gray images: %.3f"%(all_count,prop))

In [None]:
'''
# Calculate the mean and standard deviation of images
means = [(images[:, i, :, :]/255).mean() for i in range(3)]
std = [(images[:, i, :, :]/255).std() for i in range(3)]
'''

# Because above takes a lot of computation so we just save the values to use directly
means = [0.48024578664981976, 0.44807218089384243, 0.3975477478649683]
stds = [0.27698640690882403, 0.26906448510256065, 0.28208190621058304]

augs = [
        transforms.RandomHorizontalFlip(0.5),
        ### Transforms we decide not to use
        # transforms.RandomRotation(10),
        # transforms.RandomCrop(224, padding=4)
]

# Transforms used for training set
train_transform = Compose([
    ToPILImage(), 
    augs[0],
    # transforms.ColorJitter(brightness=0.3),
    # augs[1],
    # augs[2],
    transforms.Resize(224),
    ToTensor(),
    transforms.Normalize(means, stds)
])

# Transforms used for validation set and test set
val_transform = Compose([
    ToPILImage(),
    transforms.Resize(224),
    ToTensor(),
    transforms.Normalize(means, stds),
])

# The dataset containing all the training data
MiniTrain_train = MiniDataset(images, labels)

## Run the following cell only once for ***EACH RUNTIME***

In [None]:
# Split the whole training data to training, validation and test sets
train_size, val_size, test_size = 80000, 10000, 10000
train_subset, val_subset, test_subset = random_split(MiniTrain_train, [train_size, val_size, test_size])

# Do the corresponding transforms to the three subsets
train_set = Subset2Dataset(train_subset, transform=train_transform)
val_set   = Subset2Dataset(val_subset, transform=val_transform)
test_set  = Subset2Dataset(test_subset, transform=val_transform)

print("The length of training set: %d\n\
The length of validation set: %d\n\
The length of test set: %d\n"%(len(train_set), len(val_set), len(test_set)))

In [None]:
def load_test_data(data_dir):
    '''
    Load the test data from the file directory.
    Count the number of gray images to see their proportion and 
    change them to 3-channel images.
    
    data_dir, the directory of test data.  
    '''
    count = 0
    gray_count = 0
    all_images = []
    file_name = []

    # Path of "images" folder 
    person_dir = pjoin(data_dir, "images")
    for i in os.listdir(person_dir):
        # Path of each image
        image_dir = pjoin(person_dir, i)  
        
        # something in files but shouldn't be included
        if i == ".ipynb_checkpoints":
            continue
        tmp_file = i
        # Get the filenames of test images
        file_name.append(i.lower())
        
        img2 = Image.open(image_dir)
        img2=np.array(img2)

        # Process the gray-image
        if len(img2.shape) == 2:
            img2 = np.stack((img2, img2, img2), axis=-1)
            gray_count+=1

        all_images.append(img2)
        
        count = count+1
    print(count)
    
    return count, all_images, gray_count, file_name

In [None]:
# Get the images(np.array) and filenames of test data
all_count, test_images, gray_count, file_name = load_test_data("./test")
test_labels = torch.zeros([10000], dtype=torch.int32)

# The dataset containing all the test data
MiniTest = MiniDataset(test_images, test_labels, transform=val_transform)

In [None]:
def train(model, optimizer, criterion, data_loader):
    '''
    Train the model with provided training data.

    model, the neutral network
    optimizer, optimizer used to update the weight parameters
    criterion, criterion used to calculate the loss
    data_loader, the training dataloader
    '''
    model.train()
    train_loss, train_accuracy = 0, 0
    for X, y in data_loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        a2 = model(X)
        loss = criterion(a2, y)
        loss.backward()
        train_loss += loss*X.size(0)
        y_pred = F.log_softmax(a2, dim=1).max(1)[1]
        train_accuracy += f1_score(y.cpu().numpy(), y_pred.detach().cpu().numpy(),average='macro')*X.size(0)
        optimizer.step()  
        
    return train_loss/len(data_loader.dataset), train_accuracy/len(data_loader.dataset)
  
def validate(model, criterion, data_loader):
    '''
    Test the model with provided validation data.

    model, the neutral network
    criterion, criterion used to calculate the loss
    data_loader, the validation dataloader
    '''
    model.eval()
    validation_loss, validation_accuracy = 0., 0.
    for X, y in data_loader:
        with torch.no_grad():
            X, y = X.to(device), y.to(device)
            a2 = model(X)
            loss = criterion(a2, y)
            validation_loss += loss*X.size(0)
            y_pred = F.log_softmax(a2, dim=1).max(1)[1]
            validation_accuracy += f1_score(y.cpu().numpy(), y_pred.cpu().numpy(),average='macro')*X.size(0)
            
    return validation_loss/len(data_loader.dataset), validation_accuracy/len(data_loader.dataset)

def evaluate(model, data_loader):
    '''
    Evaluate the model with test data(no label) and output a .csv file.

    model, the trained model
    data_loader, the test dataloader
    '''
    model.eval()
    f = open('test_result.csv', 'w+')
    with f:
        fnames = ['Filename', 'Label']
        writer = csv.DictWriter(f, fieldnames=fnames)    
        writer.writeheader()
        i=0
        for X, y in data_loader:
            with torch.no_grad():
                X, y = X.to(device), y.to(device)
                a2 = model(X)
                y_pred = F.log_softmax(a2, dim=1).max(1)[1]
                for x in range(1000):
                    writer.writerow({'Filename' : file_name[i], 'Label': y_pred[x].item()}) 
                    i+=1

In [None]:
def train_model(model, momentum=0.5, weight_decay=0, lr=0.02, n_epochs=5, batch_size=64, test_batch_size=1000, best_accuracy=0, save_file=False, model_save_name=None):

    '''
    Train the model with training dataset and validate dataset.

    model, the neutral network
    momentum, the parameter that helps accelerate optimizer
    weight_deacy, the parameter of L2-Regularization
    lr, the learning rate of optimizer
    n_epochs, the number of epochs
    batch_size, the batch size of training set
    test_batch_size, the batch size of validation and test sets
    save_file, whether to save the model file of this training
    model_save_name, the path of file you want to save

    '''
    if save_file and not model_save_name:
        print("No file name specified.")
        return
        
    model = model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    criterion = nn.CrossEntropyLoss()

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0)
    validation_loader = DataLoader(val_set, batch_size=test_batch_size, shuffle=False, num_workers=0)

    liveloss = PlotLosses()
    for epoch in range(n_epochs):
        logs = {}
        train_loss, train_accuracy = train(model, optimizer, criterion, train_loader)

        logs['' + 'log loss'] = train_loss.item()
        logs['' + 'accuracy'] = train_accuracy.item()

        validation_loss, validation_accuracy = validate(model, criterion, validation_loader)
        logs['val_' + 'log loss'] = validation_loss.item()
        logs['val_' + 'accuracy'] = validation_accuracy.item()

        liveloss.update(logs)
        liveloss.draw()

        if save_file and validation_accuracy.item() > best_accuracy:
            best_accuracy = validation_accuracy.item()
            path = F"/content/gdrive/My Drive/{model_save_name}" 
            torch.save(model.state_dict(), path)
            print("Model %s saved (epoch: %s)" % (model_save_name, epoch + 1))
        
    return model

In [None]:
def test_model(model, test_batch_size=1000):
    '''
    Test the trained model with test dataset.

    model, the neutral network
    test_batch_size, the batch size of validation and test sets
    '''
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()

    test_loader = DataLoader(test_set, batch_size=test_batch_size, shuffle=False, num_workers=0)

    test_loss, test_accuracy = validate(model, criterion, test_loader)    
    print("Avg. Test Loss: %1.3f" % test_loss.item(), " Avg. Test Accuracy: %1.3f" % test_accuracy.item())
    print("")

In [None]:
import torchvision.models as models

# Import the model
resnet18 = models.resnet18(pretrained=True)
# Change the number of output classes
fc_in = resnet18.fc.in_features
resnet18.fc = nn.Linear(fc_in, 200)
resnet18.fc

In [None]:
# First 3 epochs for ResNet18
resnet18_try = train_model(resnet18, n_epochs=3)

In [None]:
test_model(resnet50)

In [None]:
import csv

eval_loader = DataLoader(MiniTest, batch_size=1000, shuffle=False, num_workers=4)
evaluate(resnet50, eval_loader)