In [None]:
!pip install pycocotools --quiet
!git clone https://github.com/pytorch/vision.git
!git checkout v0.3.0

!cp vision/references/detection/utils.py ./
!cp vision/references/detection/transforms.py ./
!cp vision/references/detection/coco_eval.py ./
!cp vision/references/detection/engine.py ./
!cp vision/references/detection/coco_utils.py ./

In [None]:
import sys
import os

import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
from matplotlib import patches
import random
from sklearn.model_selection import train_test_split

import cv2
import torch
import torchvision
from torchvision import datasets, models, transforms

import torch.nn as nn

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from torch.utils import data as torch_data
from torchvision import transforms as T
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torchvision.models.detection import FasterRCNN

from engine import evaluate

import time
from xml.etree import ElementTree as ET

In [None]:
def create_X_y(images_dir, annotations_dir):
    images = sorted(os.listdir(images_dir))
    annotations = sorted(os.listdir(annotations_dir))
    
    X = [os.path.join(images_dir, image) for image in images]
    y = [os.path.join(annotations_dir, annotation) for annotation in annotations]
    
    return X, y

In [None]:
images_dir = '/kaggle/input/office-masks/Images/Images/'
annotations_dir = '/kaggle/input/office-masks/Annotations/Annotations/'

In [None]:
# Create X and y arrays
X, y = create_X_y(images_dir, annotations_dir)

In [None]:
class MaskDataset(torch.utils.data.Dataset):
    def __init__(self, images, annotations, X, y, width, height, T=None):
        self.T = T
        self.images = images
        self.annotations = annotations
        self.width = width
        self.height = height
        
        self.imgs = X
        self.annotate = y
        #self.imgs = [image for image in sorted(os.listdir(images))]
        #self.annotate = [image for image in sorted(os.listdir(annotations))]
        
        self.classes = [_, 'with_mask', 'without_mask', 'mask_weared_incorrect']
        
    
    def __len__(self):
        return len(self.imgs)
    
    def __getitem__(self, index):
        image_name = self.imgs[index]
        image_path = os.path.join(self.images, image_name)
        
        # Reading and converting images
        img = cv2.imread(image_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        img_size = cv2.resize(img_rgb, (self.width, self.height), cv2.INTER_AREA)
        
        img_size /= 255.0
        
        # Annotation file
        annotation_filename = self.annotate[index]
        annotation_path = os.path.join(self.annotations, annotation_filename)
        
        boxes = []
        labels =[]
        tree = ET.parse(annotation_path)
        root = tree.getroot()
        
        # Open cv file as width and height
        wt = img.shape[1]
        ht = img.shape[0]
        
        for member in root.findall('object'):
            labels.append(self.classes.index(member.find('name').text))
            
            xmin = int(float(member.find('bndbox').find('xmin').text))
            xmax = int(float(member.find('bndbox').find('xmax').text))
            ymin = int(float(member.find('bndbox').find('ymin').text))
            ymax = int(float(member.find('bndbox').find('ymax').text))
            
            # Corrected box coordinates for image size
            xmin_cor = np.clip((xmin / wt) * self.width, 0, self.width)
            xmax_cor = np.clip((xmax / wt) * self.width, 0, self.width)
            ymin_cor = np.clip((ymin / ht) * self.height, 0, self.height)
            ymax_cor = np.clip((ymax / ht) * self.height, 0, self.height)
            
            boxes.append([xmin_cor, ymin_cor, xmax_cor, ymax_cor])
            
        # Convert into a torch.Tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        
        area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
        
        # For not a crowd
        iscrowd = torch.zeros((boxes.shape[0], ), dtype=torch.long)
        
        labels = torch.as_tensor(labels, dtype=torch.long)
        
        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['area'] = area
        target['iscrowd'] = iscrowd
        
        image_id = torch.tensor([index])
        target['image_id'] = image_id
        
        if self.T:
            
            sample = self.T(
                image = img_size,
                bboxes = target['boxes'],
                labels = labels
            )
            
            img_size = sample['image']
            target['boxes'] = torch.Tensor(sample['bboxes'])
            
        return img_size, target
    
# Check dataset
dataset = MaskDataset(images_dir, annotations_dir, X, y, 450, 350)
print('Lenght of dataset:', len(dataset), '\n')

# Getting the image and target for a test index
img, target = dataset[5]
print('Image shape:', img.shape, '\n', 'Target:', target)

In [None]:
# Initialize a dictionary to store the counts of each class
class_counts = {label: 0 for label in dataset.classes}

# Iterate over the dataset and count the occurrences of each class label
for i in range(len(dataset)):
    _, target = dataset[i]
    labels = target['labels']
    for label in labels:
        class_counts[dataset.classes[label]] += 1

# Extract the class labels and counts
labels = [str(label) for label in class_counts.keys()]  # Convert labels to strings
counts = list(class_counts.values())

# Set up the subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
background_color = '#faf9f4'
ax1.set_facecolor(background_color)
ax2.set_facecolor(background_color)

# Plot the pie chart
colors = ['blue', 'green', 'red']
ax1.pie(counts, wedgeprops=dict(width=0.3, edgecolor='w'), labels=labels,
        colors=colors, radius=1, startangle=120, autopct='%1.2f%%')
ax1.set_title('Class Distribution (Pie Chart)')

# Plot the bar chart
ax2.bar(labels, counts, color='maroon', width=0.4)
ax2.set_xlabel('Class Labels')
ax2.set_ylabel('Counts')
ax2.set_title('Class Distribution (Bar Chart)')

plt.show()

In [None]:
def plot_bbox(img, target, ax):
    
    # display the image
    ax.imshow(img)
    
    for (box, label) in zip(target['boxes'], target['labels']):
        x, y, width, height = box[0], box[1], box[2] - box[0], box[3] - box[1]
        
        # create a rectangle patch with different colors i.e. red: without mask, green: without mask, blue: mask weared incorrect
        if(label == 1):
            rect = patches.Rectangle((x, y), width, height, linewidth=1, edgecolor='g', facecolor='none')
            ax.annotate('with mask', (x, y), color='g')
            
        elif(label == 2):
            rect = patches.Rectangle((x, y), width, height, linewidth=1, edgecolor='r', facecolor='none')
            ax.annotate('without mask', (x, y), color='r')
            
        else:
            rect = patches.Rectangle((x, y), width, height, linewidth=1, edgecolor='b', facecolor='none')
            ax.annotate('mask weared incorrect', (x, y), color='b')
        
        # add the patch to the Axes
        ax.add_patch(rect)
        
        
    #plt.show()
# create a single plot with a 1x1 grid
fig, ax = plt.subplots(1, 1, figsize=(12, 8))

# plotting the image with bounding boxes
img, target = dataset[6]
plot_bbox(img, target, ax)    

In [None]:
def get_transform(train):
    
    if train:
        return A.Compose(
        [
            ToTensorV2(p=1.0)
        ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']}
        )
    else:
        return A.Compose(
        [
            ToTensorV2(p=1.0)
        ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']}
        )

In [None]:
def collate_func(batch):
    return tuple(zip(*batch))

In [None]:
# Load the dataset to get the labels for stratification
data = MaskDataset(images_dir, annotations_dir, X, y, 450, 350)

# Get the labels from the dataset
labels = [data[idx][1]['labels'] for idx in range(len(data))]

In [None]:
labels

In [None]:
# Split into train & temp
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42, stratify=labels)

# Split into valid & test
X_valid, X_test, y_valid, y_test = train_test_split(X_valid, y_valid, test_size=0.3, random_state=42)

In [None]:
# Create instances of MaskDataset for train, validation, and test sets
train_data = MaskDataset(
    images_dir,
    annotations_dir,
    X_train, 
    y_train,
    450, 350,
    T = get_transform(train=True)
)

valid_data = MaskDataset(
     images_dir,
     annotations_dir,
     X_valid,
     y_valid,
     450, 350,
     T = get_transform(train=False)
 )

test_data = MaskDataset(
     images_dir,
     annotations_dir,
     X_test,
     y_test,
     450, 350,
     T = get_transform(train=False)
 )

print('Length of training set:', len(train_data), '\nLength of validation set:', len(valid_data), '\nLength of test set:', len(test_data))

In [None]:
CFG = {
    'n_epochs': 20,
    'lr': 0.0001,
    'batch_size' : 8,
    'num_workers': 2,
    'num_classes' : 4,
    'momentum' : 0.9,
}

In [None]:
train_dataloader = torch_data.DataLoader(
    train_data,
    batch_size = CFG['batch_size'],
    shuffle = True,
    num_workers = CFG['num_workers'],
    collate_fn = collate_func,
)

valid_dataloader = torch_data.DataLoader(
    valid_data,
    batch_size = CFG['batch_size'],
    shuffle = False,
    num_workers = CFG['num_workers'],
    collate_fn = collate_func,
)
test_dataloader = torch_data.DataLoader(
    test_data,
    batch_size = CFG['batch_size'],
    shuffle = False,
    num_workers = CFG['num_workers'],
    collate_fn = collate_func,
)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = CFG['num_classes']

In [None]:
# Load fasterrcnn model
def fasterrcnn_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

# Get a model
def load_model():
    model = fasterrcnn_model(num_classes).to(device)

    model = torch.load('/kaggle/input/office-masks/Kaggle_model_weights-2.pth', map_location=device)
    model.eval()
    return model
model = load_model()

In [None]:
model

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr = CFG['lr'], momentum = CFG['momentum'])
loss_fn = nn.CrossEntropyLoss()
n_epochs = CFG['n_epochs']

In [None]:
def train_model(
    model,
    optimizer,
    n_epochs,
    train_dataloader,
    valid_dataloader,
    device,
):
    dur = []
    start_time = time.time()
    for epoch in range(n_epochs):
        
        t0 = time.time()
        train_loss = 0
        valid_loss = 0
        model.train()
        
        
        for images, annotations in (train_dataloader):
            images = list(image.to(device) for image in images)
            annotations = [{a: n.to(device) for a, n in t.items()} for t in annotations]
            
            optimizer.zero_grad()

            output = model(images, annotations)
            loss = sum(loss for loss in output.values())
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_dataloader)
        
    
        # model.eval()

        for images, annotations in (valid_dataloader):
            images = list(image.to(device) for image in images)
            annotations = [{a: n.to(device) for a, n in t.items()} for t in annotations]
            
            optimizer.zero_grad()
            
            with torch.no_grad():
                
                output = model(images, annotations)
            
            loss = sum(loss for loss in output.values())
            valid_loss += loss.item()
        valid_loss /= len(valid_dataloader)
        
        evaluate(model, valid_dataloader, device=device)
        dur.append(time.time() - t0)
        
        print('Epoch: {}, Training Loss: {:.4f}, Validation Loss: {:.4f}, Time: {:.4f}'.format(epoch, train_loss, valid_loss, np.mean(dur)))

    torch.save(model, '/kaggle/working/Custom_model_weights.pth')
    print('Training finished, took {:.2f}s'.format(time.time() - start_time))

In [None]:
train_model(model, optimizer, n_epochs, train_dataloader, valid_dataloader, device)

In [None]:
def prediction_filter(prefinal_pred, threshold):
    
    filter_mask = prefinal_pred['scores'] > threshold
    
    prefinal_pred['boxes'] = prefinal_pred['boxes'][filter_mask]
    prefinal_pred['scores'] = prefinal_pred['scores'][filter_mask]
    prefinal_pred['labels'] = prefinal_pred['labels'][filter_mask]
    return prefinal_pred

In [None]:
def apply_nms(prefinal_pred, threshold):
    # return the indices of the bboxes to keep
    keep = torchvision.ops.nms(prefinal_pred['boxes'], prefinal_pred['scores'], threshold)
    #final_pred = prefinal_pred
    
    preds_filter = prediction['scores']
    prefinal_pred['boxes'] = prefinal_pred['boxes'][keep]
    prefinal_pred['scores'] = prefinal_pred['scores'][keep]
    prefinal_pred['labels'] = prefinal_pred['labels'][keep]
    return prefinal_pred

In [None]:
# function to convert a torchtensor back to PIL image
def torch_to_pil(img):
    return T.ToPILImage()(img).convert('RGB')

In [None]:
# pick one image from the test set
img, target = test_data[2]
# put the model in evaluation mode
model.eval()
with torch.no_grad():
    prediction = model([img.to(device)])[0]

print('Predicted number of boxes: ', len(prediction['labels']))
print('Real number of boxes: ', len(target['labels']))

In [None]:
# pick one image from the test set
img, target = test_data[2]

# put the model in evaluation mode
model.eval()
with torch.no_grad():
    prediction = model([img.to(device)])[0]

# create a 1x4 grid of subplots
fig, axs = plt.subplots(2, 2, figsize=(22, 22))

# plot each image with bounding boxes in a separate subplot
print('Expected Output: ', len(target['labels']))
plot_bbox(torch_to_pil(img), target, axs[0][0]) 
axs[0][0].set_title('Expected Output', fontsize = 18)

prediction = {x: y.cpu() for x, y in prediction.items()}
print('Model Output: ', len(prediction['labels']))
plot_bbox(torch_to_pil(img), prediction, axs[1][0])
axs[1][0].set_title('Model Output', fontsize = 18)

filtered_prediction = prediction_filter(prediction, threshold = 0.5)
print('Predicted Filtered Outputs: ', len(filtered_prediction['labels']))
plot_bbox(torch_to_pil(img), filtered_prediction, axs[0][1])
axs[0][1].set_title('Filtered Predictions', fontsize = 18)

nms_prediction = apply_nms(filtered_prediction, threshold = 0.5)
print('NMS Applied Model Output', len(nms_prediction['labels']))
plot_bbox(torch_to_pil(img), nms_prediction, axs[1][1])
axs[1][1].set_title('NMS Prediction', fontsize = 18)

plt.show()

In [None]:
for i in range(len(test_data)):
    img, target = test_data[i]
    # Perform further operations with the image and target

    # put the model in evaluation mode
    model.eval()
    with torch.no_grad():
        prediction = model([img.to(device)])[0]
    
    # create grid of subplots
    fig, axs = plt.subplots(2, 2, figsize=(22, 22))

    # plot each image with bounding boxes in a separate subplot
    
    plot_bbox(torch_to_pil(img), target, axs[0][0])
    print('Expected Output: ', len(target['labels']))
    axs[0][0].set_title('Expected Output', fontsize = 18)

    prediction = {x: y.cpu() for x, y in prediction.items()}
    print('Model Output: ', len(prediction['labels']))
    plot_bbox(torch_to_pil(img), prediction, axs[1][0])
    axs[1][0].set_title('Model Output', fontsize = 18)

    filtered_prediction = prediction_filter(prediction, threshold = 0.5)
    print('Predicted Filtered Outputs: ', len(filtered_prediction['labels']))
    plot_bbox(torch_to_pil(img), filtered_prediction, axs[0][1])
    axs[0][1].set_title('Filtered Predictions', fontsize = 18)

    nms_prediction = apply_nms(filtered_prediction, threshold = 0.5)
    print('NMS Applied Model Output', len(nms_prediction['labels']))
    plot_bbox(torch_to_pil(img), nms_prediction, axs[1][1])
    axs[1][1].set_title('NMS Prediction', fontsize = 18)

    plt.show()