## Importing libraries

Baisc

In [1]:
import time
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

Torch

In [2]:
# From torch
import torch
## nn
import torch.nn as nn
import torch.nn.functional as F
## optim
import torch.optim as optim
from torch.optim import lr_scheduler
## utils
from torch.utils.data import random_split
from torch.utils.data.dataset import Dataset
## torchvision
import torchvision
from torchvision import datasets, models, transforms

SKLearn

In [3]:
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

  import pandas.util.testing as tm


## Loading and transformation

#### General

In [5]:
cls2id = {"Happy": 0, "Sad": 1, "Fear": 2}
id2cls = ["Happy", "Sad", "Fear"]

BATCHSIZE = 10
PATH = "../data/aithon2020_level2_traning.csv"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#### Definin functions

In [6]:
def load_data(PATH):
    data     = pd.read_csv(PATH)
    try:
        labels   = data["emotion"]
        data     = data.drop(["emotion"], axis = 1)
    except:
        labels = None
    images   = np.array(data.values).reshape(len(data.values), 48, 48)
    images   = images/255
    return images, labels
    
def loader(PATH):
    images, labels = load_data(PATH)
    images = torch.tensor(images)
    images = images.view(images.shape[0], -1, images.shape[1], images.shape[2])

    if labels is not None:
        target = []
        for label in labels.values:
            target.append(cls2id[label])
        target = torch.tensor(target)
    else:
        target = None
    
    return images, target

def data_split(X, Y, test_size, shuffle = True):
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size = test_size, shuffle = shuffle)
    return(X_train, X_test, Y_train, Y_test)

def create_batch(X, Y, batch_size = 1):
    batch_x = [X[i: i + batch_size] for i in range(0, len(X), batch_size)]
    batch_y = [Y[i: i + batch_size] for i in range(0, len(Y), batch_size)] 
    return list(zip(batch_x, batch_y))

#### Main

In [11]:
# train_length      = round(0.7 * datalen)
# test_length       = round(0.3 * datalen)

## Loading images
images, target = loader(PATH)
## Train test split
train_X, test_X, train_Y, test_Y = data_split(images, target, test_size = 0.3) 
## Train loader 
trainloader = create_batch(train_X, train_Y, batch_size = BATCHSIZE)
## Test loader
testloader = create_batch(test_X, test_Y, batch_size = BATCHSIZE)

TypeError: take(): argument 'index' (position 1) must be Tensor, not numpy.ndarray

In [9]:
target

tensor([2, 2, 1,  ..., 2, 1, 1])

In [12]:
images

tensor([[[[0.9059, 0.8314, 0.6118,  ..., 0.1725, 0.1059, 0.0627],
          [0.8980, 0.6863, 0.5804,  ..., 0.1059, 0.1373, 0.1059],
          [0.8392, 0.6118, 0.6157,  ..., 0.1098, 0.0863, 0.1098],
          ...,
          [0.9451, 0.9608, 0.9804,  ..., 0.2235, 0.3961, 0.5725],
          [0.9647, 0.9804, 0.9882,  ..., 0.3059, 0.4118, 0.6353],
          [0.9804, 0.9843, 0.9804,  ..., 0.3451, 0.4314, 0.5961]]],


        [[[0.2157, 0.2157, 0.2157,  ..., 0.4549, 0.3882, 0.2902],
          [0.2157, 0.2157, 0.2157,  ..., 0.4941, 0.4510, 0.3529],
          [0.2157, 0.2157, 0.2157,  ..., 0.5020, 0.5333, 0.4471],
          ...,
          [0.2196, 0.2196, 0.2196,  ..., 0.1176, 0.1216, 0.1529],
          [0.2235, 0.2235, 0.2235,  ..., 0.1255, 0.1216, 0.2000],
          [0.2235, 0.2235, 0.2235,  ..., 0.1333, 0.1176, 0.2235]]],


        [[[0.0784, 0.0667, 0.0745,  ..., 0.7333, 0.6902, 0.6353],
          [0.0863, 0.0667, 0.0667,  ..., 0.7647, 0.7059, 0.6706],
          [0.0667, 0.0667, 0.0706,  ..

## Models

Used 3 types of model

    1. RESNET101
    2. VGG19
    3. XGBOOST

Finally used bagging to ensemble those models

#### Defining functions

Gradient freezer

1. RESNET101

In [11]:
class RESNET(nn.Module):
    def __init__(self, criterion = None, optimizer = None, learning_rate = 0.001, image_dimention = 1, categories = 3):
        super(RESNET, self).__init__()
        ## Defining networt
         # Defaulf input image dimention is 1
         # Default output categories is 3
        self.pretrained = models.resnet101(pretrained = True)
        self.pretrained.conv1 = nn.Conv2d(image_dimention, 64, kernel_size = (3, 3), stride=(2,2), padding=(3,3), bias=False)
        num_ftrs = self.pretrained.fc.in_features
        self.pretrained.fc = nn.Linear(num_ftrs, categories)
        
        ## Defining optimizer and loss function
         # Default loss function is cross entropy
         # Default optimizer is SGD
         # Default learning rate is 0.001
        if criterion:
            self.criterion = criterion
        else:
            self.criterion = nn.CrossEntropyLoss()
        if optimizer:
            self.optimizer = optimizer
        else:
            self.optimizer = optim.SGD(self.pretrained.parameters(), lr = learning_rate, momentum = 0.9)
        
    def forward(self, x):
        x = self.pretrained.forward(x)
        return x
        
    def train(self, traindata, valdata = None, numberEpoch = 10, DEBUG = True):
        trainlen = sum(list(batch[0].shape[0] for batch in traindata))
        total_batch = len(traindata)
        ## Loop over the dataset multiple times
        for epoch in range(numberEpoch): 
            running_corrects = 0.0
            running_loss     = 0.0
            if DEBUG:
                pbar = tqdm(enumerate(traindata, 0), total = total_batch, desc = "Loss 0, Completed", ncols = 800)
            if not DEBUG:
                pbar = enumerate(traindata, 0)
            for count, data in pbar:
                inputs, labels = data[0].to(device), data[1].to(device)
                batch  = inputs.shape[0]
                inputs = inputs.type(torch.cuda.FloatTensor)
                
                ## zero the parameter gradients
                self.optimizer.zero_grad()
                
                ## forward + backward + optimize
                outputs = self.forward(inputs)
                _, preds = torch.max(outputs, 1)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                
                ## Calculating statistics
                running_loss += loss.item() * batch
                running_corrects += torch.sum(preds == labels.data)
                
                ## Showing statistics
                if DEBUG:
                    pbar.set_description("Loss %.3f, Completed" %(running_loss/trainlen))
            if DEBUG:
                epoch_loss = running_loss/trainlen
                epoch_acc  = running_corrects/trainlen
                print('Epoch %d completed, average loss: %.3f, accuracy: %.3f' %(epoch + 1, epoch_loss, epoch_acc))
            
                if valdata:
                    val_loss, val_acc = self.evaluate(valdata)
                    print('Validation, average loss: %.3f, accuracy: %.3f' %(val_loss, val_acc))
                
    def evaluate(self, testdata):
        running_corrects = 0.0
        running_loss     = 0.0
        testlen = sum(list(batch[0].shape[0] for batch in testdata))
        for data in testdata:
            inputs, labels = data[0].to(device), data[1].to(device)
            batch  = inputs.shape[0]
            inputs = inputs.type(torch.cuda.FloatTensor)            
            ## Forward
            outputs = self.forward(inputs)
            _, preds = torch.max(outputs, 1)
            ## Loss and accuracy
            loss = self.criterion(outputs, labels)
            running_loss += loss.item() * batch
            running_corrects += torch.sum(preds == labels.data)
            
        loss = running_loss/testlen
        acc  = running_corrects/testlen
        return loss, acc
        
    def predict(self, testdata, ID = None):
        predicted_labels = []
        for data in testdata:
            inputs, labels = data[0].to(device), data[1].to(device)
            batch  = inputs.shape[0]
            inputs = inputs.type(torch.cuda.FloatTensor)            
            ## Forward
            outputs = self.forward(inputs)
            _, preds = torch.max(outputs, 1)
            predicted_labels += preds.tolist()
        if ID:
            return([ID[label] for label in predicted_labels])
        return predicted_labels

2. VGG19

In [12]:
class VGGNET(nn.Module):
    def __init__(self, criterion = None, optimizer = None, learning_rate = 0.001, image_dimention = 1, categories = 3):
        super(VGGNET, self).__init__()
        ## Defining networt
         # Defaulf input image dimention is 1
         # Default output categories is 3
        self.pretrained = models.vgg19(pretrained = True)
        self.pretrained.features[0] = nn.Conv2d(image_dimention, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        num_ftrs = self.pretrained.classifier[6].in_features
        self.pretrained.classifier[6] = nn.Linear(num_ftrs, categories)
        
        ## Defining optimizer and loss function
         # Default loss function is cross entropy
         # Default optimizer is SGD
         # Default learning rate is 0.001
        if criterion:
            self.criterion = criterion
        else:
            self.criterion = nn.CrossEntropyLoss()
        if optimizer:
            self.optimizer = optimizer
        else:
            self.optimizer = optim.SGD(self.pretrained.parameters(), lr = learning_rate, momentum = 0.9)
        
    def forward(self, x):
        x = self.pretrained.forward(x)
        return x
        
    def train(self, traindata, valdata = None, numberEpoch = 10, DEBUG = True):
        
        trainlen = sum(list(batch[0].shape[0] for batch in traindata))
        total_batch = len(traindata)
        ## Loop over the dataset multiple times
        for epoch in range(numberEpoch): 
            running_corrects = 0.0
            running_loss     = 0.0
            if DEBUG:
                pbar = tqdm(enumerate(traindata, 0), total = total_batch, desc = "Loss 0, Completed", ncols = 800)
            else:
                pbar = enumerate(traindata, 0)
            for count, data in pbar:
                inputs, labels = data[0].to(device), data[1].to(device)
                batch  = inputs.shape[0]
                inputs = inputs.type(torch.cuda.FloatTensor)
                
                ## zero the parameter gradients
                self.optimizer.zero_grad()
                
                ## forward + backward + optimize
                outputs = self.forward(inputs)
                _, preds = torch.max(outputs, 1)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                
                ## Calculating statistics
                running_loss += loss.item() * batch
                running_corrects += torch.sum(preds == labels.data)
                
                ## Showing statistics
                if DEBUG:
                    pbar.set_description("Loss %.3f, Completed" %(running_loss/trainlen))
            if DEBUG:
                epoch_loss = running_loss/trainlen
                epoch_acc  = running_corrects/trainlen
                print('Epoch %d completed, average loss: %.3f, accuracy: %.3f' %(epoch + 1, epoch_loss, epoch_acc))
            
                if valdata:
                    val_loss, val_acc = self.evaluate(valdata)
                    print('Validation, average loss: %.3f, accuracy: %.3f' %(val_loss, val_acc))
                
    def evaluate(self, testdata):
        running_corrects = 0.0
        running_loss     = 0.0
        testlen = sum(list(batch[0].shape[0] for batch in testdata))
        with torch.no_grad():
            for data in testdata:
                inputs, labels = data[0].to(device), data[1].to(device)
                batch  = inputs.shape[0]
                inputs = inputs.type(torch.cuda.FloatTensor)            
                ## Forward
                outputs = self.forward(inputs)
                _, preds = torch.max(outputs, 1)
                ## Loss and accuracy
                loss = self.criterion(outputs, labels)
                running_loss += loss.item() * batch
                running_corrects += torch.sum(preds == labels.data)
            
        loss = running_loss/testlen
        acc  = running_corrects/testlen
        return loss, acc
        
    def predict(self, testdata, ID = None):
        predicted_labels = []
        for data in testdata:
            inputs, labels = data[0].to(device), data[1].to(device)
            batch  = inputs.shape[0]
            inputs = inputs.type(torch.cuda.FloatTensor)            
            ## Forward
            outputs = self.forward(inputs)
            _, preds = torch.max(outputs, 1)
            predicted_labels += preds.tolist()
        if ID:
            return([ID[label] for label in predicted_labels])
        return predicted_labels

3. XGBOOST

In [13]:
model_XGB = XGBClassifier(max_depth = 3000)

## Training

RESNET

In [22]:
model_resnet = RESNET()
model_resnet = model_resnet.to(device)

Number of layers::  10
Layers Freezed  ::  0


In [59]:
model_resnet.train(trainloader, valdata = testloader, numberEpoch = 25, DEBUG = True)

VGG19

In [14]:
model_vgg = VGGNET()
model_vgg = model_vgg.to(device)

In [None]:
model_vgg.train(trainloader, valdata = testloader, numberEpoch = 25, DEBUG = False)

XGBoost

In [None]:
model_XGB.fit(train_X, train_Y)

## Prediction and bagging

In [56]:
## Resnet
prediction_resnet = model_resnet.predict(testloader)
## VGG
prediction_vgg    = model_vgg.predict(testloader)
## XGB
prediction_xgb    = model_XGB.predict(test_X)