# Importation

In [1]:
import os
import copy
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
from time import time
from sklearn import preprocessing
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score 
from matplotlib import pyplot as plt

import torchvision
from torchvision import models, transforms
from torchvision.io import read_image

import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import Dataset

# Global variables 

In [2]:
INPUT_SIZE = 100
BATCH_SIZE = 256
N_CLASS = 5

PATH_LABELS = "../input/classifonlyhanddataset/index_label.csv"
PATH_IMG = "../input/classifonlyhanddataset/output/output"

PATH_LABELS_VALID = "../input/classifonlyhanddataset/index_label_validation.csv"
PATH_IMG_VALID = "../input/classifonlyhanddataset/output_validation/output_validation"

# Data functions

In [3]:
class HandGestureDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, os.listdir(self.img_dir)[idx])
        image = read_image(img_path)
        label = self.img_labels.loc[self.img_labels["index"] == str("output/"+os.listdir(self.img_dir)[idx])]["label"].item()
        if self.transform:
            image = self.transform(image)
#         image = F.normalize(image, dim = 0)
        return image, label

In [4]:
def prepare_data_vgg(data_type):
    ## Parameters fitting vgg/imagenet
    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]

    ## pytorch transformer objects
    transformVGGTrain=transforms.Compose([
            transforms.ToPILImage(),
        
            transforms.ColorJitter(brightness=0.5, hue=0.5),
            transforms.RandomPerspective(distortion_scale=0.5, p=0.2),
            transforms.RandomAffine(degrees=(0, 5), translate=(0, 0.18), scale=(0.7, 1)),
#             transforms.RandomSolarize(threshold=192.0),
            transforms.RandomAdjustSharpness(sharpness_factor=3),
            transforms.RandomAutocontrast(),
            transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.01, 2)),
            transforms.RandomRotation(degrees=(-30, 30)),
        
            ## too much
#             transforms.RandomPosterize(bits=2),
#             transforms.RandomInvert(),
#             transforms.RandomEqualize(),
        
            transforms.Resize(size=(INPUT_SIZE, INPUT_SIZE)),
            transforms.ToTensor(),
#             transforms.Normalize(mean, std) ## test with and without
        ])
    transformVGGValid=transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize(size=(INPUT_SIZE, INPUT_SIZE)),
            transforms.ToTensor(),
#             transforms.Normalize(mean, std) ## test with and without
        ])

    if data_type == "custom":
        ## Custom dataset
        VGG_dataset_train = HandGestureDataset(PATH_LABELS, PATH_IMG, transformVGGTrain)
        VGG_dataset_valid = HandGestureDataset(PATH_LABELS_VALID, PATH_IMG_VALID, transformVGGValid)
        VGG_trainloader = torch.utils.data.DataLoader(VGG_dataset_train, batch_size=BATCH_SIZE, pin_memory=True, shuffle=True)
        VGG_validloader = torch.utils.data.DataLoader(VGG_dataset_valid, batch_size=BATCH_SIZE, pin_memory=True, shuffle=True)

    return VGG_trainloader, VGG_validloader

# Loading data into pytorch dataset and dataloader objects

In [5]:
VGG_trainloader, VGG_validloader = prepare_data_vgg("custom")

In [6]:
# for img in next(iter(VGG_trainloader)):
#     for i in img[:20]:
#         i = i.permute(1,2,0)
#         plt.imshow(np.array(i))
#         plt.show()

# Model functions

In [7]:
def accuracy(yhat,y):
    if len(y.shape) == 1 or y.size(1) == 1:
        return (torch.argmax(yhat, 1).view(y.size(0), -1) == y.view(-1, 1)).double().mean()
    return (torch.argmax(yhat, 1). view(-1) == torch.argmax(y, 1).view(-1)).double().mean()

def train(model, epochs, train_loader, valid_loader, learning_rate, patience, label_encoder, feature_extract=False):
    ## Early stopping variables
    es = EarlyStopping(patience=patience)
    terminate_training = False
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    model = model.to(device)
    ## Training only the parameters where we require gradient since we are fine-tuning
    params_to_update = model.parameters()
    print("params to learn:")
    if feature_extract:
        params_to_update = []
        for name,param in model.named_parameters():
            if param.requires_grad == True:
                params_to_update.append(param)
                print("\t", name)
    else:
        for name,param in model.named_parameters():
            if param.requires_grad == True:
                print("\t", name)
                
    ## Setting up our optimizer
    optim = torch.optim.Adam(params_to_update, lr=learning_rate)
    
    ## Setting up our loss function
    loss = nn.CrossEntropyLoss()
    
    ## Running the train loop
    print(f"running {model.name}")
    for epoch in range(epochs):
        cumloss, cumacc, count = 0, 0, 0
        model.train()
        for x,y in train_loader:
            optim.zero_grad()
            x = x.to(device)
            y = label_encoder.fit_transform(y)
            y = torch.as_tensor(y)
            y = y.to(device)
            yhat = model(x)
            l = loss(yhat, y)
            l.backward()
            optim.step()
            cumloss += l * len(x)
            cumacc += accuracy(yhat, y) * len(x)
            count += len(x)
        print("epoch :", epoch, end="")
        print(", train_loss: ", cumloss.cpu().item()/count, end="")
        print(", train_acc: ", cumacc.cpu().item()/count, end="")
        if epoch % 1 == 0:
            model.eval()
            with torch.no_grad():
                valid_cumloss, valid_cumacc, count = 0, 0, 0
                for x,y in valid_loader:
                    x = x.to(device)
                    y = label_encoder.fit_transform(y)
                    y = torch.as_tensor(y)
                    y = y.to(device)
                    yhat = model(x)
                    valid_cumloss += loss(yhat,y) * len(x)
                    valid_cumacc += accuracy(yhat,y) * len(x)
                    count += len(x)
                print(", valid_loss: ", valid_cumloss.cpu().item()/count, end="")
                print(", valid_acc: ", valid_cumacc.cpu().item()/count)  
#                 print(", valid_f1_score:", f1_score(y.data, yhat))
#                 print(y.cpu())
#                 print(np.argmax(yhat.cpu(), axis=1))
#                 print(classification_report(y.cpu(), np.argmax(yhat.cpu(), axis=1)))
                ## Early stopping
                if valid_cumacc/count > best_acc:
                    best_acc = valid_cumacc/count
                    best_model_wts = copy.deepcopy(model.state_dict())
                if es.step(valid_cumloss.cpu().item()/count):
                    terminate_training = True
                    break
        if terminate_training:
            break
    print('Best val Acc: {:4f}'.format(best_acc))
    ## Returns the best model
    model.load_state_dict(best_model_wts)
    return model

def set_parameter_requires_grad(model, feature_extract):
    if feature_extract:
        for name,p in model.named_parameters():
            if "features" in name:
                p.requires_grad = False    
            else:
                p.requires_grad = True  

# Loading the model and modifying the classifier part
### Maybe we could try to modify only the last classifier layer ?

In [8]:
TB_PATH = "/tmp/logs/sceance2"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## Loading vgg16 model pretrained on imagenet
vgg = models.vgg16(pretrained=True)

## Modifies the vgg network classifier layers to fit our problem
# vgg.classifier[0] = nn.Linear(25088, 8192)
# vgg.classifier[3] = nn.Linear(8192, 1024)
# vgg.classifier[6] = nn.Linear(1024, N_CLASS)

# vgg.classifier = nn.Sequential(nn.Linear(25088, 512), # test 2048
#                                nn.ReLU(), 
#                                nn.Dropout(0.45),       # test 0.5
#                                nn.Linear(512, 100),
#                                nn.ReLU(), 
#                                nn.Dropout(0.45),
#                                nn.Linear(100, N_CLASS),                   
#                                nn.Sigmoid())

vgg.classifier = nn.Sequential(nn.Linear(25088, 100),
                               nn.ReLU(), 
                               nn.Dropout(0.45),        
                               nn.Linear(100, N_CLASS), 
                               nn.Softmax(dim=1)) 

print(vgg.eval())

## Sets all the requires grad of the classifier layers to True
set_parameter_requires_grad(vgg, True)

# Implementing early stopping

In [9]:
class EarlyStopping(object):
    def __init__(self, mode='min', min_delta=0, patience=10, percentage=False):
        self.mode = mode
        self.min_delta = min_delta
        self.patience = patience
        self.best = None
        self.num_bad_epochs = 0
        self.is_better = None
        self._init_is_better(mode, min_delta, percentage)
        if patience == 0:
            self.is_better = lambda a, b: True
            self.step = lambda a: False

    def step(self, metrics):
        if self.best is None:
            self.best = metrics
            return False
        if np.isnan(metrics):
            return True
        if self.is_better(metrics, self.best):
            self.num_bad_epochs = 0
            self.best = metrics
        else:
            self.num_bad_epochs += 1
        if self.num_bad_epochs >= self.patience:
            return True
        return False

    def _init_is_better(self, mode, min_delta, percentage):
        if mode not in {'min', 'max'}:
            raise ValueError('mode ' + mode + ' is unknown!')
        if not percentage:
            if mode == 'min':
                self.is_better = lambda a, best: a < best - min_delta
            if mode == 'max':
                self.is_better = lambda a, best: a > best + min_delta
        else:
            if mode == 'min':
                self.is_better = lambda a, best: a < best - (
                            best * min_delta / 100)
            if mode == 'max':
                self.is_better = lambda a, best: a > best + (
                            best * min_delta / 100)

# Training only the modified parts of the classifier

In [10]:
## Fine-tuning the model on our data
vgg.name = "VGG"

le = preprocessing.LabelEncoder()

best_model = train(model=vgg, 
                   epochs=500, 
                   train_loader=VGG_trainloader, 
                   valid_loader=VGG_validloader, 
                   learning_rate=3e-4, ## learning rate for Adam optimizer
                   patience=10, ## metric for earlystopping : val_loss
                   label_encoder=le) 

# Checking which classes are not correctly classified in valid set

In [11]:
yhats = []
ys = []

with torch.no_grad():
    for x,y in VGG_validloader:
        x = x.to(device)
        y = le.fit_transform(y)
        y = torch.as_tensor(y)
        y = y.to(device)
        yhat = best_model(x)
        yhats.append(yhat.cpu())
        ys.append(y.cpu())

In [12]:
yhats = np.array(yhats)
ys = np.array(ys)


for yhat, y in zip(yhats, ys):
    print(classification_report(y, yhat.argmax(axis=1)))

# Checking which kind of image could be the problem

In [13]:
LABELS = ['FINGER', 'FIST', 'LEFT', 'PALM', 'RIGHT'] 

with torch.no_grad():
    for x,y in VGG_validloader:
        x = x.to(device)
        y = le.fit_transform(y)
        y = torch.as_tensor(y)
        y = y.to(device)
        yhat = best_model(x)
        for i in range(50):
            plt.imshow(x.cpu()[i].permute(1,2,0))
            str_ = LABELS[yhat.cpu()[i].argmax().item()]
            plt.title("Pred:" + str_)
            plt.show()

# Saving the model in .pth and .onnx extension

In [14]:
PATH = "./"
torch.save(best_model.state_dict(), os.path.join(PATH,"best_model.pth"))

In [15]:
del vgg
del best_model

In [16]:
# model = models.vgg16(pretrained=True)
# model.classifier[0] = nn.Linear(25088, 8192)
# model.classifier[3] = nn.Linear(8192, 1024)
# model.classifier[6] = nn.Linear(1024, N_CLASS)
# model.load_state_dict(torch.load(os.path.join(PATH,"vgg.pth"), map_location='cpu'))
# model.eval() 

# dummy_input = torch.randn(BATCH_SIZE, 3, INPUT_SIZE, INPUT_SIZE)  
# torch.onnx.export(model,   
#                   dummy_input, 
#                   "vgg.onnx",
#                   export_params=True,
#                   do_constant_folding=True, 
#                   input_names = ['modelInput'],
#                   output_names = ['modelOutput'])