# Residual Networks

In [26]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from resnets_utils import *

from torch.utils.data import DataLoader, sampler, TensorDataset

import h5py

## 2 - Building a Residual Network

### 2.1 - The identity block

In [4]:
class identity_block(nn.Module):
    
    def __init__(self, f, filters, in_channels):
        super(identity_block, self).__init__()
        F1, F2, F3 = filters
        
        self.conv2d_1 = nn.Conv2d(in_channels, F1, kernel_size=1, padding="valid")
        self.bn_1 = nn.BatchNorm2d(F1)
        
        # Second component
        self.conv2d_2 = nn.Conv2d(F1, F2, kernel_size=f, padding="same")
        self.bn_2 = nn.BatchNorm2d(F2)
        
        # Third component
        self.conv2d_3 = nn.Conv2d(F2, F3, kernel_size=1, padding="valid")
        self.bn_3 = nn.BatchNorm2d(F3)
        
    def forward(self, x):
        x_copy = x
        
        ## Main Path
        x = F.relu(self.bn_1(self.conv2d_1(x)))
        x = F.relu(self.bn_2(self.conv2d_2(x)))
        x = self.bn_3(self.conv2d_3(x))
        
        ## Combine two paths
        x = F.relu(x + x_copy)
        
        return x

### 2.2 - Convolutional block

In [5]:
class convolutional_block(nn.Module):
    
    def __init__(self, f, filters, in_channels, s=2):
        super(convolutional_block, self).__init__()
        F1, F2, F3 = filters
        
        self.conv2d_1 = nn.Conv2d(in_channels, F1, kernel_size=1, stride=s, padding="valid")
        self.bn_1 = nn.BatchNorm2d(F1)
        
        # Second component
        self.conv2d_2 = nn.Conv2d(F1, F2, kernel_size=f, padding="same")
        self.bn_2 = nn.BatchNorm2d(F2)
        
        # Third component
        self.conv2d_3 = nn.Conv2d(F2, F3, kernel_size=1, padding="valid")
        self.bn_3 = nn.BatchNorm2d(F3)
        
        # Shortcut component
        self.conv2d_shortcut = nn.Conv2d(in_channels, F3, kernel_size=1, stride=s, padding="valid")
        self.bn_shortcut = nn.BatchNorm2d(F3)
        
    def forward(self, x):
        x_copy = x
        
        ## Main Path
        x = F.relu(self.bn_1(self.conv2d_1(x)))
        x = F.relu(self.bn_2(self.conv2d_2(x)))
        x = self.bn_3(self.conv2d_3(x))
        
        ## Shortcut path
        x_copy = self.bn_shortcut(self.conv2d_shortcut(x_copy))
        
        ## Combine two paths
        x = F.relu(x + x_copy)
        
        return x

### 2.3 - ResNet50

In [6]:
num_classes = 6

In [55]:
ResNet50 = nn.Sequential(
    nn.ZeroPad2d(3),
    
    # Stage 1
    nn.Conv2d(3, 64, kernel_size=7, stride=2),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),

    # Stage 2
    convolutional_block(3, [64, 64, 256], in_channels=64, s=1),
    identity_block(3, [64, 64, 256], in_channels=256),
    identity_block(3, [64, 64, 256], in_channels=256),

    # Stage 3
    convolutional_block(3, [128, 128, 512], in_channels=256, s=2),
    identity_block(3, [128, 128, 512], in_channels=512),
    identity_block(3, [128, 128, 512], in_channels=512),
    identity_block(3, [128, 128, 512], in_channels=512),

    # Stage 4
    convolutional_block(3, [256, 256, 1024], in_channels=512, s=2),
    identity_block(3, [256, 256, 1024], in_channels=1024),
    identity_block(3, [256, 256, 1024], in_channels=1024),
    identity_block(3, [256, 256, 1024], in_channels=1024),
    identity_block(3, [256, 256, 1024], in_channels=1024),
    identity_block(3, [256, 256, 1024], in_channels=1024),

    # Stage 5
    convolutional_block(3, [512, 512, 2048], in_channels=1024, s=2),
    identity_block(3, [512, 512, 2048], in_channels=2048),
    identity_block(3, [512, 512, 2048], in_channels=2048),

    # Average Pooling
    nn.AvgPool2d(kernel_size=2, stride=2),

    # Output Layer
    nn.Flatten(),
    nn.Linear(2048, num_classes)
)

In [56]:
params = list(ResNet50.parameters())
print(len(params))

214


In [57]:
# Load datasets
X_train_orig, Y_train, X_test_orig, Y_test, classes = load_dataset()

# Swap axes to make (N, C, H, W)
X_train_orig = np.transpose(X_train_orig, (0, 3, 1, 2))
X_test_orig = np.transpose(X_test_orig, (0, 3, 1, 2))

# Make (N,)
Y_train = Y_train.ravel()
Y_test = Y_test.ravel()

# Normalize values to [0, 1]
X_train = X_train_orig / 255
X_test = X_test_orig / 255

print(f"Number of training exampels: {X_train.shape[0]}")
print(f"Number of test exampels: {X_test.shape[0]}")
print(f"Shape of X_train: {X_train.shape}")
print(f"Shape of Y_train: {Y_train.shape}")
print(f"Shape of X_test: {X_test.shape}")
print(f"Shape of Y_test: {Y_test.shape}")

Number of training exampels: 1080
Number of test exampels: 120
Shape of X_train: (1080, 3, 64, 64)
Shape of Y_train: (1080,)
Shape of X_test: (120, 3, 64, 64)
Shape of Y_test: (120,)


In [63]:
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
Y_train_tensor = torch.tensor(Y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
Y_test_tensor = torch.tensor(Y_test, dtype=torch.long)

train_set = TensorDataset(X_train_tensor, Y_train_tensor)
test_set = TensorDataset(X_test_tensor, Y_test_tensor)

train_loader = DataLoader(train_set, batch_size=4, shuffle=True)
test_loader = DataLoader(test_set, batch_size=4, shuffle=True)

In [64]:
NUM_EPOCH = 10
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(ResNet50.parameters(), lr=1e-4, momentum=0.9)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.7)

In [65]:
def evaluate(model, test_loader):
    num_correct = 0
    num_total = 0
    with torch.no_grad():
        for inputs, targets in test_loader:
            predictions = model(inputs)
            _, y_pred = torch.max(predictions, 1)
            num_correct += (y_pred == targets).sum().item()
            num_total += targets.shape[0]
    
    return num_correct/num_total
    
    
def train(model, loss_fn, optimizer, train_loader, test_loader, epochs=1, verbose=True):
    for epoch in range(epochs):
        running_loss = 0.
        for i, (inputs, targets) in enumerate(train_loader):
            optimizer.zero_grad()
            predictions = model(inputs)
            loss = loss_fn(predictions, targets)
            running_loss += loss.item()
            loss.backward()
            optimizer.step()
        running_loss /= len(train_loader)
    
        if verbose:
            acc = evaluate(model, test_loader)
            print(f"Epoch: {epoch + 1} | Avg Loss: {loss.item()} | Test accuracy: {acc}")
        
        scheduler.step()

In [66]:
train(ResNet50, loss_fn, optimizer, train_loader, test_loader, epochs=NUM_EPOCH)

Epoch: 1 | Avg Loss: 0.7381016612052917 | Test accuracy: 0.5166666666666667
Epoch: 2 | Avg Loss: 0.5188310146331787 | Test accuracy: 0.625
Epoch: 3 | Avg Loss: 0.9953516721725464 | Test accuracy: 0.7583333333333333
Epoch: 4 | Avg Loss: 0.19300423562526703 | Test accuracy: 0.8416666666666667
Epoch: 5 | Avg Loss: 0.22030915319919586 | Test accuracy: 0.875
Epoch: 6 | Avg Loss: 0.801445484161377 | Test accuracy: 0.8583333333333333
Epoch: 7 | Avg Loss: 0.021620137616991997 | Test accuracy: 0.8833333333333333
Epoch: 8 | Avg Loss: 0.021780110895633698 | Test accuracy: 0.8916666666666667
Epoch: 9 | Avg Loss: 1.2491304874420166 | Test accuracy: 0.925
Epoch: 10 | Avg Loss: 0.46594473719596863 | Test accuracy: 0.8916666666666667
