In [15]:
import numpy as np
import pandas as pd
import random
import math
import time
import os
import copy
import gc
import  matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score, accuracy_score
from sklearn.utils.class_weight import compute_class_weight
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, models, transforms
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
from torchvision import transforms

In [20]:
class CustomDataset(Dataset):
    def __init__(self, dataset, transform=None, test=False, folder='dataset/train/'):
        super(CustomDataset, self).__init__()
        self.dataset = dataset
        self.folder = folder
        if test:
            self.y_data = torch.zeros(len(dataset))
        else:
            self.y_data = torch.tensor(dataset.sign.values,dtype=torch.long)
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        image, label = self.dataset.iloc[index].filename, self.y_data[index]
        image = np.array(Image.open(self.folder+image).convert('RGB'))
        if self.transform != None:
            image = self.transform(image)
        y = label
        return image, y, self.dataset.iloc[index].filename

class img_CNN(nn.Module):
    def __init__(self, model_type, num_classes):
        super(img_CNN, self).__init__()
        self.model_type = model_type
        num_features = 2000 * (model_type == 'Mixed') + 1000 * (model_type != 'Mixed')
        if model_type == 'VGG':
            self.model = models.vgg11(pretrained=True)
        elif model_type == 'ResNext':
            #self.model = models.resnext101_32x8d(pretrained=True)
            self.model = models.resnext50_32x4d(pretrained=True)
        elif model_type == 'ResNet':
            self.model = models.resnet18(pretrained=True)
        elif model_type == 'DenseNet':
            self.model = models.densenet161(pretrained=True)
        elif model_type == 'GoogleNet':
            self.model = models.googlenet(pretrained=True)
        elif model_type == 'Inception':
            self.model = models.inception_v3(pretrained=True)
        elif model_type == 'Wide ResNet':
            self.model = models.wide_resnet50_2(pretrained=True)
        elif model_type == 'Mixed':
            self.model1 = models.densenet161(pretrained=True)
            self.model2 = models.resnext101_32x8d(pretrained=True)
            self.model1.classifier = nn.Sequential(self.model1.classifier, nn.ReLU(), nn.Dropout(0.5), nn.Linear(1000, 64))
            self.model2.fc = nn.Sequential(self.model2.fc, nn.ReLU(), nn.Dropout(0.5), nn.Linear(1000, 64))
        else:
            raise ValueError('Wrong model type!')
        if model_type == 'Mixed':
            self.conc_models = nn.Linear(128, num_classes)
        else:
            self.conc_models = nn.Sequential(nn.ReLU(), nn.Dropout(0.5), nn.Linear(num_features, 512), 
                                             nn.ReLU(), nn.Dropout(0.5), nn.Linear(512, num_classes))
            
    def forward(self, image):
        if self.model_type == 'Mixed':
            img_feature1 = self.model1(image)
            img_feature2 = self.model2(image)
            img_feature = torch.cat((img_feature1, img_feature2), 1)
        else:
            img_feature = self.model(image)
        img_feature = self.conc_models(img_feature)
        return img_feature

def get_weightedAccuracyM(y_true, y_pred, num_classes, weights):
    y_NtrueByClass = np.array([np.sum(y_true == i) for i in range(num_classes)])
    y_NtruepositiveByClass = np.array([np.sum((y_true == i) & (y_true == y_pred)) for i in range(num_classes)])
    acc = (y_NtruepositiveByClass / y_NtrueByClass) * weights
    return np.sum(acc)

In [6]:
data_transforms = {
    'train': transforms.Compose([
     transforms.ToPILImage(),
     transforms.Resize((128//2, 198//2)),
     transforms.RandomChoice((transforms.AutoAugment(transforms.AutoAugmentPolicy.IMAGENET), 
                             transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10), 
                             transforms.AutoAugment(transforms.AutoAugmentPolicy.SVHN),
                             #transforms.RandomAffine(degrees=(0, 0), translate=(0.0, 0.0), scale=(1, 1)),
                             #transforms.RandomAffine(degrees=(0, 0), translate=(0.0, 0.3), scale=(1, 1)),
                             #transforms.RandomAffine(degrees=(0, 30), translate=(0.0, 0.0), scale=(1, 1)),
                             #transforms.RandomHorizontalFlip(p=0.9),
                             #transforms.RandomVerticalFlip(p=0.9)
                        )),
     transforms.ToTensor(),
     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
     ]),
    'val': transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((128//2, 198//2)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [8]:
train = pd.read_csv('train_targets_noV.csv')
valid = pd.read_csv('valid_targets.csv')
train_dataset = CustomDataset(train, data_transforms['train'], folder='dataset/train/')
valid_dataset =  CustomDataset(valid, data_transforms['val'], folder='dataset/train/')

train_loader = DataLoader(train_dataset, batch_size = 32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size = 1, shuffle=False)

dataloaders={}
dataloaders['train'] = train_loader
dataloaders['val'] = valid_loader
dataset_sizes = {'train': len(train_dataset), 'val':len(valid_dataset)}

In [9]:
with torch.no_grad():
    torch.cuda.empty_cache()
gc.collect()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
model_type = 'ResNext'
num_classes = 5
weights = 1 / np.array([0.4, 0.3, 0.09, 0.12, 0.06])
model = img_CNN(model_type, num_classes).cuda()

best_model_wts_single = copy.deepcopy(model.state_dict())
best_res_single = 0
best_epoch_single = -1

class_weights=compute_class_weight('balanced', np.unique(train.sign), train.sign.to_numpy())
class_weights=torch.tensor(class_weights, dtype=torch.float).cuda()
criterion = nn.CrossEntropyLoss(class_weights)
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

since = time.time()

num_epochs = 40
for epoch in tqdm(range(num_epochs)):
    print('-' * 10)
    print('Epoch {}/{}'.format(epoch, num_epochs - 1))
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()  
        else:
            model.eval()
        y_true = []
        y_pred = []
        for inputs, labels, _ in tqdm(dataloaders[phase]):
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            with torch.set_grad_enabled(phase == 'train'):
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                preds = torch.argmax(outputs, dim=1).detach().cpu().tolist()
                trues = labels.cpu().tolist()
                for j in range(len(preds)):
                    y_true.append(trues[j])
                    y_pred.append(preds[j])
                if phase == 'train':
                    loss.backward()
                    optimizer.step()                      
        if phase == 'train':
            scheduler.step()
        tmp_wAcc_score = get_weightedAccuracyM(np.array(y_true), np.array(y_pred), num_classes=5, weights=weights)
        tmp_f1_score = f1_score(y_true, y_pred, average='macro')
        print('{} results for model {} - f1: {:.8f}, weghted accuracy: {:.8f}'.format(
            phase, model_type, 
            tmp_f1_score, tmp_wAcc_score / get_weightedAccuracyM(np.array(y_true), np.array(y_true), num_classes=5, weights=weights)))
        if phase == 'val' and tmp_wAcc_score >= best_res_single:
            best_res_single = tmp_wAcc_score
            best_epoch_single = epoch
            best_model_wts_single = copy.deepcopy(model.state_dict())
    time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))

In [None]:
best_model = img_CNN(model_type, num_classes)
best_model.load_state_dict(best_model_wts_single)
input_shape = train_dataset.__getitem__(0)[0].shape
sample_input = torch.randn(1, input_shape[0], input_shape[1], input_shape[2], requires_grad=True)
torch.onnx.export(best_model, sample_input, "best_model.onnx", verbose=True)