**Seminar Deep Learning - AlexNet - Friedrch, Leon**

# Introduction



This notebook allows you to test an implementation of the well known 'AlexNet', look at the learned kernels and see what effect these kernels have on the input images. 

As input data the images of the 'Sweaty' were used. The dataset consist of roughly 5000 training and 1200 test images in one of the following eight categories: \\
0: Ball \\
1: Post \\
2: Obstacle \\
3: L-line \\
4: X-Line \\
5: T-Line \\
6: 11m-point \\
7: Foot \\
The dataset can be found in the moodle course 'Semiar-Deep-Learning-WS19'.


# Setup

Imports

In [0]:
import torch 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils import data
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms

import os

import numpy as np

import matplotlib.pyplot as plt
import matplotlib
from PIL import Image

Mount google drive

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Copy and unzip data \\

For this to work, the data needs to be stored in your google drive in a folder called 'Daten_zip'. Copying one 'zip' file turned out to be much faster than copying multiple images. 

In [0]:
# Change the path to the location of your data.
!cp -r "/content/gdrive/My Drive/Daten_zip/" ./zip_img 
!unzip zip_img/magmaDataSet.zip -d zip_img/input_data

Set directories

Here some constants are used to describe paths to the data and to folders where the model and images are saved.

In [0]:
# modify this to point to your data directory
DATA_DIR = 'zip_img'
INPUT_DIR = DATA_DIR + '/input_data'
TRAIN_IMG_DIR = INPUT_DIR + '/train'
TEST_IMG_DIR = INPUT_DIR + '/test'
OUTPUT_DIR = DATA_DIR + '/output'
CHECKPOINT_DIR = OUTPUT_DIR + '/models'  # model checkpoints
SAVE_IMAGE_DIR = OUTPUT_DIR + '/images'  # wrongly classified images

# make checkpoint path directory
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
os.makedirs(SAVE_IMAGE_DIR, exist_ok=True)

Define hyperparameters 

In [0]:
# define pytorch device 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# define model parameters
NUM_EPOCHS = 20 # original paper: 90
BATCH_SIZE = 128
MOMENTUM = 0.9
LR_DECAY = 0.0005
LR_INIT = 0.01
IMAGE_DIM = 96 # pixels (original paper: 227)
NUM_CLASSES = 8 # original paper: 1000 classes for imagenet 2012 dataset
DEVICE_IDS = [0]  # GPUs to use

# Implementations of alexnet

There can be found two different versions of the 'AlexNet' below. The first one called AlexNet was build according to the paper 'ImageNet Classification with Deep Convolutional Neural Networks'. The second one has the same parameters as the pytorch version of the 'AlexNet'.  
These two implementations have the same overall structure but have slightly different parameters in some layers.
The pytorch version additionally offers a pretrained version of the network, which can be used, as well.

For visualization purposes the networks are able to pass the input data only through a defined number of convolutional layers. To do so, an additional parameter is passed to the 'forward()' specifying the desired layer. The default value is '0' meaning all layers are used.

In [0]:
class AlexNet(nn.Module):
    """
    Neural network model consisting of layers propsed by AlexNet paper.
    """
    def __init__(self, num_classes=8):
        """
        Define and allocate layers for this neural net.
        Args:
            num_classes (int): number of classes to predict with this model
        """
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4, padding=0),
            nn.ReLU(),
            nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(96, 256, 5, padding=2),
            nn.ReLU(),
            nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(256, 384, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(384, 384, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(384, 256, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        # classifier is just a name for linear layers
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5, inplace=True),
            nn.Linear(in_features=(256 * 1 * 1), out_features=4096),
            nn.ReLU(),
            nn.Dropout(p=0.5, inplace=True),
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(),
            nn.Linear(in_features=4096, out_features=num_classes),
        )
        self.init_bias()  # initialize bias

    def init_bias(self):
        for layer in self.features:
            if isinstance(layer, nn.Conv2d):
                nn.init.normal_(layer.weight, mean=0, std=0.01)
                nn.init.constant_(layer.bias, 0)
        # original paper = 1 for Conv2d layers (2nd, 4th, and 5th conv layer), but leads to worse results in my case.
        #nn.init.constant_(self.features[4].bias, 1)
        #nn.init.constant_(self.features[10].bias, 1)
        #nn.init.constant_(self.features[12].bias, 1)

    def forward(self, x, layer = 0):
        """
        Pass the input through the net.
        Args:
            x (Tensor): input tensor
            layer (int): number of convolutional layers, the input is passed through. Used for visualization.
        Returns:
            output (Tensor): output tensor
        """
        x = self.features[0](x) # nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4, padding=0)
        if layer == 1:
            return x
        x = self.features[1](x) # nn.ReLU()
        x = self.features[2](x) # nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2)
        x = self.features[3](x) # nn.MaxPool2d(kernel_size=3, stride=2)
        x = self.features[4](x) # nn.Conv2d(96, 256, 5, padding=2)
        if layer == 2:
            return x
        x = self.features[5](x) # nn.ReLU()
        x = self.features[6](x) # nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2)
        x = self.features[7](x) # nn.MaxPool2d(kernel_size=3, stride=2)
        x = self.features[8](x) # nn.Conv2d(256, 384, 3, padding=1)
        if layer == 3:
            return x
        x = self.features[9](x) # nn.ReLU()
        x = self.features[10](x) # nn.Conv2d(384, 384, 3, padding=1)
        if layer == 4:
            return x
        x = self.features[11](x) # nn.ReLU()
        x = self.features[12](x) # nn.Conv2d(384, 256, 3, padding=1)
        if layer == 5:
            return x
        x = self.features[13](x) # nn.ReLU()
        x = self.features[14](x) # nn.MaxPool2d(kernel_size=3, stride=2)

        x = x.view(-1, 256 * 1 * 1)  # reduce the dimensions for linear layer input

        return self.classifier(x)

In [0]:
class AlexNetPytorch(nn.Module):
    """
    Neural network model consisting of layers according to the pytorch alexnet. 
    """
    def __init__(self, num_classes=8):
        """
        Define and allocate layers for this neural net.
        Args:
            num_classes (int): number of classes to predict with this model
        """
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(),
            nn.Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False),
        )
        # classifier is just a name for linear layers
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=256*2*2, out_features=4096, bias=True),
            nn.ReLU(),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=4096, out_features=4096, bias=True),
            nn.ReLU(),
            nn.Linear(in_features=4096, out_features=num_classes, bias=True),
        )

    def forward(self, x, layer = 0):
        """
        Pass the input through the net.
        Args:
            x (Tensor): input tensor
            layer (int): number of convolutional layers, the input is passed through. Used for visualization.
        Returns:
            output (Tensor): output tensor
        """
        x = self.features[0](x) # nn.Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
        if layer == 1:
            return x
        x = self.features[1](x) # nn.ReLU()
        x = self.features[2](x) # nn.MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
        x = self.features[3](x) # nn.Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
        if layer == 2:
            return x
        x = self.features[4](x) # nn.ReLU()
        x = self.features[5](x) # nn.MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
        x = self.features[6](x) # nn.Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        if layer == 3:
            return x
        x = self.features[7](x) # nn.ReLU()
        x = self.features[8](x) # nn.Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        if layer == 4:
            return x
        x = self.features[9](x) # nn.ReLU()
        x = self.features[10](x) # nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        if layer == 5:
            return x
        x = self.features[11](x) # nn.ReLU()
        x = self.features[12](x) # nn.MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)

        x = x.view(-1, 256 * 2 * 2)  # reduce the dimensions for linear layer input

        return self.classifier(x)

# Select alexnet version

In this section you can choose the version you want to use. Add a comment to the one you do not want to use. 


In [0]:
alexnet = AlexNet(num_classes=NUM_CLASSES).to(device)
#alexnet = AlexNetPytorch(num_classes=NUM_CLASSES).to()device

**Use pretrained Model**

Remove the comment, if you want to use the pretrained pytorch version. 

A pretrained model is downloaded, the number of weights in the layers is adapted and afterwards the pretrained weights are copied to a new instance of 'AlexNetPytorch'

In [0]:
#model = models.alexnet(pretrained=True).to(device)
#model.classifier[1] = nn.Linear(model.classifier[1].in_features, 256 * 2 * 2).to(device)
#model.classifier[1] = nn.Linear(model.classifier[1].out_features, model.classifier[4].in_features).to(device)
#model.classifier[6] = nn.Linear(model.classifier[6].in_features, NUM_CLASSES).to(device)
#torch.save(model.state_dict(), 'pretrained_model')
#alexnet = AlexNetPytorch(num_classes=NUM_CLASSES)
#alexnet.load_state_dict(torch.load('pretrained_model'))
#alexnet.to(device)
#!rm pretrained_model

# Train selected net

In [0]:
if __name__ == '__main__':
    # print the seed value
    seed = torch.initial_seed()
    print('Used seed : {}'.format(seed))

    print(alexnet)

    # create dataset and data loader for train images
    dataset = datasets.ImageFolder(TRAIN_IMG_DIR, transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]))
    dataloader = data.DataLoader(
        dataset,
        shuffle=True,
        pin_memory=True,
        num_workers=8,
        drop_last=True,
        batch_size=BATCH_SIZE)

    # create dataset and data loader for test images
    dataset_test = datasets.ImageFolder(TEST_IMG_DIR, transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]))
    dataloader_test = data.DataLoader(
        dataset_test,
        shuffle=False,
        pin_memory=True,
        num_workers=8,
        drop_last=True,
        batch_size=BATCH_SIZE)

    print('Datasets and dataloaders created.')
        

    # create optimizer
    optimizer = optim.Adam(params=alexnet.parameters(), lr=0.0001)
    ### BELOW is the setting proposed by the original paper - which doesn't train...
    #optimizer = optim.SGD(
        #params=alexnet.parameters(),
        #lr=LR_INIT,
        #momentum=MOMENTUM,
        #weight_decay=LR_DECAY)
    print('Optimizer created')

    # multiply LR by 1 / 10 after every 30 epochs
    lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)
    print('LR Scheduler created')

    # start training
    print('Starting training...')
    total_steps = 1
    loss_values = []
    accuracy_values = []
    loss_test = []
    accuracy_test = []
    for epoch in range(NUM_EPOCHS):
        correct_predictions = 0
        steps = 0
        for imgs, classes in dataloader:
            imgs, classes = imgs.to(device), classes.to(device)

            # calculate the loss
            output = alexnet(imgs)
            loss = F.cross_entropy(output, classes)
            loss_values.append(loss.item())
            _, preds = torch.max(output, 1)
            temp_accuracy = 100 * (torch.sum(preds == classes).item() / classes.shape[0] )
            accuracy_values.append(temp_accuracy)
            correct_predictions += torch.sum(preds == classes).item()

            # update the parameters
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_steps += 1
            steps += imgs.shape[0]


# Calculation of test loss and accuracy not possible, since cuda memory is not enough. Runtime error: cuda out of memory
        #for imgs_test, classes_test in dataloader_test:
            #imgs_test, classes_test = imgs_test.to(device), classes_test.to(device)

            ## calculate the loss
            #output_test = alexnet(imgs_test)
            #loss_test_temp = F.cross_entropy(output_test, classes_test)
            #loss_test.append(loss_test_temp.item())
            #_, preds_test = torch.max(output_test, 1)
            #temp_accuracy = 100 * (torch.sum(preds_test == classes_test).item() / classes_test.shape[0] )
            #accuracy_test.append(temp_accuracy)

        # print the epoch, accuracy and loss

        loss = F.cross_entropy(output, classes)
        acc = 100 * correct_predictions / steps
        print('epoch: {} \t loss: {:.2f} \t accuracy: {:.2f}'.format(epoch, loss.item(), acc))

        # save checkpoint every 20 epochs
        if ( epoch % 20 == 0):
            checkpoint_path = os.path.join(CHECKPOINT_DIR, 'alexnet_states_e{}.pkl'.format(epoch + 1))
            state = {
                'epoch': epoch,
                'total_steps': total_steps,
                'optimizer': optimizer.state_dict(),
                'model': alexnet.state_dict(),
                'seed': seed,
            }
            torch.save(state, checkpoint_path)

        lr_scheduler.step()
    checkpoint_path = os.path.join(CHECKPOINT_DIR + '/trained_model.pkl')
    state = {
        'epoch': epoch,
        'total_steps': total_steps,
        'optimizer': optimizer.state_dict(),
        'model': alexnet.state_dict(),
        'seed': seed,
    }
    torch.save(state, checkpoint_path)

Plot accuracy and loss for test data

In [0]:
fig_acc = plt.figure()
plt.plot(np.arange(len(accuracy_values)), accuracy_values)
ax_acc = fig_acc.add_subplot()
ax_acc.set_title('Accuracy')
ax_acc.set_xlabel('Steps')

fig_loss = plt.figure()
plt.plot(np.arange(len(loss_values)), loss_values)
ax_loss = fig_loss.add_subplot()
ax_loss.set_title('Loss')
ax_loss.set_xlabel('Steps')

# Calculate test error

Since the calculation and plotting of the test accuracy and loss are not possible due to a cuda memory issue, the test accuracy is calculated below.

In [0]:
dataset = datasets.ImageFolder(TEST_IMG_DIR, transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
]))
dataset_show = datasets.ImageFolder(TEST_IMG_DIR, transforms.Compose([
    transforms.ToTensor(),
]))
print('Dataset created')
dataloader = data.DataLoader(
    dataset,
    shuffle=False,
    pin_memory=True,
    num_workers=8,
    drop_last=True,
    batch_size=1
)
dataloader_show = data.DataLoader(
    dataset_show,
    shuffle=False,
    pin_memory=True,
    num_workers=8,
    drop_last=True,
    batch_size=1
)

images = []
labels = []
images_show = []
labels_show = []

for i, l in dataloader:
    images.append(i.to(device))
    labels.append(l.to(device))

for i, l in dataloader_show:
    images_show.append(i.to(device))
    labels_show.append(l.to(device))


In [0]:
errors = []
wrong_pred = []
right_pred = []
wrong_imgs = []
for im in range(len(images)):
    #output = alexnet(images[im])
    output = alexnet(images[im])
    if output.argmax() != labels[im]:
        wrong_pred.append(output.argmax())
        right_pred.append(labels_show[im])
        errors.append(im)
        wrong_imgs.append(images_show[im])

error = 100 * len(wrong_imgs) / len(images_show)
print('Test error: {:.2f}'.format(error))

# Plot error images

To learn something about the problems the network had when learning the classification, it can be interessting to have a look at the wrongly classified images. These are plotted below and saved in the specified output folder (SAVE_IMAGE_DIR).
The images are labeled with the wrong prediction (w) and the ground truth (r).

In [0]:
ncols = 20;
nrows = (len(wrong_imgs) // ncols) + 1
count = 0;
label_names = {0:'ball', 1:'post', 2:'obstacle', 3:'L-line', 4:'X-line', 5:'T-line', 6:'11m-point', 7:'roboter_foot'}
for idx in range(len(wrong_imgs)):
    count += 1
    fig1 = plt.figure()
    ax1 = plt.subplot()
    ax1.imshow(wrong_imgs[idx].cpu().squeeze().permute(1, 2, 0))
    filename = 'w_{}_r_{}'.format(label_names[wrong_pred[idx].item()], label_names[right_pred[idx].item()])
    fig1.savefig('{}/{}_{}.png'.format(SAVE_IMAGE_DIR, filename, count))
    ax1.set_title('{}.\nw: {} \nr: {}'.format(count, label_names[wrong_pred[idx].item()], label_names[right_pred[idx].item()]))

**Copy written images to google drive**

Run this block if you want to copy the wrongly classified images to your google drive.



In [0]:
!zip output_imgs.zip zip_img/output/images/*.png
!cp output_imgs.zip /content/gdrive/My\ Drive/Daten/output

# Visualize learned kernels

In this section the learned kernels of the network can be visualized using the method plot_weights().

This method allows you to plot the kernels of a specified network and specified convolutional layer. If the chosen layer has three input channels, these three channels can be plotted as rgb image (single_channel = False).
If the number of kernels in the specified layer exceeds 500, only a subset of filters is plotted.

In [0]:
def plot_filters_multi_channel(t):
    
    #get the number of kernals
    num_kernels = t.shape[0]    
    
    #define number of columns for subplots
    num_cols = 12
    #rows = num of kernels
    num_rows = num_kernels
    
    #set the figure size
    fig = plt.figure(figsize=(num_cols,num_rows))
    
    #looping through all the kernels
    for i in range(t.shape[0]):
        ax1 = fig.add_subplot(num_rows,num_cols,i+1)
        
        #for each kernel, we convert the tensor to numpy 
        npimg = np.array(t[i].numpy(), np.float32)
        #standardize the numpy image
        npimg = (npimg - np.mean(npimg)) / np.std(npimg)
        npimg = np.minimum(1, np.maximum(0, (npimg + 0.5)))
        npimg = npimg.transpose((1, 2, 0))
        ax1.imshow(npimg)
        ax1.axis('off')
        ax1.set_title(str(i))
        ax1.set_xticklabels([])
        ax1.set_yticklabels([])
        
    plt.savefig('myimage.png', dpi=100)    
    plt.tight_layout()
    plt.show()

In [0]:
def plot_filters_single_channel(t):
    
    #kernels depth * number of kernels
    nplots = t.shape[0]*t.shape[1]
    ncols = 12
    
    nrows = 1 + nplots//ncols
    #convert tensor to numpy image
    npimg = np.array(t.numpy(), np.float32)
    
    count = 0
    fig = plt.figure(figsize=(ncols, nrows))

    #looping through all the kernels in each channel
    for i in range(t.shape[0]):
        for j in range(t.shape[1]):
            count += 1
            ax1 = fig.add_subplot(nrows, ncols, count)
            npimg = np.array(t[i, j].numpy(), np.float32)
            npimg = (npimg - np.mean(npimg)) / np.std(npimg)
            npimg = np.minimum(1, np.maximum(0, (npimg + 0.5)))
            ax1.imshow(npimg)
            ax1.set_title(str(i) + ',' + str(j))
            ax1.axis('off')
            ax1.set_xticklabels([])
            ax1.set_yticklabels([])
   
    plt.tight_layout()
    plt.show()

In [0]:
def plot_weights(model, layer_num, single_channel = True):
  
  #extracting the model features at the particular layer number
  layer = model.features[layer_num]
  
  #checking whether the layer is convolution layer or not 
  if isinstance(layer, nn.Conv2d):
    #getting the weight tensor data
    weight_tensor = layer.weight.data.cpu()

    if single_channel:
        if ( (weight_tensor.shape[0] * weight_tensor.shape[1]) > 500):
            print(weight_tensor.shape)
            weight_tensor = weight_tensor[0:100, 0:2, :, :]
        plot_filters_single_channel(weight_tensor)
    
    else:
      if weight_tensor.shape[1] == 3:
        plot_filters_multi_channel(weight_tensor)
      else:
        print("Can only plot weights with three channels with single channel = False")
        
  else:
    print("Can only visualize layers which are convolutional")

In [0]:
# visualize weights for alexnet. The second parameter is the absolut number of the layer.
# For an instance of the class AlexNet() the convolutional layers have the numbers 0, 4, 8, 10 and 12.

plot_weights(alexnet, 0, single_channel = True)


# Visualize filtered images

To see what effect a specific filter has on an input image, you can plot the result of a selected layer for a selected input image below.

Select the index of the image to select one of the test images (1230 images).
Select the layer according 

In [0]:
selected_image_idx = 100
selected_layer = 0

if isinstance(alexnet.features[selected_layer], nn.Conv2d):
    output_for_layer = alexnet(images[selected_image_idx], selected_layer + 1)
    print(output_for_layer.shape)
    
    ax = plt.subplot()
    ax.imshow(images_show[selected_image_idx].cpu().squeeze().permute(1, 2, 0))
    ax.set_title('Original image')
    for k in range(output_for_layer.shape[1]):
        fig = plt.figure()
        ax1 = plt.subplot()
        image = output_for_layer[0][k]
        image = image.data.cpu()
        ax1.imshow(image)
        ax1.set_title('kernel: {}'.format(k))

else :
    print('Selected layer is not a convolutional layer')