In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np

CIFAR10 with ResNet-18 Architecture
<br>
By: Danny Ryngler 12/4/20

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [14]:
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)

In [15]:
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100.0%

Extracting ./data/cifar-10-python.tar.gz to ./data


In [16]:
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

Files already downloaded and verified


In [18]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)

In [19]:
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

In [20]:
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

### The identity block 

<img src="images/IdentityBlock.png" style="width:375px;height:550px;">
<caption><center> <u> <font color='purple'>Figure 1:</u><font color='purple'> Identity block. Skip connection "skips over" 2 layers.</center></caption>

In [26]:
class IdentityBlock(nn.Module):
    """"
    Implementation of the identity block (with 3x3 convolutions) as defined in Figure 1

    Arguments:
    channels -- List, specifying the number of the channels for each of the 3 filters. ex [64,64,64]
    strides -- Integer, specifying the stride to be used. Default = 1

    Returns:
    X -- output of the identity block, tensor of shape (m, n_C, n_H, n_W)
    """

    def __init__(self, input_channels, output_channels, strides=1):
        super(IdentityBlock, self).__init__()

        # Main 1
        self.conv1 = nn.Conv2d(input_channels, output_channels, kernel_size=3, padding=1, stride=strides)
        self.bn1 = nn.BatchNorm2d(output_channels)

        # Main 2
        self.conv2 = nn.Conv2d(output_channels, output_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(output_channels)

    def forward(self, X):
        X_shortcut = X

        # Main 1
        X = self.conv1(X)
        X = F.relu(self.bn1(X))

        # Main 2
        X = self.conv2(X)
        X = F.relu(self.bn2(X))

        # Shortcut
        X = X + X_shortcut
        X = F.relu(X)

        return X

## The convolutional block

The ResNet "convolutional block". It is used when the input and output dimensions don't match up. The difference with the identity block is that there is a CONV2D layer in the shortcut path: 

<img src="images/ConvolutionalBlock.png" style="width:500px;height:550px;">
<caption><center> <u> <font color='purple'> Figure 2: </u><font color='purple'> Convolutional block </center></caption>

In [27]:
class ConvolutionBlock(nn.Module):
    """"
    Implementation of the Convolutional block (with 3x3 convolutions) as defined in Figure 2.
    Uses a 1x1 convolution so shortcut and input have the same number of channels.

    Arguments:
    channels -- List, specifying the number of the channels for each of the 3 filters. ex [64,64,128]
    strides -- Integer, specifying the stride to be used. Default = 1

    Returns:
    X -- output of the convolutional block, tensor of shape (m, n_C, n_H, n_W)
    """
    def __init__(self, input_channels, output_channels, strides=1):
        super(ConvolutionBlock, self).__init__()

        # Main 1
        self.conv1 = nn.Conv2d(input_channels, output_channels, kernel_size=3, padding=1, stride=strides)
        self.bn1 = nn.BatchNorm2d(output_channels)

        # Main 2
        self.conv2 = nn.Conv2d(output_channels, output_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(output_channels)

        # Shortcut
        self.conv_shortcut = nn.Conv2d(input_channels, output_channels, kernel_size=1, stride=strides)

    def forward(self, X):
        X_shortcut = X

        # Main 1
        X = self.conv1(X)
        X = F.relu(self.bn1(X))

        # Main 2
        X = self.conv2(X)
        X = F.relu(self.bn2(X))

        # Shortcut
        X_shortcut = self.conv_shortcut(X_shortcut)
        X = X + X_shortcut
        X = F.relu(X)

        return X


## ResNet model (18 layers)

<img src="images/ResNet-18.png" style="width:400px;height:1000px;">
<caption><center> <u> <font color='purple'> Figure 3 </u><font color='purple'>  : ResNet-18 Architecture </center></caption>

In [43]:
class ResNet18(nn.Module):
    """
    Implementation of the ResNet-18 Architecture in Figure 3. Has the following architecture:
    CONV2D -> BATCHNORM -> RELU -> MAXPOOL -> IDBlOCK -> CONVBLOCK -> -> CONVBLOCK
    -> CONVBLOCK -> AVGPOOL -> FC

    Arguments:
    input_shape -- shape of the images of the dataset. ex (1,3,32,32)
    classes -- integer, number of classes

    Returns:
    X -- output of the Residual Network, tensor of shape (m, 10)
    """
    
    # NEED TO FIGURE OUT X
    
    def __init__(self, input_shape, classes):
        super(ResNet18, self).__init__()
        
        # Stage 1
        self.conv_s1 = nn.Conv2d(input_shape[1],64,kernel_size=7,stride=2,padding=3)
        self.bn_s1 = nn.BatchNorm2d(64)
        self.relu_s1 = nn.ReLU()
        self.maxpool_s1 = nn.MaxPool2d(kernel_size=3,stride=2,padding=1)
        
        # Block 1
        self.ResBlock_1 = IdentityBlock(64,64)
        self.ResBlock_1 = IdentityBlock(64,64)
        
        # Block 2
        self.ResBlock_2 = ConvolutionalBlock(64,128)
        self.ResBlock_2 = ConvolutionalBlock(64,128)
        
        # Block 3
        self.ResBlock_3 = ConvolutionalBlock(128,256)
        self.ResBlock_3 = ConvolutionalBlock(128,256)
        
        # Block 4
        self.ResBlock_4 = ConvolutionalBlock(256,512)
        self.ResBlock_4 = ConvolutionalBlock(256,512)
        
        # Avgerage Pooling
        self.avg_pool = nn.AdaptiveAvgPool2d((1,1))
        
        # Fully Contected Layers
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(512,10)
        
    def forward(self, X):
        # Stage 1
        X = self.conv_s1(X)
        X = self.bn_s1(X)
        X = self.relu_s1(X)
        X = self.maxpool_s1(X)
                                            
        # Block 1
        X = self.ResBlock_1(X)

        # Block 2
        X = self.ResBlock_2(X)
        
        # Block 3
        X = self.ResBlock_3(X)

        # Block 4
        X = self.ResBlock_4(X)

        # Average Pooling
        X = self.avg_pool(X)
                                             
        # Fully Conected Layers
        X = self.flatten(X)
        X = self.fc1(X)

        return X

In [44]:
model = ResNet18([-1,3,32,32],10).to(device)

In [45]:
criterion = nn.CrossEntropyLoss()

In [46]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [47]:
num_epochs = 5

In [48]:
n_total_steps = len(train_loader)

In [None]:
costs = []
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        #keep track of costs
        costs.append(loss.item())

        if (i + 1) % 20 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{n_total_steps}], Loss: {loss.item():.4f}')
            
    # plot costs 
    plt.plot(np.squeeze(costs))
    plt.ylabel('cost')
    plt.xlabel('iterations (per hundreds)')
    plt.title("Learning rate =" + str(learning_rate))
    plt.show()

In [None]:
# Accuracy
with torch.no_grad():
    n_correct = 0
    n_samples = 0
    n_class_correct = [0 for i in range(10)]
    n_class_samples = [0 for i in range(10)]
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        # max returns (value ,index)
        _, predicted = torch.max(outputs, 1)
        n_samples += labels.size(0)
        n_correct += (predicted == labels).sum().item()

        for i in range(batch_size):
            label = labels[i]
            pred = predicted[i]
            if (label == pred):
                n_class_correct[label] += 1
            n_class_samples[label] += 1

    acc = 100.0 * n_correct / n_samples
    print(f'Accuracy of the network: {acc} %')

    for i in range(10):
        acc = 100.0 * n_class_correct[i] / n_class_samples[i]
        print(f'Accuracy of {classes[i]}: {acc} %')