In this task you will write a piece of code that
creates [ResNet18](https://arxiv.org/abs/1512.03385).
ResNet18 is a deep neural networks devised for image classification.

## I will write here more.

## Import required packages

In [4]:
import math

import numpy as np
import torch
from torch import nn
from torchvision import datasets, transforms

from tqdm.auto import tqdm
import matplotlib.pyplot as plt

## Preliminaries

In [None]:
def conv1x1(in_planes, out_planes, stride=1):
    """
    Args:
        in_planes  (int): the number of input channels;
        out_palnes (int): the number of output channels;
        stride     (int, default=1): stride.
        
    Return:
        A two-dimensional convolutional layer with
        `in_planes` input channels, `out_planes` output channels,
        kernel size 1, stride size `stride`, 0 padding and
        without bias parameter.
    """
    
    """ 
    Your code here. 
    """

def conv3x3(in_planes, out_planes, stride=1):
    """
    Args:
        in_planes  (int): the number of input channels;
        out_palnes (int): the number of output channels;
        stride     (int, default=1): stride.
        
    Return:
        A two-dimensional convolutional layer with
        `in_planes` input channels, `out_planes` output channels,
        kernel size 3, stride size `stride`, 0 padding and
        without bias parameter.
    """
    
    """ 
    Your code here. 
    """


## Resnet18 Basic Block

![resnet_bb.svg](resnet_bb.svg)

In [None]:
class BasicBlock(nn.Module):
    """
    Write a piece of code that creates a Basic block for ResNet
    architecture.
    Basic block has two computational paths: feedforward path and
    residual path (see a picture below).
    We will utilise function `downsample_basic_block` from above
    as a residual path.
    Feedforward path consists of consequitive 
    application of the following five layers:
    1. conv3x3 (use `in_planes`, `planes` and `stride` as 
                parameters for the conv3x3);
    2. Batch normalistaion layer with `planes` features;
    3. Activation function;
    4. conv3x3 (use `planes`, `planes` as input parameters
                to conv3x3, keep `stride` by default)
    5. Batch normalisation layer with `planes` features;
    
    Then sum outputs of the residual and feedforward paths 
    as the picture suggests and return the activated sum (i.e.
    apply the activation function to the sum).
    
    Provide a possibility to use either ReLU 
    or LeakyReLU or PReLU inside a Basic block .
    
    Hint:
        When you are using ReLU function from Pytorch use can
        specify `inplace=True`. In that case the result of the
        activation function will be stored in the same tensor, 
        i.e. you do not need explicitly assign the result of
        the inplace operation to some variable. It could help
        to decrease the memory consumption sometimes.
    """
    def __init__(self, in_planes, planes, stride=1, downsample=None, relu_type='relu'):
        """
        Args:
            in_planes   (int): number of input channels to the block;
            planes     (int): number of output channels of the block;
            stride     (int, default=1): stride for the first convolutional layer;
            downsample (nn.Module, default=None): Convolutional layer to 
                                                  to downsample the residual connection
            rely_type  (str, default='relu'): Type of activation function;
        
        """
        
        super(BasicBlock, self).__init__()

        assert relu_type in ['relu', 'leaky_relu', 'prelu']
        
        self.downsample = downsample
        
        """
        Your code here.
        """

    def forward(self, x):
        """
        Your code here.
        """

        return out

## Activation functions

![Activation functions](activations.drawio.svg)

## ResNet18

In [None]:
class ResNet(nn.Module):
    def __init__(
        self, 
        block, 
        layers, 
        in_planes=64,
        num_classes=100, 
        relu_type='relu'
    ):
        """
        ResNet18 comprises an input layer and four consequitive 
        group of Basic block (layer) that are outputs of the
        `self._make_layer` method.
        The first layer (not the input one!) has 64 input channels
        and twice more, 128, output channels, the next one has 128
        input channels and 256 output channels, etc.
        The next layer averages should average input tensor over
        the spatial dimensions. Let us think that a batch has shape
        [B,C,H,W],  B -- the number of elements in the batch;
                    C -- the number of output channels;
                    H, W -- height and weight, spatial sizes.
        After averaging your tensor should have size [B,C,1,1].
        The final layer is a linear projection from C-dimensional space
        into `num_classes`-dimensional one.
        
        Hint:
            1. You may want to use nn.AdaptiveAvgPool2d;
        """
        
        self.in_planes = in_planes
        self.relu_type = relu_type
        self.downsample_block = downsample_basic_block

        super(ResNet, self).__init__()
        
        """
        Your code here.
        """

        # Initialise modules
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def _make_layer(self, block, planes, blocks, stride=1):
        """
        Args:
            block  (nn.Module): Basic block to use;
            planes (int): The number of output channels;
            blocks (int): How many Basic blocks to repeat;
            stride (int, default=1): stride.
            
        Return:
            torch.nn.modules.container.Sequential
            
        _make_layer method creates `blocks` copies of the `block`.
        The first `block` has `self.in_planes` input channels
        and `planes` output channels;
        other `blocks-1` block have the same number of
        input and output channels, namely `planes`.
        
        Hints:
            1. Do not forget to use downsample block.
                When do you need to use it?
            2. Use a list `layers` to store a list of 
                required blocks;
            3. Once the layer is created do not forget to
                change the value of `self.inplanes` since
                the number of input channels of the
                next layer is the same as the number of 
                output layers of the current layer.
        """
        
        # Define downsample
        """
        Your code here.
        """
        
        # Define layers
        layers = []
        """
        Your code here.
        """

        return nn.Sequential(*layers)
    
    def downsample_basic_block(self, in_planes, out_planes, stride):
        """
        Args:
            in_planes  (int): the number of input channels;
            out_palnes (int): the number of output channels;
            stride     (int): stride.

        Return:
            Downsample block comprises two layers:
            1. conv1x1(inplanes, outplanes, stride),
            2. Batch normalisation block with `outplanes` features.
        Hint:
            Use nn.Sequential
        """

        """ 
        Your code here. 
        """

    def make_input_layer(self, in_channels, out_channels, relu_type):
        """
        Args:
            in_channles  (int): the number if input channels;
            out_channels (int): the number of output channels;
            relu_type    (str): type of an activation function.
            
        Return:
            A sequence of layers:
            1. 2D convolution layer with `in_channels` input channels;
                `out_channels` output channels, `kernel_size` 7,
                `stride` 2, `bias` False. 
                * What `padding` value should
                one use in order to have the same spatial size of applying
                this convolutional layer? *
            2. Batch normalisation layer with out_channels features;
            3. Activation function;
            4. Maximum pooling with `kernel_size` 3, `stride` 2,
                `dilation` 1, `ceil_mode` False.
                * Calculate required value for `padding`. *
        """
        
        """
        Your code here.
        """

    def forward(self, x):
        """
        Your code here.
        """

In [None]:
def calc_accuracy(trues, logits):
    preds = np.argmax(logits, axis=1)
    return (trues == preds).mean()
  
def train_epoch(net, dl, criterion, optimizer, device='cuda'):
    net.train()
    losses = list()
    for batch in dl:
        images, trues = batch

        images = images.to(device)
        trues = trues.to(device)

        logits = net(images)  

        loss = criterion(logits, trues)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        losses.append(loss.item())
    return losses


def inference_dl(net, dl, device):
    net.eval()
    all_trues = list()
    all_logits = list()
    with torch.no_grad():
    for batch in dl:    
        images, trues = batch
        images = images.to(device)
        logits = net(images)

        all_trues.append(trues)
        all_logits.append(logits)

    all_trues = torch.cat(all_trues)
    all_logits = torch.cat(all_logits)

    return all_trues, all_logits

In [None]:
batch_size = 128
num_workers = 16
lr = 1e-3
num_epochs = 20
device = 'cuda'

In [None]:
# resnet = timm.create_model("resnet18")

all_transforms = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
    ])

train_ds = datasets.CIFAR100('../data', train=True, download=True, transform=all_transforms)
test_ds = datasets.CIFAR100('../data', train=False, download=True, transform=all_transforms)

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers)

relu_type = "prelu"
net = ResNet(BasicBlock, [2, 2, 2, 2], relu_type=relu_type)
net.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=lr)

In [None]:
valid_accuracy = list()
train_losses = list()
data = list()

print('Epoch:', -1)
trues, logits = inference_dl(net, test_dl, device)

trues = trues.cpu().numpy()
logits = logits.detach().cpu().numpy()

accuracy = calc_accuracy(trues, logits)
valid_accuracy.append(accuracy)

print('Train loss:', 0)
print('Valid accuracy:', accuracy)

data.append((trues, logits))


for epoch in range(num_epochs):

    print('Epoch:', epoch)

    train_eposc_losses = train_epoch(
        net, 
        train_dl, 
        criterion=criterion, 
        optimizer=optimizer,
        device=device
    )
    train_losses += train_eposc_losses

    trues, logits = inference_dl(net, test_dl, device)

    trues = trues.cpu().numpy()
    logits = logits.detach().cpu().numpy()

    accuracy = calc_accuracy(trues, logits)
    valid_accuracy.append(accuracy)

    print('Train loss:', np.mean(train_eposc_losses))
    print('Valid accuracy:', accuracy)

    data.append((trues, logits))

In [None]:
plt.figure(figsize=(30,5 ))
plt.plot(train_losses)
plt.grid()
plt.show()

plt.figure(figsize=(30,5 ))
plt.plot(valid_accuracy)
plt.grid()
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

trues = data[-1][0]
preds = np.argmax(data[-1][1], axis=1)

cn = confusion_matrix(trues, preds)

fig, ax = plt.subplots(figsize=(10, 10))
ConfusionMatrixDisplay(cn).plot(ax=ax)
plt.show()


In [None]:
logits = data[-1][1]
trues = data[-1][0]

plt.figure(figsize=(20, 10))
plt.hist(logits.flatten(), bins=100)
plt.grid()
plt.show()

In [None]:
pos = list()
neg = list()
for i in range(logits.shape[0]):
    pos.append(logits[i, trues[i]])
    mask = np.array([j != trues[i] for j in range(10)])
    neg += list(logits[i][mask])

plt.figure(figsize=(20, 10))
plt.hist(pos, bins=100, alpha=0.5, color='green')
plt.hist(neg, bins=100, alpha=0.5, color='red')
plt.grid()
plt.show()

In [None]:
from sklearn.metrics import roc_auc_score
from sklearn.metrics import average_precision_score, precision_recall_curve


def to_binary(trues, logits, class_num):
    bin_trues = trues == class_num
    bin_logits = logits[:, class_num]
    return bin_trues, bin_logits

def remove_part(trues, logits, fraq=0.5):
    neg_mask = trues == 0
    neg_trues = trues[neg_mask]
    neg_logits = logits[neg_mask]

    pos_mask = (trues == 1) & (np.random.random(size=trues.shape) < fraq)
    pos_trues = trues[pos_mask]
    pos_logits = logits[pos_mask]

    trues = np.concatenate([neg_trues, pos_trues])
    logits = np.concatenate([neg_logits, pos_logits], axis=0)

    return trues, logits

def binary_accuracy(trues, logits, threshold=0.0):
    preds = logits > threshold
    return (trues == preds).mean()

def roc_auc(trues, logits):
    return roc_auc_score(trues, logits)

def ap(trues, logits):
    return average_precision_score(trues, logits)


def find_threshold_f1(trues, logits, eps=1e-9):
    precision, recall, thresholds = precision_recall_curve(trues, logits)
    f1_scores = 2 * precision * recall / (precision + recall + eps)
    threshold = float(thresholds[np.argmax(f1_scores)])  
    return threshold



def calc_metrics(trues, logits):

    class_num = 2
    bin_trues, bin_logits = to_binary(trues, logits, class_num)

    bin_trues, bin_logits = remove_part(bin_trues, bin_logits, fraq=0.1)

    threshold = find_threshold_f1(bin_trues, bin_logits, eps=1e-9)
    print('Best f1 threshold:', threshold)

    result = dict()
    result['accuracy'] = calc_accuracy(trues, logits)
    result[f'acc_{class_num}_th_0'] = binary_accuracy(bin_trues, bin_logits, threshold=0.0)
    result[f'acc_{class_num}_th_opt'] = binary_accuracy(bin_trues, bin_logits, threshold=threshold)
    result[f'rocauc_{class_num}'] = roc_auc(bin_trues, bin_logits)
    result[f'ap_{class_num}'] = ap(bin_trues, bin_logits)

    #result.update(classification_metrics(bin_trues, bin_logits > threshold))

    return result

In [None]:
all_metrics = list()
for i in range(num_epochs):
    trues, logits = data[i]
    metrics = calc_metrics(trues, logits)
    all_metrics.append(metrics)

all_metrics = pd.DataFrame(all_metrics).reset_index(drop=True)

all_metrics.plot(figsize=(30,5))
plt.grid()
plt.show()

In [None]:
all_metrics

In [None]:
def classification_metrics(trues, preds, eps=1e-9):   
    
    trues = np.array(trues) > 0.5
    preds = np.array(preds) > 0.5
           
    tp = (trues & preds).sum()
    tn = (~trues & ~preds).sum()
    fp = (~trues & preds).sum()
    fn = (trues & ~preds).sum()
    
    result = dict()
    result['tp'] = tp
    result['tn'] = tn
    result['fp'] = fp
    result['fn'] = fn
    
    result['accuracy'] = (trues == preds).mean()
    
    result['recall'] = tp / (tp + fn + eps)
    result['precision'] = tp / (tp + fp + eps)
    result['sensitivity'] = tp / (tp + fn + eps)
    result['specificity'] = tn / (tn + fp + eps)
    result['ppv'] = tp / (tp + fp + eps)
    result['npv'] = tn / (tn + fn + eps)
    
    result['f1'] = 2 * result['recall'] * result['precision'] / (result['recall'] + result['precision'] + eps)
    result['ss'] = 2 * result['sensitivity'] * result['specificity'] / (result['sensitivity'] + result['specificity'] + eps)
    
    result['trues_sum']  = int(trues.sum())
    result['trues_percent'] = result['trues_sum'] / trues.shape[0]
    
    result['preds_sum'] = int(preds.sum())
    result['preds_percent'] = result['preds_sum'] / preds.shape[0]
    
    result['count'] = trues.shape[0]
    return result

In [None]:
all_metrics