<a href="https://colab.research.google.com/github/gstdl/Brazilian_e-Commerce_Exploartory_Data_Analysis_and_Machine_Learning/blob/master/Zodiac_Sings_Image_Classification_(CNN_%26_Transfer_Learning)_PyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Intro
## Dataset Download

The dataset is available in [Google Drive](https://drive.google.com/file/d/1Bjope31ZX9p4jXyaYK6CazIOGcYhOjI6/view?usp=sharing) and [Kaggle](https://www.kaggle.com/elderyouth/chinese-zodiac-signs). For my convenience, I prefer downloading the dataset directly from Kaggle to Google Colab. 



In [0]:
from google.colab.files import upload
upload = upload() # Upload Kaggle API Key

In [0]:
!pip install kaggle # Installs Kaggle
!mkdir kaggle # Makes new directory 'kaggle'
!mkdir ../root/.kaggle # Makes new directory '../root/.kaggle'
!cp kaggle.json ~/.kaggle/kaggle.json # Copies kaggle API key to the new directory
!kaggle config set -n path -v{/content}
!chmod 600 /root/.kaggle/kaggle.json
!kaggle datasets download -d elderyouth/chinese-zodiac-signs # Downloads the dataset as a zip file
!unzip /content/{/content}/datasets/elderyouth/chinese-zodiac-signs/chinese-zodiac-signs.zip # Unzips the dataset

**Early Debugging**

During my initial training I had several problems. The code below took care of the problems.

In [0]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True # Allows the image loader to read truncated images

to_delete = [
             'signs/train/snake/00000116.jpg',
             'signs/train/monkey/00000585.jpg'
] # Problematic files

import os
for delete in to_delete: 
    os.remove(delete) # Delete problematic files
    print('deleting: ',delete)

**Import Libraries**

In [0]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

import matplotlib.pyplot as plt

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
import numpy as np

**Check GPU availability**

In [0]:
# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

# Deep Learning
## Developing my own Convolutional Neural Network

## Transformations and Data Preparation

In [0]:
data_dir = 'signs'

batch_size = 20

# TODO: Define transforms for the training data and testing data
train_transforms = transforms.Compose([transforms.RandomRotation(30),
                                       transforms.RandomResizedCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.5, 0.5, 0.5],
                                                           [0.5, 0.5, 0.5])])

test_valid_transforms = transforms.Compose([transforms.Resize(255),
                                      transforms.RandomResizedCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.5, 0.5, 0.5],
                                                           [0.5, 0.5, 0.5])])

# Pass transforms in here, then run the next cell to see how the transforms look
train_data = datasets.ImageFolder(data_dir + '/train', transform=train_transforms)
valid_data = datasets.ImageFolder(data_dir + '/valid', transform=test_valid_transforms)
test_data = datasets.ImageFolder(data_dir + '/test', transform=test_valid_transforms)

train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_data, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size)

# specify the image classes
classes = train_loader.dataset.classes

### Defining helper function to un-normalize an array and display an image

In [0]:
# helper function to un-normalize and display an image
def imshow(img):
    img = img * 0.5 + 0.5  # unnormalize
    plt.imshow(np.transpose(img, (1, 2, 0)))  # convert from Tensor image

### Checking Transformation Results & Displaying Images

In [0]:
# obtain one batch of training images
dataiter = iter(train_loader)
images, labels = dataiter.next()
images = images.numpy() # convert images to numpy for display

# plot the images in the batch, along with the corresponding labels
fig = plt.figure(figsize=(25, 4))
# display 20 images
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])

In [0]:
# obtain one batch of validation images
dataiter = iter(valid_loader)
images, labels = dataiter.next()
images = images.numpy() # convert images to numpy for display

# plot the images in the batch, along with the corresponding labels
fig = plt.figure(figsize=(25, 4))
# display 20 images
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])

In [0]:
# obtain one batch of test images
dataiter = iter(test_loader)
images, labels = dataiter.next()
images = images.numpy() # convert images to numpy for display

# plot the images in the batch, along with the corresponding labels
fig = plt.figure(figsize=(25, 4))
# display 20 images
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])

## Checking Array Size & Calculating Planned Layers Outputs

In [0]:
images.shape

In [0]:
# Calculating Planned Layers Output
i = torch.randn(images.shape)
print(i.shape)
i = nn.Conv2d(3,16,3,1,1)(i)
print(i.shape)
i = nn.MaxPool2d(7,7)(i)
print(i.shape)
i = nn.Conv2d(16,32,3,1,1)(i)
print(i.shape)
i = nn.MaxPool2d(4,4)(i)
print(i.shape)
i = nn.Conv2d(32,64,3,1,1)(i)
print(i.shape)
i = nn.MaxPool2d(2,2)(i)
print(i.shape)

## Defining CNN Architecture


In [0]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, stride=1, padding=1)
        self.pool1 = nn.MaxPool2d(7, 7)
        self.pool2 = nn.MaxPool2d(4, 4)
        self.pool3 = nn.MaxPool2d(2, 2)
        # linear layer (64 * 4 * 4 -> 512)
        self.fc1 = nn.Linear(64 * 4 * 4, 512)
        # linear layer (512 -> 12)
        self.fc2 = nn.Linear(512, 12)
        # dropout layer (p=0.5)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # add sequence of convolutional and max pooling layers
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.pool3(F.relu(self.conv3(x)))
        # flatten image input
        x = x.view(-1, 64 * 4 * 4)
        # add dropout layer
        x = self.dropout(x)
        # add 1st hidden layer, with relu activation function
        x = F.relu(self.fc1(x))
        # add dropout layer
        x = self.dropout(x)
        # add 2nd hidden layer, with relu activation function
        x = self.fc2(x)
        return x

# create a complete CNN
model = Net()
print(model)

# move tensors to GPU if CUDA is available
if train_on_gpu:
    model.cuda()

## Defining Loss Function & Optimizer

In [0]:
# specify loss function
criterion = nn.CrossEntropyLoss()

# specify optimizer
optimizer = optim.Adam(model.parameters(),lr=0.0001)

## Training The Model

In [0]:
# number of epochs to train the model
n_epochs = 200

valid_loss_min = np.Inf # track change in validation loss

running_train_loss = []
running_valid_loss = []
running_min_valid_loss = [] # running loss tracker

from time import time
end_time = 0
cumulative_training_time = [] # training time tracker

for epoch in range(1, n_epochs+1):
    # track training start time
    start = time()

    # keep track of training and validation loss
    train_loss = 0.0
    valid_loss = 0.0
    
    # train the model
    model.train()
    for data, target in train_loader:
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        # forward pass
        output = model(data)
        # calculate the batch loss
        loss = criterion(output, target)
        # backward pass
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        # update training loss
        train_loss += loss.item()*data.size(0)

    # calculates training duration in seconds
    end = time() - start
    end_time += end
    cumulative_training_time.append(end_time)

    # validate the model
    model.eval()
    for data, target in valid_loader:
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()
        # compute predicted outputs by passing inputs to the model
        output = model(data)
        # calculate the batch loss
        loss = criterion(output, target)
        # update average validation loss 
        valid_loss += loss.item()*data.size(0)
    
    # calculate average losses
    train_loss = train_loss/len(train_loader.dataset)
    running_train_loss.append(train_loss)
    valid_loss = valid_loss/len(valid_loader.dataset)
    running_valid_loss.append(valid_loss)
        
    # print training/validation statistics 
    print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f} \tTraining Time: {:.3f}s'.format(
        epoch, train_loss, valid_loss, end))
    
    # save model if validation loss has decreased
    if valid_loss <= valid_loss_min:
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_loss_min,
        valid_loss))
        torch.save(model.state_dict(), 'model_zodiac.pt')
        valid_loss_min = valid_loss

    # stop training when minimum validation loss doesn't improve after 11 iterations
    running_min_valid_loss.append(valid_loss_min)
    if len(running_min_valid_loss) > 9 and sum([i==j for i,j in zip(running_min_valid_loss[-11:-1],running_min_valid_loss[-10:])]) == 10:
        break

## Plotting The Training Loss


In [0]:
fig, ax = plt.subplots(figsize=(17,5))
x = range(1, len(running_train_loss)+1)

ax.set_xlabel('epoch')
ax.set_ylabel('loss')
ax.plot(x, running_train_loss , label="Train Loss")
ax.plot(x, running_valid_loss, label="Validation Loss")
ax.legend()

ax_2 = ax.twinx()
ax_2.set_ylabel('time (s)', color = 'tab:green')
ax_2.plot(x, cumulative_training_time, linestyle = '-.', color='green', alpha = 0.5)

fig.tight_layout()

## Testing On New Data

In [0]:
model.load_state_dict(torch.load('model_zodiac.pt')) # Load the First Model with the Lowest Validation Loss

In [0]:
# track test loss
test_loss = 0.0
class_correct = list(0. for i in range(len(classes)))
class_total = list(0. for i in range(len(classes)))

model.eval()
# iterate over test data
for data, target in test_loader:
    if train_on_gpu:
        data, target = data.cuda(), target.cuda()
    # compute predicted outputs by passing inputs to the model
    output = model(data)
    # calculate the batch loss
    loss = criterion(output, target)
    # update test loss 
    test_loss += loss.item()*data.size(0)
    # convert output probabilities to predicted class
    _, pred = torch.max(output, 1)    
    # compare predictions to true label
    correct_tensor = pred.eq(target.data.view_as(pred))
    correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())
    # calculate test accuracy for each object class
    for i in range(batch_size):
        label = target.data[i]
        class_correct[label] += correct[i].item()
        class_total[label] += 1

# average test loss
test_loss = test_loss/len(test_loader.dataset)
print('Test Loss: {:.6f}\n'.format(test_loss))

for i in range(len(classes)):
    if class_total[i] > 0:
        print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (
            classes[i], 100 * class_correct[i] / class_total[i],
            np.sum(class_correct[i]), np.sum(class_total[i])))
    else:
        print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

print('\nTest Accuracy (Overall): %2d%% (%2d/%2d)' % (
    100. * np.sum(class_correct) / np.sum(class_total),
    np.sum(class_correct), np.sum(class_total)))

## Visualize Sample Test Results

In [0]:
# obtain one batch of test images
dataiter = iter(test_loader)
images, labels = dataiter.next()

# move model inputs to cuda, if GPU available
if train_on_gpu:
    images = images.cuda()

# get sample outputs
output = model(images)
# convert output probabilities to predicted class
_, preds_tensor = torch.max(output, 1)
preds = np.squeeze(preds_tensor.numpy()) if not train_on_gpu else np.squeeze(preds_tensor.cpu().numpy())

# convert images to numpy array for visualization
images = images.numpy() if not train_on_gpu else images.cpu().numpy()

# plot the images in the batch, along with predicted and true labels
fig = plt.figure(figsize=(25, 4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title("{} ({})".format(classes[preds[idx]], classes[labels[idx]]),
                 color=("green" if preds[idx]==labels[idx].item() else "red"))

# Transfer Learning

## Transformations and Data Preparation

All pretrained models in pytorch expects the images to be normalized with the following mean and standard deviation.

In [0]:
mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

With that being said we will need to re-transform the data

In [0]:
data_dir = 'signs'

batch_size = 20

# TODO: Define transforms for the training data and testing data
train_transforms = transforms.Compose([transforms.RandomRotation(30),
                                       transforms.RandomResizedCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize(mean=mean,
                                                           std=std)])

test_valid_transforms = transforms.Compose([transforms.Resize(255),
                                      transforms.RandomResizedCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize(mean=mean,
                                                           std=std)])

# Pass transforms in here, then run the next cell to see how the transforms look
train_data = datasets.ImageFolder(data_dir + '/train', transform=train_transforms)
valid_data = datasets.ImageFolder(data_dir + '/valid', transform=test_valid_transforms)
test_data = datasets.ImageFolder(data_dir + '/test', transform=test_valid_transforms)

train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_data, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size)

# specify the image classes
classes = train_loader.dataset.classes

Besides of that, we will also need to define a new helper function based on the new means and standard deviations. 

Formula for normalization is shown below.

> `input[channel] = (input[channel] - mean[channel]) / std[channel]`

In [0]:
# helper function to un-normalize and display an image
def imshow(img, mean = mean, std = std):
    for idx, (m ,s) in enumerate(zip(mean,std)):
        img[idx] = img[idx] * s + m  # un-normalize array
    plt.imshow(np.transpose(img, (1, 2, 0)))  # convert from Tensor image

Lets check whether our new imshow function gives the expected result or not

In [0]:
# obtain one batch of training images
dataiter = iter(train_loader)
images, labels = dataiter.next()
images = images.numpy() # convert images to numpy for display

# plot the images in the batch, along with the corresponding labels
fig = plt.figure(figsize=(25, 4))
# display 20 images
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])

## Loading Pretrained Model

The pretrained model we will be using for this classification is ResNet-18 model from [“Deep Residual Learning for Image Recognition”](https://arxiv.org/pdf/1512.03385.pdf). I do not have any particular reason for using this model.

In [0]:
model = models.resnet18(pretrained=True)
model

## Defining Model Output

In [0]:
# Freeze parameters so we don't backprop through them
for param in model.parameters():
    param.requires_grad = False

from collections import OrderedDict
resnet_fc = nn.Sequential(OrderedDict([
                          ('fc1', nn.Linear(512, 128)),
                          ('relu', nn.ReLU()),
                          ('dropout1', nn.Dropout(p=0.5)),
                          ('fc2', nn.Linear(128, 32)),
                          ('relu', nn.ReLU()),
                          ('dropout2', nn.Dropout(p=0.5)),
                          ('fc3', nn.Linear(32, 12)),
                          ]))
    
model.fc = resnet_fc

# move tensors to GPU if CUDA is available
if train_on_gpu:
    model.cuda()

model

## Defining Loss Function & Optimizer

In [0]:
# specify loss function
criterion = nn.CrossEntropyLoss()

# specify optimizer
optimizer = optim.Adam(model.parameters(),lr=0.0001)

## Training The Model

In [0]:
# number of epochs to train the model
n_epochs = 200

valid_loss_min = np.Inf # track change in validation loss

running_train_loss = []
running_valid_loss = []
running_min_valid_loss = [] # running loss tracker

from time import time
end_time = 0
cumulative_training_time = [] # training time tracker

for epoch in range(1, n_epochs+1):
    # track training start time
    start = time()

    # keep track of training and validation loss
    train_loss = 0.0
    valid_loss = 0.0
    
    # train the model
    model.train()
    for data, target in train_loader:
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        # forward pass
        output = model(data)
        # calculate the batch loss
        loss = criterion(output, target)
        # backward pass
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        # update training loss
        train_loss += loss.item()*data.size(0)

    # calculates training duration in seconds
    end = time() - start
    end_time += end
    cumulative_training_time.append(end_time)

    # validate the model
    model.eval()
    for data, target in valid_loader:
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()
        # compute predicted outputs by passing inputs to the model
        output = model(data)
        # calculate the batch loss
        loss = criterion(output, target)
        # update average validation loss 
        valid_loss += loss.item()*data.size(0)
    
    # calculate average losses
    train_loss = train_loss/len(train_loader.dataset)
    running_train_loss.append(train_loss)
    valid_loss = valid_loss/len(valid_loader.dataset)
    running_valid_loss.append(valid_loss)
        
    # print training/validation statistics 
    print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f} \tTraining Time: {:.3f}s'.format(
        epoch, train_loss, valid_loss, end))
    
    # save model if validation loss has decreased
    if valid_loss <= valid_loss_min:
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_loss_min,
        valid_loss))
        torch.save(model.state_dict(), 'model_resnet_zodiac.pt')
        valid_loss_min = valid_loss

    # stop training when minimum validation loss doesn't improve after 11 iterations
    running_min_valid_loss.append(valid_loss_min)
    if len(running_min_valid_loss) > 9 and sum([i==j for i,j in zip(running_min_valid_loss[-11:-1],running_min_valid_loss[-10:])]) == 10:
        break

## Plotting The Training Loss

In [0]:
fig, ax = plt.subplots(figsize=(17,5))
x = range(1, len(running_train_loss)+1)

ax.set_xlabel('epoch')
ax.set_ylabel('loss')
ax.plot(x, running_train_loss , label="Train Loss")
ax.plot(x, running_valid_loss, label="Validation Loss")
ax.legend()

ax_2 = ax.twinx()
ax_2.set_ylabel('time (s)', color = 'tab:green')
ax_2.plot(x, cumulative_training_time, linestyle = '-.', color='green', alpha = 0.5)

fig.tight_layout()

## Testing On New Data

In [0]:
model.load_state_dict(torch.load('model_resnet_zodiac.pt')) # Load the First Model with the Lowest Validation Loss

In [0]:
# track test loss
test_loss = 0.0
class_correct = list(0. for i in range(len(classes)))
class_total = list(0. for i in range(len(classes)))

model.eval()
# iterate over test data
for data, target in test_loader:
    if train_on_gpu:
        data, target = data.cuda(), target.cuda()
    # compute predicted outputs by passing inputs to the model
    output = model(data)
    # calculate the batch loss
    loss = criterion(output, target)
    # update test loss 
    test_loss += loss.item()*data.size(0)
    # convert output probabilities to predicted class
    _, pred = torch.max(output, 1)    
    # compare predictions to true label
    correct_tensor = pred.eq(target.data.view_as(pred))
    correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())
    # calculate test accuracy for each object class
    for i in range(batch_size):
        label = target.data[i]
        class_correct[label] += correct[i].item()
        class_total[label] += 1

# average test loss
test_loss = test_loss/len(test_loader.dataset)
print('Test Loss: {:.6f}\n'.format(test_loss))

for i in range(len(classes)):
    if class_total[i] > 0:
        print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (
            classes[i], 100 * class_correct[i] / class_total[i],
            np.sum(class_correct[i]), np.sum(class_total[i])))
    else:
        print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

print('\nTest Accuracy (Overall): %2d%% (%2d/%2d)' % (
    100. * np.sum(class_correct) / np.sum(class_total),
    np.sum(class_correct), np.sum(class_total)))

## Visualize Sample Test Results

In [0]:
# obtain one batch of test images
dataiter = iter(test_loader)
images, labels = dataiter.next()

# move model inputs to cuda, if GPU available
if train_on_gpu:
    images = images.cuda()

# get sample outputs
output = model(images)
# convert output probabilities to predicted class
_, preds_tensor = torch.max(output, 1)
preds = np.squeeze(preds_tensor.numpy()) if not train_on_gpu else np.squeeze(preds_tensor.cpu().numpy())

# convert images to numpy array for visualization
images = images.numpy() if not train_on_gpu else images.cpu().numpy()

# plot the images in the batch, along with predicted and true labels
fig = plt.figure(figsize=(25, 4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 20/2, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title("{} ({})".format(classes[preds[idx]], classes[labels[idx]]),
                 color=("green" if preds[idx]==labels[idx].item() else "red"))

# Download Final Model to Local Computer

In [0]:
from google.colab.files import download

download('model_zodiac.pt')
download('model_resnet_zodiac.pt')