Import Packages

In [None]:
###pytorch packages
import torch
#from torchvision.models.detection import ssd300_vgg16
import torchvision
from torchvision import transforms as torchtrans  
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import Dataset, DataLoader

###imports for creating custom backbone
from torch import nn
from torch.nn import functional as F
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

###helper libraries
#from engine import train_one_epoch, evaluate
#import utils
import imutils
#import transforms as T
from torchvision import transforms as T

###for mAP, IoU metrics & confusion matrix
from torchmetrics.detection import MeanAveragePrecision, IntersectionOverUnion

###to process coco format json
from pycocotools.coco import COCO

'''###for image augmentations
from albumentations import Resize, Compose
from albumentations.pytorch.transforms import ToTensorV2
from albumentations.augmentations.transforms import Normalize'''

###onnx related importings
import torch.onnx
import onnx
import onnxruntime

###openvino related imports
import openvino as ov
from openvino.tools import mo

###other packages
import os
import json
import numpy as np
import cv2
from PIL import Image, ImageDraw
from imutils.video import VideoStream
from imutils.video import FPS
import imutils
import random
import time
from tqdm.auto import tqdm
import glob as glob
import math
import pandas as pd

###matplotlib for visualization
import matplotlib.pyplot as plt
import matplotlib.patches as patches

###for ignoring warnings
'''import warnings
warnings.filterwarnings('ignore')'''

plt.style.use('ggplot')

Dataset Handler

In [None]:
###create your own dataset func to handle the images and its resp. annotations(COCO) for resizing and other operations
class DDWDataset(Dataset):
    ###init functions for initializing different parameters passed
    def __init__(self, root, re_width, re_height, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.re_height = re_height
        self.re_width = re_width
        self.ids = list(sorted(self.coco.imgs.keys()))

    ###function to process the data and access it
    def __getitem__(self, index):
        ###Own coco file
        coco = self.coco
        ###Image ID
        img_id = self.ids[index]
        ###List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        ###Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        ###get the class/category id --> [0:Background, 1:awake, 2:drowsy] 
        ann_cls = int(coco_annotation[0]['category_id'])
        ###path for input image
        path = os.path.join(self.root,coco.loadImgs(img_id)[0]['file_name'])
        ###open the input image
        img = cv2.imread(path)
        ###convert BGR to RGB color format(because in cv2 by default its BGR) and represent to float32 as pytorch requires float values for training
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        ###resize all the image to single size
        image_resized = cv2.resize(img, (self.re_width, self.re_height))
        ###scale the raw pixel intensities to the range [0, 1]
        image_resized /= 255.0

        ###number of objects in the image
        num_objs = len(coco_annotation)
        ###get the height and width of the image to resize boxes/annotations
        image_width = img.shape[1]
        image_height = img.shape[0]

        ###Bounding boxes for objects
        ###coco format is bbox = [xmin, ymin, width, height]
        ###In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        labels = []
        for i in range(num_objs):
            labels.append(ann_cls)
            
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            # In coco format, bbox = [xmin, ymin, width, height]
            # In pytorch, the input should be [xmin, ymin, xmax, ymax]
            # so add x1 with width and y1 with height
            xmax = coco_annotation[i]['bbox'][0] + coco_annotation[i]['bbox'][2]
            ymax = coco_annotation[i]['bbox'][1] + coco_annotation[i]['bbox'][3]

            xmin_final = (xmin/image_width)*self.re_width
            xmax_final = (xmax/image_width)*self.re_width
            ymin_final = (ymin/image_height)*self.re_height
            ymax_final = (ymax/image_height)*self.re_height
            
            boxes.append([xmin_final, ymin_final, xmax_final, ymax_final])

        ###convert boxes to tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        ###convert labels/classes/annotations to tensors
        labels = torch.as_tensor(labels, dtype=torch.int64)
        ###convert img_id to tensor
        img_id = torch.tensor([img_id])
        ###calculate the area after resizing and no need to convert to tensor as its tensorised while getting calculated from boxes(which is tensorised)
        areas = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        ###convert iscrowd to tensor and only 1 class at a time in the image so crows=0
        #iscrowd = torch.zeros((0,), dtype=torch.int64)   ###gives empty tenosr as [] which throws error for torchmetrics, coco going further
        #iscrowd = torch.as_tensor(coco_annotation[0]['iscrowd'], dtype=torch.int64)   ###based on above issue don't create empty
        ###commented the above iscrowd because for torchmetrics its throwing error and in this usecase, its not useful

        ###Annotation is in dictionary format
        ann_dict = {}
        ann_dict["boxes"] = boxes
        ann_dict["labels"] = labels
        ann_dict["image_id"] = img_id
        ann_dict["area"] = areas
        #ann_dict["iscrowd"] = iscrowd

        ###check if any transformations(like augmentations-rotate, flip, etc.) are applied
        if self.transforms is not None:
            img_re = self.transforms(image_resized)

        return img_re, ann_dict

    ###func to get the length of the dataset passed 
    def __len__(self):
        return len(self.ids)

Transformations

In [None]:
###In my case, just added ToTensor, can add different transformations like resize, translate, etc.
#from albumentations.pytorch import ToTensorV2
def get_transform():
    custom_transforms = []
    custom_transforms.append(torchvision.transforms.ToTensor())
    #custom_transforms.append(torchvision.transforms.ToTensorV2(p=1.0))
    return torchvision.transforms.Compose(custom_transforms)

Utils & Helper Functions

In [None]:
###this class keeps track of the training and validation loss values and helps to get the average for each epoch as well
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0
        
    def send(self, value):
        self.current_total += value
        self.iterations += 1
    
    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations
    
    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

###To handle the data loading as different images may have different number of objects and to handle varying size tensors as well
def collate_fn(batch):
    return tuple(zip(*batch))

Training Configurations

In [None]:
BATCH_SIZE = 4 ###increase / decrease according to GPU memeory
RESIZE_TO = 416 ###resize the image for training and transforms based on the lowest size image if images are of different sizes
NUM_EPOCHS = 50 ###number of epochs to train for, Note : give even num as to save last data for last epoch as well
START_EPOCH = 0 ###to start from 0 or 1st epoch if scratch training, if continuing training, this var will be replaced with last saved ckpt num
###to train on GPU if available
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
###training images and COCO JSON Annotations files directory
TRAIN_DIR = "G:\\Projects\\ADAS\\DDW\\Data\\Usable_Data\\train"
TRAIN_ANN_DIR = "G:\\Projects\\ADAS\\DDW\\Data\\Usable_Data\\train\\train_3_annotations.coco.json"
###validation images and COCO JSON Annotations files directory
VALID_DIR = "G:\\Projects\\ADAS\\DDW\\Data\\Usable_Data\\valid"
VALID_ANN_DIR = "G:\\Projects\\ADAS\\DDW\\Data\\Usable_Data\\valid\\valid_3_annotations.coco.json"
###classes: 0 index is reserved for background, 1: awake, 2: drowsy
CLASSES = ['background', 'awake', 'drowsy']
NUM_CLASSES = 3

###location to save model and plots
OUT_DIR = "G:\\Projects\\ADAS\\DDW\\Output"
###name to save the trained model with
MODEL_NAME = 'frcnn_custombackbone_model'

SAVE_PLOTS_EPOCH = 4 ###save loss plots after these many epochs, Note : give even num as to save last data for last epoch as well
SAVE_MODEL_EPOCH = 4 ###save model after these many epochs, Note : give even num as to save last data for last epoch as well

DataLoaders

In [None]:
###prepare the final datasets and data loaders
train_dataset = DDWDataset(TRAIN_DIR, RESIZE_TO, RESIZE_TO, TRAIN_ANN_DIR, get_transform())
valid_dataset = DDWDataset(VALID_DIR, RESIZE_TO, RESIZE_TO, VALID_ANN_DIR, get_transform())

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0,
    collate_fn=collate_fn
)
valid_loader = DataLoader(
    valid_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_fn
)

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of validation samples: {len(valid_dataset)}\n")

FRCNN-Resnet50 Model Function

In [None]:
def create_model_frcnn_resnet50(num_classes):
    
    ###load Faster RCNN model a fresh as the classes what we have are new(i.e., without pretrained) and with only 1 box detection per image
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False, box_detections_per_img=1) #(pretrained=True)
    
    ###get the number of input features 
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    ###define a new head for the detector with required number of classes
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes) 
    
    return model

FRCNN-CustomBackbone Model Function

In [None]:
###to create a custom backbone for frcnn with resnet block
###creates 2 Residual block of ResNet in each sequential block
class ResidualBlock(nn.Module):    
    def __init__(
        self, in_channels, out_channels, use_1x1conv=True, strides=1
    ):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels,
                               kernel_size=3, padding=1, stride=strides)
        self.conv2 = nn.Conv2d(out_channels, out_channels,
                               kernel_size=3, padding=1)
        if use_1x1conv:
            self.conv3 = nn.Conv2d(in_channels, out_channels,
                                   kernel_size=1, stride=strides)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        inputs = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        if self.conv3:
            inputs = self.conv3(inputs)
        x += inputs
        return F.relu(x)

###creating a resnet block
def create_resnet_block(input_channels, output_channels, num_residuals,):
        resnet_block = []
        for i in range(num_residuals):
            if i == 0:
                resnet_block.append(ResidualBlock(input_channels, output_channels,
                                    use_1x1conv=True, strides=2))
            else:
                resnet_block.append(ResidualBlock(output_channels, output_channels))
        return resnet_block

###creating a custom resnet class with totally 5 sequential blocks
class CustomResNet(nn.Module):
    def __init__(self, num_classes=3):   ###with 3 classes
        super().__init__()
        self.block1 = nn.Sequential(nn.Conv2d(3, 16, kernel_size=7, stride=2, padding=3),
                        nn.BatchNorm2d(16), nn.ReLU(),
                        nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
        self.block2 = nn.Sequential(*create_resnet_block(16, 32, 2))
        self.block3 = nn.Sequential(*create_resnet_block(32, 64, 2))
        self.block4 = nn.Sequential(*create_resnet_block(64, 128, 2))
        self.block5 = nn.Sequential(*create_resnet_block(128, 256, 2))

        self.linear = nn.Linear(256, num_classes)

    ###a forward function with 5 blocks
    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        bs, _, _, _ = x.shape
        x = F.adaptive_avg_pool2d(x, 1).reshape(bs, -1)
        x = self.linear(x)
        return x

###finally creating model class by combining other functions     
def create_model_frcnn_custombackbone(num_classes):
    custom_resnet = CustomResNet(num_classes=3)   ###with 3 classes
    block1 = custom_resnet.block1
    block2 = custom_resnet.block2
    block3 = custom_resnet.block3
    block4 = custom_resnet.block4
    block5 = custom_resnet.block5

    backbone = nn.Sequential(
        block1, block2, block3, block4, block5 
    )

    backbone.out_channels = 256

    ###Generate anchors using the RPN where 5x3 anchors are used i.e., anchors with 5 different sizes and 3 different aspect ratios.
    anchor_generator = AnchorGenerator(
        sizes=((32, 64, 128, 256, 512),),
        aspect_ratios=((0.5, 1.0, 2.0),)
    )

    ###Feature maps to perform RoI cropping if backbone returns a Tensor, `featmap_names` is expected to be [0] where which feature maps to use can be chosen
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0'],
        output_size=7,
        sampling_ratio=2
    )

    ###pass the custom backbone in FRCNN model
    model = FasterRCNN(
        backbone=backbone,
        num_classes=num_classes,
        rpn_anchor_generator=anchor_generator,
        box_roi_pool=roi_pooler
    )

    return model

Training and Validation Functions

In [None]:
###function for running training iterations
def train(train_data_loader, model):
    print('Training...')
    global train_itr
    global train_loss_list
    
    ###initialize tqdm progress bar
    prog_bar = tqdm(train_data_loader, total=len(train_data_loader))
    
    for i, data in enumerate(prog_bar):
        optimizer.zero_grad()
        images, targets = data
        
        images = list(image.to(DEVICE) for image in images)
        
        ###this'll only work with collate_fn
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        ###don't loop through targets as above because its single dict and not list, if try to loop then throws error as 'str' doesn't has items()
        ###this'll only work when there is no collate_fn but without this, error will be thrown wrt to size as "required (N,4) but got (4,1,4)"
        ###can check in this link : https://discuss.pytorch.org/t/valueerror-expected-target-boxes-to-be-a-tensorof-shape-n-4-got-torch-size-4/137314
        #targets = [{k: v.to(DEVICE) for k, v in targets.items()}] 

        #model.train()
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        train_loss_list.append(loss_value)
        train_loss_hist.send(loss_value)
        losses.backward()
        optimizer.step()
        train_itr += 1
    
        ###update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")

    return train_loss_list

###function for running validation iterations
def validate(valid_data_loader, model):
    print('Validating...')
    global val_itr
    global val_loss_list
    
    ###initialize tqdm progress bar
    prog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))
    
    for i, data in enumerate(prog_bar):
        images, targets = data
        
        images = list(image.to(DEVICE) for image in images)
        
        ###this'll only work with collate_fn
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
        ###don't loop through targets as above because its single dict and not list, if try to loop then throws error as 'str' doesn't has items()
        ###this'll only work when there is no collate_fn but without this, error will be thrown wrt to size as "required (N,4) but got (4,1,4)"
        ###can check in this link : https://discuss.pytorch.org/t/valueerror-expected-target-boxes-to-be-a-tensorof-shape-n-4-got-torch-size-4/137314
        #targets = [{k: v.to(DEVICE) for k, v in targets.items()}]   
        
        ###not to calculate gradiemts as it affects backward propogation and if removed for other than training then will give forward pass error
        with torch.no_grad():
            loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()
        val_loss_list.append(loss_value)
        val_loss_hist.send(loss_value)
        val_itr += 1
        ###update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return val_loss_list

TorchMetrics Functions - mAP & IoU

In [None]:
###functions for getting mAP & IoU using torchmetrics
def meanAvgPrecision(data_loader, model, type):
    if(type=='train'):
        print("Calculating mAP for training data...")
    elif(type=='valid'):
        print("Calculating mAP for validation data...")
    
    global mAP_train_itr
    global mAP_train_list
    global mAP_valid_itr
    global mAP_valid_list
    
    for i, data in enumerate(data_loader):
        images, targets = data
            
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
    
        ###not to calculate gradiemts as it affects backward propogation and if removed for other than training then will give forward pass error
        with torch.no_grad():
            #model.eval()
            model_pred = model(images)
        
        metric_mAP = MeanAveragePrecision(iou_type="bbox")
        metric_mAP.update(model_pred, targets)
        mAP = metric_mAP.compute()
        ###getting only 'map' and not other metrics, refer https://torchmetrics.readthedocs.io/en/stable/detection/mean_average_precision.html
        '''other metrics are :  {'classes': tensor(0, dtype=torch.int32),
                             'map': tensor(0.6000),
                             'map_50': tensor(1.),
                             'map_75': tensor(1.),
                             'map_large': tensor(0.6000),
                             'map_medium': tensor(-1.),
                             'map_per_class': tensor(-1.),
                             'map_small': tensor(-1.),
                             'mar_1': tensor(0.6000),
                             'mar_10': tensor(0.6000),
                             'mar_100': tensor(0.6000),
                             'mar_100_per_class': tensor(-1.),
                             'mar_large': tensor(0.6000),
                             'mar_medium': tensor(-1.),
                             'mar_small': tensor(-1.)}'''
        mAP_val = mAP.get('map').item()
        mAP_val = 0 if(math.isnan(mAP_val)) else mAP_val   ###to avoid getting nan as result after calculations
        ###based on type as training data or validation data
        if(type=='train'):
            mAP_train_list.append(mAP_val)
            mAP_train_hist.send(mAP_val)
            mAP_train_itr += 1
        elif(type=='valid'):
            mAP_valid_list.append(mAP_val)
            mAP_valid_hist.send(mAP_val)
            mAP_valid_itr += 1

    ###return list based on type
    if(type=='train'):
        return mAP_train_list
    elif(type=='valid'):
        return mAP_valid_list

def InteroverUnion(data_loader, model, type):
    if(type=='train'):
        print("Calculating IoU for training data...")
    elif(type=='valid'):
        print("Calculating IoU for validation data...")
    
    global IoU_train_itr
    global IoU_train_list
    global IoU_valid_itr
    global IoU_valid_list
    
    for i, data in enumerate(data_loader):
        images, targets = data
            
        images = list(image.to(DEVICE) for image in images)
        targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]

        ###not to calculate gradiemts as it affects backward propogation and if removed for other than training then will give forward pass error
        with torch.no_grad():
            #model.eval()
            model_pred = model(images)
        
        metric_IoU = IntersectionOverUnion()
        IoU = metric_IoU(model_pred, targets)
        ###only 1 metric as 'iou' unlike mAP
        IoU_val = IoU.get('iou').item()
        IoU_val = 0 if(math.isnan(IoU_val)) else IoU_val   ###to avoid getting nan as result after calculations
        ###based on type as training data or validation data
        if(type=='train'):
            IoU_train_list.append(IoU_val)
            IoU_train_hist.send(IoU_val)
            IoU_train_itr += 1
        elif(type=='valid'):
            IoU_valid_list.append(IoU_val)
            IoU_valid_hist.send(IoU_val)
            IoU_valid_itr += 1

    ###return list based on type
    if(type=='train'):
        return IoU_train_list
    elif(type=='valid'):
        return IoU_valid_list
    

Load Checkpoint to Continue Training Process

In [None]:
###function to Load Checkpoint to Continue Training Process
def load_last_checkpoint(model, optimizer, ckpt_path):
    ###Input model & optimizer should be pre-defined, this routine only updates their states
    start_epoch = 0
    if os.path.isfile(ckpt_path):
        print("Loading Checkpoint '{}'".format(ckpt_path))
        checkpoint = torch.load(ckpt_path)
        start_epoch = checkpoint['epoch']
        model.load_state_dict(checkpoint['state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer'])
        train_loss_list = checkpoint['train_loss_list']
        val_loss_list = checkpoint['val_loss_list']
        mAP_train_list = checkpoint['mAP_train_list']
        IoU_train_list = checkpoint['IoU_train_list']
        mAP_valid_list = checkpoint['mAP_valid_list']
        IoU_valid_list = checkpoint['IoU_valid_list']
        print("Loaded Checkpoint '{}' (epoch {})".format(ckpt_path, start_epoch))
    else:
        print("No Checkpoint found at '{}'".format(ckpt_path))

    return model, optimizer, start_epoch, train_loss_list, val_loss_list, mAP_train_list, IoU_train_list, mAP_valid_list, IoU_valid_list

Graph/Plot Functions

In [None]:
###function to plot graphs for losses and metrics like mAP & IoU for both training and validation data
def plot_metrics_graph(type_list, type_color, type_name, OUT_DIR, epoch):
    ###create 6 subplots, one for each, training and validation losses & metrics - mAP, IoU
    figure_1, plot_ax = plt.subplots()

    plot_ax.plot(type_list, color=type_color)
    plot_ax.set_xlabel('iterations')
    plot_ax.set_ylabel(type_name)
    figure_1.savefig(f"{OUT_DIR}\\FRCNN_CustomBackbone_Outputs\\Model_Graphs_FRCNN_CustomBackbone\\{type_name}_{epoch+1}.png")
    
    #print('SAVING PLOTS COMPLETE...')
    
    plt.close('all')


Training and Validation Process

In [None]:
if __name__ == '__main__':
    ###initializing the iterators
    train_itr = 1
    val_itr = 1
    mAP_train_itr = 1
    IoU_train_itr = 1
    mAP_valid_itr = 1
    IoU_valid_itr = 1
    
    ###train and validation loss lists to store loss values of all iterations till ena and plot graphs for all iterations
    train_loss_list = []
    val_loss_list = []
    mAP_train_list = []
    IoU_train_list = []
    mAP_valid_list = []
    IoU_valid_list = []

    ###initialize the Averager class
    train_loss_hist = Averager()
    val_loss_hist = Averager()
    mAP_train_hist = Averager()
    IoU_train_hist = Averager()
    mAP_valid_hist = Averager()
    IoU_valid_hist = Averager()
    
    ###initialize the model and move model to the computation device
    model = create_model_frcnn_custombackbone(num_classes=NUM_CLASSES)
    model = model.to(DEVICE)
    ###get the model parameters
    params = [p for p in model.parameters() if p.requires_grad]
    ###define the optimizer
    optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)

    ###get the latest saved checkpoint if available to continue the training process
    last_ckpt_path_list = glob.glob(OUT_DIR+"\\FRCNN_CustomBackbone_Outputs\\Model_Graphs_FRCNN_CustomBackbone\\*.pt")
    #print("last_ckpt_path_list = = =",last_ckpt_path_list)
    if(len(last_ckpt_path_list) > 0):
        last_ckpt_path = max(last_ckpt_path_list, key=os.path.getctime)   ###to get the latest saved ckpt

        ###if ckpt available only then call the function to get last ckpt state
        model, optimizer, START_EPOCH, train_loss_list, val_loss_list, mAP_train_list, IoU_train_list, mAP_valid_list, IoU_valid_list = load_last_checkpoint(model, optimizer, last_ckpt_path)
        
    ###start the training epochs
    for epoch in range(START_EPOCH, NUM_EPOCHS):
        
        print(f"\nEPOCH {epoch+1} of {NUM_EPOCHS}")
        
        ###reset the training and validation loss histories for the current epoch
        train_loss_hist.reset()
        val_loss_hist.reset()
        mAP_train_hist.reset()
        IoU_train_hist.reset()
        mAP_valid_hist.reset()
        IoU_valid_hist.reset()
        
        ###start timer and carry out training and validation
        start = time.time()

        ###put the model back to training state after the 1st epoch is completed and after model is evaluated to get metrics - mAP & IoU
        ###if not put back to training state after evaluating state is completed then will throw error while processing the dataloader
        if(epoch > 0):
            model.train()
        ###calling training and validation func
        train_loss = train(train_loader, model)
        val_loss = validate(valid_loader, model)

        ###put the model to evaluate state for calculating the metrics - mAP & IoU
        model.eval()
        ###calling metrics - mAP & IoU func for training and validation
        mAP_train = meanAvgPrecision(train_loader, model, 'train')
        IoU_train = InteroverUnion(train_loader, model, 'train')
        mAP_valid = meanAvgPrecision(valid_loader, model, 'valid')
        IoU_valid = InteroverUnion(valid_loader, model, 'valid')
        
        print(f"Epoch #{epoch+1} train loss: {train_loss_hist.value:.3f}")   
        print(f"Epoch #{epoch+1} validation loss: {val_loss_hist.value:.3f}")
        print(f"Epoch #{epoch+1} mAP_train: {mAP_train_hist.value:.3f}")   
        print(f"Epoch #{epoch+1} iou_train: {IoU_train_hist.value:.3f}")
        print(f"Epoch #{epoch+1} mAP_valid: {mAP_valid_hist.value:.3f}")   
        print(f"Epoch #{epoch+1} iou_valid: {IoU_valid_hist.value:.3f}") 
        
        end = time.time()
        print(f"Took {((end - start) / 60):.3f} minutes for epoch {epoch+1}")

        ###get different states of model in dict like optimizer, model and next epoch because current epoch will be saved and next time retraining shouldn't happen
        ###also save the losses and metrics such that they can be continued to plot graphs
        model_state = {'epoch': epoch + 1, 'state_dict': model.state_dict(), 'optimizer': optimizer.state_dict(),
                       'train_loss_list' : train_loss_list, 'val_loss_list' : val_loss_list, 'mAP_train_list' : mAP_train_list,
                       'IoU_train_list' : IoU_train_list, 'mAP_valid_list' : mAP_valid_list, 'IoU_valid_list' : IoU_valid_list}
        
        if (epoch+1) % SAVE_MODEL_EPOCH == 0:   ###save model after every n epochs
            torch.save(model_state, f"{OUT_DIR}\\FRCNN_CustomBackbone_Outputs\\Model_Graphs_FRCNN_CustomBackbone\\{MODEL_NAME}_ckpt_{epoch+1}.pt")   ###save as pt and not pth because collides with python path library
            print('SAVING MODEL COMPLETE...\n')
        
        if (epoch+1) % SAVE_PLOTS_EPOCH == 0:   ###save 6 subplots, one for each, training and validation losses & metrics - mAP, IoU after n epochs
            plot_metrics_graph(train_loss, 'red', 'train_loss', OUT_DIR, epoch)
            plot_metrics_graph(val_loss, 'blue', 'valid_loss', OUT_DIR, epoch)
            plot_metrics_graph(mAP_train, 'red', 'mAP_train', OUT_DIR, epoch)
            plot_metrics_graph(mAP_valid, 'blue', 'mAP_valid', OUT_DIR, epoch)
            plot_metrics_graph(IoU_train, 'red', 'IoU_train', OUT_DIR, epoch)
            plot_metrics_graph(IoU_valid, 'blue', 'IoU_valid', OUT_DIR, epoch)

            print('SAVING PLOTS COMPLETE...')

        if (epoch+1) == NUM_EPOCHS:   ###save loss plots and model once at the end
            torch.save(model_state, f"{OUT_DIR}\\FRCNN_CustomBackbone_Outputs\\Model_Graphs_FRCNN_CustomBackbone\\{MODEL_NAME}_ckpt_{epoch+1}.pt")   ###save as pt and not pth because collides with python path library
            print('SAVING MODEL FOR LAST EPOCH COMPLETE...\n')
            
            plot_metrics_graph(train_loss, 'red', 'train_loss', OUT_DIR, epoch)
            plot_metrics_graph(val_loss, 'blue', 'valid_loss', OUT_DIR, epoch)
            plot_metrics_graph(mAP_train, 'red', 'mAP_train', OUT_DIR, epoch)
            plot_metrics_graph(mAP_valid, 'blue', 'mAP_valid', OUT_DIR, epoch)
            plot_metrics_graph(IoU_train, 'red', 'IoU_train', OUT_DIR, epoch)
            plot_metrics_graph(IoU_valid, 'blue', 'IoU_valid', OUT_DIR, epoch)

            print('SAVING PLOTS FOR LAST EPOCH COMPLETE...')
            

IoU, Avg IoU, Precision, Recall Calculations

In [None]:
###function to calculate IoU, Avg IoU, Precision, Recall
def IoU_Calc(grndtrth, pred):
	###determine the (x, y)-coordinates of the intersection rectangle
	x_grndtrth = max(grndtrth[0], pred[0])
	y_grndtrth = max(grndtrth[1], pred[1])
	x_pred = min(grndtrth[2], pred[2])
	y_pred = min(grndtrth[3], pred[3])
	
    ###compute the area of intersection rectangle
	interArea = max(0, x_pred - x_grndtrth + 1) * max(0, y_pred - y_grndtrth + 1)
	###compute the area of both the prediction and ground-truth rectangles
	grndtrth_Area = (grndtrth[2] - grndtrth[0] + 1) * (grndtrth[3] - grndtrth[1] + 1)
	pred_Area = (pred[2] - pred[0] + 1) * (pred[3] - pred[1] + 1)
	###compute the IoU by taking the intersection area and dividing it by the sum of prediction + ground-truth areas - the interesection area
	IoU = interArea / float(grndtrth_Area + pred_Area - interArea)
	# return the intersection over union value
	return IoU

def IoU_Precision_Recall_F1(metrics_dict, CLASSES):
    for cls in CLASSES:
        if(cls != "background"):
            ###each class IoU
            if((cls+"_IoU" in metrics_dict) and (cls+"_Count" in metrics_dict)):
                metrics_dict[cls+"_IoU"] = round(metrics_dict.get(cls+"_IoU") / metrics_dict.get(cls+"_Count"),2)
            ###each class precision = ((TP) / (TP + FP))
            if((cls+"_TP" in metrics_dict) and (cls+"_FP" in metrics_dict)):
                metrics_dict[cls+"_Precision"] = round(((metrics_dict.get(cls+"_TP")) / ((metrics_dict.get(cls+"_TP")) + (metrics_dict.get(cls+"_FP")))),2)
            ###each class recall = ((TP) / (TP + FN))
            ###FN is when model doesn't detects a groundtruth due to < threshold
            '''###here FN is other class's FP because its binary classification only when model predicts all based on threshold'''
            if((cls+"_TP" in metrics_dict) and (cls+"_FN" in metrics_dict)):
                metrics_dict[cls+"_Recall"] = round(((metrics_dict.get(cls+"_TP")) / ((metrics_dict.get(cls+"_TP")) + (metrics_dict.get(cls+"_FN")))),2)
            ###each class F1 = (2 / ((1 / precision) + (1 / recall)))
            if((cls+"_Precision" in metrics_dict) and (cls+"_Recall" in metrics_dict)):
                metrics_dict[cls+"_F1"] = round((2 / ((1 / (metrics_dict.get(cls+"_Precision"))) + (1 / (metrics_dict.get(cls+"_Recall"))))),2)

    ###both class IoU i.e., Average
    metrics_dict["Avg_IoU"] = round(((metrics_dict.get(str(CLASSES[1])+"_IoU") + metrics_dict.get(str(CLASSES[2])+"_IoU")) / 2),2)
    ###both class IoU i.e., Average
    metrics_dict["Avg_Precision"] = round(((metrics_dict.get(str(CLASSES[1])+"_Precision") + metrics_dict.get(str(CLASSES[2])+"_Precision")) / 2),2)
    ###both class IoU i.e., Average
    metrics_dict["Avg_Recall"] = round(((metrics_dict.get(str(CLASSES[1])+"_Recall") + metrics_dict.get(str(CLASSES[2])+"_Recall")) / 2),2)
    ###both class IoU i.e., Average
    metrics_dict["Avg_F1"] = round(((metrics_dict.get(str(CLASSES[1])+"_F1") + metrics_dict.get(str(CLASSES[2])+"_F1")) / 2),2)

    return metrics_dict

Model Testing - Test Images

In [None]:
all_time = time.time()
###set the computation device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
###load the model and the trained weights
model = create_model_frcnn_custombackbone(num_classes=3).to(device)
###if model saved with other parameters like optimizer, losses, epoch, etc., then below method won't load the model
'''model.load_state_dict(torch.load(
    OUT_DIR+"\\Model_Graphs\\"+MODEL_NAME+"_48.pt", map_location=device
))'''
###to load the model correctly, get the model's state dict and then use/run it
model.load_state_dict(torch.load(
    OUT_DIR+"\\FRCNN_CustomBackbone_Outputs\\Model_Graphs_FRCNN_CustomBackbone\\"+MODEL_NAME+"_ckpt_48.pt", map_location=device
)['state_dict'])   ###saved as pt and not pth because collides with python path library
###get the model into evaluation mode
model.eval()

###csv name to store metrics
out_csv = MODEL_NAME + "_MetricsCSV_ckpt48.csv"
###directory where all the images and its annotation json are present
DIR_TEST = "G:\\Projects\\ADAS\\DDW\\Data\\Usable_Data\\test"
test_images = glob.glob(f"{DIR_TEST}/*.jpg")
print(f"Test instances: {len(test_images)}")

###read the coco json to get all the ids, classes/annotations, boxes to make as groundtruth for metrics calculation : TP, FP, FN, Precision, Recall, F1 etc.
coco = COCO(DIR_TEST + "\\test_3_annotations.coco.json")

ids_l = list(sorted(coco.imgs.keys()))
imgname_l = [coco.loadImgs([i])[0]['file_name'] for i in ids_l]   ###[0] used because for 1 id only 1 image and 1 class annotation, if multiole then [0]...[n]
ann_ids_l = [coco.getAnnIds(imgIds=ids_l[i])[0] for i in ids_l]
ann_cls_l = [coco.loadAnns(i)[0]['category_id'] for i in ann_ids_l]
bbox_l = [coco.loadAnns(i)[0]['bbox'] for i in ann_ids_l]

###classes: 0 index is reserved for background
CLASSES = ['background', 'awake', 'drowsy']
###define the detection threshold, any detection having score below this will be discarded
detection_threshold = 0.75
###dictionary to store count, TP, FP, FN, IoU, Precision, Recall, F1 of each class and avg of all
metrics_dict = {}
for cls in CLASSES:
    if cls != "background":
        metrics_dict[cls+"_Count"] = 0
        metrics_dict[cls+"_TP"] = 0
        metrics_dict[cls+"_FP"] = 0
        metrics_dict[cls+"_FN"] = 0
        metrics_dict[cls+"_IoU"] = 0
        metrics_dict[cls+"_Precision"] = 0
        metrics_dict[cls+"_Recall"] = 0
        metrics_dict[cls+"_F1"] = 0
metrics_dict["Avg_IoU"] = 0
metrics_dict["Avg_Precision"] = 0
metrics_dict["Avg_Recall"] = 0
metrics_dict["Avg_F1"] = 0

###loop through all test images
for i in range(len(test_images)):
    each_img_time = time.time()
    ###get the image file name for saving output later on
    image_name = test_images[i].split('\\')[-1]
    print("Image Name = ", image_name)
    ###read the image using cv2
    image = cv2.imread(test_images[i])
    ###get image size for further groundtruth box resizing
    hght, wdth, *_ = image.shape
    ###resize the image probably to the model trained size to make the detections faster
    #image = cv2.resize(image, (RESIZE_TO, RESIZE_TO))
    ###take a copy of image to draw boxes and classes name with score
    orig_image = image.copy()
    ###BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    ###make the pixel range between 0 and 1
    image /= 255.0
    ###bring color channels to front followed by width and height
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)   ###np.float isn't supported in numpy==1.22 so np.float64 or float32 which is equivalant
    ###convert to tensor
    image = torch.tensor(image, dtype=torch.float).cuda()
    ###add batch dimension
    image = torch.unsqueeze(image, 0)

    ###get the index of the matching image name from coco ids list to get other info like class, box for current image(which acts as groundtruth)
    cur_id_index = imgname_l.index(image_name)
    cur_ann_cls = ann_cls_l[cur_id_index]
    cur_box = bbox_l[cur_id_index]
    ###to convert width and height of box into xmax/x2, ymax/y2 and not to resize based on model trained image size i.e., 416 because model will give outputs matching with original size images
    cur_box[2] = cur_box[0] + cur_box[2]
    cur_box[3] = cur_box[1] + cur_box[3]
    '''cur_box[0] = (cur_box[0]/wdth)*416
    cur_box[2] = (cur_box[0] + cur_box[2]/wdth)*416
    cur_box[1] = (cur_box[1]/hght)*416
    cur_box[3] = (cur_box[1] + cur_box[3]/hght)*416'''

    ###not to calculate gradiemts as it affects backward propogation and if removed for other than training then will give forward pass error
    with torch.no_grad():
        ###Note : model prediction on default size images without resizing the image as model will resize internally and give the outputs matching with original size images
        outputs = model(image)
    
    ###load all detection to CPU for further operations
    outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]
    
    ###draw boxes, classes with score on images if above threshold and if boxes available
    if len(outputs[0]['boxes']) != 0:
        ###converting tensors to numpy
        boxes = outputs[0]['boxes'].data.numpy()
        scores = outputs[0]['scores'].data.numpy()
        ###getting the max score above threshold from multiple boxes as only 1 box is required, works for custom backbone where single prediction can't be defined unlike pretrained backbone(given as parameter)
        max_score = scores.max(where=(scores >= detection_threshold),initial=0)
        max_score_index = np.where(scores==max_score)
        ###filter out boxes according to `detection_threshold`, Note: similar to NMS but in FRCNN, NMS is internal
        #boxes = boxes[scores >= detection_threshold].astype(np.int32)   ###works for frcnn with pretrained backbones having only 1 detection(given as parameter)
        boxes = boxes[max_score_index].astype(np.int32) if(max_score >= detection_threshold) else []   ###if no single box matching threshold then empty
        draw_boxes = boxes.copy()
        ###get all the predicited class names
        pred_classes = [CLASSES[i] for i in outputs[0]['labels'].cpu().numpy()]
        
        ###draw the bounding boxes and write the class name with score on top of it
        for j, box in enumerate(draw_boxes):
            
            ###get the counts of TP, FP for each class in dict
            if(CLASSES[cur_ann_cls] == pred_classes[j]):   ###TP case(score is already > threshold)
                metrics_dict[pred_classes[j]+'_TP'] = metrics_dict.get(pred_classes[j]+'_TP')+1
            else:   ###FP case(score is already > threshold)
                metrics_dict[pred_classes[j]+'_FP'] = metrics_dict.get(pred_classes[j]+'_FP')+1
            ###get the avg IoU for each class in dict
            cur_IoU = IoU_Calc(cur_box, box)
            metrics_dict[pred_classes[j]+'_IoU'] = round((metrics_dict.get(pred_classes[j]+'_IoU') + cur_IoU),2)

            ###draw groundtruth on images
            cv2.rectangle(orig_image,
                        (int(cur_box[0]), int(cur_box[1])),
                        (int(cur_box[2]), int(cur_box[3])),
                        (0, 0, 255), 2)
            cv2.putText(orig_image, CLASSES[cur_ann_cls], 
                        (int(cur_box[0]), int(cur_box[3]+10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 
                        2, lineType=cv2.LINE_AA)
            
            ###draw model predictions on image
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 255, 0,), 2)
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
            cv2.putText(orig_image, str(round(scores[j],2)), 
                        (int(box[0]), int(box[1]-20)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        
        #cv2.imshow('Prediction', orig_image)
        #cv2.waitKey(1)

        ###get the counts of FN, when model doesn't detects a groundtruth due to < threshold
        metrics_dict[CLASSES[cur_ann_cls]+'_FN'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')+1 if(len(draw_boxes) == 0) else metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')      
        
        ###save the predicted images
        path = OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\Test_Predictions_pt_FRCNN_CustomBackbone\\" + image_name
        cv2.imwrite(path, orig_image)
        
    ###get the counts of FN, when model doesn't detects a groundtruth at all
    metrics_dict[CLASSES[cur_ann_cls]+'_FN'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')+1 if(len(outputs[0]['boxes']) == 0) else metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')
    ###get the counts of each groundtruth class in dict
    metrics_dict[CLASSES[cur_ann_cls]+'_Count'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_Count')+1  

    print(f"Image {i+1} Done with Time : ", str(time.time() - each_img_time))
    print('-'*50)

'''
###1 class's FP is another class's FN in binary classification if background class is neglected, so swap the FPs for FNs only when model predicts all above threshold
metrics_dict[str(CLASSES[1])+"_FN"] = metrics_dict.get(CLASSES[2]+'_FP')
metrics_dict[str(CLASSES[2])+"_FN"] = metrics_dict.get(CLASSES[1]+'_FP')
'''
###to calculate the avg IoU
metrics_dict = IoU_Precision_Recall_F1(metrics_dict, CLASSES)
print("\nDifferent Metrics = ", metrics_dict)

###save the metrics in csv using pandas
df = pd.DataFrame([metrics_dict]) 
df.to_csv (OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\Test_Predictions_pt_FRCNN_CustomBackbone\\" + out_csv, index=False, header=True)

#cv2.destroyAllWindows()
print('\nTEST PREDICTIONS COMPLETE WITH TIME : ',str(time.time() - all_time))

Model Testing - Video/Webcam

In [None]:
###set the computation device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
###load the model and the trained weights
model = create_model_frcnn_custombackbone(num_classes=3).to(device)
###if model saved with other parameters like optimizer, losses, epoch, etc., then below method won't load the model
'''model.load_state_dict(torch.load(
    OUT_DIR+"\\Model_Graphs\\"+MODEL_NAME+"_48.pt", map_location=device
))'''
###to load the model correctly, get the model's state dict and then use/run it
model.load_state_dict(torch.load(
    OUT_DIR+"\\FRCNN_CustomBackbone_Outputs\\Model_Graphs_FRCNN_CustomBackbone\\"+MODEL_NAME+"_ckpt_48.pt", map_location=device
)['state_dict'])   ###saved as pt and not pth because collides with python path library
###get the model into evaluation mode
model.eval()

###classes: 0 index is reserved for background
CLASSES = ['background', 'awake', 'drowsy']
###define the detection threshold, any detection having score below this will be discarded
detection_threshold = 0.75

###initialize the video stream
vid_strm = cv2.VideoCapture("https://192.168.0.104:8080/video")   ###from android app webcam else give src=0 for local cam
###save the video stream used for predictions
vid_strm_save = cv2.VideoWriter(OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\Video_Output_pt_FRCNN_CustomBackbone\\" + MODEL_NAME + "ckpt48_webcam.mp4",cv2.VideoWriter_fourcc(*'MP4V'),
                         10,(1920,1080))

###initialize the FPS counter
fps = FPS().start()

#while True:
while(vid_strm.isOpened()):
    ###read/get the frame/image from video stream/webcam
    val, image = vid_strm.read()
    #image = cv2.resize(image, (RESIZE_TO, RESIZE_TO))   ###if resized, model will run faster and fps increases
    ###if no frame then exit the process
    if image is None:
        break
    ###take a copy of image to draw boxes and classes name with score
    orig_image = image.copy()
    ###BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    ###make the pixel range between 0 and 1
    image /= 255.0
    ###bring color channels to front followed by width and height
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)   ###np.float isn't supported in numpy==1.22 so np.float64 or float32 which is equivalant
    ###convert to tensor
    image = torch.tensor(image, dtype=torch.float).cuda()
    ###add batch dimension
    image = torch.unsqueeze(image, 0)

    ###not to calculate gradiemts as it affects backward propogation and if removed for other than training then will give forward pass error
    with torch.no_grad():
        ###Note : model prediction on default size images without resizing the image as model will resize internally and give the outputs matching with original size images
        outputs = model(image)
    
    ###load all detection to CPU for further operations
    outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]
    
    ###draw boxes, classes with score on images if above threshold and if boxes available
    if len(outputs[0]['boxes']) != 0:
        ###converting tensors to numpy
        boxes = outputs[0]['boxes'].data.numpy()
        scores = outputs[0]['scores'].data.numpy()
        ###getting the max score above threshold from multiple boxes as only 1 box is required, works for custom backbone where single prediction can't be defined unlike pretrained backbone(given as parameter)
        max_score = scores.max(where=(scores >= detection_threshold),initial=0)
        max_score_index = np.where(scores==max_score)
        ###filter out boxes according to `detection_threshold`, Note: similar to NMS but in FRCNN, NMS is internal
        #boxes = boxes[scores >= detection_threshold].astype(np.int32)   ###works for frcnn with pretrained backbones having only 1 detection(given as parameter)
        boxes = boxes[max_score_index].astype(np.int32) if(max_score >= detection_threshold) else []   ###if no single box matching threshold then empty
        draw_boxes = boxes.copy()
        ###get all the predicited class names
        pred_classes = [CLASSES[i] for i in outputs[0]['labels'].cpu().numpy()]
        
        ###draw the bounding boxes and write the class name with score on top of it
        for j, box in enumerate(draw_boxes):
            
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 255, 0), 2)
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
            cv2.putText(orig_image, str(round(scores[j],2)), 
                        (int(box[0]), int(box[1]-20)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        
    ###show the output frame
    cv2.imshow("Outout_Frame", orig_image)
    vid_strm_save.write(orig_image)
    
    ###if the 'q' key was pressed, break from the loop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    ###update the FPS counter
    fps.update()

###stop the timer and display FPS information
fps.stop()

print("Elapsed Time: {:.2f}".format(fps.elapsed()))
print("Approx. FPS: {:.2f}".format(fps.fps()))

###cleanup of all objects created
vid_strm.release()
vid_strm_save.release()
cv2.destroyAllWindows()

ONNX Conversion

In [None]:
###converting the .pt to onnx
###Note: conversion should happen without cuda device on cpu as it throws error "https://github.com/pytorch/pytorch/issues/72175"

###set the computation device
#device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
###load the model and the trained weights
model = create_model_frcnn_custombackbone(num_classes=3)#.to(device)
###to load the model correctly, get the model's state dict and then use/run it
model.load_state_dict(torch.load(
    OUT_DIR+"\\FRCNN_CustomBackbone_Outputs\\Model_Graphs_FRCNN_CustomBackbone\\"+MODEL_NAME+"_ckpt_48.pt")['state_dict'])   ###saved as pt and not pth because collides with python path library
###get the model into evaluation mode
model.eval()

###input tensor to the model
onnx_input = torch.randn(1, 3, RESIZE_TO, RESIZE_TO, requires_grad=True)#.to(device)
#model_onnx_input = model(onnx_input)
###path to save onnx model
onnx_path = OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\ONNX_FRCNN_CustomBackbone\\" + MODEL_NAME + "_ckpt48_onnx_varsize.onnx"

###export the model to onnx for variable length axes
torch.onnx.export(model,                     ###model being run
                  onnx_input,                ###model input (or a tuple for multiple inputs)
                  onnx_path,   ###where to save the model (can be a file or file-like object)
                  export_params=True,        ###store the trained parameter weights inside the model file
                  opset_version=11,          ###the ONNX version to export the model to(keep 11 as 10 throws error with onnx==1.11)
                  verbose=True,              ###prints the model conversion/debug process
                  input_names = ['input'],   ###the model's input names
                  output_names = ['output'], ###the model's output names
                  do_constant_folding=True,  ###whether to execute constant folding for optimization
                  dynamic_axes={"input": {2: "H", 3: "W"},
                               "output": {2: "H", 3: "W"},
                                })           ###make the onnx to accept different sizes of images

ONNX Model Testing - Test Images

In [None]:
all_time = time.time()
###Load the ONNX model
onnx_model = onnx.load(OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\ONNX_FRCNN_CustomBackbone\\"+MODEL_NAME+"_ckpt48_onnx_varsize.onnx")
###Create an ONNX runtime session with both CUDA and CPU to run the onnx using cuda if available
ort_session = onnxruntime.InferenceSession(OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\ONNX_FRCNN_CustomBackbone\\"+MODEL_NAME+"_ckpt48_onnx_varsize.onnx", 
                                           providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])

###csv name to store metrics
out_csv = MODEL_NAME + "_ONNX_MetricsCSV_ckpt48.csv"
###directory where all the images and its annotation json are present
DIR_TEST = "G:\\Projects\\ADAS\\DDW\\Data\\Usable_Data\\test"
test_images = glob.glob(f"{DIR_TEST}/*.jpg")
print(f"Test instances: {len(test_images)}")

###read the coco json to get all the ids, classes/annotations, boxes to make as groundtruth for metrics calculation : TP, FP, FN, Precision, Recall, F1 etc.
coco = COCO(DIR_TEST + "\\test_3_annotations.coco.json")

ids_l = list(sorted(coco.imgs.keys()))
imgname_l = [coco.loadImgs([i])[0]['file_name'] for i in ids_l]   ###[0] used because for 1 id only 1 image and 1 class annotation, if multiole then [0]...[n]
ann_ids_l = [coco.getAnnIds(imgIds=ids_l[i])[0] for i in ids_l]
ann_cls_l = [coco.loadAnns(i)[0]['category_id'] for i in ann_ids_l]
bbox_l = [coco.loadAnns(i)[0]['bbox'] for i in ann_ids_l]

###classes: 0 index is reserved for background
CLASSES = ['background', 'awake', 'drowsy']
###define the detection threshold, any detection having score below this will be discarded
detection_threshold = 0.75
###dictionary to store count, TP, FP, FN, IoU, Precision, Recall, F1 of each class and avg of all
metrics_dict = {}
for cls in CLASSES:
    if cls != "background":
        metrics_dict[cls+"_Count"] = 0
        metrics_dict[cls+"_TP"] = 0
        metrics_dict[cls+"_FP"] = 0
        metrics_dict[cls+"_FN"] = 0
        metrics_dict[cls+"_IoU"] = 0
        metrics_dict[cls+"_Precision"] = 0
        metrics_dict[cls+"_Recall"] = 0
        metrics_dict[cls+"_F1"] = 0
metrics_dict["Avg_IoU"] = 0
metrics_dict["Avg_Precision"] = 0
metrics_dict["Avg_Recall"] = 0
metrics_dict["Avg_F1"] = 0

###loop through all test images
for i in range(len(test_images)):
    each_img_time = time.time()
    ###get the image file name for saving output later on
    image_name = test_images[i].split('\\')[-1]
    print("Image Name = ", image_name)
    ###read the image using cv2
    image = cv2.imread(test_images[i])
    ###get image size for further groundtruth box resizing before image resize
    hght, wdth, *_ = image.shape
    ###resize the image if onnx expects fix size to which it was converted to make the detections faster
    #image = cv2.resize(image, (RESIZE_TO, RESIZE_TO))
    ###take a copy of image to draw boxes and classes name with score
    orig_image = image.copy()
    ###BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    ###bring color channels to front followed by width and height
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)   ###np.float isn't supported in numpy==1.22 so np.float64 or float32 which is equivalant
    ###convert to tensor
    image = torch.tensor(image, dtype=torch.float)#.cuda()   ###numpy doesn't support cuda at line 84 while converting image to numpy
    ###make the pixel range between 0 and 1
    image /= 255.0
    ###add batch dimension
    image = torch.unsqueeze(image, 0)

    ###get the index of the matching image name from coco ids list to get other info like class, box for current image(which acts as groundtruth)
    cur_id_index = imgname_l.index(image_name)
    cur_ann_cls = ann_cls_l[cur_id_index]
    cur_box = bbox_l[cur_id_index]
    ###to convert width and height of box into xmax/x2, ymax/y2 and not to resize based on model trained image size i.e., 416 because model will give outputs matching with original size images
    cur_box[2] = cur_box[0] + cur_box[2]
    cur_box[3] = cur_box[1] + cur_box[3]
    '''cur_box[0] = (cur_box[0]/wdth)*416
    cur_box[2] = (cur_box[0] + cur_box[2]/wdth)*416
    cur_box[1] = (cur_box[1]/hght)*416
    cur_box[3] = (cur_box[1] + cur_box[3]/hght)*416'''

    ###Run the ONNX model on input image
    inputs = {"input": image.numpy()}
    outputs = ort_session.run(None, inputs)
    
    ###draw boxes, classes with score on images if above threshold and if boxes available
    if len(outputs[0]) != 0:
        boxes = outputs[0]#.data.numpy()
        scores = outputs[2]#.data.numpy()
        ###getting the max score above threshold from multiple boxes as only 1 box is required, works for custom backbone where single prediction can't be defined unlike pretrained backbone(given as parameter)
        max_score = scores.max(where=(scores >= detection_threshold),initial=0)
        max_score_index = np.where(scores==max_score)
        ###filter out boxes according to `detection_threshold`, Note: similar to NMS but in FRCNN, NMS is internal
        #boxes = boxes[scores >= detection_threshold].astype(np.int32)   ###works for frcnn with pretrained backbones having only 1 detection(given as parameter)
        boxes = boxes[max_score_index].astype(np.int32) if(max_score >= detection_threshold) else []   ###if no single box matching threshold then empty
        draw_boxes = boxes.copy()
        ###get all the predicited class names
        pred_classes = [CLASSES[i] for i in outputs[1]]
        
        ###draw the bounding boxes and write the class name with score on top of it
        for j, box in enumerate(draw_boxes):
            
            ###get the counts of TP, FP for each class in dict
            if(CLASSES[cur_ann_cls] == pred_classes[j]):   ###TP case(score is already > threshold)
                metrics_dict[pred_classes[j]+'_TP'] = metrics_dict.get(pred_classes[j]+'_TP')+1
            else:   ###FP case(score is already > threshold)
                metrics_dict[pred_classes[j]+'_FP'] = metrics_dict.get(pred_classes[j]+'_FP')+1
            ###get the avg IoU for each class in dict
            cur_IoU = IoU_Calc(cur_box, box)
            metrics_dict[pred_classes[j]+'_IoU'] = round((metrics_dict.get(pred_classes[j]+'_IoU') + cur_IoU),2)

            ###draw groundtruth on images
            cv2.rectangle(orig_image,
                        (int(cur_box[0]), int(cur_box[1])),
                        (int(cur_box[2]), int(cur_box[3])),
                        (0, 0, 255), 2)
            cv2.putText(orig_image, CLASSES[cur_ann_cls], 
                        (int(cur_box[0]), int(cur_box[3]+10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 
                        2, lineType=cv2.LINE_AA)
            
            ###draw model predictions on images
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 255, 0), 2)
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
            cv2.putText(orig_image, str(round(scores[j],2)), 
                        (int(box[0]), int(box[1]-20)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        
        #cv2.imshow('Prediction', orig_image)
        #cv2.waitKey(1)

        ###get the counts of FN, when model doesn't detects a groundtruth due to < threshold
        metrics_dict[CLASSES[cur_ann_cls]+'_FN'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')+1 if(len(draw_boxes) == 0) else metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')      
        
        ###save the predicted images
        path = OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\Test_Predictions_ONNX_FRCNN_CustomBackbone\\" + image_name
        cv2.imwrite(path, orig_image)
        
    ###get the counts of FN, when model doesn't detects a groundtruth at all
    metrics_dict[CLASSES[cur_ann_cls]+'_FN'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')+1 if(len(outputs[0]) == 0) else metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')
    ###get the counts of each groundtruth class in dict
    metrics_dict[CLASSES[cur_ann_cls]+'_Count'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_Count')+1  

    print(f"Image {i+1} Done with Time : ", str(time.time() - each_img_time))
    print('-'*50)

'''
###1 class's FP is another class's FN in binary classification if background class is neglected, so swap the FPs for FNs only when model predicts all above threshold
metrics_dict[str(CLASSES[1])+"_FN"] = metrics_dict.get(CLASSES[2]+'_FP')
metrics_dict[str(CLASSES[2])+"_FN"] = metrics_dict.get(CLASSES[1]+'_FP')
'''
###to calculate the avg IoU
metrics_dict = IoU_Precision_Recall_F1(metrics_dict, CLASSES)
print("\nDifferent Metrics = ", metrics_dict)

###save the metrics in csv using pandas
df = pd.DataFrame([metrics_dict]) 
df.to_csv (OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\Test_Predictions_ONNX_FRCNN_CustomBackbone\\" + out_csv, index=False, header=True)

#cv2.destroyAllWindows()
print('\nTEST PREDICTIONS COMPLETE WITH TIME : ',str(time.time() - all_time))

ONNX Model Testing - Video/Webcam

In [None]:
###Load the ONNX model
onnx_model = onnx.load(OUT_DIR + "\\ONNX\\"+MODEL_NAME+"_ckpt48_onnx.onnx")
###Create an ONNX runtime session with both CUDA and CPU to run the onnx using cuda if available
ort_session = onnxruntime.InferenceSession(OUT_DIR + "\\ONNX\\"+MODEL_NAME+"_ckpt48_onnx_varsize.onnx", 
                                           providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])

###classes: 0 index is reserved for background
CLASSES = ['background', 'awake', 'drowsy']
###define the detection threshold, any detection having score below this will be discarded
detection_threshold = 0.75

###initialize the video stream
vid_strm = cv2.VideoCapture("https://192.168.0.104:8080/video")   ###from android app webcam else give src=0 for local cam
###save the video stream used for predictions
vid_strm_save = cv2.VideoWriter(OUT_DIR + "\\Video_Output_ONNX\\" + MODEL_NAME + "ckpt48_onnx_webcam.mp4",cv2.VideoWriter_fourcc(*'MP4V'),
                         10,(1920,1080))

###initialize the FPS counter
fps = FPS().start()

#while True:
while(vid_strm.isOpened()):
    ###read/get the frame/image from video stream/webcam
    val, image = vid_strm.read()
    #image = cv2.resize(image, (RESIZE_TO, RESIZE_TO))   ###if resized, model will run faster and fps increases
    ###if no frame then exit the process
    if image is None:
        break
    ###take a copy of image to draw boxes and classes name with score
    orig_image = image.copy()
    ###BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    ###bring color channels to front followed by width and height
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)   ###np.float isn't supported in numpy==1.22 so np.float64 or float32 which is equivalant
    ###convert to tensor
    image = torch.tensor(image, dtype=torch.float)#.cuda()   ###numpy doesn't support cuda at line 84 while converting image to numpy
    ###make the pixel range between 0 and 1
    image /= 255.0
    ###add batch dimension
    image = torch.unsqueeze(image, 0)

    ###Run the ONNX model on input image
    inputs = {"input": image.numpy()}
    outputs = ort_session.run(None, inputs)
    
    ###draw boxes, classes with score on images if above threshold and if boxes available
    if len(outputs[0]) != 0:
        boxes = outputs[0]#.data.numpy()
        scores = outputs[2]#.data.numpy()
        ###getting the max score above threshold from multiple boxes as only 1 box is required, works for custom backbone where single prediction can't be defined unlike pretrained backbone(given as parameter)
        max_score = scores.max(where=(scores >= detection_threshold),initial=0)
        max_score_index = np.where(scores==max_score)
        ###filter out boxes according to `detection_threshold`, Note: similar to NMS but in FRCNN, NMS is internal
        #boxes = boxes[scores >= detection_threshold].astype(np.int32)   ###works for frcnn with pretrained backbones having only 1 detection(given as parameter)
        boxes = boxes[max_score_index].astype(np.int32) if(max_score >= detection_threshold) else []   ###if no single box matching threshold then empty
        draw_boxes = boxes.copy()
        ###get all the predicited class names
        pred_classes = [CLASSES[i] for i in outputs[1]]
        
        ###draw the bounding boxes and write the class name with score on top of it
        for j, box in enumerate(draw_boxes):
            
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 255, 0), 2)
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
            cv2.putText(orig_image, str(round(scores[j],2)), 
                        (int(box[0]), int(box[1]-20)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        
    ###show the output frame
    cv2.imshow("Outout_Frame", orig_image)
    vid_strm_save.write(orig_image)
    
    ###if the 'q' key was pressed, break from the loop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    ###update the FPS counter
    fps.update()

###stop the timer and display FPS information
fps.stop()

print("Elapsed Time: {:.2f}".format(fps.elapsed()))
print("Approx. FPS: {:.2f}".format(fps.fps()))

###cleanup of all objects created
vid_strm.release()
vid_strm_save.release()
cv2.destroyAllWindows()

OpenVINO Conversion

In [None]:
###conversion of onnx to openvino=2023.1(latest)

###onnx saved path
onnx_path = OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\ONNX_FRCNN_CustomBackbone\\" + MODEL_NAME + "_ckpt48_onnx_varsize.onnx"
###path to save openvino model(consists of .xml & .bin)
ir_path = OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\OpenVINO_FRCNN_CustomBackbone\\" + MODEL_NAME + "_ckpt48_openvino_varsize.xml"

###conversion and saving wiht compression to fp16(to improve performance but accuracy will be affected a bit) instead of fp32(onnx model is of fp32)
ov_model = ov.convert_model(onnx_path)   ###if not to convert to fp16 and keep default then add , compress_to_fp16=False given in "https://docs.openvino.ai/2023.1/openvino_docs_MO_DG_FP16_Compression.html"
#ov_model = mo.convert_model(onnx_path, input_shape=[1,3,416,416], compress_to_fp16=True)   ###only mo supports input_shape and other options
#ov_model = mo.convert_model(help=True)   ###to get different options available
ov.save_model(ov_model, ir_path)

OpenVINO Model Testing - Test Images

In [None]:
all_time = time.time()
###intialize the OpenVINO Core package
core = ov.Core()
###read the OpenVINO model(.xml)
model = core.read_model(model=OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\OpenVINO_FRCNN_CustomBackbone\\" + MODEL_NAME + "_ckpt48_openvino_varsize.xml")
###compile the OpenVINO model
compiled_model = core.compile_model(model=model, device_name="CPU")   ###check for available devices in system using "from openvino.runtime import Core; print(Core().available_devices)" and GPU.1 means NVIDIA and GPU.0 means Intel
                                                                      ###Note: if GPU/GPU.0 used then throws unknown error as "[GPU] get_tensor() is called for dynamic shape without upper bound" and GPU.1 is NVIDIA so not supported by Intel's OpenVINO
#input_layer = compiled_model.input(0) #(0..2)   ###check the input layer
#output_layer = compiled_model.output(0) #(0..2)   ###get the outputs from compiled openvino model

###csv name to store metrics
out_csv = MODEL_NAME + "_OpenVINO_MetricsCSV_ckpt48.csv"
###directory where all the images and its annotation json are present
DIR_TEST = "G:\\Projects\\ADAS\\DDW\\Data\\Usable_Data\\test"
test_images = glob.glob(f"{DIR_TEST}/*.jpg")
print(f"Test instances: {len(test_images)}")

###read the coco json to get all the ids, classes/annotations, boxes to make as groundtruth for metrics calculation : TP, FP, FN, Precision, Recall, F1 etc.
coco = COCO(DIR_TEST + "\\test_3_annotations.coco.json")

ids_l = list(sorted(coco.imgs.keys()))
imgname_l = [coco.loadImgs([i])[0]['file_name'] for i in ids_l]   ###[0] used because for 1 id only 1 image and 1 class annotation, if multiole then [0]...[n]
ann_ids_l = [coco.getAnnIds(imgIds=ids_l[i])[0] for i in ids_l]
ann_cls_l = [coco.loadAnns(i)[0]['category_id'] for i in ann_ids_l]
bbox_l = [coco.loadAnns(i)[0]['bbox'] for i in ann_ids_l]

###classes: 0 index is reserved for background
CLASSES = ['background', 'awake', 'drowsy']
###define the detection threshold, any detection having score below this will be discarded
detection_threshold = 0.75
###dictionary to store count, TP, FP, FN, IoU, Precision, Recall, F1 of each class and avg of all
metrics_dict = {}
for cls in CLASSES:
    if cls != "background":
        metrics_dict[cls+"_Count"] = 0
        metrics_dict[cls+"_TP"] = 0
        metrics_dict[cls+"_FP"] = 0
        metrics_dict[cls+"_FN"] = 0
        metrics_dict[cls+"_IoU"] = 0
        metrics_dict[cls+"_Precision"] = 0
        metrics_dict[cls+"_Recall"] = 0
        metrics_dict[cls+"_F1"] = 0
metrics_dict["Avg_IoU"] = 0
metrics_dict["Avg_Precision"] = 0
metrics_dict["Avg_Recall"] = 0
metrics_dict["Avg_F1"] = 0

###loop through all test images
for i in range(len(test_images)):
    each_img_time = time.time()
    ###get the image file name for saving output later on
    image_name = test_images[i].split('\\')[-1]
    print("Image Name = ", image_name)
    ###read the image using cv2
    image = cv2.imread(test_images[i])
    ###get image size for further groundtruth box resizing before image resize
    hght, wdth, *_ = image.shape
    ###resize the image if openvino expects fix size to which it was converted to make the detections faster
    #image = cv2.resize(image, (RESIZE_TO, RESIZE_TO))
    ###take a copy of image to draw boxes and classes name with score
    orig_image = image.copy()
    ###BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    ###bring color channels to front followed by width and height
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)   ###np.float isn't supported in numpy==1.22 so np.float64 or float32 which is equivalant
    ###convert to tensor
    image = torch.tensor(image, dtype=torch.float)#.cuda()   ###numpy doesn't support cuda at line 84 while converting image to numpy
    ###make the pixel range between 0 and 1
    image /= 255.0
    ###add batch dimension
    image = torch.unsqueeze(image, 0)

    ###get the index of the matching image name from coco ids list to get other info like class, box for current image(which acts as groundtruth)
    cur_id_index = imgname_l.index(image_name)
    cur_ann_cls = ann_cls_l[cur_id_index]
    cur_box = bbox_l[cur_id_index]
    ###to convert width and height of box into xmax/x2, ymax/y2 and not to resize based on model trained image size i.e., 416 because model will give outputs matching with original size images
    cur_box[2] = cur_box[0] + cur_box[2]
    cur_box[3] = cur_box[1] + cur_box[3]
    '''cur_box[0] = (cur_box[0]/wdth)*416
    cur_box[2] = (cur_box[0] + cur_box[2]/wdth)*416
    cur_box[1] = (cur_box[1]/hght)*416
    cur_box[3] = (cur_box[1] + cur_box[3]/hght)*416'''

    ###append the results of compiled OpenVINO model into outputs list
    outputs = []
    for out_ov in range(3):
        outputs.append(compiled_model(image)[compiled_model.output(out_ov)])
    #results = compiled_model(image)[output_layer]
    
    ###draw boxes, classes with score on images if above threshold and if boxes available
    if len(outputs[0]) != 0:
        boxes = outputs[0]#.data.numpy()
        scores = outputs[2]#.data.numpy()
        ###getting the max score above threshold from multiple boxes as only 1 box is required, works for custom backbone where single prediction can't be defined unlike pretrained backbone(given as parameter)
        max_score = scores.max(where=(scores >= detection_threshold),initial=0)
        max_score_index = np.where(scores==max_score)
        ###filter out boxes according to `detection_threshold`, Note: similar to NMS but in FRCNN, NMS is internal
        #boxes = boxes[scores >= detection_threshold].astype(np.int32)   ###works for frcnn with pretrained backbones having only 1 detection(given as parameter)
        boxes = boxes[max_score_index].astype(np.int32) if(max_score >= detection_threshold) else []   ###if no single box matching threshold then empty
        draw_boxes = boxes.copy()
        ###get all the predicited class names
        pred_classes = [CLASSES[i] for i in outputs[1]]
        
        ###draw the bounding boxes and write the class name with score on top of it
        for j, box in enumerate(draw_boxes):
            
            ###get the counts of TP, FP for each class in dict
            if(CLASSES[cur_ann_cls] == pred_classes[j]):   ###TP case(score is already > threshold)
                metrics_dict[pred_classes[j]+'_TP'] = metrics_dict.get(pred_classes[j]+'_TP')+1
            else:   ###FP case(score is already > threshold)
                metrics_dict[pred_classes[j]+'_FP'] = metrics_dict.get(pred_classes[j]+'_FP')+1
            ###get the avg IoU for each class in dict
            cur_IoU = IoU_Calc(cur_box, box)
            metrics_dict[pred_classes[j]+'_IoU'] = round((metrics_dict.get(pred_classes[j]+'_IoU') + cur_IoU),2)

            ###draw groundtruth on images
            cv2.rectangle(orig_image,
                        (int(cur_box[0]), int(cur_box[1])),
                        (int(cur_box[2]), int(cur_box[3])),
                        (0, 0, 255), 2)
            cv2.putText(orig_image, CLASSES[cur_ann_cls], 
                        (int(cur_box[0]), int(cur_box[3]+10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 
                        2, lineType=cv2.LINE_AA)
            
            ###draw model predictions on images
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 255, 0), 2)
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
            cv2.putText(orig_image, str(round(scores[j],2)), 
                        (int(box[0]), int(box[1]-20)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        
        #cv2.imshow('Prediction', orig_image)
        #cv2.waitKey(1)

        ###get the counts of FN, when model doesn't detects a groundtruth due to < threshold
        metrics_dict[CLASSES[cur_ann_cls]+'_FN'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')+1 if(len(draw_boxes) == 0) else metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')      
        
        ###save the predicted images
        path = OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\Test_Predictions_OpenVINO_FRCNN_CustomBackbone\\" + image_name
        cv2.imwrite(path, orig_image)
        
    ###get the counts of FN, when model doesn't detects a groundtruth at all
    metrics_dict[CLASSES[cur_ann_cls]+'_FN'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')+1 if(len(outputs[0]) == 0) else metrics_dict.get(CLASSES[cur_ann_cls]+'_FN')
    ###get the counts of each groundtruth class in dict
    metrics_dict[CLASSES[cur_ann_cls]+'_Count'] = metrics_dict.get(CLASSES[cur_ann_cls]+'_Count')+1  

    print(f"Image {i+1} Done with Time : ", str(time.time() - each_img_time))
    print('-'*50)

'''
###1 class's FP is another class's FN in binary classification if background class is neglected, so swap the FPs for FNs only when model predicts all above threshold
metrics_dict[str(CLASSES[1])+"_FN"] = metrics_dict.get(CLASSES[2]+'_FP')
metrics_dict[str(CLASSES[2])+"_FN"] = metrics_dict.get(CLASSES[1]+'_FP')
'''
###to calculate the avg IoU
metrics_dict = IoU_Precision_Recall_F1(metrics_dict, CLASSES)
print("\nDifferent Metrics = ", metrics_dict)

###save the metrics in csv using pandas
df = pd.DataFrame([metrics_dict]) 
df.to_csv (OUT_DIR + "\\FRCNN_CustomBackbone_Outputs\\Test_Predictions_OpenVINO_FRCNN_CustomBackbone\\" + out_csv, index=False, header=True)

#cv2.destroyAllWindows()
print('\nTEST PREDICTIONS COMPLETE WITH TIME : ',str(time.time() - all_time))

OpenVINO Model Testing - Video/Webcam

In [None]:
###intialize the OpenVINO Core package
core = ov.Core()
###read the OpenVINO model(.xml)
model = core.read_model(model=OUT_DIR + "\\OpenVINO\\" + MODEL_NAME + "_ckpt48_openvino_varsize.xml")
###compile the OpenVINO model
compiled_model = core.compile_model(model=model, device_name="CPU")   ###check for available devices in system using "from openvino.runtime import Core; print(Core().available_devices)" and GPU.1 means NVIDIA and GPU.0 means Intel
                                                                      ###Note: if GPU or GPU.0/GPU used then throws unknown error and GPU.1 is NVIDIA so not supported by Intel's OpenVINO
#input_layer = compiled_model.input(0) #(0..2)   ###check the input layer
#output_layer = compiled_model.output(0) #(0..2)   ###get the outputs from compiled openvino model

###classes: 0 index is reserved for background
CLASSES = ['background', 'awake', 'drowsy']
###define the detection threshold, any detection having score below this will be discarded
detection_threshold = 0.75

###initialize the video stream
vid_strm = cv2.VideoCapture("https://192.168.0.104:8080/video")   ###from android app webcam else give src=0 for local cam
###save the video stream used for predictions
vid_strm_save = cv2.VideoWriter(OUT_DIR + "\\Video_Output_OpenVINO\\" + MODEL_NAME + "ckpt48_openvino_webcam.mp4",cv2.VideoWriter_fourcc(*'MP4V'),
                         10,(1920,1080))

###initialize the FPS counter
fps = FPS().start()

#while True:
while(vid_strm.isOpened()):
    ###read/get the frame/image from video stream/webcam
    val, image = vid_strm.read()
    #image = cv2.resize(image, (RESIZE_TO, RESIZE_TO))   ###if resized, model will run faster and fps increases
    ###if no frame then exit the process
    if image is None:
        break
    ###take a copy of image to draw boxes and classes name with score
    orig_image = image.copy()
    ###BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    ###bring color channels to front followed by width and height
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)   ###np.float isn't supported in numpy==1.22 so np.float64 or float32 which is equivalant
    ###convert to tensor
    image = torch.tensor(image, dtype=torch.float)#.cuda()   ###numpy doesn't support cuda at line 84 while converting image to numpy
    ###make the pixel range between 0 and 1
    image /= 255.0
    ###add batch dimension
    image = torch.unsqueeze(image, 0)

    ###append the results of compiled OpenVINO model into outputs list
    outputs = []
    for out_ov in range(3):
        outputs.append(compiled_model(image)[compiled_model.output(out_ov)])
    #results = compiled_model(image)[output_layer]
    
    ###draw boxes, classes with score on images if above threshold and if boxes available
    if len(outputs[0]) != 0:
        boxes = outputs[0]#.data.numpy()
        scores = outputs[2]#.data.numpy()
        ###getting the max score above threshold from multiple boxes as only 1 box is required, works for custom backbone where single prediction can't be defined unlike pretrained backbone(given as parameter)
        max_score = scores.max(where=(scores >= detection_threshold),initial=0)
        max_score_index = np.where(scores==max_score)
        ###filter out boxes according to `detection_threshold`, Note: similar to NMS but in FRCNN, NMS is internal
        #boxes = boxes[scores >= detection_threshold].astype(np.int32)   ###works for frcnn with pretrained backbones having only 1 detection(given as parameter)
        boxes = boxes[max_score_index].astype(np.int32) if(max_score >= detection_threshold) else []   ###if no single box matching threshold then empty
        draw_boxes = boxes.copy()
        ###get all the predicited class names
        pred_classes = [CLASSES[i] for i in outputs[1]]
        
        ###draw the bounding boxes and write the class name with score on top of it
        for j, box in enumerate(draw_boxes):
            
            cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        (0, 255, 0), 2)
            cv2.putText(orig_image, pred_classes[j], 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
            cv2.putText(orig_image, str(round(scores[j],2)), 
                        (int(box[0]), int(box[1]-20)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 
                        2, lineType=cv2.LINE_AA)
        
    ###show the output frame
    cv2.imshow("Outout_Frame", orig_image)
    vid_strm_save.write(orig_image)
    
    ###if the 'q' key was pressed, break from the loop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    ###update the FPS counter
    fps.update()

###stop the timer and display FPS information
fps.stop()

print("Elapsed Time: {:.2f}".format(fps.elapsed()))
print("Approx. FPS: {:.2f}".format(fps.fps()))

###cleanup of all objects created
vid_strm.release()
vid_strm_save.release()
cv2.destroyAllWindows()