<h2>Fine-Tuning the VGG-11 Neural Net Image Model (CNN) for Masked-Face Recognition </h2>

**Avyay Kuchibotla**

**********
<h5>Below, we will fine-tune a pretrained model with our own images of people wearing masks, and "teach" the model to "recognize" people who are wearing masks.
<br/>

A simple use case for this would be face-unlocking your iPhone during COVID days with a mask on (when this project was first written)
</h5>



### Imports

**PyTorch (Torch)** - ML framework with tensor computations, strong GPU acceleration (find yourself a nice NVIDIA runtime), and deep neural nets such as the VGG-11 model used in this tutorial

**TorchVision** - contains image datasets (such as the one upon which the pretrained model we use has been trained), model architectures, and image data transforms

In [None]:
# Required imports; can be found in accompanying requirements.txt
# For this project, we will be using the PyTorch framework

# ... to stay consistent between Python 2 & 3
from __future__ import print_function, division

# ... PyTorch imports
import torch
import torch.nn as nn # neural nets
import torch.optim as optim # optimizers

# ... TorchVision (used for image datasets and manipulation for pre-training)
import torchvision
from torchvision import datasets, models, transforms

# ... for data handling
import numpy as np

# ... general utilities
import time, os, copy

In [None]:
# Check that PyTorch & TorchVision have been successfully installed
print( "PyTorch", torch.__version__ )
print( "TorchVision", torchvision.__version__ )

### Downloading our fine-tuning dataset from Kaggle

I have uploaded a dataset with training, validation, and testing images of people wearing masks to Kaggle (completely public). Be sure to set up a Kaggle API key (in a properly-placed ~/.kaggle/kaggle.json chmod'ded +600)!

Then you can either run the below function, which will perform all the installs, downloads, and unzips to bring the dataset to your working directory. Or download it <a href = "https://www.kaggle.com/datasets/squanch/maskimageset"> here </a>!

In [None]:
# BEFORE RUNNING THIS CELL MAKE SURE YOU HAVE A KAGGLE API KEY
# Place it in ~/.kaggle/kaggle.json (and chmod +600 if you like to be safe)
# NOTE: If you're on Windows, look in your C drive and re-examine your OS choices

# This is over-engineered! I suggest doing this manually for your own edification and enrichment

def downloadAndUnzipData():

    # If data is already present, return
    if os.path.exists( './mask_image_set/' ):
        print( 'Dataset already exists in the working directory.' )
        return

    # Install Kaggle API client if not already installed
    try:
        __import__( 'kaggle' )

    except ( ModuleNotFoundError, ImportError ):
        pip = __import__( 'pip' )
        pip.main( ['install', 'kaggle'] )

    # Now use a bash subprocess to download the dataset
    subprocess = __import__( 'subprocess' )

    downloadCommand = "kaggle datasets download -d squanch/maskimageset" # Hope you're a Rick & Morty fan! As I was 5 years ago
    
    commandResult = subprocess.run( downloadCommand, 
                                   shell = True, 
                                   stdout = subprocess.PIPE,
                                   stderr = subprocess.PIPE,
                                   text = True
                                   )
    
    if commandResult.returncode != 0 or os.path.exists( './maskimageset.zip' ):
        raise Exception( "Data failed to download. Please check your API key, or try manually." )
    
    # Now unzip the file
    try:
        zipfile = __import__( 'zipfile' )
        with zipfile.ZipFile( './maskimageset.zip' ) as zipObj:
            zipObj.extractall()

    except:
        raise Exception( "Data failed to unzip. Unzip it yourself." )
    
    # Remove the original zipped file from the working directory
    if os.path.exists( './maskimageset.zip' ):
        os.remove( './maskimageset.zip' )

    # And we're done! There should be a completely hydrated folder called 'mask_image_set'

In [None]:
# Call the function and watch the data downloading magic happen!
# NOTE: I changed the folder names to 'trainingData', 'testData', 'validationData'; I don't like unclear names anymore

downloadAndUnzipData()

### Setting up the PyTorch DataLoader & Dataset

**Dataset** - the structure that organizes the images and labels

**DataLoader** - generator that yields data batches for model-training at each step

In [None]:
# Let's set some settings!

settings = {
    'DATASET_PATH': './mask_image_set/', # Should be local if you've performed the above, otherwise change it
    'NUM_CLASSES': '2', # Masked or non-masked
    'BATCH_SIZE': '64',
    'NUM_EPOCHS': '25', # How many levels of training do you want?
    'FEATURE_EXTRACTION': 'ON', # Only update reshaped layer parameters (otherwise set to empty string)
    'IMAGE_SIZE': '224', # Standardize all input images to a square image of 224px
    'RUNTIME_DEVICE': 'cuda:0' if torch.cuda.is_available() else 'cpu' # Use CUDA if available!
}

os.environ.update( settings )

In [None]:
# Get training and validation data from their respective folders!

# Common mean and standard deviation selection for ImageNet normalization (for DataTransforms)
normalizationMean, normalizationSTD = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]

# DataTransforms for trainingData and validationData (changed folder names because we don't like names like 'train', 'test', 'val')
DataTransforms = {
    'trainingData': transforms.Compose( [
        transforms.RandomResizedCrop( int( os.environ.get('IMAGE_SIZE', '224') ) ), # Random Crop
        transforms.RandomHorizontalFlip(), # Random Horizontal Flip
        transforms.ToTensor(), # Conversion to Tensor
        transforms.Normalize( normalizationMean, normalizationSTD )
    ]),

    'validationData': transforms.Compose( [
        transforms.Resize( int( os.environ.get('IMAGE_SIZE', '224') ) ),
        transforms.CenterCrop( int( os.environ.get('IMAGE_SIZE', '224') )),
        transforms.ToTensor(),
        transforms.Normalize( normalizationMean, normalizationSTD )
    ])
}

# FOLDER_NAMES to eliminate redundancy moving forward ( NOTE: I changed folder names )
FOLDER_NAMES = ('trainingData', 'validationData')

getFolderPath = lambda fileFolder : os.path.join( os.environ.get('DATASET_PATH'), fileFolder ) 

# Map our DataTransforms and create trainingData and validationData datasets
ImageDatasets = { 
    currentFolder: datasets.ImageFolder( getFolderPath( currentFolder ), \
                                      DataTransforms[ currentFolder ] ) \
                for currentFolder in FOLDER_NAMES
}


# Create trainingData and validationData DataLoaders
ImageDataLoaders = {
    currentFolder: torch.utils.data.DataLoader( ImageDatasets[ currentFolder ], \
                                             batch_size = int( os.environ.get('BATCH_SIZE') ), \
                                             shuffle = True, \
                                             num_workers = 4 ) \
                for currentFolder in FOLDER_NAMES
}

### Initializing our model

This VGG-11 classifier model has learned to classify 1,000 classes! We are setting up a simple binary classification (masked/non-masked)

In [None]:
# If feature extraction is turned on, we will freeze all of the layers except the last
def setGradientRequirements( model, feature_extraction_on = bool( os.environ.get('FEATURE_EXTRACTION', '') ) ):
    if feature_extraction_on:
        for parameter in model.parameters():
            parameter.requires_grad = False

In [None]:
# Now let's do some setup!

# Our pre-trained VGG-11 image classifier
classificationModel = models.vgg11_bn( pretrained = True )

# Set gradient requirements (using the function defined above)
setGradientRequirements( model = classificationModel )

# Number of input features to model's forward method
lastLayer = 6
numInputFeatures = classificationModel.classifier[ lastLayer ].in_features

# Change the dimension of the last layer to 2 (we're performing binary classification)
classificationModel.classifier[ lastLayer ] = nn.Linear( numInputFeatures, lastLayer )

# Create a Torch device based on CUDA availability and move our model there!
runtimeDevice = torch.device( os.environ.get("RUNTIME_DEVICE", 'cpu') )
classificationModel.to( runtimeDevice )

### Model Optimization

Here we set up our model optimizer and loss metric. In deep learning, loss metrics are used to evaluate how close our predictions were to the "ground truth". Smaller loss means better performance. The optimizer computes gradients and minimizes loss at each training step.

We use a **stochastic gradient descent** as our optimizer and **cross-entropy** as our loss metric

In [None]:
# Let's set up our model optimizer and loss metric

# Stochastic gradient descent (model optimizer)
modelOptimizer = optim.SGD( 
    classificationModel.parameters(),
    lr = 0.001, # Learning rate (play around with this for best accuracy)
    momentum = 0.9
)

# Cross entropy (loss metric)
lossMetric = nn.CrossEntropyLoss()

### Now the fun stuff -- let's train!

The function below uses all of the setup we have done to train the model and visualize training and validation loss & accuracy at each step. At the end, we can see the model at its best and the amount of time training took (feel free to change the time to each "epoch" or training cycle)

In [None]:
# Now let's train our model! 

def trainModel( model, dataloaders, criterion, optimizer, num_epochs ):

    '''
    Function to train a model using particular datasets, optimizer, and loss function

    Args:
    - model: a pre-defined neural network model
    - dataloaders: a dictionary containing the training and validation data loaders
    - criterion: the loss function used to measure the error of the model's output
    - optimizer: the optimization algorithm used to adjust the model's parameters
    - num_epochs: the number of times the entire dataset is passed through the model during training 

    Returns:
    - model: the trained model
    '''
    
    # Process startTime
    startTime = time.time()

    # Validation Accuracy History - % of correctly classified images at 
    # the end of each training epoch
    accuracyLog = []
    
    # Highest model accuracy and set of model weights
    bestModelWeights = copy.deepcopy( model.state_dict() )
    highestAccuracy = 0.0

    # Training Epochs
    for currentEpoch in range(1, (num_epochs + 1) ):

      # Log currentEpoch to console

      logText = 'Epoch {}/{}'.format(currentEpoch, num_epochs)
      print( logText )

      print( '-' * len( logText ) ) # Number of dashes changes with text length

      # Each epoch has a training and validation phase
      PHASES = ('Training', 'Validation')

      # Reset the currentFolder to trainingData in each epoch
      currentFolder = 'trainingData'
      
      # Toggle between training phase and validation phase
      for currentPhase in PHASES:
          
          # If we are in the training phase, set the model to training mode
          if currentPhase == 'Training':
              model.train()

          # Otherwise, set the model to evaluation mode
          else:
              model.eval()
              currentFolder = 'validationData' # Toggle currentFolder to validationData

          # Continually track the number of correct predictions
          # and the average loss of the model on the trainingData
          numCorrect, runningLoss = 0, 0.0

          # Iterate through the datasets
          for modelInputs, targetOutput in dataloaders[ currentFolder ]:
              
              # Write the inputs and outputs to the RuntimeDevice
              modelInputs, targetOutput = modelInputs.to( runtimeDevice ), targetOutput.to( runtimeDevice )

              # Zero the parameter gradients at the beginning of each iteration 
              optimizer.zero_grad()

              # Move our model forwards
              # Track history if we are in the training phase (but not in the evalution phase)
              with torch.set_grad_enabled( currentPhase == 'Training' ):
                  
                  # Obtain model outputs
                  modelOutputs = model( modelInputs )

                  # Update the LossFunction and modelPredictions
                  LossFunction = criterion( modelOutputs, targetOutput )
                  _, modelPredictions = torch.max( modelOutputs, 1 )

                  # If we are in the training phase, move the LossFunction backwards and optimize
                  if currentPhase == 'Training':
                      LossFunction.backward()
                      optimizer.step()

              # Update our runningLoss and our number of correct predictions
              runningLoss += LossFunction.item() * modelInputs.size(0)
              numCorrect += torch.sum( modelPredictions == targetOutput.data)

          # The loss over the epoch is the loss so far / the size of the dataset
          epochLoss = runningLoss / len( dataloaders[ currentFolder ].dataset )

          # The epoch accuracy is the number of correct predictions / the size of the dataset
          epochAccuracy = numCorrect.double() / len( dataloaders[ currentFolder ].dataset )

          # Print to console
          print( '{} | Loss: {:.4f} Accuracy: {:.4f}'.format( currentPhase, epochLoss, epochAccuracy ) )

          # If we are in the validation phase, deep copy the model
          if currentPhase == 'Validation':

            # Update the highestAccuracy and bestModelWeights if we have encountered bettter accuracy
            if epochAccuracy > highestAccuracy:
              highestAccuracy = epochAccuracy
              bestModelWeights = copy.deepcopy( model.state_dict() )
          
            # Add the epoch's accuracy to the accuracyLog
            accuracyLog.append( epochAccuracy )

      print() # Blank line

    # Total elapsedTime
    elapsedTime = time.time() - startTime

    # Log process time and model performance to console
    print('\n\nTraining time: {:.0f}m {:.0f}s'.format( elapsedTime // 60, elapsedTime % 60 ) )
    print('Highest validation accuracy: {:.4f}%'.format( highestAccuracy * 100 ) )

    # Load the bestModelWeights
    model.load_state_dict( bestModelWeights )

    # Return the model
    return model

In [None]:
fineTunedModel = trainModel( 
    model = classificationModel, 
    dataloaders = ImageDataLoaders, 
    criterion = lossMetric, 
    optimizer = modelOptimizer, 
    num_epochs = int( os.environ.get('NUM_EPOCHS') ) 
)

### ... And let's see how we did! (Testing)

In [None]:
# Add the testData transforms to our DataTransforms HashMap, and set it equal to our training data transforms

# NOTE: This takes forever on a CPU! I highly suggest you get a nice T4 Nvidia runtime & breeze through this

DataTransforms[ 'testData' ] = DataTransforms[ 'trainingData' ]

# Create our test dataset
ImageDatasets[ 'testData' ] = datasets.ImageFolder( get_folder_path( 'testData' ),
                                                    DataTransforms['testData'] )

# Create our test DataLoader

ImageDataLoaders[ 'testData' ] = torch.utils.data.DataLoader( ImageDatasets['testData'],
                                                              batch_size = BATCH_SIZE,
                                                              shuffle = True,
                                                              num_workers = 4 )

In [None]:
# Now let's assume this model has been trained (which it was in Colab, but migrated locally)
# And test our model!

def testModel( model, dataloader ):
  
  '''
  Function to test the performance of a trained model on a test dataset

  Arguments:
  - model (nn.Module): trained image classification model to be tested
  - dataloader (DataLoader): PyTorch DataLoader object for the test dataset

  Returns:
  - testAccuracy (float): the classification accuracy of the model
  '''

  # First, we set the model to evaluation mode
  model.eval()

  # Track the # of correct predictions and the # of total predictions
  numCorrect, numPredictions = 0, 0


  with torch.no_grad(): # Gradients are not needed in inference/validation

    # Iterate over the test dataset   
    for modelInputs, targetOutput in dataloader:

      # Move the modelInputs and targetOutputs to the RuntimeDevice
      modelInputs, targetOutput = modelInputs.to( runtimeDevice ), targetOutput.to( runtimeDevice )

      # Predict!
      modelOutput = model( modelInputs )
      _, modelPrediction = torch.max( modelOutput, 1 )

      # Update the numCorrect and numPredictions
      numCorrect += (modelPrediction == targetOutput).sum().item()
      numPredictions += targetOutput.size(0)

  # Calculate testAccuracy ( return object )
  testAccuracy = numCorrect / numPredictions

  # Log testAccuracy to console
  print( 'Test Accuracy: {:.2f}%'.format( testAccuracy * 100 ) )

In [None]:
testModel( model = fineTunedModel,
            dataloader = ImageDataLoaders['testData'] )