### Imports

In [1]:
import numpy as np
import pandas as pd
import os
from bs4 import BeautifulSoup
from PIL import Image

# PyTorch
import torch
from torch.utils.data import DataLoader

import torchvision
from torchvision import transforms, datasets, models
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

# Plots
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# TypeHinting
from typing import List

  from .autonotebook import tqdm as notebook_tqdm


### Initialize Helper Functions

In [2]:
def generate_box(obj)->List[int]:
    xmin = int(obj.find('xmin').text)
    ymin = int(obj.find('ymin').text)
    xmax = int(obj.find('xmax').text)
    ymax = int(obj.find('ymax').text)
    
    return [xmin, ymin, xmax, ymax]


def generate_label(obj)->int:
    if obj.find("name").text == "with_mask":
        return 1
    elif obj.find("name").text == "mask_weared_incorrect":
        return 2
    return 0


def generate_target(image_id:int, file:str)->dict:
    """Generates the targets of the given iamge

    Args:
        image_id (int): Index of the file
        file (str): File path

    Returns:
        dict: Labelled targets of the input file (*.xml)
    """
    with open(file) as f:
        data = f.read()
        soup = BeautifulSoup(data, 'xml')
        objects = soup.find_all('object')
        
        num_objs = len(objects)
        
        # Get bounding boxes for objects
        # Notes:
        # CoCo format: bbox-> [xmin, ymin, width, height]
        # PyTorch format: bbox-> [xmin, ymin, xmax, ymax]
        boxes = []
        labels = []
        
        for i in objects:
            boxes.append(generate_box(i))
            labels.append(generate_label(i))
        
        # Converts data into a tensor, sharing data and preserving autograd history if possible.
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        
        # Labels (In my case, I only one class: target class or background)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        # Tensorise img_id
        img_id = torch.tensor([image_id])
        
        # Annotation in dictionary format
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = img_id
        
        return target

In [3]:
class MaskDataset:
    def __init__(self, transforms):
        # Load all image files, sorting to ensure they are aligned
        
        curr_dir = os.getcwd()
        self.img_path = os.path.join(curr_dir, "../data/images/")
        self.label_path = os.path.join(curr_dir, "../data/annotations/")
        
        self.transforms = transforms
        self.imgs = list(sorted(os.listdir(self.img_path)))
        self.labels = list(sorted(os.listdir(self.label_path)))
        
    def __getitem__(self, index):
        # Load images and annotation masks
        file_image = 'maksssksksss'+ str(index) + '.png'
        file_label = 'maksssksksss'+ str(index) + '.xml'
        
        img_path = os.path.join(self.img_path, file_image)
        label_path = os.path.join(self.label_path, file_label)
        
        img = Image.open(img_path).convert("RGB")
        # Generate label
        target = generate_target(index, label_path)
        
        # Check if transforms is loaded
        if self.transforms is not None:
            img = self.transforms(img)
        
        # Returns the image and labels
        return img, target
    
    def __len__(self):
        return len(self.imgs)
    

### Define Transformation and `collate_fn()`

In [4]:
data_transform = transforms.Compose([
    transforms.ToTensor(),
])

In [5]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [6]:
dataset = MaskDataset(data_transform)
data_loader = DataLoader(dataset, batch_size=4, num_workers=0, collate_fn=collate_fn)

In [7]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

### Define Model

In [8]:
def get_model_instance_segmentation(num_classes:int):
    
    # Load an instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    
    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # Replace pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model

In [9]:
model = get_model_instance_segmentation(3)



In [10]:
for imgs, annotations in data_loader:
    imgs = list(img.to(device) for img in imgs)
    annotations = [{k:v.to(device) for k, v in t.items()} for t in annotations]
    
    # Test / Sanity check for the first iteration
    print(annotations)
    break

[{'boxes': tensor([[ 79., 105., 109., 142.],
        [185., 100., 226., 144.],
        [325.,  90., 360., 141.]]), 'labels': tensor([0, 1, 0]), 'image_id': tensor([0])}, {'boxes': tensor([[321.,  34., 354.,  69.],
        [224.,  38., 261.,  73.],
        [299.,  58., 315.,  81.],
        [143.,  74., 174., 115.],
        [ 74.,  69.,  95.,  99.],
        [191.,  67., 221.,  93.],
        [ 21.,  73.,  44.,  93.],
        [369.,  70., 398.,  99.],
        [ 83.,  56., 111.,  89.]]), 'labels': tensor([1, 1, 1, 1, 1, 1, 1, 1, 0]), 'image_id': tensor([1])}, {'boxes': tensor([[ 68.,  42., 105.,  69.],
        [154.,  47., 178.,  74.],
        [238.,  34., 262.,  69.],
        [333.,  31., 366.,  65.]]), 'labels': tensor([1, 1, 1, 2]), 'image_id': tensor([2])}, {'boxes': tensor([[ 52.,  53.,  73.,  76.],
        [ 72.,  53.,  92.,  75.],
        [112.,  51., 120.,  68.],
        [155.,  60., 177.,  83.],
        [189.,  59., 210.,  80.],
        [235.,  57., 257.,  78.],
        [289.,  60.

### Train Model

In [11]:
num_epochs = 10
model.to(device)

# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum = 0.9, weight_decay=0.0005)

len_dataloader = len(data_loader)
batch_size = 4

for epoch in range(num_epochs):
    
    # Set to training mode
    model.train()
    i = 0
    epoch_loss = 0
    
    for imgs, annotations in data_loader:
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k:v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model([imgs[0]], [annotations[0]])
        losses = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        epoch_loss += losses
        
        if i%10 == 0 or i == len(data_loader):
            print(f"Epoch: {epoch + 1}, Step: {i} / {int(len_dataloader)}, losses: {losses:.4f}")
    print(f"Epoch: {epoch + 1} / {num_epochs}, Epoch loss: {epoch_loss}")

Epoch: 1, Step: 10 / 214, losses: 1.6696
Epoch: 1, Step: 20 / 214, losses: 0.9823
Epoch: 1, Step: 30 / 214, losses: 0.5432
Epoch: 1, Step: 40 / 214, losses: 1.8423
Epoch: 1, Step: 50 / 214, losses: 0.1456
Epoch: 1, Step: 60 / 214, losses: 0.0611
Epoch: 1, Step: 70 / 214, losses: 0.4439
Epoch: 1, Step: 80 / 214, losses: 0.3607
Epoch: 1, Step: 90 / 214, losses: 0.0666
Epoch: 1, Step: 100 / 214, losses: 0.4841
Epoch: 1, Step: 110 / 214, losses: 0.2758
Epoch: 1, Step: 120 / 214, losses: 0.3792
Epoch: 1, Step: 130 / 214, losses: 0.7358
Epoch: 1, Step: 140 / 214, losses: 0.7931
Epoch: 1, Step: 150 / 214, losses: 0.2166
Epoch: 1, Step: 160 / 214, losses: 0.3321
Epoch: 1, Step: 170 / 214, losses: 0.6620
Epoch: 1, Step: 180 / 214, losses: 0.0069
Epoch: 1, Step: 190 / 214, losses: 0.5310
Epoch: 1, Step: 200 / 214, losses: 0.7132
Epoch: 1, Step: 210 / 214, losses: 0.4327
Epoch: 1, Step: 214 / 214, losses: 0.3064
Epoch: 1 / 10, Epoch loss: 91.98088073730469
Epoch: 2, Step: 10 / 214, losses: 0.7383

### Save Model

In [12]:
torch.save(model.state_dict(), "../model/mask_detection_fasterrcnn.pt")