# Realtime Medical Mask Detection Based on Faster RCNN
Based on `Faster R-CNN`, we train model on our mask dataset and leverage data augmentation to preprocess our data. Mean average precision is introduced to evaluate the model performance, and we compare between models with and without data augmentation. In the last part, we embed our detection model into camera on computer to achieve real-time detection.

This notebook is a tutorial guidance of our project. We have detailed explaination on `data augmentation`, `training` and `mAP evaluation` process

In [None]:
#Load Google Drive
from google.colab import drive
drive.mount('/content/drive')

## Data Augmentation

In [None]:
!pip install pascal_voc_writer

In [None]:
!python ../data_augmentation/augment.py

## Initialization & Config

In [None]:
# This project is developed from the Faster RCNN turtorial Notebook on: https://www.kaggle.com/daniel601/pytorch-fasterrcnn
# This Python 3 environment comes with many helpful analytics libraries installed
# For example, here's several helpful packages to load
import numpy as np
import pandas as pd
import os
import cv2
from PIL import Image
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
import torch
import torchvision
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import matplotlib.patches as patches

#Config settings
data_path = r'../data_augmentation/output'
output_path = r'../results'
model_path = output_path+r'/trained_models'


#----------------following functions are used to read annotations---------------------
def generate_box(obj):
  xmin = int(obj.find('xmin').text)
  ymin = int(obj.find('ymin').text)
  xmax = int(obj.find('xmax').text)
  ymax = int(obj.find('ymax').text)

  return [xmin, ymin, xmax, ymax]

#transfer class name into numbers
#We have three classes in mask dataset: with_mask/without_mask/mask_weared_incorrect
def generate_label(obj):
  #Start from one, since FastRCNNPredictor considers label 0 as background.
  if obj.find('name').text == "with_mask":
    return 1
  elif obj.find('name').text == "without_mask":
    return 2
  elif obj.find('name').text == "mask_weared_incorrect":
    return 3 
  return None

def generate_target(image_id, file): 
  with open(file) as f:
    data = f.read()
    soup = BeautifulSoup(data, 'xml')
    objects = soup.find_all('object')

    num_objs = len(objects)

    # Bounding boxes for objects
    # In coco format, bbox = [xmin, ymin, width, height]
    # In pytorch, the input should be [xmin, ymin, xmax, ymax]
    boxes = []
    labels = []
    for i in objects:
        boxes.append(generate_box(i))
        labels.append(generate_label(i))
    boxes = torch.as_tensor(boxes, dtype=torch.float32)
    # Labels
    labels = torch.as_tensor(labels, dtype=torch.int64)
    # Tensorise img_id
    img_id = torch.tensor([image_id])
    # To transfer Annotation into dictionary format
    target = {}
    target["boxes"] = boxes
    target["labels"] = labels
    target["image_id"] = img_id
  return target
#----------------functions above are used to read annotations---------------------

#Generate the sample list and split datasets into trainset and testset
allimgs_list = list(sorted(os.listdir(data_path+"/images")))
alllabels_list = list(sorted(os.listdir(data_path+"/annotations")))
#test_size represents the percentage of test set data in all images
#i.e. test_size = 0.25 means training_set : test_set = 3 : 1
trainset_imgs, testset_imgs, trainset_labels, testset_labels = train_test_split(allimgs_list, alllabels_list, test_size=0.25)

class MaskDataset(object):
    def __init__(self, transforms, set_imgs, set_labels):
      self.transforms = transforms
      # load all image files, sorting them to
      # ensure that they are aligned
      self.imgs, self.labels = set_imgs, set_labels

    def __getitem__(self, idx):
      # load images ad masks
      file_image = self.imgs[idx]
      file_label = self.labels[idx]
      img_path = os.path.join(data_path+"/images", file_image)
      label_path = os.path.join(data_path+"/annotations", file_label)
      img = Image.open(img_path).convert("RGB")
      #Generate Label
      target = generate_target(idx, label_path)
      
      if self.transforms is not None:
        img = self.transforms(img)

      return img, target

    def __len__(self):
      return len(self.imgs)

def collate_fn(batch):
    return tuple(zip(*batch))

#Define transformer, more adumentations can be added into the compose function
data_transform = transforms.Compose([
        transforms.ToTensor(),
    ])

trainset = MaskDataset(data_transform, trainset_imgs, trainset_labels)
testset = MaskDataset(data_transform, testset_imgs, testset_labels)

data_loader = torch.utils.data.DataLoader(trainset, batch_size=4, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(testset, batch_size=1, shuffle=False, collate_fn=collate_fn)
print('training set:', len(data_loader)*4)
print('test size:', len(test_loader))
#Check whether GPU is available
torch.cuda.is_available()

#Read labels in test annotations mainly used for mAP computation
def parse_rec(filename):
  with open(filename) as f:
    objects = []
    data = f.read()
    soup = BeautifulSoup(data, 'xml')
    objs = soup.find_all('object')
    num_objs = len(objs)
    for obj in objs:
      obj_struct = {}
      obj_struct['name'] = obj.find('name').text
      objects.append(obj_struct)
      obj_struct['pose'] = obj.find('pose').text
      obj_struct['truncated'] = int(obj.find('truncated').text)
      obj_struct['difficult'] = int(obj.find('difficult').text)
      bbox = obj.find('bndbox')
      obj_struct['bbox'] = [int(bbox.find('xmin').text),
                  int(bbox.find('ymin').text),
                  int(bbox.find('xmax').text),
                  int(bbox.find('ymax').text)]
  return objects

#change the program path
if not os.path.exists(output_path):
  os.makedirs(output_path)
os.chdir(output_path)
#clear out original contents
f = open('imagesetfile.txt','w')
f.truncate()
f.close()

#Record testset label data to get prepared for mAP evaluation
for ann_file in testset.labels:
  f = open('imagesetfile.txt','a')
  f.write(ann_file.split('.')[0]+'\n')
  f.close()
# read list of images
annopath = data_path+ r'/annotations/{}.xml'
f = open('imagesetfile.txt', 'r')
lines = f.readlines()
recs = {}
for i, imagename in enumerate(lines):
  imagename = imagename.strip('\n')
  recs[imagename] = parse_rec(annopath.format(imagename))
  if i % 100 == 0: #progress bar
    print( 'Reading annotation for {:d}/{:d}'.format(i,len(lines)))

## Model

In [None]:
def get_model_instance_segmentation(num_classes): 
    # load an instance segmentation model pre-trained pre-trained on COCO 
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True) 
    # get number of input features for the classifier 
    in_features = model.roi_heads.box_predictor.cls_score.in_features 
    # replace the pre-trained head with a new one 
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes+1) 
    # plus background 
    return model

model = get_model_instance_segmentation(3)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

## mAP Evaluation Function

In [None]:
#Here we define two methods to compute AP value
#Thanks for mAP tutorial on 
def voc_ap(rec, prec, use_07_metric=False):
    """ ap = voc_ap(rec, prec, [use_07_metric])
    Compute VOC AP given precision and recall.
    If use_07_metric is true, uses the
    VOC 07 11 point method (default:False).
    """
    if use_07_metric:
        # 11 point metric
        ap = 0.
        for t in np.arange(0., 1.1, 0.1):
            if np.sum(rec >= t) == 0:
                p = 0
            else:
                p = np.max(prec[rec >= t])
            ap = ap + p / 11.
            print('use 11 point metric')
            print('t =', t, 'ap =', ap)
    else:
        # correct AP calculation
        # first append sentinel values at the end
        mrec = np.concatenate(([0.], rec, [1.]))
        mpre = np.concatenate(([0.], prec, [0.]))

        # compute the precision envelope
        for i in range(mpre.size - 1, 0, -1):
            mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])

        # to calculate area under PR curve, look for points
        # where X axis (recall) changes value
        i = np.where(mrec[1:] != mrec[:-1])[0]

        # and sum (\Delta recall) * prec
        ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
    return ap

#store predictions for testset
def pred_stor(model, test_loader, testset_labels):
  num_test = 0
  for i in range(1,4):#clear out the origional file
    f = open(f'class{i}.txt', 'w')
    f.truncate()
    f.close()
  for imgs, annotations in test_loader:
    imgs = list(img.to(device) for img in imgs)
    annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
    model.eval()
    output = model(imgs)#output=[{'boxes':tensor([],device) 'labels': 'scores': }*batchsize]
    for pre_dic in output:
      filename = testset_labels[num_test].split('.')[0]
      if filename.count('m') == 2:#sometimes the file name amy accidentally saved as 'mmaksssksksss' with number
        filename = filename[1:]#(why it happens remained unclear)
        print('!!!!!!!!!!!!!!!!!!!!!double m occurred!!!!!!!!!!!!!!!!!!!!!')
      for i, cla in enumerate(pre_dic['labels']):
        f_class = open("class{}.txt".format(cla),'a')
        box = pre_dic['boxes'][i]
        f_class.write("{} {} {} {} {} {}\n".format(filename, pre_dic['scores'][i].item(),box[0],box[1],box[2],box[3]))
        f_class.close()
      num_test += 1
  return None


#Main evaluation function, which will read prediction and gt values to compute Recall, Precision, AP
def voc_eval(detpath,
             annopath,
             imagesetfile,
             recs,
             classname,
             ovthresh=0.5,
             use_07_metric=False):
    """rec, prec, ap = voc_eval(detpath,
                                annopath,
                                imagesetfile,
                                recs,
                                classname,
                                [ovthresh],
                                [use_07_metric])

    Top level function that does the PASCAL VOC evaluation.

    detpath: Path to detections
        detpath.format(classname) path to .txt files of classes' predictions
    annopath: Path to annotations
        annopath.format(imagename) path to .xml files of test set labels
    imagesetfile: path of imagesetfile
    [classname]: one specific class name
    [ovthresh]: IOU Overlap (default = 0.5)
    [use_07_metric]: Whether to use VOC07's 11 point AP calculation(default False)
    """
    # assumes detections are in detpath.format(classname)
    # assumes annotations are in annopath.format(imagename)
    # assumes imagesetfile is a text file with each line an image name

    # first load gt
    if classname == '1':
      name = 'with_mask'
    elif classname == '2':
      name = 'without_mask'
    elif classname == '3':
      name = 'mask_weared_incorrect'

    # read list of images
    f = open(imagesetfile, 'r')
    lines = f.readlines() 
    class_recs = {}  #Save Ground Truth data 
    npos = 0
    for imagename in lines:
      imagename = imagename.strip('\n')
      #Retrieves the Ground Truth in each file of a certain type of object
      R = [obj for obj in recs[imagename] if obj['name'] == name]

      bbox = np.array([x['bbox'] for x in R])
      #Different is almost 0/False.
      difficult = np.array([x['difficult'] for x in R]).astype(np.bool)
      det = [False] * len(R)
      npos = npos + sum(~difficult) # increment, ~ Difficult inverse, count the number of samples

      #Record the contents of Ground Truth
      class_recs[imagename] = {'bbox': bbox, 'difficult': difficult, 'det': det}

    #read dets -- Read the predicted output of a certain class
    detfile = detpath.format(classname)

    with open(detfile, 'r') as f:
        lines = f.readlines()

    splitlines = [x.strip().split(' ') for x in lines]
    image_ids = [x[0].split('.')[0] for x in splitlines]  # Image ID

    confidence = np.array([float(x[1]) for x in splitlines]) # IOU
    BB = np.array([[float(z) for z in x[2:]] for x in splitlines]) # bounding box vlaues

    #The index of confidence is sorted in descending order according to the value size.
    sorted_ind = np.argsort(-confidence) 
    sorted_scores = np.sort(-confidence)
    BB = BB[sorted_ind, :] #Resort bboxes by possibilities from big to small
    image_ids = [image_ids[x] for x in sorted_ind] #Resort images by possibilities from big to small

    # go down dets and mark TPs and FPs
    nd = len(image_ids) 

    tp = np.zeros(nd)
    fp = np.zeros(nd)
    for d in range(nd):
        if image_ids[d].count('m') == 2:#in case of accidents(how it happens remained unclear)
          image_ids[d] = image_ids[d][1:]
        R = class_recs[image_ids[d]]  #ann

        bb = BB[d, :].astype(float)
        '''
        #1.If the predictions are(x_min, y_min, x_max, y_max), then we don't need to change
        #2.If the predictions are(x_center, y_center, h, w), we need to transform that into top,left,bottom,right
        #Transform into(x_min, y_min, x_max, y_max)
        top = int(bb[1]-bb[3]/2)
        left = int(bb[0]-bb[2]/2)
        bottom = int(bb[1]+bb[3]/2)
        right = int(bb[0]+bb[2]/2)
        bb = [left, top, right, bottom]
        '''
        ovmax = -np.inf  #Negative maximum
        BBGT = R['bbox'].astype(float)

        if BBGT.size > 0:
            # compute overlaps
            # intersection
            ixmin = np.maximum(BBGT[:, 0], bb[0])
            iymin = np.maximum(BBGT[:, 1], bb[1])
            ixmax = np.minimum(BBGT[:, 2], bb[2])
            iymax = np.minimum(BBGT[:, 3], bb[3])
            iw = np.maximum(ixmax - ixmin + 1., 0.)
            ih = np.maximum(iymax - iymin + 1., 0.)
            inters = iw * ih

            # union
            uni = ((bb[2] - bb[0] + 1.) * (bb[3] - bb[1] + 1.) +
                   (BBGT[:, 2] - BBGT[:, 0] + 1.) *
                   (BBGT[:, 3] - BBGT[:, 1] + 1.) - inters)

            overlaps = inters / uni
            ovmax = np.max(overlaps) # Maximum overlap
            jmax = np.argmax(overlaps) # ground truth of the maximum overlap
        #Compute TP and FP numbers
        if ovmax > ovthresh:
            if not R['difficult'][jmax]:
                #This GT has been detected
                #Next time, if there is another detection result whose coincidence rate meets the threshold value
                #We don't think we detect a new object
                if not R['det'][jmax]: 
                    tp[d] = 1.
                    R['det'][jmax] = 1 #To mark as been detected
                else:
                    fp[d] = 1.
        else:
            fp[d] = 1.

    # compute precision recall
    fp = np.cumsum(fp)
    tp = np.cumsum(tp)
    rec = tp / float(npos)

    # avoid divide by zero in case the first detection matches a difficult
    # ground truth
    # np.finfo(np.float64).eps is an infinitesimal greater than 0
    prec = tp / np.maximum(tp + fp, np.finfo(np.float64).eps) 
    ap = voc_ap(rec, prec, use_07_metric)

    return rec, prec, ap

## Train Model

In [None]:
#Load TensorBoard to visualize training process
%reload_ext tensorboard
%tensorboard --logdir='runs'

In [None]:
#Set the epoch numbers
num_epochs = 25

#Load trained model(if you don't want to train from scratch)
#model.load_state_dict(torch.load('../results/trained_models/Epoch_24_model.pt'))   

# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)#Try ADAM optimizer if you can
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

len_dataloader = len(data_loader)
writer = SummaryWriter()
if not os.path.exists(model_path):
  os.makedirs(model_path)
for epoch in range(num_epochs):
    i = 0    
    epoch_loss = 0
    mAP = []
    for imgs, annotations in data_loader:
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]


        model.train()
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step() 

        print(f'Epoch: {epoch+1}/{num_epochs}, Iteration: {i}/{len_dataloader}, Loss: {losses}')
        total_iter = epoch*len_dataloader + i
        writer.add_scalar('iteration_loss',losses,total_iter)
        epoch_loss += losses
    torch.save(model.state_dict(),output_path+'/trained_models/Epoch_{}_model.pt'.format(epoch))
    writer.add_scalar('epoch_loss',epoch_loss,epoch)
    pred_stor(model, test_loader, testset_labels)#prediction results are stored in class1/2/3.txt files

    #Compute AP for each class
    for c in range(1,4):
      class_name = str(c)
      rec, prec, ap = voc_eval(output_path+'/class{}.txt', output_path+'/annotations/{}.xml',
                             output_path+'/imagesetfile.txt', recs, class_name)
      mAP.append(ap)
    #Print mAP
    meanap = float(sum(mAP)/len(mAP))
    writer.add_scalar('mAP',meanap,epoch)
    print('*************************************************')
    print(f'Epoch: {epoch+1}/{num_epochs}, mAP: {meanap}')
    print(f'AP_class1: {mAP[0]}, AP_class2: {mAP[1]}, AP_class3: {mAP[2]}')
    print('*************************************************')

## Function to plot image

In [None]:
#Define plot function
def plot_image(img_tensor, annotation, block=True):
  fig,ax = plt.subplots(1)
  img = img_tensor.cpu().data

  # Display the image
  ax.imshow( np.array( img.permute(1, 2, 0) ) )
  
  for box, label in zip( annotation["boxes"], annotation["labels"] ):
    xmin, ymin, xmax, ymax = box
    # Create a Rectangle patch
    if label==1:
      rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='g',facecolor='none')
    elif label==2:
      rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='r',facecolor='none')
    elif label==3:
      rect = patches.Rectangle((xmin,ymin),(xmax-xmin),(ymax-ymin),linewidth=1,edgecolor='y',facecolor='none')
    
    # Add the patch to the Axes
    ax.add_patch(rect)
    ax.axis("off")
    
  plt.show(block=block)

In [None]:
count=0
model.eval()
for imgs, annotations in test_loader:
  imgs = list(img.to(device) for img in imgs)
  annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
  preds = model(imgs)

  for i in range(len(imgs)):
    print("Prediction")
    plot_image(imgs[i], preds[i])
    print("Target")
    plot_image(imgs[i], annotations[i])
  count += 1
  if count == 20:#We will check 20 images in test set.
    break

## Load Model

In [None]:
model2 = get_model_instance_segmentation(3)
model2.load_state_dict(torch.load(model_path+'Epoch_24_model.pt'))#Modify by model name
model2.eval()
model2.to(device)

In [None]:
pred2 = model2(imgs)
print("Predict with loaded model")
plot_image(imgs[0], pred2[0])

## Real-time Mask Detection

Since colab cannot connect to the local camera on your computer, you have to build a local environment, and run `camera.py`