<a href="https://colab.research.google.com/github/harshitha-gummuluri/Code-Clause/blob/main/FaceMask_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import torch
from torch.utils.data import Dataset
import pandas as pd
from torchvision.io import read_image
from torchvision.transforms import Resize
from torch import nn
from torch.utils.data import DataLoader
from torch.optim import Optimizer
from torch.optim import Adam
from torch import Tensor
from sklearn.model_selection import train_test_split
import numpy as np
import time
import os
import matplotlib.pyplot as plt

In [6]:
# Dataset Class, used by DataLoader
class MaskImgDataset(Dataset):
    def __init__(self, data_frame):
        self.data_path = "/kaggle/input/face-mask-detection-dataset/images/"
        self.data_frame = data_frame
        self.transformations = Resize((100, 100))

    def __len__(self):
        return len(self.data_frame["File"])

    def __getitem__(self, index):
        img = read_image(self.data_path + self.data_frame["File"][index])
        label = self.data_frame["Label"][index]

        img = self.transformations(img).float()

        return img, label

In [10]:
# Define the structure and the steps of the model
class MaskDetector(nn.Module):
    def __init__(self, loss_function):
        super(MaskDetector, self).__init__()

        self.loss_function = loss_function

        self.conv2d_1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=(3,3), padding=(1,1)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2))
        )

        self.conv2d_2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=(3,3), padding=(1,1)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2))
        )

        self.conv2d_3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(3,3), padding=(1,1), stride=(3,3)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2))
        )

        self.linearLayers = nn.Sequential(
            nn.Linear(in_features=2048, out_features=1024),
            nn.ReLU(),
            nn.Linear(in_features=1024, out_features=2)
        )

        for sequential in [self.conv2d_1, self.conv2d_2, self.conv2d_3, self.linearLayers]:
            for layer in sequential.children():
                if isinstance(layer, (nn.Linear, nn.Conv2d)):
                    nn.init.xavier_uniform_(layer.weight)

    def forward(self, x):
        out = self.conv2d_1(x)
        out = self.conv2d_2(out)
        out = self.conv2d_3(out)
        out = out.view(-1, 2048)
        out = self.linearLayers(out)

        return out
    def add_optimizer(self, optimizer):
        self.optimizer = optimizer



In [11]:
def plot_losses(loss_db):
    plt.plot(loss_db)
    plt.xlabel("Epoch")
    plt.ylabel("Loss")

In [12]:
# One epoch of training
def model_train_loop(model, train_DL):
    total_loss = 0
    total_correct = 0

    for inputs, labels in train_DL:  # iterate over batches
        # move inputs to GPU if used
        inputs, labels = inputs.to(device), labels.to(device)

        # forward pass
        pred = model(inputs)
        labels = labels.flatten()
        loss = model.loss_function(pred, labels)

        total_loss += loss.item()
        total_correct += (pred.argmax(dim = 1) == labels).sum()  # count correct predictions

        # backward pass and update parameters
        model.optimizer.zero_grad()
        loss.backward()
        model.optimizer.step()

    return total_loss, total_correct

# Train model for num_epochs
def model_train(model, train_DL, num_epochs):
    loss_db = []
    print("Started training")
    start = time.time()

    for i in range(num_epochs):
        loss, correct = model_train_loop(model, train_DL)
        loss_db.append(loss)

        print(f"Epoch {i}: Loss {loss:.4f} Accuracy {correct * 100 / len(train_DL.dataset) :.2f}%")

    print(f"Training took {time.time() - start:.4f}s")
    plot_losses(loss_db)

In [13]:
# Predict for validation dataset
def validate_model(model, val_DL, limit = float('inf')):
    total_correct = 0
    total = 0
    total_masked = 0
    total_masked_correct = 0

    for inputs, labels in val_DL:
        inputs, labels = inputs.to(device), labels.to(device)
        pred = model(inputs)
        labels = labels.flatten()

        # Count total correct predictions
        total_correct += (pred.argmax(dim = 1) == labels).sum()
        total += len(labels)

        # Count correctly predicted masked images (true positives)
        total_masked += labels.sum()
        total_masked_correct += ((pred.argmax(dim=1) == 1) & (labels == 1)).sum()

        # If limit is given, only predict for images upto limit
        if total >= limit: break

    print(f"Validation accuracy: {total_correct * 100 / total :.2f}%")
    print(f"Accuracy of identifying masked images: {total_masked_correct * 100 / total_masked :.2f}%")

In [14]:
# Use GPU if available
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


In [24]:
# Preparing data for training and validation

maskDF = pd.read_csv("/content/mask_df.csv")
# Split data into training and validation sets
train_maskDF, val_maskDF = train_test_split(maskDF, train_size=0.7, random_state=0, stratify=maskDF["Label"])
train_maskDF.reset_index(inplace = True, drop = True)
val_maskDF.reset_index(inplace = True, drop = True)

train_DS = MaskImgDataset(train_maskDF)
val_DS = MaskImgDataset(val_maskDF)

train_DL = DataLoader(train_DS, batch_size=32, shuffle=True, num_workers=2)
val_DL = DataLoader(val_DS, batch_size=32, num_workers=2)


In [23]:
# Configuring the loss function
# Weights are used because the data is unbalanced

num_non_mask_images = maskDF[maskDF["Label"] == 0].shape[0]
num_mask_images = maskDF[maskDF["Label"] == 1].shape[0]
total_images = num_non_mask_images + num_mask_images

normed_weights = [1 - num_non_mask_images/total_images, 1 - num_mask_images/total_images]
print("Weights for [unmasked, masked]:", normed_weights)

loss_function = nn.CrossEntropyLoss(weight = torch.tensor(normed_weights))

# initializing the model
model = MaskDetector(loss_function)
optimizer = Adam(model.parameters(), lr=0.00001)
model.add_optimizer(optimizer)
model.to(device)

Weights for [unmasked, masked]: [0.015228536906614965, 0.984771463093385]


MaskDetector(
  (loss_function): CrossEntropyLoss()
  (conv2d_1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (conv2d_2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (conv2d_3): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(3, 3), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (linearLayers): Sequential(
    (0): Linear(in_features=2048, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=2, bias=True)
  )
)

In [27]:
# Use model to predict for images outside the dataset
labels = ["No mask", "Masked"]
def predict_image(model, img_path, ax = None):
    img = read_image(img_path)
    re_img = Resize((100, 100))(img).reshape((1, 3, 100, 100)).float()
    re_img = re_img.to(device)
    pred = model(re_img)
    res = pred.argmax(dim = 1)

    ax.imshow(img.permute((1, 2, 0)))
    ax.set_title(labels[res], fontdict = {'fontsize' : 40})
    ax.axis('off')


In [28]:
def predict_real_images(model, img_dir):
    images = os.listdir(img_dir)
    fig, axs = plt.subplots(1, len(images), figsize = (50, 10))
    for i, img in enumerate(images):
        predict_image(model, os.path.join(img_dir, img), axs[i])


In [38]:
images=pd.read_csv( "/content/train_labels.csv")