Step 1. Import all the necessary libraries for building the object detection model


In [None]:
import os
import numpy as np
import torch
import torchvision
import pycocotools
import pandas as pd
from PIL import Image
from torch.utils.data import Subset, DataLoader
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
print(torchvision.__version__)
!git clone https://github.com/pytorch/vision.git
!cp vision/references/detection/utils.py ./
!cp vision/references/detection/transforms.py ./
!cp vision/references/detection/coco_eval.py ./
!cp vision/references/detection/engine.py ./
!cp vision/references/detection/coco_utils.py ./
from engine import train_one_epoch, evaluate
import utils
import transforms as TF

Step 2: Download the dataset from the following link. The dataset basically contains images of people wearing face mask, not wearing face mask and wearing the facemask incorrectly.

In [None]:
import requests
url = 'https://empslocal.ex.ac.uk/people/staff/ad735/ECMM426/MaskedFace.zip'  # Replace with your URL
r = requests.get(url, allow_redirects=True)
open('dataset.zip', 'wb').write(r.content)

Unzip the Dataset

In [None]:
!unzip dataset.zip

Step 3 : Seperate the annotations files and place it in the seperate directory. Do the same for train and val folders respectively

In [None]:
import os
import shutil

def separate_xml_files(source_dir, destination_dir):
    # Create the destination directory if it doesn't exist
    if not os.path.exists(destination_dir):
        os.makedirs(destination_dir)

    # List all files in the source directory
    files = os.listdir(source_dir)

    # Loop through each file
    for file_name in files:
        # Check if the file is an XML file
        if file_name.endswith('.xml'):
            # Move XML files to the destination directory
            shutil.move(os.path.join(source_dir, file_name), os.path.join(destination_dir, file_name))
            print(f"Moved {file_name} to {destination_dir}")

if __name__ == "__main__":
    source_directory = '/content/train'
    x = '/content'
    destination_directory = os.path.join(x, "annotations")

    # Call the function to separate XML files
    separate_xml_files(source_directory, destination_directory)

In [None]:
import os
import shutil

def separate_xml_files(source_dir, destination_dir):
    # Create the destination directory if it doesn't exist
    if not os.path.exists(destination_dir):
        os.makedirs(destination_dir)

    # List all files in the source directory
    files = os.listdir(source_dir)

    # Loop through each file
    for file_name in files:
        # Check if the file is an XML file
        if file_name.endswith('.xml'):
            # Construct source and destination file paths
            source_file = os.path.join(source_dir, file_name)
            destination_file = os.path.join(destination_dir, file_name)

            # Move XML files to the destination directory
            os.rename(source_file, destination_file)
            print(f"Moved {file_name} to {destination_dir}")

source_directory = '/content/val/'
destination_directory = os.path.join(source_directory, "annotations")
separate_xml_files(source_directory, destination_directory)

In [None]:
!mv /content/val/annotations /content/sample_data/

Step 4: Now create MaskDataset classes for train and val datasets which are objects of these classes

In [None]:
## import os
from PIL import Image
from xml.etree import ElementTree as ET
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms.functional import to_tensor
from torchvision.transforms import transforms

class MaskDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.image_dir = os.path.join(root_dir,'train')  # Correctly store the image directory
        self.annot_dir = os.path.join(root_dir, 'annotations')  # Correctly store the annotation directory
        self.transform = transform
        self.image_names = os.listdir(self.image_dir)
        self.image_names.sort()

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        image_name = self.image_names[idx]  # Get the image name from the sorted list
        image_path = os.path.join(self.image_dir, image_name)  # Construct the correct path to the image file
        image = Image.open(image_path).convert("RGB")  # Open the image and convert it to RGB format

        if self.transform:
            image = self.transform(image)  # Apply the transformation on the image if transform is not None

        target = {}  # Initialize the target dictionary

        # Construct the correct path to the annotation file. Assume the annotation filename matches the image name.
        # You might need to change the extension or naming convention depending on your dataset.
        annot_file_name = image_name.replace('.png', '.xml')  # Change this depending on your file naming convention
        annot_path = os.path.join(self.annot_dir, annot_file_name)  # Correct path to the individual annotation file

        tree = ET.parse(annot_path)  # Parse the XML annotation file
        root = tree.getroot()  # Get the root element of the XML

        boxes = []  # List to store the bounding boxes
        labels = []  # List to store the corresponding labels

        for obj in root.findall('object'):
            label = obj.find('name').text  # Get the label of the object

            # Map the labels to numeric values
            if label == 'with_mask':
                labels.append(1)
            elif label == 'without_mask':
                labels.append(2)
            elif label == 'mask_weared_incorrect':
                labels.append(3)

            bndbox = obj.find('bndbox')  # Get the bounding box coordinates
            xmin = int(bndbox.find('xmin').text)
            ymin = int(bndbox.find('ymin').text)
            xmax = int(bndbox.find('xmax').text)
            ymax = int(bndbox.find('ymax').text)

            boxes.append([xmin, ymin, xmax, ymax])  # Append the bounding box coordinates

        boxes = torch.tensor(boxes, dtype=torch.float32)  # Convert the bounding boxes to a tensor
        labels = torch.tensor(labels)  # Convert the labels to a tensor

        target['boxes'] = boxes  # Assign the boxes tensor to 'boxes' key in the target dictionary
        target['labels'] = labels  # Assign the labels tensor to 'labels' key in the target dictionary

        return image, target

In [None]:
## import os
from PIL import Image
from xml.etree import ElementTree as ET
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms.functional import to_tensor
from torchvision.transforms import transforms

class MaskDataset1(Dataset):
    def __init__(self, root_dir, transform=None):
        self.image_dir = os.path.join(root_dir,'val')  # Correctly store the image directory
        self.annot_dir = os.path.join(root_dir, 'sample_data/annotations')  # Correctly store the annotation directory
        self.transform = transform
        self.image_names = os.listdir(self.image_dir)
        self.image_names.sort()

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        image_name = self.image_names[idx]  # Get the image name from the sorted list
        image_path = os.path.join(self.image_dir, image_name)  # Construct the correct path to the image file
        image = Image.open(image_path).convert("RGB")  # Open the image and convert it to RGB format

        if self.transform:
            image = self.transform(image)  # Apply the transformation on the image if transform is not None

        target = {}  # Initialize the target dictionary

        # Construct the correct path to the annotation file. Assume the annotation filename matches the image name.
        # You might need to change the extension or naming convention depending on your dataset.
        annot_file_name = image_name.replace('.png', '.xml')  # Change this depending on your file naming convention
        annot_path = os.path.join(self.annot_dir, annot_file_name)  # Correct path to the individual annotation file

        tree = ET.parse(annot_path)  # Parse the XML annotation file
        root = tree.getroot()  # Get the root element of the XML

        boxes = []  # List to store the bounding boxes
        labels = []  # List to store the corresponding labels

        for obj in root.findall('object'):
            label = obj.find('name').text  # Get the label of the object

            # Map the labels to numeric values
            if label == 'with_mask':
                labels.append(1)
            elif label == 'without_mask':
                labels.append(2)
            elif label == 'mask_weared_incorrect':
                labels.append(3)

            bndbox = obj.find('bndbox')  # Get the bounding box coordinates
            xmin = int(bndbox.find('xmin').text)
            ymin = int(bndbox.find('ymin').text)
            xmax = int(bndbox.find('xmax').text)
            ymax = int(bndbox.find('ymax').text)

            boxes.append([xmin, ymin, xmax, ymax])  # Append the bounding box coordinates

        boxes = torch.tensor(boxes, dtype=torch.float32)  # Convert the bounding boxes to a tensor
        labels = torch.tensor(labels)  # Convert the labels to a tensor

        target['boxes'] = boxes  # Assign the boxes tensor to 'boxes' key in the target dictionary
        target['labels'] = labels  # Assign the labels tensor to 'labels' key in the target dictionary

        return image, target



In [None]:
collate_fn = lambda batch: tuple(zip(*batch))


# Define transforms for the dataset
train_transforms = transforms.Compose([

    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create dataset for training with specified transformations
train_dataset = MaskDataset('/content', transform=train_transforms)
# Create dataloader for training dataset
dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, collate_fn=collate_fn)

In [None]:
# Fetch a batch of data from the dataloader
img, target = next(iter(dataloader))

# Print the shape of the first image in the batch
print(img[0].shape)

# Print the target label of the first image in the batch
print(target[0])

Step 5: Now build the faster-rcnn model

In [None]:
import torchvision # Imports the torchvision library
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor # Imports the Faster R-CNN predictor module from torchvision
from torchvision.models.detection.rpn import AnchorGenerator
num_classes = 4 # Define the number of classes in your dataset 3 classes and one background
# Define the model
def get_model(num_classes):
  model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

  in_features = model.roi_heads.box_predictor.cls_score.in_features
  model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
  return model
model = get_model(num_classes)

Step 6: Define the optimizer as AdamW and also define the learning rate scheduler with step size as 5 and gamma as 0.5

In [None]:
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.AdamW(params, lr=1e-4,
                              amsgrad=True,
                              weight_decay=1e-6)
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=5,
                                               gamma=0.5)

# Define the device (GPU or CPU)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

Step 7: Define the train_one_epoch function and start training the model

In [None]:
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq):
    model.train()
    for batch_idx, (images, targets) in enumerate(data_loader):
        # Sending training data to CUDA
        images = list([image.to(device) for image in images])
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        if batch_idx % print_freq == 0:
            print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {losses}')

In [None]:
num_epochs = 20

dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=4, collate_fn=collate_fn)
# Train the model
model.to(device)


for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, dataloader, device, epoch, print_freq=20)
    lr_scheduler.step()

Step 8: Now comes the testing part so define your test loader and set up the parameters.

In [None]:
import torchvision.ops as ops
# Load a single minibatch of data
test_dataset = MaskDataset1('/content', transform=train_transforms)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=4, collate_fn=collate_fn)
print(len(test_dataset))
images, targets = next(iter(test_dataloader))



85


Step 9: Now define the count masks function to print the predicted labels in the form of a Nx3 array which returns the count of people wearing mask properly, not wearing mask and incorrectly wearing mask and also calculate the MAPE score.

In [None]:
import numpy as np

def count_masks(dataset, model, device):
    model.eval()  # Set the model to evaluation mode
    all_pred_counts = []  # List to hold all predicted counts for each image
    all_true_counts = []  # List to hold all true counts for each image
    all_mape_scores = []  # List to hold MAPE scores for each image

    with torch.no_grad():  # No need to track gradients
        for images, targets in dataset:
            images = list(img.to(device) for img in images)
            # print(targets)
            outputs = model(images)

            for i, output in enumerate(outputs):
                # Assuming 'labels' from the model output indicate the predicted classes
                preds = output['labels'].cpu().numpy()  # Predicted class labels

                pred_count_array = [np.sum(preds == class_id) for class_id in range(1, 4)]
                all_pred_counts.append(pred_count_array)

                # Assuming 'targets' contain the true labels in a similar format
                true_labels = targets[i]['labels'].cpu().numpy()
                true_count_array = [np.sum(true_labels == label) for label in range(1, 4)]  # Assuming class labels are 1, 2, 3
                all_true_counts.append(true_count_array)

                # Calculate MAPE for this image and append
                mape_scores = np.abs(np.array(pred_count_array) - np.array(true_count_array)) / np.maximum(np.array(true_count_array), 1)
                all_mape_scores.append(np.mean(mape_scores) * 100)  # Average MAPE per image, converted to percentage

    # Calculate overall average MAPE across all images
    average_mape = np.mean(all_mape_scores)

    # Convert lists to Numpy arrays
    final_pred_counts = np.array(all_pred_counts, dtype=np.int64)
    final_true_counts = np.array(all_true_counts, dtype=np.int64)

    return final_pred_counts, final_true_counts, average_mape
final_pred_counts, final_true_counts, average_mape = count_masks(test_dataloader,model,device)
print(final_true_counts,final_pred_counts)

In [None]:
print('True Counts [With Mask, Without Mask, Mask Wearing incorrectly] :' , final_true_counts)

In [None]:
print('Final Predicted Counts [With Mask, Without Mask, Mask Wearing incorrectly] :' , final_pred_counts)

In [59]:
print(f'MAPE SCORE :{average_mape} % ')

MAPE SCORE :16.334645771361256 % 


Step 10: Display some images of the prediction to see how well the model performed

In [None]:
model.eval()
for images, targets in test_dataloader:
  with torch.no_grad():
      images = list([image.to(device) for image in images])
      outputs = model(images)

      for i, image in enumerate(images):

          boxes = outputs[i]['boxes'].cpu().numpy()

          labels = outputs[i]['labels'].cpu().numpy()

          # Visualize the input image with the predicted bounding boxes and labels
          fig, ax = plt.subplots(1)
          ax.imshow(unorm(image).cpu().permute(1, 2, 0))
          label_color = ['green', 'red', 'yellow' ]

          for box, label in zip(boxes, labels):
              x1, y1, x2, y2 = box.astype(int)
              ax.add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, fill=False, edgecolor=label_color[label - 1], linewidth=2))

          plt.show()
