In [None]:
#### Importing Libraries

In [None]:
!pip install timm

# Libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from statistics import mean

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import cv2

import timm
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import StepLR

import albumentations as A
from albumentations.pytorch import ToTensorV2

import warnings
warnings.filterwarnings('ignore')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset

#### Data Analysis
* concetto_CDT contains images of hand-drawn clock
* train_csv contain the corrsponding image id and their labels
* test_csv contain the images which needs to be predicted

In [None]:
train = pd.read_csv('../input/concetto22/train.csv')
test = pd.read_csv('../input/concetto22/test.csv')

In [None]:
train.head()

In [None]:
test.head()

In [None]:
# This function adds an extra column having the path of the images
def append_path(df):
    target_str = []
    for i in range(len(df)):
        target_str.append(str(df['id'][i]))
    for i in range(len(df)):
        target_str[i] = target_str[i].replace('.0', '.tif') 
        target_str[i] = '/kaggle/input/concetto22/concetto_CDT/concetto_CDT/'+target_str[i]
    df['path'] = target_str
    return df

In [None]:
train = append_path(train)
train.head()

In [None]:
test = append_path(test)
test.head()

#### Data visualization

In [None]:
plt.imshow(cv2.imread(train['path'][0]))

#### Splitting the train data into train and val set

In [None]:
X = train.drop(['tar'], axis=1)
y = train['tar']
x_train, x_val, y_train, y_val = train_test_split(X, y, test_size=0.1, random_state=42, stratify = y)

In [None]:
train_data = pd.merge(x_train, y_train, right_index=True, left_index=True)
val_data = pd.merge(x_val, y_val, right_index=True, left_index=True)

In [None]:
#defining a configuration

class CFG:
    model_name = 'efficientnet_b2'
    target_size = 6
    size = 264
    batch_size = 12
    epochs = 15
    num_workers = 2
    lr = 1e-3
    weight_decay = 0
    train = True
    target_col = 'tar'

#### Dataset creation 
Creating a custom dataset for training and test data which takes path of images, transforms it and converts the image to tensors for further processing.

In [None]:
class TrainDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.file_names = df['path'].values
        self.labels = df['tar'].values
        self.transform = transform
        
    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        file_name = self.file_names[idx]
        file_path = file_name
        image = cv2.imread(file_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        label = torch.tensor(self.labels[idx]).long()
        return image, label
    

class TestDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.file_names = df['path'].values
        self.transform = transform
        
    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        file_name = self.file_names[idx]
        file_path = file_name
        image = cv2.imread(file_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        return image

# Transformation

#### Augmentation
Applying some necessary augmentations like resizing the image to the size accepted by model, Normalizing the tensors.
Albumentations like RandomResizedCrop and HorizontalFlip are used to augment the dataset.
Find more albumentations [here](https://github.com/pytorch/vision/blob/main/torchvision/transforms/transforms.py)

In [None]:
# Transforms
def get_transforms(*, data):
    
    if data == 'train':
        return A.Compose([
            A.Resize(CFG.size, CFG.size),
            A.RandomResizedCrop(CFG.size, CFG.size),
            A.HorizontalFlip(p=0.5),
            A.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225],
            ),
            ToTensorV2(),
        ])

    elif data == 'valid':
        return A.Compose([
            A.Resize(CFG.size, CFG.size),
            A.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225],
            ),
            ToTensorV2(),
        ])

# Model

In [None]:
class CustomNet(nn.Module):
    def __init__(self, model_name=CFG.model_name, pretrained=False):
        super().__init__()
        self.model = timm.create_model(CFG.model_name, pretrained=pretrained)
        #print(self.model.default_cfg["classifier"])
        n_features = self.model.classifier.in_features #either fc or classifier , check using above line
        self.model.classifier = nn.Linear(n_features, CFG.target_size)

    def forward(self, x):
        x = self.model(x)
        return x
    

In [None]:
model = CustomNet(model_name=CFG.model_name, pretrained=True)

## Loss Function
The loss function used, here [CrossEntropyLoss.](https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html)

In [None]:
def get_criterion():
    criterion = nn.CrossEntropyLoss()
    return criterion

In [None]:
def get_score(y_true, y_pred):
    return accuracy_score(y_true, y_pred)

## Optimizer

In [None]:
def get_optimizer(model):
    optimizer = Adam(model.parameters(), lr=CFG.lr, weight_decay=CFG.weight_decay, amsgrad=False)
    return optimizer

## Scheduler
It is used for adjusting the learning rate(LR decay) between epochs as the training progresses. Read about more schedulers [here](https://pytorch.org/docs/stable/optim.html#)

In [None]:
def get_scheduler(optimizer):
    scheduler = StepLR(optimizer, step_size=2, gamma=0.1, verbose=True)
    return scheduler

## Utility Functions

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count
        
def train_fn(train_loader, model, criterion, optimizer, epoch, scheduler, device):
    model.train() # switch to training mode
    running_loss = 0
    count = 0
    for (images, labels) in tqdm(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        y_preds = model(images)
        
        loss = criterion(y_preds, labels)
        running_loss += loss.item()*labels.shape[0]
        count += 1
        
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
    return running_loss/count


def valid_fn(valid_loader, model, criterion, device):
    model.eval() # switch to evaluation mode
    preds = []
    running_loss = 0
    count = 0
    
    for (images, labels) in tqdm(valid_loader):
        images = images.to(device)
        labels = labels.to(device)
        # compute loss
        with torch.no_grad():
            y_preds = model(images)
        loss = criterion(y_preds, labels)
        running_loss += loss.item()*labels.shape[0]
        count += 1
        # record accuracy
        preds.append(y_preds.softmax(1).to('cpu').numpy())
    predictions = np.concatenate(preds)
    
    return (running_loss/count), predictions


def test_fun(test_loader, model, device):
    model.eval()
    preds = []
    test_df = pd.DataFrame()
    for step, (images) in enumerate(test_loader):
        images = images.to(device)
        with torch.no_grad():
            y_preds = model(images)
        preds.append(y_preds.softmax(1).to('cpu').numpy())
    predictions = np.concatenate(preds)
    pred = predictions.argmax(1)
    return pred

# Train loop

In [None]:
# Train loop
def train_loop(train_data, valid_data):
    
    # create dataset
    train_dataset = TrainDataset(train_data, transform=get_transforms(data='train'))
    valid_dataset = TrainDataset(valid_data, transform=get_transforms(data='valid'))

    # create dataloader
    train_loader = DataLoader(train_dataset, batch_size=CFG.batch_size, shuffle=True, 
                              num_workers=CFG.num_workers, pin_memory=True, drop_last=True)
    valid_loader = DataLoader(valid_dataset, batch_size=CFG.batch_size, shuffle=False, 
                              num_workers=CFG.num_workers, pin_memory=True, drop_last=False)

    # create model and transfer to device
    model = CustomNet(CFG.model_name, pretrained=True)
    model.to(device)
    
    # select optimizer, scheduler and criterion
    optimizer = get_optimizer(model)
    scheduler = get_scheduler(optimizer)
    criterion = get_criterion()

    best_score = -1.0
    best_loss = np.inf
    
    # start training
    for epoch in range(CFG.epochs):
        # train
        avg_loss = train_fn(train_loader, model, criterion, optimizer, epoch, scheduler, device)
        # validation
        avg_val_loss, preds = valid_fn(valid_loader, model, criterion, device)
        
#         valid_labels = valid_folds[CFG.target_col].values
        valid_labels = valid_data['tar']
        
        scheduler.step()

        # scoring
        score = get_score(valid_labels, preds.argmax(1))
        print("score: ", score)

        # code for saving the best model
        if score > best_score:
            print('Score Improved')
            best_score = score
            print(f'Epoch {epoch+1} - Save Best Score: {best_score:.4f}')
            torch.save({'model': model.state_dict(), 
                        'preds': preds,
                        'optimizer': optimizer.state_dict(),
                        'scheduler': scheduler.state_dict()},
                        './'+f'{CFG.model_name}_best.pth')
    
    check_point = torch.load('./'+f'{CFG.model_name}_best.pth')
    valid_data['preds'] = check_point['preds'].argmax(1)

    return valid_data

In [None]:
# main
def main():
    def get_result(result_df):
        preds = result_df['preds'].values
        labels = result_df[CFG.target_col].values
        score = get_score(labels, preds)
    
    if CFG.train: 
        # train
        df = train_loop(train_data, val_data)
        get_result(df)

In [None]:
if __name__ == '__main__':
    main()

In [None]:
test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, 
                          num_workers=CFG.num_workers, pin_memory=True, drop_last=False)
check_point = torch.load('./'+f'{CFG.model_name}_best.pth')
model = CustomNet(CFG.model_name, pretrained=True)
model.to(device)
model.load_state_dict(check_point['model'])
pred = test_fun(test_loader, model, device)

In [None]:
test['tar'] = pred

In [None]:
submission_df = test.drop(['path'], axis=1)
submission_df.to_csv("solution.csv", index=False)

In [None]:
submission_df

### Now click on the "Submit" button to submit the notebook
### NOTE: We expect everyone to generate such notebooks for your final submission. Only the teams with notebook submitted against their final submission will be considered for prize money!

### Things to try next:
* Try different architectures, optimizers, loss functions etc.
* Think of ways of tackling data imbalance problem.
* Try different image size
* Try Ensembling methods.
* Apply semi supervised learning.

### PS: This competition is hosted to promote learning. So we request you to publish your baseline models via Kaggle kernels and discuss on the discussion tab to help others learn. Thanks!