In [None]:
# importing the libraries
import pandas as pd
import numpy as np
import math
# for reading and displaying images
from skimage.io import imread
import matplotlib.pyplot as plt
%matplotlib inline
# for creating validation set
from sklearn.model_selection import train_test_split
# for evaluating the model
from sklearn.metrics import accuracy_score
from tqdm import tqdm
# PyTorch libraries and modules
import torch
import torch.nn as nn
import torchvision
from torch.autograd import Variable
from torch.nn import Linear, ReLU, CrossEntropyLoss, Sequential, Conv2d, MaxPool2d, Module, Softmax, BatchNorm2d, Dropout
from torch.optim import Adam, SGD
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2

import time

from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

from sklearn.preprocessing import LabelEncoder

# Check GPU

In [None]:
cuda = torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")
torch.backends.cudnn.benchmark = True
print("Using {}: {}".format(device, torch.cuda.get_device_name(0)))

# Global Variables

In [None]:
IMAGES_DIR = 'input/images/'
TRAIN_CSV = 'input/split/train.csv'
VALID_CSV = 'input/split/valid.csv'
BATCH_SIZE = 256

LABEL_ENCODER = LabelEncoder()
LABEL_ENCODER.fit(pd.read_csv("input/train.csv")['category'])



# 1. Load Data

In [None]:
def get_category_dict():
    train_data = pd.read_csv('input/train.csv')
    train_y = train_data['category'].values
    categories = list(set(train_y))
    categories_dict = { categories[i]: i for i in range(27)}
    return categories_dict

class ProductDataset(Dataset):

    def __init__(self, df, transforms=None, X=None, Y=None):
        self.df = df
        self.category_dict = get_category_dict()
        self.transforms=transforms
        
        self.X = X
        self.Y = Y

        # print('init:', len(self.X), len(self.Y))

    def __len__(self):
        # print('len: ', len(self.X))
        return len(self.X)
    
    def __getitem__(self, idx):

        # print('idx:', idx)

        image_path = IMAGES_DIR + str(self.X[idx]) + '.jpg'
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transforms:
            transformed = self.transforms(image=image)
            image = transformed['image']
        
        # label_text = self.Y[idx]
        # label = self.category_dict[label_text]
        # label = torch.from_numpy(np.array([label]).astype(np.int8))
        
        return image, self.Y[idx]

## 1.1 Split data

In [None]:

df = pd.read_csv("input/train.csv")

X = list(df['id'])
y = list(df['category'])

X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=11)

y_train, y_valid = LABEL_ENCODER.transform(y_train), LABEL_ENCODER.transform(y_valid)


train_df = None
test_df = None

len(X_valid), len(y_valid)








In [None]:
transforms_train = A.Compose([
    # A.Flip(),
    A.ShiftScaleRotate(rotate_limit=1.0, p=0.8),

    # Pixels
    A.OneOf([
        A.IAAEmboss(p=1.0),
        A.IAASharpen(p=1.0),
        A.Blur(p=1.0),
    ], p=0.5),

    # Affine
    # A.OneOf([
    #     A.ElasticTransform(p=1.0),
    #     A.IAAPiecewiseAffine(p=1.0)
    # ], p=0.5),

    A.Normalize(p=1.0),
    ToTensorV2(p=1.0),
])

transforms_test = A.Compose([
    A.Normalize(p=1.0),
    ToTensorV2(p=1.0),
])


train_dataset = ProductDataset(train_df, transforms_train, X_train, y_train)
test_dataset = ProductDataset(test_df, transforms_test, X_valid, y_valid)

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)





# 2. Model

In [None]:
class Net(nn.Module):
    
    def __init__(self, num_classes=27):
        super().__init__()
        
        self.backbone = torchvision.models.wide_resnet101_2(pretrained=True)
        
        in_features = self.backbone.fc.in_features

        self.logit = nn.Linear(in_features, num_classes)
        
    def forward(self, x):
        batch_size, C, H, W = x.shape
        
        x = self.backbone.conv1(x)
        x = self.backbone.bn1(x)
        x = self.backbone.relu(x)
        x = self.backbone.maxpool(x)

        x = self.backbone.layer1(x)
        x = self.backbone.layer2(x)
        x = self.backbone.layer3(x)
        x = self.backbone.layer4(x)
        
        x = F.adaptive_avg_pool2d(x,1).reshape(batch_size,-1)
        x = F.dropout(x, 0.25, self.training)

        x = self.logit(x)

        return x

In [None]:
model = Net()
optimizer = Adam(model.parameters(), lr=0.0001)
criterion = CrossEntropyLoss()

model = model.cuda()
criterion = criterion.cuda()


# 3. Train

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim = True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

def train(model, iterator, optimizer, criterion, device):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for (x, y) in iterator:
        
        x = x.to(device)
        y = y.to(device, dtype=torch.int64)
        
        optimizer.zero_grad()
                
        y_pred = model(x)
        
        
        loss = criterion(y_pred, y)
        
        acc = calculate_accuracy(y_pred, y)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate(model, iterator, criterion, device):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():

        for (x, y) in iterator:
            
            x = x.to(device)
            y = y.to(device, dtype=torch.int64)
                    
            y_pred = model(x)
            
            loss = criterion(y_pred, y)
            
            acc = calculate_accuracy(y_pred, y)
            
            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)



In [None]:
max_epochs = 20
train_accuracy_list = []
train_loss_list = []
valid_acc_list = []
valid_loss_list = []
best_valid_loss = float('inf')

for epoch in range(max_epochs):

    start_time = time.monotonic()

    # train
    train_loss, train_acc = train(model, train_dataloader, optimizer, criterion, device)
    
    # valid
    valid_loss, valid_acc = evaluate(model, test_dataloader, criterion, device)

    # save best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'output/cnn-model.pt')

    # Track the accuracy
    train_accuracy_list.append(train_acc)
    train_loss_list.append(train_loss)
    valid_acc_list.append(valid_acc)
    valid_loss_list.append(valid_loss)
        
    # print epoch info
    end_time = time.monotonic()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')
        




In [None]:
model.load_state_dict(torch.load('./output/cnn-model.pt'))

test_loss, test_acc = evaluate(model, test_dataloader, criterion, device)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

In [None]:
def get_predictions(model, iterator, device):

    model.eval()

    images = []
    labels = []
    probs = []

    with torch.no_grad():

        for (x, y) in iterator:

            x = x.to(device)

            y_pred = model(x)

            y_prob = F.softmax(y_pred, dim = -1)
            top_pred = y_prob.argmax(1, keepdim = True)

            images.append(x.cpu())
            labels.append(y.cpu())
            probs.append(y_prob.cpu())

    images = torch.cat(images, dim = 0)
    labels = torch.cat(labels, dim = 0)
    probs = torch.cat(probs, dim = 0)

    return images, labels, probs

In [None]:
images, labels, probs = get_predictions(model, test_dataloader, device)

In [None]:
pred_labels = torch.argmax(probs, 1)


In [None]:
def plot_confusion_matrix(labels, pred_labels, classes):
    
    fig = plt.figure(figsize = (15, 15));
    ax = fig.add_subplot(1, 1, 1);
    cm = confusion_matrix(labels, pred_labels);


    for i in range(27):
        total = sum(cm[i])

        for j in range(27):
            cm[i][j] = (cm[i][j] / total) * 100
        # if cm[i][i] > 15:
        #     cm[i][i] = 15



    cm = ConfusionMatrixDisplay(cm, display_labels = classes);
    cm.plot(values_format = 'd', cmap = 'Blues', ax = ax)
    plt.xticks(rotation = 20)



In [None]:
plot_confusion_matrix(labels, pred_labels, get_category_dict().keys()) 