In [None]:
!pip install timm pandas requests

In [None]:
import torch
import torchvision
from torchvision import datasets as dsets
from torchvision import transforms
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, sampler, random_split
from torchvision import models

import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import os
import pandas as pd

from PIL import Image
import timm
from timm.loss import LabelSmoothingCrossEntropy # This is better than normal nn.CrossEntropyLoss

import sys
from tqdm import tqdm
import time
import copy

## Prepare Data

In [None]:
# Data augementation pipline
transform = transforms.Compose([transforms.RandomHorizontalFlip(0.9),
    transforms.Resize((384, 384)),
    transforms.ToTensor(), # ToTensor : [0, 255] -> [0, 1]
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Train transformation pipeline
train_transform = transforms.Compose([
    transforms.Resize((384, 384)),
    transforms.ToTensor(), # ToTensor : [0, 255] -> [0, 1]
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Train transformation pipeline
test_transform = transforms.Compose([
    transforms.Resize((384, 384)),
    transforms.ToTensor(), # ToTensor : [0, 255] -> [0, 1]
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])
train_data1 = dsets.ImageFolder('/content/drive/MyDrive/train/', transform)
test_data1 = dsets.ImageFolder('/content/drive/MyDrive/val/',  transform)
train_data2 = dsets.ImageFolder('/content/drive/MyDrive/train/', train_transform)
test_data2 = dsets.ImageFolder('/content/drive/MyDrive/val/', test_transform)

train_data = torch.utils.data.ConcatDataset([train_data1,train_data2])
test_data = torch.utils.data.ConcatDataset([test_data1,test_data2])

In [None]:
batch_size = 11

train_loader = DataLoader(train_data,
                          batch_size=batch_size,
                          shuffle=True, drop_last=True)

test_loader = DataLoader(test_data,
                         batch_size=batch_size,
                         shuffle=True, drop_last=True)

In [None]:
train_data_len = len(train_data)
test_data_len = len(test_data)

dataloaders = {
    "train": train_loader,
    "val": test_loader
}
dataset_sizes = {
    "train": train_data_len,
    "val": test_data_len
}

## Define Model

In [None]:
model = torch.hub.load('facebookresearch/deit:main', 'deit_base_patch16_384', pretrained=True)
print(model)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
for param in model.parameters(): #freeze model
    param.requires_grad = False

# Set uo classifier head
n_inputs = model.head.in_features
model.head = nn.Sequential(
    nn.Linear(n_inputs, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, 4)
)
model = model.to(device)
print(model.head)

In [None]:
for param in model.parameters(): # During fine-tune
    param.requires_grad = True

In [None]:
criterion = LabelSmoothingCrossEntropy()
criterion = criterion.to(device)
optimizer = optim.Adam(model.head.parameters(), lr=0.0001)

# lr scheduler
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.97)

## Train Model

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print("-"*10)

        for phase in ['train', 'val']: # We do training and validation phase per epoch
            if phase == 'train':
                model.train() # model to training mode
            else:
                model.eval() # model to evaluate

            running_loss = 0.0
            running_corrects = 0.0

            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'): # no autograd makes validation go faster
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1) # used for accuracy
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step() # step at end of epoch

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc =  running_corrects.double() / dataset_sizes[phase]

            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, epoch_loss, epoch_acc))

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict()) # keep the best validation accuracy model
        print()
    time_elapsed = time.time() - since # slight error
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print("Best Val Acc: {:.4f}".format(best_acc))

    model.load_state_dict(best_model_wts)
    return model

In [None]:
model_ft = train_model(model, criterion, optimizer, exp_lr_scheduler) # now it is a lot faster
# I will come back after 10 epochs

## Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd

y_pred = []
y_true = []

# iterate over test data
for images, labels in test_loader:

        images = images.cuda()
        output = model(images) # Feed Network

        output = (torch.max(torch.exp(output), 1)[1]).data.cpu().numpy()
        y_pred.extend(output) # Save Prediction

        labels = labels.data.cpu().numpy()
        y_true.extend(labels) # Save Truth

# constant for classes
classes = ("High_risk","invalid", "Low_risk","Medium_risk",)

# Build confusion matrix
cf_matrix = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(cf_matrix / np.sum(cf_matrix, axis=1)[:, None], index = [i for i in classes],
                     columns = [i for i in classes])
plt.figure(figsize = (12,7))
sn.heatmap(df_cm, annot=True)
plt.savefig('output.png')