# Implementation of the ResNet as sole network 
Self implemented based on the tutorials about ResNets

The here trained ResNet has 110 layers --> ResNet-54

Goal: comparison of single RNet with Blockdrop in testing AND TRAINING

In [1]:
import tqdm
import torchvision.datasets as datasets
import torch.utils.data as torchdata
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import numpy as np
from timeit import default_timer as timer

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)
    

# Model of ResNet

In [2]:
# self reimplemted based on ResNet tutorial: https://pytorch-tutorial.readthedocs.io/en/latest/tutorial/chapter03_intermediate/3_2_2_cnn_resnet_cifar10/
# 3x3 convolution: 
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)

# Residual block consists of following steps:
# 1. 3x3 conv 2. Batch Normalization 3. ReLU 4. 3x3 Convolution 5. Batch Normalization 
# at the end, addition of the identity function (= skipping function = input of layer)
class ResidualBlock(nn.Module):
    def __init__(self, num_in, num_out, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(num_in, num_out, stride)
        self.bn1 = nn.BatchNorm2d(num_out)
        self.activation = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(num_out, num_out)
        self.bn2 = nn.BatchNorm2d(num_out)
        self.downsample = downsample

    def forward(self, x):
        residual = x # to get input of layer for identity function
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.activation(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.downsample:
            residual = self.downsample(residual)
        x += residual
        x = self.activation(x)
        return x

# ResNet consists of following steps:
# 1. normal convolutional layer (input, conv, batch_norm, relu) 
# 2. x Residual blocks (parted in three units for downsampling) 
# 3. avg. pooling 
# 4. fully connected layer: gives back the final output
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv3x3(3, 16)
        self.bn = nn.BatchNorm2d(16)
        self.activation = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0])
        self.layer2 = self.make_layer(block, 32, layers[1], 2)
        self.layer3 = self.make_layer(block, 64, layers[2], 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.activation(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


# Training of ResNet
Procedure based on PyTorch tutorial by Thorben

Steps:
1. Move input data to device 
2. Run the model on the input data
3. Calculate the loss
4. Perform backpropagation
5. Update the parameters

In [3]:
def train_resnet(model, optimizer, data_loader, loss_module, num_epochs=1):
    model.train()
    for epoch in range(num_epochs):
        print("Epoch ", epoch)
        for batch_idx, (data_inputs, data_labels) in tqdm.tqdm(enumerate(data_loader), total=len(data_loader)):
            ## Step 1: Move input data to device
            data_inputs = data_inputs.to(device)
            data_labels = data_labels.to(device, non_blocking=True)

            ## Step 2: Run the model on the input data
            preds = model(data_inputs)
            preds = preds.squeeze(dim=1)

            ## Step 3: Calculate the loss
            loss = loss_module(preds, data_labels)

            ## Step 4: Perform backpropagation
            optimizer.zero_grad()
            loss.backward()

            ## Step 5: Update the parameters
            optimizer.step()


# Evaluation metric

In [4]:
## compute accuracy
def get_accuracy(preds, target, batch_size):
    corrects = (torch.max(preds, 1)[1].view(target.size()).data == target.data).sum()
    accuracy = 100.0 * corrects/batch_size
    return accuracy.item()

# Testing
For each batch:
    1. execute model on batch 
    2. evalulate accuravy

In [5]:
def test_resnet(model, data_loader):
    acc = 0.
    with torch.no_grad():
        for batch_id, (data_inputs, data_labels) in tqdm.tqdm(enumerate(data_loader), total=len(data_loader)):
            data_inputs, data_labels = data_inputs.to(device), data_labels.to(device)
            preds = model(data_inputs)
            acc += get_accuracy(preds, data_labels, len(data_labels))
    accuracy = acc/batch_id
    print(f"Accuracy of the model: {acc/batch_id:4.2f}%")


# Combining all components

1. Create model
2. Get and transform data
3. Train model
4. Test model
5. Save model

In [None]:
root='data/'
## Step 1: Create model
layer_config = [18, 18, 18] #110 layer resnet
model_name = "110_layer"
batch_size = 256
#layer_config = [5, 5, 5] #for 32 layer Resnet
model = ResNet(ResidualBlock, layer_config)
model.to(device)


## Step 2: Data
#image preprocessing; used for training
transform = transforms.Compose([
    transforms.Pad(4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32),
    transforms.ToTensor()])

trainset = datasets.CIFAR10(root=root, train=True, download=False, transform= transform)#transform_train)
testset = datasets.CIFAR10(root=root, train=False, download=False, transform=transforms.ToTensor())#transform_test)


num_workers = 4 if torch.device("cuda") else 1

trainloader = torchdata.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers) #50000 images
testloader = torchdata.DataLoader(testset, batch_size=1, shuffle=False, num_workers=num_workers) #10000 images

## Step 3: Training
# parameters:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) #lr parameter as proposed in BlockDrop implementation
loss = nn.CrossEntropyLoss()
num_epochs = 100

start_training_single = timer()
train_resnet(model, optimizer, trainloader, loss, num_epochs)
end_training_single = timer()
training_time_single = end_training_single -start_training_single

## Step 4: Testing
model.eval()
start_testing_single = timer()
test_resnet(model, testloader)
end_testing_single = timer()
## Step 5: Save model
state_dict = model.state_dict()
torch.save(state_dict, 'cv/trained_rnet/'+ model_name + '/Epoch_%d.t7'%(num_epochs))
testing_time_single = end_testing_single - start_testing_single


"Training in s: %.2f, Testing per image in ms: %.2f"%(training_time_single, 1000*testing_time_single/len(testset))