# Face Mask Detection by Live Webcam

In [None]:
import torch
import torch.nn as nn #its help to create and train Neural network
import torch.optim as optim #implementing various optimization algorithms
from torch.optim import lr_scheduler #learning rate scheduler, we can gradually decrease the learning rate value dynamically while training
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import pandas as pd
import shutil
import time
import copy

In [None]:
## [ 1 ] Data loading

In [None]:
experiments_path = 'C:/Users/SIRISHA/Desktop/Face-Mask-Detection-and-Authentication-main/experiements/dest_folder/'
data_path = 'C:/Users/SIRISHA/Desktop/Face-Mask-Detection-and-Authentication-main/experiements/data/' 

In [None]:
!pip install torchvision

## [ 2 ] Data pre-processing

In [None]:
#train_data = experiments_path+'train'
# train_data = datasets.ImageFolder(root = train_dir, 
#                                   transform = transforms.ToTensor())

# means = torch.zeros(3)
# stds = torch.zeros(3)

# for img, label in train_data:
#     means += torch.mean(img, dim = (1,2))
#     stds += torch.std(img, dim = (1,2))

# means /= len(train_data)
# stds /= len(train_data)
    
# print(f'Calculated means: {means}')
# print(f'Calculated stds: {stds}')

Now to actually load our data. As we are going to be using a pre-trained model we will need to ensure that our images are the same size and have the same normalization as those used to train the model - which we find on the torchvision models page.

We use the same data augmentation as always: randomly rotating, flipping horizontally and cropping.

In [None]:

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224), #crop the images 
        transforms.ToTensor(),  # image to a pixel with range [0,1]
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) #normalize rgb image in same std and mean
    ]), 
    'test' : transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

In [None]:
!pip install transforms

In [None]:
def get_train_files_path(experiments_path, data_path, phase):
    if phase == 'train':
        file_name = 'train.csv'
    elif phase == 'test':
        file_name = 'test.csv'
    else:
        print("phase can only have train and test as parameter values")
        exit()
        
    file_path = os.path.join(experiments_path, file_name)
    train_df = pd.read_csv(file_path, delimiter=',')
    files_path = []
    fonts_class = []
    for row in train_df.iterrows():
        files_path.append(os.path.join(data_path, row[1]['class'], row[1]['filename']))
        fonts_class.append(row[1]['class'])
    
    return files_path, fonts_class

In [None]:
def copy_images_to_path(file_path, file_class, destination_dir):
    font_folder = os.path.join(destination_dir, file_class)
    if os.path.exists(font_folder) == False:
        os.makedirs(font_folder)
    
    print("File being copied from {}:{}".format(file_path, font_folder))
    shutil.copy(file_path, font_folder)
    #shutil.copyfile(file_path, font_folder)

## [ 3 ] Data Splitting

In [None]:
X_train, y_train = get_train_files_path(experiments_path, data_path, phase='train')
X_test, y_test = get_train_files_path(experiments_path, data_path, phase='test')

In [None]:
train_dir = os.path.join(experiments_path, 'train')
test_dir = os.path.join(experiments_path, 'test')

if not os.path.exists(train_dir):
    os.makedirs(train_dir)

if not os.path.exists(test_dir):
    os.makedirs(test_dir)

In [None]:
import os

In [None]:
for file_path, font_class in zip(X_train, y_train):
    copy_images_to_path(file_path, font_class, train_dir)

In [None]:
image_datasets = {x: datasets.ImageFolder(os.path.join(experiments_path, x), data_transforms[x]) for x in ['train', 'test']}

In [None]:
image_datasets['train']

In [None]:
image_datasets['test']

In [None]:
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], 
                                             batch_size=16, 
                                             shuffle=True, 
                                             num_workers=4) 
               for x in ['train', 'test']} #works for creating batch of 16 images and work on 4 images at a time

In [None]:
dataloaders

In [None]:
class_names = image_datasets['train'].classes

In [None]:
class_names

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
'''CUDA (an acronym for Compute Unified Device Architecture) is a parallel 
computing platform and application programming interface (API) model created by Nvidia'''
device

In [None]:
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'test']}

## [ 4 ] Visualizing images

In [None]:

def imshow(inp, title=None):
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean #optimize input image
    inp = np.clip(inp, 0, 1) #taking clip of value 0 to 1
    plt.figure(figsize=(20,20))
    plt.imshow(inp)

    if title is not None:
        plt.title(title)
    plt.pause(0.001)

In [None]:
# Get a batch of training data
# inputs contains 4 images because batch_size=4 for the dataloaders
inputs, classes = next(iter(dataloaders['train']))
# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

In [None]:
!pip install dataloaders

## [ 5 ] Training the model

In [None]:

def train_model(model, criterion, optimizer, scheduler, num_epochs=20):
    since = time.time()
    best_acc = 0.0
    best_model = copy.deepcopy(model.state_dict())
    
    new_freeze_state = None
    prev_freeze_state = False
    for epoch in range(num_epochs):
        print("Epoch {}/{}".format(epoch, num_epochs - 1))
        print('-' * 10)
        
        for phase in ['train', 'test']:
            if phase == 'train':
                scheduler.step()
                model.train()
            else:
                model.eval()
                
            running_loss = 0.0
            running_corrects = 0
        
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                        
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            print('{} Loss: {:.4f} Acc:{:.4f}'.format(
                phase, epoch_loss, epoch_acc))
            
            
            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model = copy.deepcopy(model.state_dict())
            
            print()
    
    time_elapsed = time.time() - since
    print('Training complete in {:0f}m {:.0f}s'.format(
    time_elapsed // 60, time_elapsed % 60))
    print('Best val acc: {:4f}'.format(best_acc))
    
    model.load_state_dict(best_model)
    return model

### [ 5.1 ] ResNet Model

In [None]:
re_model_ft = models.resnet101(pretrained=True)

re_num_frts = re_model_ft.fc.in_features
re_model_ft.fc = nn.Linear(re_num_frts, len(class_names))

re_model_ft = re_model_ft.to(device)
criterion = nn.CrossEntropyLoss()
''' Adaptive Subgradient Methods (AdaGrad)? AdaGrad is a variation of 
stochastic gradient optimization algorithms that updates the learning rate for each parameter.'''
#optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.01, momentum=0.9)
optimizer_ft = optim.Adagrad(re_model_ft.parameters(), lr=0.001)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
ResNetmodel_ft = train_model(re_model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)

In [None]:
torch.save(ResNetmodel_ft, 'C:/Users/SIRISHA/Desktop/Face-Mask-Detection-and-Authentication-main/maskmodel1_resnet101.pth')

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
nb_classes = 2
confusion_matrix = np.zeros((nb_classes, nb_classes))
with torch.no_grad():
    for i, (inputs, classes) in enumerate(dataloaders['test']):
        inputs = inputs.to(device)
        classes = classes.to(device)
        outputs = ResNetmodel_ft(inputs)
        _, preds = torch.max(outputs, 1)
        for t, p in zip(classes.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1

plt.figure(figsize=(5,5))
print("============================================")
print(f"Normalized confusion matrix:")
for row in confusion_matrix:
    a = row / row.sum()
    n = np.round_(a, decimals = 4)
    print(n)
print("============================================")
class_names = ['with_mask', 'without_mask']
df_cm = pd.DataFrame(confusion_matrix, index=class_names, columns=class_names).astype(int)
heatmap = sns.heatmap(df_cm, annot=True, fmt="d")

heatmap.yaxis.set_ticklabels(heatmap.yaxis.get_ticklabels(), rotation=0, ha='right',fontsize=15)
heatmap.xaxis.set_ticklabels(heatmap.xaxis.get_ticklabels(), rotation=45, ha='right',fontsize=15)
plt.ylabel('True label')
plt.xlabel('Predicted label')

### [ 5.2 ] AlexNet Model

In [None]:
#Now using the AlexNet
AlexNet_model = torch.hub.load('pytorch/vision:v0.6.0', 'alexnet', pretrained=True)
#Model description
AlexNet_model.eval()

In [None]:
#Updating the second classifier
AlexNet_model.classifier[4] = nn.Linear(4096,1024)

#Updating the third and the last classifier that is the output layer of the network. Make sure to have 10 output nodes if we are going to get 10 class labels through our model.
AlexNet_model.classifier[6] = nn.Linear(1024,2)

In [None]:
AlexNet_model.eval()

In [None]:
#Instantiating CUDA device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#Verifying CUDA
print(device)
#Move the input and AlexNet_model to GPU for speed if available
AlexNet_model.to(device)

In [None]:
import torch.optim as optim
import torch.nn as nn
#Loss
criterion = nn.CrossEntropyLoss()
#Optimizer(SGD)
# optimizer = optim.SGD(AlexNet_model.parameters(), lr=0.001, momentum=0.9)

In [None]:
optimizer_ft = optim.Adagrad(AlexNet_model.parameters(), lr=0.001)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
AlexNetmodel_ft = train_model(AlexNet_model, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)

In [None]:
torch.save(AlexNetmodel_ft, 'C:/Users/SIRISHA/Desktop/Face-Mask-Detection-and-Authentication-main/maskmodel2_Alexnet.pth')

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
nb_classes = 2
confusion_matrix = np.zeros((nb_classes, nb_classes))
with torch.no_grad():
    for i, (inputs, classes) in enumerate(dataloaders['test']):
        inputs = inputs.to(device)
        classes = classes.to(device)
        outputs = AlexNetmodel_ft(inputs)
        _, preds = torch.max(outputs, 1)
        for t, p in zip(classes.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1

plt.figure(figsize=(5,5))
print("============================================")
print(f"Normalized confusion matrix:")
for row in confusion_matrix:
    a = row / row.sum()
    n = np.round_(a, decimals = 4)
    print(n)
print("============================================")
class_names = ['with_mask', 'without_mask']
df_cm = pd.DataFrame(confusion_matrix, index=class_names, columns=class_names).astype(int)
heatmap = sns.heatmap(df_cm, annot=True, fmt="d")

heatmap.yaxis.set_ticklabels(heatmap.yaxis.get_ticklabels(), rotation=0, ha='right',fontsize=15)
heatmap.xaxis.set_ticklabels(heatmap.xaxis.get_ticklabels(), rotation=45, ha='right',fontsize=15)
plt.ylabel('True label')
plt.xlabel('Predicted label')


### [ 5.3 ] GoogleNet Model

In [None]:
go_model_ft = models.googlenet(pretrained=True)

go_num_frts = go_model_ft.fc.in_features
go_model_ft.fc = nn.Linear(go_num_frts, len(class_names))

go_model_ft = go_model_ft.to(device)
criterion = nn.CrossEntropyLoss()
''' Adaptive Subgradient Methods (AdaGrad)? AdaGrad is a variation of 
stochastic gradient optimization algorithms that updates the learning rate for each parameter.'''
#optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.01, momentum=0.9)
optimizer_ft = optim.Adagrad(go_model_ft.parameters(), lr=0.001)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
GoogleNetmodel_ft = train_model(go_model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)

In [None]:
torch.save(GoogleNetmodel_ft, 'C:/Users/SIRISHA/Desktop/Face-Mask-Detection-and-Authentication-main/maskmodel3_GoogleNet.pth')

In [None]:
import GoogleNetmodel_ft

In [None]:
!pip install GoogleNetmodel_ft

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
nb_classes = 2
confusion_matrix = np.zeros((nb_classes, nb_classes))
with torch.no_grad():
    for i, (inputs, classes) in enumerate(dataloaders['test']):
        inputs = inputs.to(device)
        classes = classes.to(device)
        outputs = GoogleNetmodel_ft(inputs)
        _, preds = torch.max(outputs, 1)
        for t, p in zip(classes.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1

plt.figure(figsize=(5,5))
print("============================================")
print(f"Normalized confusion matrix:")
for row in confusion_matrix:
    a = row / row.sum()
    n = np.round_(a, decimals = 4)
    print(n)
print("============================================")
class_names = ['with_mask', 'without_mask']
df_cm = pd.DataFrame(confusion_matrix, index=class_names, columns=class_names).astype(int)
heatmap = sns.heatmap(df_cm, annot=True, fmt="d")

heatmap.yaxis.set_ticklabels(heatmap.yaxis.get_ticklabels(), rotation=0, ha='right',fontsize=15)
heatmap.xaxis.set_ticklabels(heatmap.xaxis.get_ticklabels(), rotation=45, ha='right',fontsize=15)
plt.ylabel('True label')
plt.xlabel('Predicted label')

### [ 5.4 ] VGG Model

In [None]:
### Define model
vg_model = models.vgg16(pretrained = True)

### Modifying last few layers and no of classes
# NOTE: cross_entropy loss takes unnormalized op (logits), then function itself applies softmax and calculates loss, so no need to include softmax here
vg_model.classifier = nn.Sequential(
    nn.Linear(25088, 4096, bias = True),
    nn.ReLU(inplace = True),
    nn.Dropout(0.4),
    nn.Linear(4096, 2048, bias = True),
    nn.ReLU(inplace = True),
    nn.Dropout(0.4),
    nn.Linear(2048, 200)
)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
torch.cuda.empty_cache()

vg_model.to(device)

criterion = nn.CrossEntropyLoss()
''' Adaptive Subgradient Methods (AdaGrad)? AdaGrad is a variation of 
stochastic gradient optimization algorithms that updates the learning rate for each parameter.'''
#optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.01, momentum=0.9)
optimizer_ft = optim.Adagrad(vg_model.parameters(), lr=0.001)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
Vggmodel_ft = train_model(vg_model, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)

In [None]:
torch.save(Vggmodel_ft, 'C:/Users/SIRISHA/Desktop/Face-Mask-Detection-and-Authentication-main/maskmodel4_vgg.pth')

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
nb_classes = 2
confusion_matrix = np.zeros((nb_classes, nb_classes))
with torch.no_grad():
    for i, (inputs, classes) in enumerate(dataloaders['test']):
        inputs = inputs.to(device)
        classes = classes.to(device)
        outputs = Vggmodel_ft(inputs)
        _, preds = torch.max(outputs, 1)
        for t, p in zip(classes.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1

plt.figure(figsize=(5,5))
print("============================================")
print(f"Normalized confusion matrix:")
for row in confusion_matrix:
    a = row / row.sum()
    n = np.round_(a, decimals = 4)
    print(n)
print("============================================")
class_names = ['with_mask', 'without_mask']
df_cm = pd.DataFrame(confusion_matrix, index=class_names, columns=class_names).astype(int)
heatmap = sns.heatmap(df_cm, annot=True, fmt="d")

heatmap.yaxis.set_ticklabels(heatmap.yaxis.get_ticklabels(), rotation=0, ha='right',fontsize=15)
heatmap.xaxis.set_ticklabels(heatmap.xaxis.get_ticklabels(), rotation=45, ha='right',fontsize=15)
plt.ylabel('True label')
plt.xlabel('Predicted label')

## [ 6 ] Visualizing model

In [None]:
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    #fig = plt.figure(figsize=(10,10))
    
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['test']):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            print(preds,"predicitons")
            
            
            for j in range(inputs.size()[0]):
                images_so_far +=1
                #ax = plt.subplot(num_images//len(labels)-1, len(labels), images_so_far)
                #ax.axis('off')
                #ax.set_title('true: {} predicted: {}'.format(class_names[labels[j]], class_names[preds[j]]))
                print('true: {} predicted: {}'.format(class_names[labels[j]], class_names[preds[j]]))
                #imshow(inputs.cpu().data[j])
                
                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [None]:
visualize_model(ResNetmodel_ft)

In [None]:
visualize_model(AlexNetmodel_ft)

In [None]:
visualize_model(GoogleNetmodel_ft)

In [None]:
visualize_model(Vggmodel_ft)