# Lab 01 - Overfitting

In this lab, you must modify the convolutional neural network you built in the previous lab to counteract overfitting.

> **Hints**:
> - To copy your code from the previous lab to this module folder, open your notebook in the **Mod02** folder and save a copy of it. Then move the copy to this folder.
> - If you did not complete the previous lab, use the sample solution as a starting point.
> - Add data augmentation as you load the training data - for example by rotating or flipping the images. See the [PyTorch](https://pytorch.org/docs/stable/torchvision/transforms.html#transforms-on-pil-image) or [Keras](https://keras.io/preprocessing/image/) documentation for help with this.
> - Add at least one drop layer to your CNN. See the [PyTorch](https://pytorch.org/docs/stable/nn.html#dropout-layers) or [Keras](https://keras.io/layers/core/) documentation for help with this.

In [0]:
# mount files:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
import os, os.path
folder = 'Moocs/edx_Microsoft'
os.chdir('/content/drive/My Drive/'+folder)

In [0]:
# setup:
!pip install https://download.pytorch.org/whl/cpu/torch-1.0.1.post2-cp36-cp36m-linux_x86_64.whl
!pip install torchvision

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

print("Libraries imported - ready to use PyTorch", torch.__version__)

In [0]:
# preparing data: now with Data-Augmentation
import os

def make_class_list(train_folder):
    return sorted(os.listdir(train_folder))

def load_data(train_folder):
    
    # Load all of the images
    transformation = transforms.Compose([
        # Randomly augment the image data
        transforms.RandomHorizontalFlip(0.5),
        # transform to tensors
        transforms.ToTensor(),
        # Normalize the pixel values (in R, G, and B channels)
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    # Load all of the images, transforming them
    full_dataset = torchvision.datasets.ImageFolder(
        root=train_folder,
        transform=transformation
    )
    
    return transformation, full_dataset
    
def split_data(full_dataset):
    # Split into training 70% and testing 30% datasets
    train_ratio = 0.7
        
    train_size = int(train_ratio * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = \
        torch.utils.data.random_split(full_dataset, [train_size, test_size])
    
    return train_dataset, test_dataset

def define_loader(train_dataset, test_dataset):    
    size_of_batch = 15
    
    # define a loader for the training data we can iterate through in 50-image batches
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=size_of_batch,
        num_workers=0,
        shuffle=False
    )
    
    # define a loader for the testing data we can iterate through in 50-image batches
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=size_of_batch,
        num_workers=0,
        shuffle=False
    )
    
    return train_loader, test_loader

def main1():
    train_folder = "./data/classification/training"
    class_list = make_class_list(train_folder)
    transformation, full_dataset = load_data(train_folder)
    train_dataset, test_dataset = split_data(full_dataset)
    train_loader, test_loader = define_loader(train_dataset, test_dataset)
    
#     print(class_list)
#     print(len(train_loader.dataset))
    
    return train_loader, test_loader, class_list

In [0]:
# the net:

# debug function - prints tensor's shape
def print_debug(num_of_print, x):
    print(num_of_print, ':', x.shape)

class Net(nn.Module):

    def __init__(self, num_classes=3):
        super(Net, self).__init__()

        # A drop layer deletes 30% of the features to help prevent overfitting
        self.drop = nn.Dropout2d(p=0.3)

        # Lenet variation
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=6, kernel_size=5, stride=1, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) # out_channels=6
        self.bn2_1 = nn.BatchNorm2d(num_features=6)
        
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) # out_channels=16
        self.bn2_2 = nn.BatchNorm2d(num_features=16)
        
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=5, stride=1, padding=1)
        self.bn2_3 = nn.BatchNorm2d(num_features=120)
        
        self.fc1 = nn.Linear(in_features= 4 * 4 * 120, out_features=84)
        self.bn1 = nn.BatchNorm1d(84)
        self.fc2 = nn.Linear(in_features=84, out_features=num_classes)

    def forward(self, x):
        
        # Lenet variation
        x=self.pool1(x)
        x=self.pool1(x)
        
        x=self.conv1(x)
        x=torch.sigmoid(self.pool2(x))
        x=self.bn2_1(x)

        # Select some features to drop to prevent overfitting (only during training)
        x = F.dropout(self.drop(x), training=self.training)
        
        x=self.conv2(x)
        x=torch.sigmoid(self.pool3(x))
        x=self.bn2_2(x)

        x=self.conv3(x)
        x=self.bn2_3(x)
        
        x=x.view(-1, x.shape[1]*x.shape[2]*x.shape[3])

        # print_debug(1, x)
        
        x=self.bn1(F.relu(self.fc1(x)))
        
        x=self.fc2(x)
        return torch.log_softmax(x, dim=1)
        
def main2_lenet():
    train_loader, test_loader, class_list = main1()
    
    device = "cpu"
    if (torch.cuda.is_available()):
        # if GPU available, use cuda
        device = "cuda"
    
    # Create an instance of the model class and allocate it to the device
    model = Net(num_classes=len(class_list)).to(device)
#     print(model)
    return model, device, train_loader, test_loader, class_list

In [0]:
# train and test:

def train(model, device, train_loader, optimizer, epoch, loss_criteria):
    # Set the model to training mode
    model.train()
    train_loss = 0
    print("Epoch:", epoch)
    
    # Process the images in batches
    for batch_idx, (data, target) in enumerate(train_loader):
        # Use the CPU or GPU as appropriate
        data, target = data.to(device), target.to(device)
        
        # Reset the optimizer
        optimizer.zero_grad()
        
        # Push the data forward through the model layers
        output = model(data)
        
        # Get the loss
        loss = loss_criteria(output, target)

        # Keep a running total
        train_loss += loss.item()
        
        # Backpropagate
        loss.backward()
        optimizer.step()
        
        # Print metrics so we see some progress
        print('\tTraining batch {} Loss: {:.6f}'.format(batch_idx + 1, loss.item()))
            
    # return average loss for the epoch
    avg_loss = train_loss / (batch_idx+1)
    print('Training set: Average loss: {:.6f}'.format(avg_loss))
    return avg_loss
               
def test(model, device, test_loader, loss_criteria):
    # Switch the model to evaluation mode (without backpropagate)
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        batch_count = 0
        for data, target in test_loader:
            batch_count += 1
            data, target = data.to(device), target.to(device)
            
            # Get the predicted classes for this batch
            output = model(data)
            
            # Calculate the loss for this batch
            test_loss += loss_criteria(output, target).item()
            
            # Calculate the accuracy for this batch
            _, predicted = torch.max(output.data, 1)
            correct += torch.sum(target==predicted).item()

    # Calculate the average loss and total accuracy for this epoch
    avg_loss = test_loss / batch_count
    print('Validation set: Average loss: {:.6f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
                                        avg_loss, correct, len(test_loader.dataset),
                                        100. * correct / len(test_loader.dataset)))
    
    # return average loss for the epoch
    return avg_loss

def run_all(model, device, train_loader, test_loader):
    # Track metrics in these arrays
    epoch_nums = []
    training_loss = []
    validation_loss = []
    
#     Use an "Adam" optimizer to adjust weights
    # optimizer = optim.Adam(model.parameters(), lr=0.001) # one option
    optimizer = optim.Adagrad(model.parameters(), lr=0.01) # second option
    # optimizer = optim.SGD(params=model.parameters(), lr=0.001, momentum=0.9, 
    #                       weight_decay=1e-6, nesterov=True) # third option

    # Specify the loss criteria
    loss_criteria = nn.CrossEntropyLoss()

    # Train epochs
    epochs = 25
    print('Training on', device)
    for epoch in range(1, epochs + 1):
        train_loss = train(model, device, train_loader, optimizer, epoch, loss_criteria)
        test_loss = test(model, device, test_loader, loss_criteria)
        epoch_nums.append(epoch)
        training_loss.append(train_loss)
        validation_loss.append(test_loss)
    
    return epoch_nums, training_loss, validation_loss  
    
def main3():
    model, device, train_loader, test_loader, class_list = main2_lenet()
    epoch_nums, training_loss, validation_loss = run_all(model, device, train_loader, test_loader)
    print('finish training')
    return epoch_nums, training_loss, validation_loss, model, test_loader, class_list

In [0]:
# view loss graph:

%matplotlib inline
from matplotlib import pyplot as plt

# globals for other cells
epoch_nums, training_loss, validation_loss, model, test_loader, class_list = main3()

def plot_loss():
    plt.plot(epoch_nums, training_loss)
    plt.plot(epoch_nums, validation_loss)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['training', 'validation'], loc='upper right')
    plt.show()
    
plot_loss()

In [0]:
# confusion matrix:

from sklearn.metrics import confusion_matrix
from matplotlib import pyplot as plt
import numpy as np

def plot_confusion_matrix():
    truelabels = []
    predictions = []
    model.eval()
    print("Getting predictions from test set...")
    for data, target in test_loader:
        for label in target.data.numpy():
            truelabels.append(label)
            
        if (torch.cuda.is_available()):
            # if GPU available, use cuda
            for prediction in model(data.cuda()).cpu().data.numpy().argmax(1):
                predictions.append(prediction)
        else:
            for prediction in model(data).data.numpy().argmax(1):
                predictions.append(prediction)

    # Plot the confusion matrix
    cm = confusion_matrix(truelabels, predictions)
    plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
    plt.colorbar()
    tick_marks = np.arange(len(class_list))
    plt.xticks(tick_marks, class_list, rotation=85)
    plt.yticks(tick_marks, class_list)
    plt.xlabel("Predicted Shape")
    plt.ylabel("True Shape")
    plt.show()

plot_confusion_matrix()

In [0]:
# save and deploy:

model_file = 'my-classifier.pt'
torch.save(model.state_dict(), model_file)
print("Model saved")

# Delete the existing model variable
del model

In [0]:
# helper functions for classification:

# Helper function to resize image
def resize_image(src_img, size=(128,128), bg_color="white"): 
    from PIL import Image

    # rescale the image so the longest edge is the right size
    src_img.thumbnail(size, Image.ANTIALIAS)
    
    # Create a new image of the right shape
    new_image = Image.new("RGB", size, bg_color)
    
    # Paste the rescaled image onto the new background
    new_image.paste(src_img, (int((size[0] - src_img.size[0]) / 2), int((size[1] - src_img.size[1]) / 2)))
    
    # return the resized image
    return new_image

# Function to predict the class of an image
def predict_image(classifier, image_array):
    
    # Set the classifer model to evaluation mode
    classifier.eval()
    
    # These are the classes our model can predict
    class_names = class_list
    
    # Apply the same transformations as we did for the training images
    transformation = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    # Preprocess the imagees
    image_tensor = torch.stack([transformation(image).float() for image in image_array])

    # Turn the input into a Variable
    input_features = image_tensor

    # Predict the class of each input image
    predictions = classifier(input_features)
    
    predicted_classes = []
    # Convert the predictions to a numpy array 
    for prediction in predictions.data.numpy():
        # The prediction for each image is the probability for each class, e.g. [0.8, 0.1, 0.2]
        # So get the index of the highest probability
        class_idx = np.argmax(prediction)
        # And append the corresponding class name to the results
        predicted_classes.append(class_names[class_idx])
    return np.array(predicted_classes)

print("Functions created - ready to use model for inference.")

In [0]:
# classification:

from random import randint
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
%matplotlib inline

def get_model():
    model = Net()
    model.load_state_dict(torch.load(model_file))
    return model

def plot_classes(predictions, image_arrays):
    fig = plt.figure(figsize=(12, 8))
    # plot each image with its corresponding prediction
    for idx in range(len(predictions)):
        a=fig.add_subplot(1,len(predictions),idx+1)
        imgplot = plt.imshow(image_arrays[idx])
        a.set_title(predictions[idx])


def classify(test_image_files, test_folder, size, background_color, model):
    # Empty array on which to store the images
    image_arrays = []
    # Get the images and show the predicted classes
    for file_idx in range(len(test_image_files)):
        img = Image.open(os.path.join(test_folder, test_image_files[file_idx]))
        
        # resize the image so it matches the training set -
        # it  must be the same size as the images on which the model was trained
        resized_img = np.array(resize_image(img, size, background_color))
                        
        # Add the image to the array of images
        image_arrays.append(resized_img)

    # Get predictions from the array of image arrays
    # Note that the model expects an array of 1 or more images - just like the batches on which it was trained
    return predict_image(model, np.array(image_arrays)), image_arrays

def main4():
    model = get_model()

    test_folder = './data/classification/test'
    test_image_files = os.listdir(test_folder)
    size = (128,128)
    background_color="white"

    predictions, image_arrays = classify(test_image_files, test_folder, size, background_color, model)
    plot_classes(predictions, image_arrays)

main4()