In [1]:
!pip install torchmetrics



In [2]:
!pip install --upgrade albumentations opencv-python



In [3]:
import os
from PIL import Image
from pathlib import Path, PosixPath
from typing import List
from xml.etree import ElementTree

import albumentations as A
import cv2
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from google.colab import drive
from torch.utils.data import Dataset, DataLoader
from torchmetrics.detection import MeanAveragePrecision as MAP
from torchvision.ops import nms
from torchvision.transforms import ToTensor
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, fasterrcnn_resnet50_fpn

In [4]:
!pip install kaggle



In [5]:
!mkdir ~/.kaggle

mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [6]:
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle

In [8]:
!chmod 600 ~/.kaggle/kaggle.json

In [9]:
!kaggle datasets download -d andrewmvd/face-mask-detection

Downloading face-mask-detection.zip to /content
 94% 375M/398M [00:02<00:00, 136MB/s]
100% 398M/398M [00:02<00:00, 157MB/s]


In [10]:
!unzip face-mask-detection.zip &

Archive:  face-mask-detection.zip
  inflating: annotations/maksssksksss0.xml  
  inflating: annotations/maksssksksss1.xml  
  inflating: annotations/maksssksksss10.xml  
  inflating: annotations/maksssksksss100.xml  
  inflating: annotations/maksssksksss101.xml  
  inflating: annotations/maksssksksss102.xml  
  inflating: annotations/maksssksksss103.xml  
  inflating: annotations/maksssksksss104.xml  
  inflating: annotations/maksssksksss105.xml  
  inflating: annotations/maksssksksss106.xml  
  inflating: annotations/maksssksksss107.xml  
  inflating: annotations/maksssksksss108.xml  
  inflating: annotations/maksssksksss109.xml  
  inflating: annotations/maksssksksss11.xml  
  inflating: annotations/maksssksksss110.xml  
  inflating: annotations/maksssksksss111.xml  
  inflating: annotations/maksssksksss112.xml  
  inflating: annotations/maksssksksss113.xml  
  inflating: annotations/maksssksksss114.xml  
  inflating: annotations/maksssksksss115.xml  
  inflating: annotations/maksssk

In [11]:
anno_path = Path("annotations")
file_path = Path("images")

In [12]:
all_labels = set()

for xml_file in anno_path.rglob("*xml"):
    xml = ElementTree.parse(xml_file)
    for element in xml.findall("object"):
        label = element.find("name").text
        all_labels.add(label)

In [13]:
labels_dict = {label: i for i, label in enumerate(all_labels, start=1)}

In [14]:
labels_dict

{'mask_weared_incorrect': 2, 'with_mask': 1, 'without_mask': 3}

In [15]:
train_transform = A.Compose([
    A.SmallestMaxSize(224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2)
], bbox_params=A.BboxParams(format="coco", label_fields=['category_ids'], min_visibility=0.3))

test_transform = A.Compose([
    A.SmallestMaxSize(224),
], bbox_params=A.BboxParams(format="coco", label_fields=['category_ids'], min_visibility=0.3))

In [16]:
class DetectionDataset(Dataset):
    
    def __init__(self, anno_files: List[PosixPath], file_path, labels_dict, transform=None):
        self.anno_files = anno_files
        self.file_path = file_path
        self.to_tensor = ToTensor()
        self.labels_dict = labels_dict
        self.transform = transform
    
    def __len__(self):
        return len(self.anno_files)
    
    def __getitem__(self, index):
        anno_file = self.anno_files[index]
        xml =  ElementTree.parse(anno_file)
        img_name = xml.find("filename").text
        img = Image.open(self.file_path / img_name)
        img = img.convert('RGB')
        img = np.array(img)
        labels = []
        bboxes = []
        height = img.shape[0]
        width = img.shape[1]
        for element in xml.findall("object"):
            label = element.find("name").text
            label_id = self.labels_dict[label]
            labels.append(label_id)
            bndbox = element.find("bndbox")
            xmin = int(bndbox.find("xmin").text)
            ymin = int(bndbox.find("ymin").text)
            xmax = min(int(bndbox.find("xmax").text), width)
            ymax = min(int(bndbox.find("ymax").text), height)
            bboxes.append([xmin, ymin, xmax - xmin, ymax - ymin])
        if self.transform is not None:
            img_dict = self.transform(image=img, bboxes=bboxes, category_ids=labels)
            bboxes = img_dict["bboxes"]
            img = self.to_tensor(img_dict["image"])
            labels = torch.LongTensor(img_dict["category_ids"])
        else:
            img = self.to_tensor(img)
            labels = torch.LongTensor(labels)
        areas = [width * height for _, _, width, height in bboxes]
        bboxes = [
            [xmin, ymin, xmin + width, ymin + height]
            for xmin, ymin, width, height in bboxes
        ]
        bboxes = torch.FloatTensor(bboxes)
        image_id = torch.tensor([index])
        areas = torch.FloatTensor(areas)
        return {
            "image": img, 
            "labels": labels,
            "bndboxes": bboxes,
            "areas": areas,
            "image_id": image_id
        }

In [17]:
anno_files = list(anno_path.rglob("*xml"))
idxs = np.arange(len(anno_files))
np.random.seed(0)
train_idxs = np.random.choice(idxs, int(0.8 * len(idxs)), replace=False)
test_idxs = [i for i in idxs if i not in train_idxs]

In [18]:
train_files = [anno_files[i] for i in train_idxs]
test_files = [anno_files[i] for i in test_idxs]

In [19]:
len(train_files), len(test_files)

(682, 171)

In [20]:
train_dataset = DetectionDataset(train_files, file_path, labels_dict, train_transform)
test_dataset = DetectionDataset(train_files, file_path, labels_dict, test_transform)

In [21]:
model = fasterrcnn_resnet50_fpn(pretrained=True)

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth


  0%|          | 0.00/160M [00:00<?, ?B/s]

In [22]:
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, len(labels_dict) + 1)

In [23]:
model.to("cuda");

RuntimeError: ignored

In [24]:
EPOCHES = 20
BATCH_SIZE = 6
LR = 1e-4
NMS_THRESH = 0.5

In [25]:
def collate_fn(batch):
    images = []
    targets = []
    for image_dict in batch:
        images.append(image_dict["image"])
        targets.append({
            "labels": image_dict["labels"],
            "area": image_dict["areas"],
            "boxes": image_dict["bndboxes"],
            "image_id": image_dict["image_id"]
        })
    return images, targets

In [26]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

optimizer = optim.Adam(model.parameters(), LR)

In [27]:
def fastrcnn_loss(class_logits, box_regression, labels, regression_targets):

    labels = torch.cat(labels, dim=0)
    regression_targets = torch.cat(regression_targets, dim=0)

    classification_loss = F.cross_entropy(class_logits, labels, weight=[])

    # get indices that correspond to the regression targets for
    # the corresponding ground truth labels, to be used with
    # advanced indexing
    sampled_pos_inds_subset = torch.where(labels > 0)[0]
    labels_pos = labels[sampled_pos_inds_subset]
    N, num_classes = class_logits.shape
    box_regression = box_regression.reshape(N, box_regression.size(-1) // 4, 4)

    box_loss = F.smooth_l1_loss(
        box_regression[sampled_pos_inds_subset, labels_pos],
        regression_targets[sampled_pos_inds_subset],
        beta=1 / 9,
        reduction="sum",
    )
    box_loss = box_loss / labels.numel()

    return classification_loss, box_loss

In [28]:
torchvision.models.detection.roi_heads.fastrcnn_loss = fastrcnn_loss

In [29]:
def apply_nms(orig_prediction, iou_thresh):

    keep = nms(orig_prediction['boxes'], orig_prediction['scores'], iou_thresh)

    # Keep indices from nms
    final_prediction = orig_prediction
    final_prediction['boxes'] = final_prediction['boxes'][keep]
    final_prediction['scores'] = final_prediction['scores'][keep]
    final_prediction['labels'] = final_prediction['labels'][keep]

    return final_prediction

In [None]:
for e in range(EPOCHES):
    model.train()
    for i, batch in enumerate(train_loader):
        optimizer.zero_grad()
        images, targets = batch
        images = [image.to("cuda") for image in images]
        targets = [{k: v.to("cuda") for k, v in targ_dict.items()} for targ_dict in targets]
        out = model(images, targets)
        loss = out["loss_classifier"] + out["loss_box_reg"] + out["loss_objectness"] + out["loss_rpn_box_reg"]
        loss.backward()
        optimizer.step()
        if i % 100 == 0:
            print(f"Epoch: {e + 1}/{EPOCHES}, iter: {i + 1}/{len(train_loader)}, loss: {loss.item():.3f}")
    model.eval()
    metric = MAP()
    for batch in test_loader:
        with torch.no_grad():
            images, targets = batch
            images = [image.to("cuda") for image in images]
            preds = model(images)
            preds = [apply_nms(p, NMS_THRESH) for p in preds]
            preds = [{k: v.to("cpu") for k, v in pred_dict.items()} for pred_dict in preds]
            metric.update(preds, targets)
    score = metric.compute()
    print(f"Epoch: {e + 1}/{EPOCHES}, metric: {score['map']}")

In [None]:
metric = MAP()
model.eval()
for batch in test_loader:
    with torch.no_grad():
        images, targets = batch
        images = [image.to("cuda") for image in images]
        preds = model(images)
        preds = [apply_nms(p, NMS_THRESH) for p in preds]
        preds = [{k: v.to("cpu") for k, v in pred_dict.items()} for pred_dict in preds]
        metric.update(preds, targets)
score = metric.compute()
print(f"metric: {score['map']}")