<a href="https://colab.research.google.com/github/avyayk/ml_notebooks/blob/main/maskClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Fine-Tuning VGG11 Image Classifier to Recognize Particular Image Features

In this project, we will fine-tune a Convolutional NN image classification model to classify whether people in the images are wearing masks (task-specific function)

##Step 1: Set up the environment 

'Runtime' > 'Change runtime type' > 'Hardware accelerator' > 'GPU'

In [None]:
## Requisite Imports ## 

# Utilities ( stay consistent between Python 2 & 3 )
from __future__ import print_function 
from __future__ import division

# PyTorch
import torch
import torch.nn as nn # Neural Networks
import torch.optim as optim # Optimizers

# NumPy for data handling
import numpy as np

# Torchvision for image datasets and manipulation
import torchvision
from torchvision import datasets, models, transforms

# Pyplot to generate plots (unused)
#import matplotlib.pyplot as plt

# More utilities
import time
import os
import copy

# Print the versions of PyTorch and Torchvision used in this project
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

PyTorch Version:  1.13.1+cu116
Torchvision Version:  0.14.1+cu116


##Step 2: Set up the training and validation data

Data Source: https://drive.google.com/drive/folders/1J7zq8j03w1R4DzcIFiLIDuOgCxlOeugy?usp=sharing 

Copy into your own Google Drive and set the filePath accordingly & folder names accordingly. In this notebook, I have changed the folder names to 'trainingData', 'validationData', and 'testData' (used later)

In [None]:
## Mount Google Drive ##

from google.colab import drive
drive.mount('/content/gdrive', force_remount = True )

Mounted at /content/gdrive


In [None]:
# Extract the image data from the zipped imageset (only do this once)
### WRONG FILEPATH! 

from zipfile import ZipFile
with ZipFile('/content/gdrive/My Drive/mask_classification/mask_image_set.zip', 'r') as zipObject:
   # Extract all the contents of zip file in current directory
   zipObject.extractall()

In [None]:
## Some definitions... ##

# directoryPath to imageset of people wearing masks (and lambda for convenience)
## NOTE: This is MY directoryPath, not necessarily yours
directoryPath = '/content/gdrive/My Drive/mask_image_set/mask_image_set'
get_folder_path = lambda fileFolder : os.path.join( directoryPath, fileFolder )

# NUMBER_OF_CLASSES in the dataset (masked / non-masked)
NUMBER_OF_CLASSES = 2

# Training batch size (# of samples processed before the model is updated)
## Change depending on how much memory we have
BATCH_SIZE = 64

# Number of training epochs
NUMBER_OF_EPOCHS = 25

# Only update reshaped layer parameters (True); otherwise finetune the entire model (False)
FEATURE_EXTRACTION_ON = True

# Image size for the network input (we will perform a square resize)
IMAGE_SIZE = 224

In PyTorch, data is organized using DataLoader and Dataset modules

Datasets: the abstract structure that organize all the images and labels

Dataloader: the generator to yield data batch for model training at each step##

In [None]:
## Input the 'trainingData' and 'validationData' from their respective folders ##

# Common mean and standard deviation selection for ImageNet normalization (for DataTransforms)
normalizationMean, normalizationSTD = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]

# DataTransforms for trainingData and validationData
DataTransforms = {
    'trainingData': transforms.Compose( [
        transforms.RandomResizedCrop( IMAGE_SIZE ), # Random Crop
        transforms.RandomHorizontalFlip(), # Random Horizontal Flip
        transforms.ToTensor(), # Conversion to Tensor
        transforms.Normalize( normalizationMean, normalizationSTD )
    ]),

    'validationData': transforms.Compose( [
        transforms.Resize( IMAGE_SIZE ),
        transforms.CenterCrop( IMAGE_SIZE ),
        transforms.ToTensor(),
        transforms.Normalize( normalizationMean, normalizationSTD )
    ])
}

# FOLDER_NAMES to eliminate redundancy moving forward
FOLDER_NAMES = ('trainingData', 'validationData')

# Map our DataTransforms and create trainingData and validationData datasets
ImageDatasets = { 
    currentFolder: datasets.ImageFolder( get_folder_path( currentFolder ), \
                                      DataTransforms[ currentFolder ] ) \
    for currentFolder in FOLDER_NAMES
}


# Create trainingData and validationData DataLoaders
ImageDataLoaders = {
    currentFolder: torch.utils.data.DataLoader( ImageDatasets[ currentFolder ], \
                                             batch_size = BATCH_SIZE, \
                                             shuffle = True, \
                                             num_workers = 4 ) \
                    for currentFolder in FOLDER_NAMES
}

# Detect if we have a GPU available, and set the RuntimeDevice accordingly
RuntimeDevice = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')



##Step 3: Model Initialization
There are lots of deep models with hundreds of layers trained on Imagenet, a large dataset including images of 1000 classes. We consider models trained on this large dataset have already gained plenty of visual knowledge, therefore, after we use our own data to finetune model, hopefully the model will learn to deal with our new task combining knowledge gained from task-specific new data and its previous visual knowledge trained from Imagenet.

To initialize the model, we can take advantage of TorchVision, a package saving plenty of deep model parameters. As mentioned above, the model is learned to classify 1000 classes. Here we only want to classify two classes, therefore, after downloading the model, we will change the dimension of the last layer to two.

In [None]:
# If FEATURE_EXTRACTION_ON, then this function will freeze all layers except the last layer
def set_gradient_requirements( model, feature_extraction_on = FEATURE_EXTRACTION_ON ):
    if feature_extraction_on:
        for parameter in model.parameters():
            parameter.requires_grad = False

In [None]:
# Our pre-trained CNN model that has been fine-tuned; we are using a VGG11 image classifier 
ImageClassifier = models.vgg11_bn( pretrained = True ) # pretrained = True will initialize the model with parameters learned from ImageNet

# Last fully-connected layer of VGG model (Layer #6)
LAST_CONNECTED_LAYER = 6

# Choose to freeze or unfreeze gradients of the model parameters during the training process
## If feature extraction is on, the model's gradients will be computed and updated during backpropagation
set_gradient_requirements( ImageClassifier, FEATURE_EXTRACTION_ON )

# Number of input features to model's forward method
numInputFeatures = ImageClassifier.classifier[ LAST_CONNECTED_LAYER ].in_features

# Change the dimension of the last layer to NUMBER_OF_CLASSES (in our case, 2)
ImageClassifier.classifier[ LAST_CONNECTED_LAYER ] = nn.Linear( numInputFeatures, NUMBER_OF_CLASSES )

# Move the model to our device
ImageClassifier.to( RuntimeDevice )



VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU(inplace=True)
    (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): ReLU(inplace=True)
    (14): MaxPool2d(ke

##Step 4: Optimization Tools

In deep learning, we use loss metrics to evalute how close between the predicted label and the grouth truth. A smaller loss means a better performance. 
<br/>
<br/>
To minimize the loss at each time step, we will use the optimizer to compute the gradients and backpropagate through the network.
<br/>
<br/>
Here we will use stochastic gradient descent as our optimizer and cross entropy as our loss metric.

In [None]:
# Create the ModelOptimizer (we will be using Stochastic Gradient Descent)
ModelOptimizer = optim.SGD( ImageClassifier.parameters(), 
                            lr = 0.001, # Play with lr for the best accuracy!!
                            momentum = 0.9 ) 

# We will be using Cross-Entropy as our LossFunction
LossFunction = nn.CrossEntropyLoss()

##Step 5: Training

In [None]:
## Function to train the model ## 

def train_model( model, dataloaders, criterion, optimizer, num_epochs ):

    '''
    Function to train a model using particular datasets, optimizer, and loss function

    Args:
    - model: a pre-defined neural network model
    - dataloaders: a dictionary containing the training and validation data loaders
    - criterion: the loss function used to measure the error of the model's output
    - optimizer: the optimization algorithm used to adjust the model's parameters
    - num_epochs: the number of times the entire dataset is passed through the model during training 

    Returns:
    - model: the trained model
    '''
    
    # Process startTime
    startTime = time.time()

    # Validation Accuracy History - % of correctly classified images at 
    # the end of each training epoch
    accuracyLog = []
    
    # Highest model accuracy and set of model weights
    bestModelWeights = copy.deepcopy( model.state_dict() )
    highestAccuracy = 0.0

    # Training Epochs
    for currentEpoch in range(1, (NUMBER_OF_EPOCHS + 1) ):

      # Log currentEpoch to console

      logText = 'Epoch {}/{}'.format(currentEpoch, NUMBER_OF_EPOCHS)
      print( logText )

      print( '-' * len( logText ) ) # Number of dashes changes with text length

      # Each epoch has a training and validation phase
      PHASES = ('Training', 'Validation')

      # Reset the currentFolder to trainingData in each epoch
      currentFolder = 'trainingData'
      
      # Toggle between training phase and validation phase
      for currentPhase in PHASES:
          
          # If we are in the training phase, set the model to training mode
          if currentPhase == 'Training':
              model.train()

          # Otherwise, set the model to evaluation mode
          else:
              model.eval()
              currentFolder = 'validationData' # Toggle currentFolder to validationData

          # Continually track the number of correct predictions
          # and the average loss of the model on the trainingData
          numCorrect, runningLoss = 0, 0.0

          # Iterate through the datasets
          for modelInputs, targetOutput in dataloaders[ currentFolder ]:
              
              # Write the inputs and outputs to the RuntimeDevice
              modelInputs, targetOutput = modelInputs.to( RuntimeDevice ), targetOutput.to( RuntimeDevice )

              # Zero the parameter gradients at the beginning of each iteration 
              ModelOptimizer.zero_grad()

              # Move our model forwards
              # Track history if we are in the training phase (but not in the evalution phase)
              with torch.set_grad_enabled( currentPhase == 'Training' ):
                  
                  # Obtain model outputs
                  modelOutputs = model( modelInputs )

                  # Update the LossFunction and modelPredictions
                  LossFunction = criterion( modelOutputs, targetOutput )
                  _, modelPredictions = torch.max( modelOutputs, 1 )

                  # If we are in the training phase, move the LossFunction backwards and optimize
                  if currentPhase == 'Training':
                      LossFunction.backward()
                      ModelOptimizer.step()

              # Update our runningLoss and our number of correct predictions
              runningLoss += LossFunction.item() * modelInputs.size(0)
              numCorrect += torch.sum( modelPredictions == targetOutput.data)

          # The loss over the epoch is the loss so far / the size of the dataset
          epochLoss = runningLoss / len( dataloaders[ currentFolder ].dataset )

          # The epoch accuracy is the number of correct predictions / the size of the dataset
          epochAccuracy = numCorrect.double() / len( dataloaders[ currentFolder ].dataset )

          # Print to console
          print( '{} | Loss: {:.4f} Accuracy: {:.4f}'.format( currentPhase, epochLoss, epochAccuracy ) )

          # If we are in the validation phase, deep copy the model
          if currentPhase == 'Validation':

            # Update the highestAccuracy and bestModelWeights if we have encountered bettter accuracy
            if epochAccuracy > highestAccuracy:
              highestAccuracy = epochAccuracy
              bestModelWeights = copy.deepcopy( model.state_dict() )
          
            # Add the epoch's accuracy to the accuracyLog
            accuracyLog.append( epochAccuracy )

    # Total elapsedTime
    elapsedTime = time.time() - startTime

    # Log process time and model performance to console
    print('\n\nTraining complete in {:.0f}m {:.0f}s!'.format( elapsedTime // 60, elapsedTime % 60 ) )
    print('Highest validation accuracy: {:.4f}%'.format( highestAccuracy * 100 ) )

    # Load the bestModelWeights
    model.load_state_dict( bestModelWeights )

    # Return the model
    return model

In [None]:
## Fine-Tune the ImageClassifier model ##

ImageClassifier = train_model( model = ImageClassifier, 
                               dataloaders = ImageDataLoaders, 
                               criterion = LossFunction, 
                               optimizer = ModelOptimizer, 
                               num_epochs = NUMBER_OF_EPOCHS )

Epoch 1/25
----------
Training Loss: 0.6198 Acc: 0.6417
Validation Loss: 0.5250 Acc: 0.7600

Epoch 2/25
----------
Training Loss: 0.4665 Acc: 0.8233
Validation Loss: 0.4185 Acc: 0.8200

Epoch 3/25
----------
Training Loss: 0.4185 Acc: 0.8100
Validation Loss: 0.3753 Acc: 0.8450

Epoch 4/25
----------
Training Loss: 0.3488 Acc: 0.8483
Validation Loss: 0.3501 Acc: 0.8550

Epoch 5/25
----------
Training Loss: 0.3522 Acc: 0.8567
Validation Loss: 0.3309 Acc: 0.8700

Epoch 6/25
----------
Training Loss: 0.3614 Acc: 0.8467
Validation Loss: 0.3218 Acc: 0.8750

Epoch 7/25
----------
Training Loss: 0.3290 Acc: 0.8617
Validation Loss: 0.3116 Acc: 0.8750

Epoch 8/25
----------
Training Loss: 0.3395 Acc: 0.8483
Validation Loss: 0.3026 Acc: 0.8800

Epoch 9/25
----------
Training Loss: 0.3315 Acc: 0.8567
Validation Loss: 0.2949 Acc: 0.8850

Epoch 10/25
----------
Training Loss: 0.3345 Acc: 0.8550
Validation Loss: 0.2902 Acc: 0.8800

Epoch 11/25
----------
Training Loss: 0.3339 Acc: 0.8467
Validation L

In [None]:
## Now test the model using our testData ##

# load test dataset and create test dataloader as in step 2

# Add the testData transforms to our DataTransforms HashMap, and set it equal to our trainingData transforms
DataTransforms[ 'testData' ] = DataTransforms[ 'trainingData' ]

# Create our test dataset
ImageDatasets[ 'testData' ] = datasets.ImageFolder( get_folder_path( 'testData' ),
                                                    DataTransforms['testData'] )

# Create our test DataLoader

ImageDataLoaders[ 'testData' ] = torch.utils.data.DataLoader( ImageDatasets['testData'],
                                                              batch_size = BATCH_SIZE,
                                                              shuffle = True,
                                                              num_workers = 4 )


In [None]:
## Function to test our model (very similar to train_model) ##

def test_model( model, dataloader ):
  
  '''
  Function to test the performance of a trained model on a test dataset

  Arguments:
  - model (nn.Module): trained image classification model to be tested
  - dataloader (DataLoader): PyTorch DataLoader object for the test dataset

  Returns:
  - testAccuracy (float): the classification accuracy of the model
  '''

  # First, we set the model to evaluation mode
  model.eval()

  # Track the # of correct predictions and the # of total predictions
  numCorrect, numPredictions = 0, 0


  with torch.no_grad(): # Gradients are not needed in inference/validation

    # Iterate over the test dataset   
    for modelInputs, targetOutput in dataloader:

      # Move the modelInputs and targetOutputs to the RuntimeDevice
      modelInputs, targetOutput = modelInputs.to( RuntimeDevice ), targetOutput.to( RuntimeDevice )

      # Predict!
      modelOutput = model( modelInputs )
      _, modelPrediction = torch.max( modelOutput, 1 )

      # Update the numCorrect and numPredictions
      numCorrect += (modelPrediction == targetOutput).sum().item()
      numPredictions += targetOutput.size(0)

  # Calculate testAccuracy ( return object )
  testAccuracy = numCorrect / numPredictions

  # Log testAccuracy to console
  print( 'Test Accuracy: {:.2f}%'.format( testAccuracy * 100 ) )

  # Final return
  #return testAccuracy

In [None]:
## Let's Drive! ##

test_model( model = ImageClassifier,
            dataloader = ImageDataLoaders['testData'] )

Test Accuracy: 85.50%


0.855