In [None]:
import os
import numpy as np
import torch
import torch.utils.data
import PIL
from PIL import Image
import pandas as pd
import torchvision
import pycocotools
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import xml.etree.ElementTree as ET
import glob
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets, models
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from sklearn.model_selection import train_test_split
from torch_snippets import Report
import time
import torchvision.transforms.functional as F
import torchvision.transforms.transforms as T

In [None]:
#pip install torch_snippets

In [None]:
#get annotation
def get_xmlfilenames(dire):
  xml_filenames=[]
  for filename in os.listdir(dire):
    if filename.endswith(".xml"):
      xml_filenames.append(filename)
  return sorted(xml_filenames)

In [None]:
class_str2num={'with_mask': 1, 'without_mask': 2, 'mask_weared_incorrect': 3}
class_num2str = {v: k for k, v in class_str2num.items()}

def parse_xml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    bboxes = []
    labels = []
    filename = root.find('filename').text
    for boxes in root.iter('object'):
        ymin, xmin, ymax, xmax = None, None, None, None
        ymin = int(boxes.find("bndbox/ymin").text)
        xmin = int(boxes.find("bndbox/xmin").text)
        ymax = int(boxes.find("bndbox/ymax").text)
        xmax = int(boxes.find("bndbox/xmax").text)
        box = [xmin, ymin, xmax, ymax]
        bboxes.append(box)
        labels.append(int(class_str2num[boxes.find("name").text]))
    return filename, bboxes, labels

In [None]:
# Get xml files
def xml_to_dict(xml_path):
    # Decode the .xml file
    tree = ET.parse(xml_path)
    root = tree.getroot()
    # Return the image size, object label and bounding box
    # coordinates together with the filename as a dict.
    return {"filename": xml_path,
            "image_width": int(root.find("./size/width").text),
            "image_height": int(root.find("./size/height").text),
            "image_channels": int(root.find("./size/depth").text),
            "label": root.find("./object/name").text,
            "x1": int(root.find("./object/bndbox/xmin").text),
            "y1": int(root.find("./object/bndbox/ymin").text),
            "x2": int(root.find("./object/bndbox/xmax").text),
            "y2": int(root.find("./object/bndbox/ymax").text)}

In [None]:
# Convert human readable str label to int.
label_dict = {'with_mask': 1, 'without_mask': 2, 'mask_weared_incorrect': 3}
# Convert label int to human readable str.
reverse_label_dict = {1:'with_mask', 2:'without_mask', 3:'mask_weared_incorrect'}


class MaskedFaceDataset(torch.utils.data.Dataset):
     def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs=sorted(glob.glob(os.path.join(root,"*.png")))
        self.antns=get_xmlfilenames(root)

     def __getitem__(self, idx):
        # load annotation
        filename, boxes, labels = parse_xml(os.path.join(self.root,self.antns[idx]))
        # load image
        img_path = os.path.join(self.root, filename)
        img = Image.open(img_path).convert('RGB')
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        num_objs = boxes.shape[0]
        # classes
        labels = torch.tensor(labels, dtype=torch.int64)
        image_id = int(torch.tensor([idx]))
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        # suppose all instances are not crowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        w,h = img.size
        boxes[:, 0::2].clamp_(min=0, max=w)
        boxes[:, 1::2].clamp_(min=0, max=h)
        keep = (boxes[:, 3]>boxes[:, 1]) & (boxes[:, 2]>boxes[:, 0])
        boxes = boxes[keep]
        labels = labels[keep]
        area = area[keep]
        iscrowd = iscrowd[keep]

        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = image_id
        target['area'] = area
        target['iscrowd'] = iscrowd
        if self.transforms is not None:
             img,target = self.transforms(img,target)
        return  img,target

     def __len__(self):
        return len(self.antns)

In [None]:
#Transform helper
class Compose:

    def __init__(self, transforms = []):
        self.transforms = transforms
    # __call__ sequentially performs the image transformations on the input image, and returns the augmented image.
    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

In [None]:
#transforms class
class ToTensor(torch.nn.Module):

    def forward(self, image, target = None):
        image = F.pil_to_tensor(image)
        image = F.convert_image_dtype(image)
        return image, target
class RandomHorizontalFlip(T.RandomHorizontalFlip):

    def forward(self, image, target = None):
        if torch.rand(1) < self.p:
            image = F.hflip(image)
            if target is not None:
                width, _ = F.get_image_size(image)
                target["boxes"][:, [0, 2]] = width - \
                                     target["boxes"][:, [2, 0]]
        return image, target

In [None]:
def get_transform(train):
    transforms = []
    # ToTensor is applied to all images.
    transforms.append(ToTensor())
    # The following transforms are applied only to the train set.
    if train == True:
        transforms.append(RandomHorizontalFlip(0.5))
        # Other transforms can be added here later on.
    return Compose(transforms)

In [None]:
train_ds = MaskedFaceDataset('/content/drive/MyDrive/Colab Notebooks/MaskedFace/train', transforms =get_transform(train=True) )
test_ds = MaskedFaceDataset('/content/drive/MyDrive/Colab Notebooks/MaskedFace/val', transforms = get_transform(train=False))
val_ds = MaskedFaceDataset('/content/drive/MyDrive/Colab Notebooks/MaskedFace/val', transforms = get_transform(train=False))

# split the dataset in train and test set
torch.manual_seed(1)
train_indices = torch.randperm(len(train_dataset)).tolist()
test_indices = torch.randperm(len(test_dataset)).tolist()

# Please feel free to use more samples if you have enough resources
n = 500
#train_ds = torch.utils.data.Subset(train_dataset, train_indices[:n])
#test_ds = torch.utils.data.Subset(test_dataset, test_indices[:n])
#val_ds=torch.utils.data.Subset(val_ds, test_indices[:n])

In [None]:
# Collate image-target pairs into a tuple.
def collate_fn(batch):
    return tuple(zip(*batch))
# Create the DataLoaders from the Datasets.
train_dl = torch.utils.data.DataLoader(train_ds,
                                 batch_size = 4,
                                 shuffle = True,
                        collate_fn = collate_fn)
val_dl = torch.utils.data.DataLoader(val_ds,
                             batch_size = 4,
                            shuffle = False,
                    collate_fn = collate_fn)
test_dl = torch.utils.data.DataLoader(test_ds,
                               batch_size = 4,
                              shuffle = False,
                      collate_fn = collate_fn)

In [None]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
def get_object_detection_model(num_classes = 4,
                               feature_extraction = True):

    # Load the pretrained faster r-cnn model.
    model = fasterrcnn_resnet50_fpn(pretrained = True)
    # If True, the pre-trained weights will be frozen.
    if feature_extraction == True:
        for p in model.parameters():
            p.requires_grad = False

    # tailored for num_classes.
    in_feats = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_feats,
                                                   num_classes)
    return model

In [None]:
# trrain and validate using batches
def unbatch(batch, device):

    X, y = batch
    X = [x.to(device) for x in X]
    y = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in y]

    return X, y
def train_batch(batch, model, optimizer, device):

    model.train()
    X, y = unbatch(batch, device = device)
    optimizer.zero_grad()
    losses = model(X, y)
    loss = sum(loss for loss in losses.values())
    loss.backward()
    optimizer.step()
    return loss, losses
@torch.no_grad()
def validate_batch(batch, model, optimizer, device):

    model.train()
    X, y = unbatch(batch, device = device)
    optimizer.zero_grad()
    losses = model(X, y)
    loss = sum(loss for loss in losses.values())
    return loss, losses

In [None]:
#train model function which save data in train log report
def train_fasterrcnn(model,
                 optimizer,
                  n_epochs,
              train_loader,
        test_loader ,
                log = None,
               keys = None,
            device = "cpu"):
    if log is None:
        log = Report(n_epochs)
    if keys is None:
        # FasterRCNN loss names.
        keys = ["loss_classifier",
                   "loss_box_reg",
                "loss_objectness",
               "loss_rpn_box_reg"]
    model.to(device)
    for epoch in range(n_epochs):
        N = len(train_loader)
        for ix, batch in enumerate(train_loader):
            loss, losses = train_batch(batch, model,
                                  optimizer, device)
            # Record the current train loss.
            pos = epoch + (ix + 1) / N
            log.record(pos = pos, trn_loss = loss.item(),
                       end = "\r")
        if test_loader is not None:
            N = len(test_loader)
            for ix, batch in enumerate(test_loader):
                loss, losses = validate_batch(batch, model,
                                         optimizer, device)

                # Record the current validation loss.
                pos = epoch + (ix + 1) / N
                log.record(pos = pos, val_loss = loss.item(),
                           end = "\r")
    log.report_avgs(epoch + 1)
    return log

In [None]:
# Create the faster rcnn model with 3 classes  and  background.

model = get_object_detection_model(num_classes = 4,
                        feature_extraction = False)
# Use the stochastic gradient descent optimizer.
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params,lr = 0.001, momentum = 0.9, weight_decay = 0.0005)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# Train the model over 1 epoch.
log = train_fasterrcnn(model = model,optimizer = optimizer,  n_epochs = 1, train_loader = train_dl, test_loader = test_dl,  log = None, keys = None, device = device)



EPOCH: 1.000  val_loss: 0.440  trn_loss: 0.502  (122.83s - 0.00s remaining)


In [None]:
n_epochs=10
for epoch in range(n_epochs):
      _n = len(train_dl)
      for ix, inputs in enumerate(train_dl):
          loss, losses = train_batch(inputs, model, optimizer,device)
          loc_loss, regr_loss, loss_objectness, loss_rpn_box_reg = \
              [losses[k] for k in ['loss_classifier','loss_box_reg','loss_objectness','loss_rpn_box_reg']]
          pos = (epoch + (ix+1)/_n)
          log.record(pos, trn_loss=loss.item(), trn_loc_loss=loc_loss.item(),
                    trn_regr_loss=regr_loss.item(), trn_objectness_loss=loss_objectness.item(),
                    trn_rpn_box_reg_loss=loss_rpn_box_reg.item(), end='\r')

      _n = len(test_dl)
      for ix,inputs in enumerate(test_dl):
          loss, losses = validate_batch(inputs, model,optimizer,device)
          loc_loss, regr_loss, loss_objectness, loss_rpn_box_reg = \
            [losses[k] for k in ['loss_classifier','loss_box_reg','loss_objectness','loss_rpn_box_reg']]
          pos = (epoch + (ix+1)/_n)
          log.record(pos, val_loss=loss.item(), val_loc_loss=loc_loss.item(),
                    val_regr_loss=regr_loss.item(), val_objectness_loss=loss_objectness.item(),
                    val_rpn_box_reg_loss=loss_rpn_box_reg.item(), end='\r')
      if (epoch+1)%(n_epochs//10)==0: log.report_avgs(epoch+1)



EPOCH: 1.000  val_rpn_box_reg_loss: 0.009  trn_loc_loss: 0.112  val_objectness_loss: 0.010  trn_regr_loss: 0.201  trn_objectness_loss: 0.008  trn_rpn_box_reg_loss: 0.010  val_regr_loss: 0.220  val_loc_loss: 0.129  trn_loss: 0.417  val_loss: 0.404  (239.57s - 0.00s remaining)
EPOCH: 2.000  val_rpn_box_reg_loss: 0.008  trn_loc_loss: 0.095  val_objectness_loss: 0.010  trn_regr_loss: 0.180  trn_objectness_loss: 0.006  trn_rpn_box_reg_loss: 0.009  val_regr_loss: 0.194  val_loc_loss: 0.119  trn_loss: 0.290  val_loss: 0.321  (350.89s - -175.45s remaining)
EPOCH: 3.000  val_rpn_box_reg_loss: 0.008  trn_loc_loss: 0.088  val_objectness_loss: 0.008  trn_regr_loss: 0.168  trn_objectness_loss: 0.005  trn_rpn_box_reg_loss: 0.008  val_regr_loss: 0.194  val_loc_loss: 0.116  trn_loss: 0.269  val_loss: 0.326  (461.18s - -307.45s remaining)
EPOCH: 4.000  val_rpn_box_reg_loss: 0.008  trn_loc_loss: 0.082  val_objectness_loss: 0.009  trn_regr_loss: 0.160  trn_objectness_loss: 0.004  trn_rpn_box_reg_loss: 0.

## Helper methods for Predection

In [None]:
@torch.no_grad()
def predict_batch(batch, model, device):
    model.to(device)
    model.eval()
    X, _ = unbatch(batch, device = device)
    predictions = model(X)
 #   predictions= decode_prediction(prediction, score_threshold=0.9, nms_iou_threshold=0.3)
    predictions = [decode_prediction(pred, score_threshold=0.8, nms_iou_threshold=0.3) for pred in predictions]
    return [x.cpu() for x in X], predictions
def predict(model, data_loader, device = "cpu"):

    images = []
    predictions = []
    for i, batch in enumerate(data_loader):
        X, p = predict_batch(batch, model, device)
        images = images + X
        predictions = predictions + p
    return images, predictions

In [None]:
def decode_prediction(prediction,
                      score_threshold ,
                      nms_iou_threshold ):
    """
    Inputs
        prediction: dict
        score_threshold: float
        nms_iou_threshold: float
    Returns
        prediction: tuple
    """
    boxes = prediction["boxes"]
    scores = prediction["scores"]
    labels = prediction["labels"]
    # Remove any low-score predictions.
    if score_threshold is not None:
        want = scores > score_threshold
        boxes = boxes[want]
        scores = scores[want]
        labels = labels[want]
    # Remove any overlapping bounding boxes using NMS.
    if nms_iou_threshold is not None:
        want = torchvision.ops.nms(boxes = boxes, scores = scores,
                                iou_threshold = nms_iou_threshold)
        boxes = boxes[want]
        scores = scores[want]
        labels = labels[want]
    return {
        "boxes": boxes.cpu().numpy(),
        "labels": labels.cpu().numpy(),
        "scores": scores.cpu().numpy()
    }

In [None]:
def countpred (predictions):
  P_counts=[]
  for i in range(len(predictions)):
      P_counts.append(np.array(predictions[i]['labels']))#.cpu()
  # Determine the total number of classes
  num_classes = 3
  # Initialize an array to store the sum of each class in each image
  P_sums = np.zeros((len(P_counts), num_classes), dtype=int)

  # Calculate the sum of each class in each image
  for i, counts in enumerate(P_counts):
      unique_classes, class_counts = np.unique(counts, return_counts=True)
      P_sums[i, unique_classes-1] = class_counts
  return P_sums

In [None]:
def counttrue (dataset):
    T_counts_list=[]
    for idx in range(len(dataset)):
        # Get the image and its target annotations
        _, target = dataset[idx]

        # Extract labels for the current image
        labels = target['labels']
        #print(labels)

        # Count occurrences of each label
        mask_on_count = (labels == 1).sum().item()  #  1 represents mask on correctly
        no_mask_count = (labels == 2).sum().item()  #  2 represents no mask
        mask_incorrect_count = (labels == 3).sum().item()  #  3 represents mask worn incorrectly

        # Append the counts for the current image to the list
        T_counts_list.append([mask_on_count, no_mask_count, mask_incorrect_count])

    counts_array = np.array(T_counts_list, dtype=np.int64)

    return counts_array

#Final method

In [None]:
def count_masks(dataset):

    dataset_dl = torch.utils.data.DataLoader(dataset,batch_size = 2,shuffle = False, collate_fn = collate_fn)
    # Initialize counts

    MAPE=0

    images, predictions = predict(model, dataset_dl, device)
    P_list =countpred (predictions)
    T_list = counttrue (dataset)

    nims,ncls=T_list.shape[0],T_list.shape[1]
    # mape for each image
    mape=np.zeros(nims)
    #mape for each class in image
    class_mape=np.zeros(ncls)

    for i in range(nims):
      for t in range(ncls):
        class_mape[t]= np.abs((T_list[i][t] - P_list[i][t] )/np.max([T_list[i][t],1]))*100
      mape[i]=np.mean(class_mape)
    MAPE=np.mean(mape)

    print(f'''
            MAPE : { round(MAPE, 2) } %
            ''')

    return T_list , MAPE

In [None]:
test_dataset = MaskedFaceDataset('/content/drive/MyDrive/Colab Notebooks/MaskedFace/val', transforms = get_transform(train=False))
T_list , MAPE =count_masks(test_dataset)

 
            MAPE : 12.8 % 
            


In [None]:
MAPE

12.798514146994336

In [None]:
T_list

array([[15,  0,  1],
       [ 3,  3,  0],
       [ 1,  0,  0],
       [ 2,  0,  0],
       [ 9,  0,  0],
       [ 9,  0,  0],
       [ 2,  0,  0],
       [13,  0,  0],
       [ 1,  0,  0],
       [ 2,  2,  0],
       [ 4,  1,  1],
       [ 0,  1,  0],
       [ 4,  0,  0],
       [ 2,  0,  0],
       [ 0,  2,  2],
       [ 4,  4,  0],
       [17,  1,  1],
       [ 1,  0,  0],
       [ 3,  3,  0],
       [ 6,  0,  0],
       [ 7,  0,  0],
       [ 1,  0,  0],
       [ 1,  0,  0],
       [ 1,  1,  0],
       [10,  0,  0],
       [ 1,  0,  0],
       [10,  0,  0],
       [ 2, 12,  0],
       [ 1,  0,  0],
       [12,  0,  0],
       [ 0,  1,  0],
       [13,  0,  0],
       [53,  0,  0],
       [ 2,  0,  0],
       [ 1,  0,  0],
       [15,  5,  1],
       [ 2,  0,  0],
       [ 2,  0,  0],
       [19, 16,  0],
       [ 3,  7,  4],
       [ 1,  1,  1],
       [ 1,  0,  0],
       [ 1,  0,  0],
       [ 9,  0,  0],
       [ 0,  1,  0],
       [ 0,  0,  1],
       [11,  2,  0],
       [ 1,  

### Saving model

In [None]:
# Save the array to an .npy file
np.save('MAPE.npy', MAPE)
np.save('Truelist.npy', T_list)

In [None]:
torch.save(model, 'Q7_model.pth')

In [None]:
torch.save(model.state_dict(), 'model_weights.pth')