In [2]:
# Imports
import torch
import torchvision
import torch.nn as nn  # All neural network modules, nn.Linear, nn.Conv2d, BatchNorm, Loss functions
import torch.optim as optim  # For all Optimization algorithms, SGD, Adam, etc.
import torch.nn.functional as F  # All functions that don't have any parameters
from torch.utils.data import (
    DataLoader,
)  # Gives easier dataset managment and creates mini batches
import torchvision.datasets as datasets  # Has standard datasets we can import in a nice way
import torchvision.transforms as transforms  # Transformations we can perform on our dataset
from tqdm import tqdm
import os
import time
import copy

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# Network architecture - DON'T CHANGE
# -*- coding:utf-8 -*-
import torch.nn as nn
import torch
import torch.nn.functional as F

cfg = {
    12: [1, 1, 1, 1, 1],
    18: [1, 2, 2, 2, 1],
    20: [1, 2, 4, 1, 1],
    28: [1, 3, 6, 1, 1],
    36: [2, 4, 8, 2, 1],
    64: [3, 8, 16, 3, 1],
}

block2channels = {
    0: 16,
    1: 32,
    2: 64,
    3: 128,
    4: 256
}


def conv3x3(in_channels, out_channels, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride,
                     padding=1)


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResBlock, self).__init__()
        self.conv2d_1 = conv3x3(in_channels, out_channels)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu1 = nn.ReLU()
        self.conv2d_2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu2 = nn.ReLU()
        self.conv2d_3 = conv3x3(out_channels, out_channels)
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.relu3 = nn.ReLU()

    def forward(self, x):
        net = self.conv2d_1(x)
        net = self.bn1(net)
        net = self.relu1(net)

        net = self.conv2d_2(net)
        net = self.bn2(net)
        net = self.relu2(net)

        net = self.conv2d_3(net)
        net = self.bn3(net)
        net = self.relu3(net)

        if x.size(1) < net.size(1):
            x = F.pad(x, x.view(1) - net.view(1))

        # if the num of channels of the input is larger than the outputs' - don't use the residual connection
        if x.size(1) > net.size(1):
            pass
        else:
            net = net + x
        return net


class DownSampleBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DownSampleBlock, self).__init__()
        self.conv2d = conv3x3(in_channels, out_channels, stride=2)
        self.relu = nn.ReLU()
        self.bn = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        return self.bn(self.relu(self.conv2d(x)))


class UpSampleBlock(nn.Module):
    def __init__(self):
        super(UpSampleBlock, self).__init__()
        self.us = nn.Upsample(scale_factor=2, mode='bilinear')

    def forward(self, x):
        return self.us(x)


class BackboneBlock(nn.Module):
    def __init__(self, in_channels, out_channels, repetitions, keep_res=False):
        super(BackboneBlock, self).__init__()
        self.keep_res = keep_res
        if keep_res is False:
            self.down_sample_block = DownSampleBlock(in_channels, out_channels)
            self.res_blocks = nn.ModuleList([ResBlock(out_channels, out_channels)] * repetitions)
        else:
            self.res_blocks = nn.ModuleList([conv3x3(in_channels, out_channels)] +
                                            [ResBlock(out_channels, out_channels)] * repetitions)

    def forward(self, x):
        if self.keep_res is False:
            net = self.down_sample_block(x)
        else:
            net = x
        for res_block in self.res_blocks:
            net = res_block(net)
        return net


class HE_Classifier(nn.Module):
    def __init__(self, input_size=512, input_channels=3, sphereface_size=12, net_dropout_prob=0.1):
        super(HE_Classifier, self).__init__()
        self.input_size = input_size

        res_blocks = cfg[sphereface_size]

        self.block1 = BackboneBlock(input_channels, block2channels[0], res_blocks[0], keep_res=True)
        self.block2 = BackboneBlock(block2channels[0], block2channels[1], res_blocks[1])
        self.block3 = BackboneBlock(block2channels[1], block2channels[2], res_blocks[2])
        self.block4 = BackboneBlock(block2channels[2], block2channels[3], res_blocks[3])
        self.block5 = BackboneBlock(block2channels[3], block2channels[4], res_blocks[4])
        self.sphereface_blocks = nn.ModuleList([self.block1, self.block2, self.block3, self.block4, self.block5])

        f_size = input_size // (2 ** 4)
        self._gap = nn.AvgPool2d((f_size, f_size), stride=1)
        self._final_1x1_conv = nn.Conv2d(in_channels=block2channels[4], out_channels=2, kernel_size=1)
        self.net_dropout = torch.nn.Dropout(p=net_dropout_prob)

    def get_num_trainable_params(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)

    def get_num_params(self):
        return sum(p.numel() for p in self.parameters())

    def get_im_size(self):
        return self.input_size

    def forward(self, x):
        # encode
        for block in self.sphereface_blocks:
            x = block(x)

        # predict class
        x = self.net_dropout(x)
        x = self._gap(x)
        x = self._final_1x1_conv(x)
        return x
    
    
net = HE_Classifier(input_size=512, input_channels=3, sphereface_size=12)

print(net.get_num_params())
print(net.get_num_trainable_params())


2755538
2755538


In [6]:
# Unfreeze only last 1x1 conv layer, has 2 trainable parameters
# In total, model has 80 parameters
count = 0
for param in net.parameters():
    if count < 78:
        param.requires_grad = False
    else:
        param.requires_grad = True
    count += 1

In [8]:
# Data augmentation and normalization for training, loading the data

input_size = 512
data_dir = "./Data"

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

print("Initializing Datasets and Dataloaders...")


# Create training and validation datasets
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']}
# Create training and validation dataloaders
dataloaders_dict = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=1, shuffle=False, num_workers=4, drop_last=False) for x in ['train', 'val']}

print("Done")

print(len(image_datasets["train"]))
print(len(image_datasets["val"]))

Initializing Datasets and Dataloaders...
Done
10
422


In [9]:
# Load model weights as checkpoint

print('Loading model...')

checkpoint = torch.load("./trained_model.pt", map_location=torch.device('cpu'))
net.load_state_dict(checkpoint['model_state_dict'], strict=True)
print(checkpoint.keys())

net = net.to(device)

Loading model...
dict_keys(['epoch', 'model_state_dict'])


In [10]:
# Focal loss (from https://pytorch.org/vision/stable/_modules/torchvision/ops/focal_loss.html)
import torch
import torch.nn.functional as F

from torchvision.utils import _log_api_usage_once


def sigmoid_focal_loss(
    inputs: torch.Tensor,
    targets: torch.Tensor,
    alpha: float = 0.25,
    gamma: float = 2,
    reduction: str = "none",
) -> torch.Tensor:
    """
    Loss used in RetinaNet for dense detection: https://arxiv.org/abs/1708.02002.

    Args:
        inputs (Tensor): A float tensor of arbitrary shape.
                The predictions for each example.
        targets (Tensor): A float tensor with the same shape as inputs. Stores the binary
                classification label for each element in inputs
                (0 for the negative class and 1 for the positive class).
        alpha (float): Weighting factor in range (0,1) to balance
                positive vs negative examples or -1 for ignore. Default: ``0.25``.
        gamma (float): Exponent of the modulating factor (1 - p_t) to
                balance easy vs hard examples. Default: ``2``.
        reduction (string): ``'none'`` | ``'mean'`` | ``'sum'``
                ``'none'``: No reduction will be applied to the output.
                ``'mean'``: The output will be averaged.
                ``'sum'``: The output will be summed. Default: ``'none'``.
    Returns:
        Loss tensor with the reduction option applied.
    """
    # Original implementation from https://github.com/facebookresearch/fvcore/blob/master/fvcore/nn/focal_loss.py


    if not torch.jit.is_scripting() and not torch.jit.is_tracing():
        #print("PANIC")
        _log_api_usage_once(sigmoid_focal_loss)
    p = torch.sigmoid(inputs)
    ce_loss = F.binary_cross_entropy_with_logits(inputs, targets, reduction="none")
    p_t = p * targets + (1 - p) * (1 - targets)
    loss = ce_loss * ((1 - p_t) ** gamma)

    if alpha >= 0:
        alpha_t = alpha * targets + (1 - alpha) * (1 - targets)
        loss = alpha_t * loss

    # Check reduction option and return loss accordingly
    if reduction == "none":
        pass
    elif reduction == "mean":
        loss = loss.mean()
    elif reduction == "sum":
        loss = loss.sum()
    else:
        raise ValueError(
            f"Invalid Value for arg 'reduction': '{reduction} \n Supported reduction modes: 'none', 'mean', 'sum'"
        )
    return loss

In [None]:
# Initialising variables for training
optimizer1 = optim.RAdam(net.parameters(), lr=0.001) # this is the optimizer for the first 80 epochs
optimizer2 = optim.RAdam(net.parameters(), lr=0.0001) # this is the optimizer for the last 30 epochs
batch_size = 32
num_epochs = 111



In [12]:
# Training loop
def train_model(model, dataloaders, optimizer1, optimizer2, num_epochs=25, is_inception=False):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
        
            
        if epoch < 80:
            optimizer = optimizer1
        else:
            optimizer = optimizer2

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    # Special case for inception because in training it has an auxiliary output. In train
                    #   mode we calculate the loss by summing the final output and the auxiliary output
                    #   but in testing we only consider the final output.
                    if is_inception and phase == 'train':
                        # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                        outputs, aux_outputs = model(inputs)
                        loss1 = sigmoid_focal_loss(outputs, labels)
                        loss2 = sigmoid_focal_loss(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = sigmoid_focal_loss(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

In [13]:
# Train and evaluate

model_ft, hist = train_model(net, dataloaders_dict, optimizer1, optimizer2, num_epochs=num_epochs, is_inception=False)



Epoch 0/109
----------


ValueError: Target size (torch.Size([1])) must be the same as input size (torch.Size([1, 2, 1, 1]))