# Computer Vision and Pattern Recognition - Project 3 (CNN classifier)
#### Gaia Marsich [SM3500600]

* [Introduction](#intro)
* [1. Task 1](#1-bullet)
* [2. Task 2](#2-bullet)
* [3. Task 3](#3-bullet)
* [References](#ref)

## Introduction <a class="anchor" id="#intro"></a>

This project requires the implementation of an image classifier based on convolutional neural networks. The provided dataset (from [Lazebnik et al., 2006]), contains 15 categories (office, kitchen, living room, bedroom, store, industrial, tall building, inside city, street, highway, coast, open country, mountain, forest, suburb), and is already divided in training set and test set.

First of all, let's do the imports:

In [1]:
import torch
from torch import nn

import os
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import random_split, DataLoader
import torch.optim as optim
import torch.nn.init as init
import numpy as np
from datetime import datetime

## Task 1 <a class="anchor" id="#1-bullet"></a>

In [4]:
# set a seed for reproducibility
torch.manual_seed(0)

# Build the network

class CNN(nn.Module):

    # A model will have an __init__() function, where it instantiates its layers

    def __init__(self):
        super(CNN, self).__init__() # the constructor of the parent class (nn.Module) is called to initialize the model properly.

        # Convolutional layer 1:
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3, stride=1) # from [1] we get the formula: output = ((input - kernel_size + 2*padding)/stride) + 1 => 62*62
        # ReLU activation after conv1
        self.relu1 = nn.ReLU() # output: 62*62 #TODO OK
        # Max pooling layer 1
        self.maxpool1 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 31*31 (from 62/2)

        # Convolutional layer 2
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=1) # from [1] we know that output: 29*29
        # ReLU activation after conv2
        self.relu2 = nn.ReLU() # output: 29*29 #TODO OK
        # Max pooling layer 2
        self.maxpool2 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 14*14 (from the test in dim_images.ipynb)

        # Convolutional layer 3
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1) # from [1] we know that output: 12*12
        # ReLU activation after conv3  
        self.relu3 = nn.ReLU() # output: 12*12

        # Fully connected layer. 32: number of channels; 12, 12: height and width of the feature map
        self.fc = nn.Linear(32 * 12 * 12, 15) #TODO OK
        # Classification layer
        #self.output = nn.CrossEntropyLoss() #TODO: ma è giusto da mettere? Al momento è tolto

        self.initialize_weights()




    def initialize_weights(self):       #TODO: the professor mentioned to avoid normalization, what should I do?
        for module in self.modules(): # self.modules() comes from nn.Module; to recursively iterate over all the modules
            if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
                init.normal_(module.weight, mean=0, std=0.01) # initial weights drawn from a Gaussian distribution having a mean of 0 and a standard deviation of 0.01
                init.constant_(module.bias, 0) # set the bias to 0




    # A model will have a forward() function
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)

        x = self.conv3(x)
        x = self.relu3(x)

        x = x.view(-1, 32 * 12 * 12)  # flatten the tensor before passing to fully connected layers (the size -1 is inferred from other dimensions)
        
        x = self.fc(x)
        #x = self.output(x) #TODO: in caso, da eliminare

        return x

In [5]:
# Split the provided training set in 85% for actual training set and 15% to be used as validation set

resized_train_path = '/Users/Gaia/Desktop/CVPR-project/CVPR-project/resized/train'

dataset_train = ImageFolder(root=resized_train_path, transform=transforms.ToTensor())

train_size = int(0.85 * len(dataset_train))
val_size = len(dataset_train) - train_size

train_dataset, val_dataset = random_split(dataset_train, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) # batch_size=32 required by the project
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) # batch_size=32 required by the project


# Create the test loader
resized_test_path = '/Users/Gaia/Desktop/CVPR-project/CVPR-project/resized/test'

dataset_test = ImageFolder(root=resized_test_path, transform=transforms.ToTensor())

test_loader = DataLoader(dataset_test, batch_size=32, shuffle=False)

Let's instantiate a model...

In [7]:
# Instantiate a model
model = CNN()
print(model)

# Set parameters for the training
learning_rate = 0.0002
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), momentum=0.9, lr=learning_rate) # the momentum by default is 0, but I need it different from 0

CNN(
  (conv1): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1))
  (relu1): ReLU()
  (maxpool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1))
  (relu2): ReLU()
  (maxpool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (relu3): ReLU()
  (fc): Linear(in_features=4608, out_features=15, bias=True)
)


...and do the training:

In [8]:
def train_one_epoch(epoch_index,loader): # to train for just one epoch (one epoch: the network sees the whole training set)
    running_loss = 0.

    for i, data in enumerate(loader):

        inputs, labels = data # get the minibatch

        outputs = model(inputs) # forward pass

        loss = loss_function(outputs, labels) # compute the loss
        running_loss += loss.item() # sum up the loss for the minibatches processed so far

        optimizer.zero_grad() # notice that by default, the gradients are accumulated, hence we need to set them to zero
        loss.backward() # backward pass
        optimizer.step() # update the weights

    return running_loss/(i+1) # average loss per minibatch


# Training loop
EPOCHS = 5

print('Training loop...')
for epoch in range(EPOCHS):
    train_loss = train_one_epoch(epoch,train_loader)
    print(f'Epoch [{epoch + 1}/{EPOCHS}], Loss: {train_loss:.3f}')

Training loop...
Epoch [1/5], Loss: 2.708
Epoch [2/5], Loss: 2.708
Epoch [3/5], Loss: 2.708
Epoch [4/5], Loss: 2.708
Epoch [5/5], Loss: 2.708


In [None]:
# Method to train for just one epoch

def train_one_epoch(epoch_index, loader):
    running_loss = 0.

    for i, data in enumerate(loader):

        inputs, labels = data # get the minibatch

        outputs = model(inputs) # forward pass

        loss = loss_function(outputs, labels) # compute the loss
        running_loss += loss.item() # sum up the loss for the minibatches processed so far

        optimizer.zero_grad() # notice that by default, the gradients are accumulated, hence we need to set them to zero
        loss.backward() # backward pass
        optimizer.step() # update the weights

    return running_loss/(i+1) # average loss per minibatch



# Training

EPOCHS = 48

best_validation_loss = np.inf

loss_train = [] # store the values of the loss for the training set
loss_val = [] # store the values of the loss for the validation set
accuracies_val = []

for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch+ 1))

    # Make sure gradient tracking is on, and do a pass over the data
    model.train(True)
    train_loss = train_one_epoch(epoch, train_loader)

    running_validation_loss = 0.0

    # If using dropout and/or batch normalization, we need the following, to set the model to evaluation mode, disabling dropout and using population
    # statistics for batch normalization.
    model.eval()


    with torch.no_grad():      # disable gradient computation and reduce memory consumption
        for i, vdata in enumerate(val_loader):
            vinputs, vlabels = vdata
            voutputs = model(vinputs)
            vloss = loss_function(voutputs, vlabels)
            running_validation_loss += vloss

    validation_loss = running_validation_loss / (i + 1) # average validation loss per minibatch
    loss_train.append(train_loss)
    loss_val.append(validation_loss)
    print('LOSS: train: {}; validation: {}'.format(train_loss, validation_loss))


    # Track best performance (based on validation), and save the model
    if validation_loss < best_validation_loss:
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        best_validation_loss = validation_loss
        model_path = 'model_{}_{}'.format(timestamp, epoch)
        torch.save(model.state_dict(), model_path)



In [None]:
# Load the best model and evaluate the performance on the test set
# to load the best model, first instantiate a new model, then load its state
newModel = CNN()
newModel.load_state_dict(torch.load(model_path))

correct = 0
total = 0
with torch.no_grad():
    for data in cifar2_test_loader:
        images, labels = data
        outputs = newModel(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the test images: %d %%' % (
    100 * correct / total))

## Task 2 <a class="anchor" id="#2-bullet"></a>

## Task 3 <a class="anchor" id="#3-bullet"></a>

## References <a class="anchor" id="ref"></a>

[1] https://dingyan89.medium.com/calculating-parameters-of-convolutional-and-fully-connected-layers-with-keras-186590df36c6