## Setup

In [30]:
!pip install torch
!pip install torchvision
!pip install numpy
!pip install pandas
!pip install thop
!pip install tqdm
!pip install fiftyone
!pip install opencv-python
!pip install matplotlib
!pip install pillow



In [31]:
import os
import sys
import time
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.init as init
import torchvision
import fiftyone as fo
import fiftyone.zoo as foz
import fiftyone.utils.coco as fouc
from tqdm import tqdm
from itertools import product

## Data

In [5]:
class FiftyOneTorchDataset(torch.utils.data.Dataset):
    """A class to construct a PyTorch dataset from a FiftyOne dataset.

    Args:
        fiftyone_dataset: a FiftyOne dataset or view that will be used for training or testing
        transforms (None): a list of PyTorch transforms to apply to images and targets when loading
        gt_field ("ground_truth"): the name of the field in fiftyone_dataset that contains the
            desired labels to load
        classes (None): a list of class strings that are used to define the mapping between
            class names and indices. If None, it will use all classes present in the given fiftyone_dataset.
    """

    def __init__(
        self,
        fiftyone_dataset,
        transforms=None,
        gt_field="ground_truth",
        classes=None,
    ):
        self.samples = fiftyone_dataset
        self.transforms = transforms
        self.gt_field = gt_field

        self.img_paths = self.samples.values("filepath")

        self.classes = classes
        if not self.classes:
            # Get list of distinct labels that exist in the view
            self.classes = self.samples.distinct(
                "%s.detections.label" % gt_field
            )

        if self.classes[0] != "background":
            self.classes = ["background"] + self.classes

        self.labels_map_rev = {c: i for i, c in enumerate(self.classes)}

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        sample = self.samples[img_path]
        metadata = sample.metadata
        img = Image.open(img_path).convert("RGB")
        width, height = img.size

        boxes = []
        labels = []
        area = []
        iscrowd = []
        if sample[self.gt_field] is not None:
            detections = sample[self.gt_field].detections
            for det in detections:
                category_id = self.labels_map_rev[det.label]
                coco_obj = fouc.COCOObject.from_label(
                    det, metadata, category_id=category_id,
                )
                x, y, w, h = coco_obj.bbox
                boxes.append([(x + w / 2) / width, (y + h / 2) / height, w / width, h / height]) # normalized (xc, yc, w, h)
                labels.append(coco_obj.category_id)
                area.append(coco_obj.area)
                iscrowd.append(coco_obj.iscrowd)

        target = {}
        target["boxes"] = torch.as_tensor(boxes, dtype=torch.float32)
        target["labels"] = torch.as_tensor(labels, dtype=torch.int64)
        target["image_id"] = torch.as_tensor([idx])
        target["area"] = torch.as_tensor(area, dtype=torch.float32)
        target["iscrowd"] = torch.as_tensor(iscrowd, dtype=torch.int64)

        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.img_paths)

    def get_classes(self):
        return self.classes

In [6]:
class Transformtsz:
    def __init__(self, resize):
        self.resize = resize
    def __call__(self, image, boxes):
        image = torchvision.transforms.functional.resize(image, self.resize)
        image = torchvision.transforms.functional.to_tensor(image)
        return image, boxes

In [7]:
def collate(batch, grid_size=7, n_classes=80):
    images = []
    gt = []
    for item in batch:
        images.append(item[0].unsqueeze(0))

        fmap = torch.zeros(1, grid_size, grid_size, 5*2+n_classes)
        bboxes = item[1]["boxes"]
        labels = item[1]["labels"]

        used_col_row = {(r, c): 0 for r,c in list(product(range(7), repeat=2))}
        for bbox, label in zip(bboxes, labels):
            col = int(bbox[1] * grid_size)
            row = int(bbox[0] * grid_size)
            cell_size = 1 / grid_size
            row_interval = (cell_size*row, cell_size*(row+1))
            col_interval = (cell_size*col, cell_size*(col+1))

            # if more than 2 bboxes in one cell then skip
            if used_col_row[(row, col)] == 2:
                continue

            used_col_row[(row, col)] += 1

            if used_col_row[(row, col)] == 1:
                # bbox center coords relative to grid cell
                fmap[0, row, col, 0]  = (bbox[0] - row_interval[0]) / (row_interval[1] - row_interval[0])
                fmap[0, row, col, 1]  = (bbox[1] - col_interval[0]) / (col_interval[1] - col_interval[0])
                fmap[0, row, col, 2:4]  = bbox[2:] # bbox w and h relative to image size
                fmap[0, row, col, 4] = 1 # confindece
            elif used_col_row[(row, col)] == 2:
                # bbox center coords relative to grid cell
                fmap[0, row, col, 5]  = (bbox[0] - row_interval[0]) / (row_interval[1] - row_interval[0])
                fmap[0, row, col, 6]  = (bbox[1] - col_interval[0]) / (col_interval[1] - col_interval[0])
                fmap[0, row, col, 7:9]  = bbox[2:] # bbox w and h relative to image size
                fmap[0, row, col, 9] = 1 # confindece
            # set classes probabilities
            fmap[0, row, col, label - 1 + 10] = 1
        gt.append(fmap)
    
    images = torch.cat(images, 0)
    detections = torch.cat(gt, 0)
    return (images, detections)

In [6]:
###### delete
def convert_label_matrix_to_bboxes(label_matrix, S=7, C=5, B=2, img_size=448):
    height, width, _ = label_matrix.shape

    bboxes = []
    predicted_classes = []
    confidence = []

    for i in range(height):
        for j in range(width):
            cell_label = label_matrix[i, j]

            if cell_label[C] != 0:
                cell_x, cell_y, width_cell, height_cell = cell_label[C+1:C+5]

                xmin = (j + cell_x - width_cell / 2).item() / S * img_size
                ymin = (i + cell_y - height_cell / 2).item() / S * img_size
                xmax = (j + cell_x + width_cell / 2).item() / S * img_size
                ymax = (i + cell_y + height_cell / 2).item() / S * img_size

                bboxes.append([xmin, ymin, xmax, ymax])
                predicted_classes.append(torch.argmax(cell_label[0:C]).item())
                confidence.append(cell_label[C])

    return np.array(bboxes), np.array(predicted_classes), np.array(confidence)

In [39]:
# cv_dir = os.getcwd()
# data_dir = os.path.join(cv_dir, "data")
data_dir = '/kaggle/working/data'
fo.config.dataset_zoo_dir = data_dir

In [40]:
fo.config.dataset_zoo_dir

'/kaggle/working/data'

In [10]:
def load_coco(max_samples, data_dir):
    dataset = foz.load_zoo_dataset(
    "coco-2017",
    splits = ["train", "validation", "test"],
    label_types = ["detections"],
    # classes = classes
    max_samples = max_samples,
    dataset_dir=data_dir)
    
    dataset.compute_metadata()
    return dataset

In [11]:
def ttsplit(dataset):
    train_data = dataset.match_tags("train")
    test_data = dataset.match_tags("test")
    val_data = dataset.match_tags("validation")
    return train_data, test_data, val_data

In [12]:
def get_torch(dataset):
    classes = dataset.distinct(
    "ground_truth.detections.label"
    )
    torch_dataset = FiftyOneTorchDataset(dataset, transforms=Transformtsz(resize=(448, 448)), classes=classes)
    return dataset

In [13]:
def get_loader(torch_dataset):
    data_loader = torch.utils.data.DataLoader(torch_dataset, batch_size=1, shuffle=False)
    return data_loader

## Model

In [14]:
class YoloBackbone(nn.Module):
    def __init__(self):
        super(YoloBackbone, self).__init__()
        conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            # nn.BatchNorm2d(64),
            nn.LeakyReLU(0.1, inplace=True)
        )
        pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        conv2 = nn.Sequential(
            nn.Conv2d(64, 192, kernel_size=3, padding=1),
            # nn.BatchNorm2d(192),
            nn.LeakyReLU(0.1, inplace=True)
        )
        pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        conv3 = nn.Sequential(
            nn.Conv2d(192, 128, kernel_size=1),
            # nn.BatchNorm2d(128),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            # nn.BatchNorm2d(256),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(256, 256, kernel_size=1),
            # nn.BatchNorm2d(256),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            # nn.BatchNorm2d(512),
            nn.LeakyReLU(0.1, inplace=True)
        )
        pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        conv4_part = nn.Sequential(
            nn.Conv2d(512, 256, kernel_size=1),
            # nn.BatchNorm2d(256),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            # nn.BatchNorm2d(512),
            nn.LeakyReLU(0.1, inplace=True)
        )
        conv4_modules = []
        for _ in range(4):
            conv4_modules.append(conv4_part)
        conv4 = nn.Sequential(
            *conv4_modules,
            nn.Conv2d(512, 512, kernel_size=1),
            # nn.BatchNorm2d(512),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(512, 1024, kernel_size=3, padding=1),
            # nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1, inplace=True)
        )
        pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        conv5 = nn.Sequential(
            nn.Conv2d(1024, 512, kernel_size=1),
            # nn.BatchNorm2d(512),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(512, 1024, kernel_size=3, padding=1),
            # nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(1024, 512, kernel_size=1),
            # nn.BatchNorm2d(512),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(512, 1024, kernel_size=3, padding=1),
            # nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1, inplace=True)
        )
        self.net = nn.Sequential(conv1, pool1, conv2, pool2, conv3, pool3, conv4, pool4, conv5)

    def forward(self, X):
        return self.net(X)

In [15]:
class Yolo(nn.Module):
    def __init__(self, backbone: YoloBackbone = YoloBackbone(), backbone_out_channels=1024, n_classes=80):
        self.n_classes = n_classes
        super(Yolo, self).__init__()
        self.backbone = backbone
        self.head = nn.Sequential(
            nn.Conv2d(backbone_out_channels, 1024, kernel_size=3, padding=1),
            # nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(1024, 1024, kernel_size=3, padding=1, stride=2),
            # nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(1024, 1024, kernel_size=3, padding=1),
            # nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Conv2d(1024, 1024, kernel_size=3, padding=1),
            # nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Flatten(),
            nn.Linear(7*7*1024, 4096),
            nn.Dropout(0.5),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Linear(4096, 7 * 7 *(2 * 5 + self.n_classes)),
            nn.Sigmoid(),
            nn.Unflatten(1, (7, 7, (2 * 5 + self.n_classes)))
        )
        self.net = nn.Sequential(self.backbone, self.head)

    def forward(self, X):
        return self.net(X)

## Utilis

#### IoU

In [16]:
def is_intersect(self, boxA, boxB):
    if boxA[0] > boxB[2]:
        return False  
    if boxA[1] > boxB[3]:
        return False  
    if boxA[2] < boxB[0]:
        return False 
    if boxA[3] < boxB[1]:
        return False  
    return True


def get_intersection(self, boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    return (xB - xA + 1) * (yB - yA + 1)


def get_union(self, boxA, boxB):
    area_A = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    area_B = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    return area_A + area_B


def transform_x1y1wh_to_x1y1x2y2(self, box):
    x1 = round(box[0], 2)
    y1 = round(box[1], 2)
    x2 = round(box[0] + box[2], 2)
    y2 = round(box[1] + box[3], 2)
    return [x1, y1, x2, y2]


def get_IoU(self, boxA, boxB):
    # x1y1wh -> x1y1x2y2
    boxA = self.transform_x1y1wh_to_x1y1x2y2(boxA)
    boxB = self.transform_x1y1wh_to_x1y1x2y2(boxB)
    if self.is_intersect(boxA, boxB) is False:
        return 0
    inter = self.get_intersection(boxA, boxB)
    union = self.get_union(boxA, boxB)
    iou = inter / (union - inter)
    return iou

#### Loss

In [17]:
def yolo_loss(yhat, y, lambda_coord=5, lambda_noobj=0.5, n_classes=80):
    """
    Args:
        yhat: [#, 7, 7, 30]
        y: [#, 7, 7, 30]
    Returns:
        loss: [#]
    """
    with torch.no_grad():
        # arrange cell xidx, yidx
        # [7, 7]
        cell_xidx = (torch.arange(49) % 7).reshape(7, 7)
        cell_yidx = (torch.div(torch.arange(49), 7, rounding_mode='floor')).reshape(7, 7)
        # transform to [7, 7, 2]
        cell_xidx.unsqueeze_(-1)
        cell_yidx.unsqueeze_(-1)
        cell_xidx.expand(7, 7, 2)
        cell_yidx.expand(7, 7, 2)
        # move to device
        cell_xidx = cell_xidx.to(yhat.device)
        cell_yidx = cell_yidx.to(yhat.device)

    def calc_coord(val):
        with torch.no_grad():
            # transform cell relative coordinates to image relative coordinates
            x = (val[..., 0] + cell_xidx) / 7.0
            y = (val[..., 1] + cell_yidx) / 7.0

            return (x - val[..., 2] / 2.0,
                x + val[..., 2] / 2.0,
                y - val[..., 3] / 2.0,
                y + val[..., 3] / 2.0)

    y_area = y[..., :10].reshape(-1, 7, 7, 2, 5)
    yhat_area = yhat[..., :10].reshape(-1, 7, 7, 2, 5)

    y_class = y[..., 10:].reshape(-1, 7, 7, n_classes)
    yhat_class = yhat[..., 10:].reshape(-1, 7, 7, n_classes)

    with torch.no_grad():
        # calculate IoU
        x_min, x_max, y_min, y_max = calc_coord(y_area)
        x_min_hat, x_max_hat, y_min_hat, y_max_hat = calc_coord(yhat_area)

        wi = torch.min(x_max, x_max_hat) - torch.max(x_min, x_min_hat)
        wi = torch.max(wi, torch.zeros_like(wi))
        hi = torch.min(y_max, y_max_hat) - torch.max(y_min, y_min_hat)
        hi = torch.max(hi, torch.zeros_like(hi))

        intersection = wi * hi
        union = (x_max - x_min) * (y_max - y_min) + (x_max_hat - x_min_hat) * (y_max_hat - y_min_hat) - intersection
        iou = intersection / (union + 1e-6) # add epsilon to avoid nan

        _, res = iou.max(dim=3, keepdim=True)

    # [#, 7, 7, 5]
    # responsible bounding box (having higher IoU)
    yhat_res = torch.take_along_dim(yhat_area, res.unsqueeze(3), 3).squeeze_(3)
    y_res = y_area[..., 0, :5]

    with torch.no_grad():
        # calculate indicator matrix
        have_obj = y_res[..., 4] > 0
        no_obj = ~have_obj

    return ((lambda_coord * ( # coordinate loss
          (y_res[..., 0] - yhat_res[..., 0]) ** 2 # X
        + (y_res[..., 1] - yhat_res[..., 1]) ** 2 # Y
        + (torch.sqrt(y_res[..., 2]) - torch.sqrt(yhat_res[..., 2])) ** 2  # W
        + (torch.sqrt(y_res[..., 3]) - torch.sqrt(yhat_res[..., 3])) ** 2) # H
        # confidence
        + (y_res[..., 4] - yhat_res[..., 4]) ** 2
        # class
        + ((y_class - yhat_class) ** 2).sum(dim=3)) * have_obj
        # noobj
        + ((y_area[..., 0, 4] - yhat_area[..., 0, 4]) ** 2 + \
        (y_area[..., 1, 4] - yhat_area[..., 1, 4]) ** 2) * no_obj * lambda_noobj).sum(dim=(1, 2))

#### NMS

In [184]:
def nms(pred, threshold=0.5):

    with torch.no_grad():
        pred = pred.reshape((-1, 30))
        nms_data = [[] for _ in range(80)]
        for i in range(pred.shape[0]):
            cell = pred[i]
            score, idx = torch.max(cell[10:30], dim=0)
            idx = idx.item()
            x, y, w, h, iou = cell[0:5].cpu().numpy()

            nms_data[idx].append([i, x, y, w, h, iou, score.item()])
            x, y, w, h, iou = cell[5:10].cpu().numpy()
            nms_data[idx].append([i, x, y, w, h, iou, score.item()])

        ret = torch.zeros_like(pred)
        flag = torch.zeros(pred.shape[0], dtype=torch.bool)
        for c in range(80):
            c_nms_data = np.array(nms_data[c])

            keep_index = _nms(c_nms_data, threshold)
            keeps = c_nms_data[keep_index]

            for keep in keeps:
                i, x, y, w, h, iou, score = keep
                i = int(i)

                last_score, _ = torch.max(ret[i][10:30], dim=0)
                last_iou = ret[i][4]

                if score * iou > last_score * last_iou:
                    flag[i] = False
                if flag[i]: continue

                ret[i][0:5] = torch.tensor([x, y, w, h, iou])
                ret[i][10:30] = 0
                ret[i][10 + c] = score

                flag[i] = True

        return ret
    
    
def _nms(data, threshold):

    if len(data) == 0:
        return []

    cell_idx = data[:, 0]
    x = data[:, 1]
    y = data[:, 2]
    xidx = cell_idx % 7
    yidx = cell_idx // 7
    x = (x + xidx) / 7.0
    y = (y + yidx) / 7.0
    w = data[:, 3]
    h = data[:, 4]
    x1 = x - w / 2
    y1 = y - h / 2
    x2 = x + w / 2
    y2 = y + h / 2

    score_area = data[:, 5]

    areas = w * h

    order = score_area.argsort()[::-1]
    keep = []

    while order.size > 0:
        i = order[0]
        keep.append(i)
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0.0, xx2 - xx1)
        h = np.maximum(0.0, yy2 - yy1)
        inter = w * h
        ovr = inter / (areas[i] + areas[order[1:]] - inter)

        inds = np.where(ovr <= threshold)[0]
        order = order[inds + 1]

    return keep

#### mAP

In [19]:
def calculate_mAP(self, classAP_data):
    AP_50_per_class, PR_50_pts_per_class = {}, {}
    (
        num_true_per_class,
        num_positive_per_class,
        num_TP_50_per_class,
        num_FP_50_per_class,
    ) = ({}, {}, {}, {})
    mAP_50, mAP_75, mAP_5095 = 0, 0, 0
    valid_num_classes = 0 + 1e-8

    for res in classAP_data:
        if res["total_positive"] > 0:
            valid_num_classes += 1
            AP_50_per_class[res["class"]] = res["AP_50"]
            PR_50_pts_per_class[res["class"]] = {
                "mprec": res["prec_50"],
                "mrec": res["rec_50"],
            }
            num_true_per_class[res["class"]] = res["total_true"]
            num_positive_per_class[res["class"]] = res["total_positive"]
            num_TP_50_per_class[res["class"]] = res["total_TP_50"]
            num_FP_50_per_class[res["class"]] = res["total_FP_50"]
            mAP_50 += res["AP_50"]
            mAP_75 += res["AP_75"]
            mAP_5095 += res["AP_5095"]

    mAP_50 /= valid_num_classes
    mAP_75 /= valid_num_classes
    mAP_5095 /= valid_num_classes

    res = {
        "AP_50_PER_CLASS": AP_50_per_class,
        "PR_50_PTS_PER_CLASS": PR_50_pts_per_class,
        "NUM_TRUE_PER_CLASS": num_true_per_class,
        "NUM_POSITIVE_PER_CLASS": num_positive_per_class,
        "NUM_TP_50_PER_CLASS": num_TP_50_per_class,
        "NUM_FP_50_PER_CLASS": num_FP_50_per_class,
        "mAP_50": round(mAP_50, 4),
        "mAP_75": round(mAP_75, 4),
        "mAP_5095": round(mAP_5095, 4),
    }
    return res

In [20]:
def mean_average_precision(
    pred_boxes, true_boxes, iou_threshold=0.5, box_format="midpoint", num_classes=5
):
    """
    Calculates mean average precision
    Parameters:
        pred_boxes (list): list of lists containing all bboxes with each bboxes
        specified as [train_idx, class_prediction, prob_score, x1, y1, x2, y2]
        true_boxes (list): Similar as pred_boxes except all the correct ones
        iou_threshold (float): threshold where predicted bboxes is correct
        box_format (str): "midpoint" or "corners" used to specify bboxes
        num_classes (int): number of classes
    Returns:
        float: mAP value across all classes given a specific IoU threshold
    """

    # list storing all AP for respective classes
    average_precisions = []

    # used for numerical stability later on
    epsilon = 1e-6

    for c in range(num_classes):
        detections = []
        ground_truths = []

        # Go through all predictions and targets,
        # and only add the ones that belong to the
        # current class c
        for detection in pred_boxes:
            if detection[1] == c:
                detections.append(detection)

        for true_box in true_boxes:
            if true_box[1] == c:
                ground_truths.append(true_box)

        # find the amount of bboxes for each training example
        # Counter here finds how many ground truth bboxes we get
        # for each training example, so let's say img 0 has 3,
        # img 1 has 5 then we will obtain a dictionary with:
        # amount_bboxes = {0:3, 1:5}
        amount_bboxes = Counter([gt[0] for gt in ground_truths])

        # We then go through each key, val in this dictionary
        # and convert to the following (w.r.t same example):
        # ammount_bboxes = {0:torch.tensor[0,0,0], 1:torch.tensor[0,0,0,0,0]}
        for key, val in amount_bboxes.items():
            amount_bboxes[key] = torch.zeros(val)

        # sort by box probabilities which is index 2
        detections.sort(key=lambda x: x[2], reverse=True)
        TP = torch.zeros((len(detections)))
        FP = torch.zeros((len(detections)))
        total_true_bboxes = len(ground_truths)

        # If none exists for this class then we can safely skip
        if total_true_bboxes == 0:
            continue

        for detection_idx, detection in enumerate(detections):
            # Only take out the ground_truths that have the same
            # training idx as detection
            ground_truth_img = [
                bbox for bbox in ground_truths if bbox[0] == detection[0]
            ]

            num_gts = len(ground_truth_img)
            best_iou = 0

            for idx, gt in enumerate(ground_truth_img):
                iou = intersection_over_union(
                    torch.tensor(detection[3:]),
                    torch.tensor(gt[3:]),
                    box_format=box_format,
                )

                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = idx

            if best_iou > iou_threshold:
                # only detect ground truth detection once
                if amount_bboxes[detection[0]][best_gt_idx] == 0:
                    # true positive and add this bounding box to seen
                    TP[detection_idx] = 1
                    amount_bboxes[detection[0]][best_gt_idx] = 1
                else:
                    FP[detection_idx] = 1

            # if IOU is lower then the detection is a false positive
            else:
                FP[detection_idx] = 1

        TP_cumsum = torch.cumsum(TP, dim=0)
        FP_cumsum = torch.cumsum(FP, dim=0)
        recalls = TP_cumsum / (total_true_bboxes + epsilon)
        precisions = torch.divide(TP_cumsum, (TP_cumsum + FP_cumsum + epsilon))
        precisions = torch.cat((torch.tensor([1]), precisions))
        recalls = torch.cat((torch.tensor([0]), recalls))
        # torch.trapz for numerical integration
        average_precisions.append(torch.trapz(precisions, recalls))

    return sum(average_precisions) / (len(average_precisions) + 1e-6)

#### Weights initialization

In [21]:
def weight_init(m):
    '''
    Usage:
        model = Model()
        model.apply(weight_init)
    '''
    if isinstance(m, nn.Conv1d):
        init.normal_(m.weight.data)
        if m.bias is not None:
            init.normal_(m.bias.data)
    elif isinstance(m, nn.Conv2d):
        init.xavier_normal_(m.weight.data)
        if m.bias is not None:
            init.normal_(m.bias.data)
    elif isinstance(m, nn.Conv3d):
        init.xavier_normal_(m.weight.data)
        if m.bias is not None:
            init.normal_(m.bias.data)
    elif isinstance(m, nn.ConvTranspose1d):
        init.normal_(m.weight.data)
        if m.bias is not None:
            init.normal_(m.bias.data)
    elif isinstance(m, nn.ConvTranspose2d):
        init.xavier_normal_(m.weight.data)
        if m.bias is not None:
            init.normal_(m.bias.data)
    elif isinstance(m, nn.ConvTranspose3d):
        init.xavier_normal_(m.weight.data)
        if m.bias is not None:
            init.normal_(m.bias.data)
    elif isinstance(m, nn.BatchNorm1d):
        init.normal_(m.weight.data, mean=1, std=0.02)
        init.constant_(m.bias.data, 0)
    elif isinstance(m, nn.BatchNorm2d):
        init.normal_(m.weight.data, mean=1, std=0.02)
        init.constant_(m.bias.data, 0)
    elif isinstance(m, nn.BatchNorm3d):
        init.normal_(m.weight.data, mean=1, std=0.02)
        init.constant_(m.bias.data, 0)
    elif isinstance(m, nn.Linear):
        init.xavier_normal_(m.weight.data)
        init.normal_(m.bias.data)
    elif isinstance(m, nn.LSTM):
        for param in m.parameters():
            if len(param.shape) >= 2:
                init.orthogonal_(param.data)
            else:
                init.normal_(param.data)
    elif isinstance(m, nn.LSTMCell):
        for param in m.parameters():
            if len(param.shape) >= 2:
                init.orthogonal_(param.data)
            else:
                init.normal_(param.data)
    elif isinstance(m, nn.GRU):
        for param in m.parameters():
            if len(param.shape) >= 2:
                init.orthogonal_(param.data)
            else:
                init.normal_(param.data)
    elif isinstance(m, nn.GRUCell):
        for param in m.parameters():
            if len(param.shape) >= 2:
                init.orthogonal_(param.data)
            else:
                init.normal_(param.data)

#### Other

In [22]:
class Accumulator(object):
    """
    Sum a list of numbers over time
    from: https://github.com/dsgiitr/d2l-pytorch/blob/master/d2l/base.py
    """
    def __init__(self, n):
        self.data = [0.0] * n
    def add(self, *args):
        self.data = [a + b for a, b in zip(self.data, args)]
    def reset(self):
        self.data = [0] * len(self.data)
    def __getitem__(self, i):
        return self.data[i]


In [23]:
class Timer(object):
    """Record multiple running times."""
    def __init__(self):
        self.times = []
        self.start()

    def start(self):
        """Start the timer"""
        self.start_time = time.time()

    def stop(self):
        """Stop the timer and record the time in a list"""
        self.times.append(time.time() - self.start_time)
        return self.times[-1]

    def avg(self):
        """Return the average time"""
        return sum(self.times)/len(self.times)

    def sum(self):
        """Return the sum of time"""
        return sum(self.times)

    def cumsum(self):
        """Return the accumuated times"""
        return np.array(self.times).cumsum().tolist()

## Train

In [24]:
def train(net, train_iter, test_iter, num_epochs, lr, momentum=0.9, weight_decay=5e-4, accum_batch_num=1, save_path='./chkpt', load=None, load_epoch=-1, pretrained=False):
    '''
    Train net work. Some notes for load & load_epoch:
    :param load: the file of model weights to load
    :param load_epoch: num of epoch already completed (minus 1). should be the same with the number in auto-saved file name.
    '''

    def print_and_log(msg, log_file):
        print(msg)
        with open(log_file, 'a', encoding='utf8') as f:
            f.write(msg + '\n')

    def update_lr(opt, lr):
        for param_group in opt.param_groups:
            param_group['lr'] = lr

    os.makedirs(save_path, exist_ok=True)
    log_file = os.path.join(save_path, f'log-{time.time_ns()}.txt')

    if load:
        net.load_state_dict(torch.load(load))
    elif pretrained:
        net.head.apply(weight_init)
    else:
        # init params
        net.apply(weight_init)

    if not torch.cuda.is_available():
        net = net.to(torch.device('cpu'))
        devices = [torch.device('cpu')]
    else:
        net = net.to(torch.device('cuda'))
        devices = [torch.device('cuda')]

    # define optimizer
    if isinstance(lr, float):
        tlr = lr
    else: tlr = 0.001

    optimizer = torch.optim.SGD(net.parameters(), lr=tlr, momentum=momentum, weight_decay=weight_decay)

    # visualization

    num_batches = len(train_iter)
    # train
    for epoch in range(num_epochs - load_epoch - 1):
        # adjust true epoch number according to pre_load
        epoch = epoch + load_epoch + 1

        # define metrics: train loss, sample count
        metrics = Accumulator(2)
        # define timer
        timer = Timer()

        # train
        net.train()

        # set batch accumulator
        accum_cnt = 0
        accum = 0
        loop = tqdm(train_iter, leave=True)

        for batch_idx, (X, y) in enumerate(loop):
            timer.start()

            X, y = X.to(devices[0]), y.to(devices[0])
            yhat = net(X)
            
            loss_val = yolo_loss(yhat, y)
            # print(loss_val)

            # backward to accumulate gradients
            loss_val.sum().backward()
            # step
            optimizer.step()
            # clear
            optimizer.zero_grad()


            # update metrics
            with torch.no_grad():
                metrics.add(loss_val.sum().cpu(), X.shape[0])
            train_l = м[0] / metrics[1]

            timer.stop()

            # log & visualization
            if (batch_idx + 1) % (num_batches // 5) == 0 or batch_idx == num_batches - 1:
                print_and_log("epoch: %d, batch: %d / %d, loss: %.4f, time: %.4f" % (epoch, batch_idx + 1, num_batches, train_l.item(), timer.sum()), log_file)

        # redefine metrics: test loss, test sample count
        metrics = Accumulator(2)
        # redefine timer
        timer = Timer()
        # test
        net.eval()

        with torch.no_grad():
            timer.start()

            for batch in test_iter:
                X, y = batch
                X, y = X.to(devices[0]), y.to(devices[0])
                yhat = net(X)

                loss_val = yolo_loss(yhat, y)
                metrics.add(loss_val.sum().cpu(), X.shape[0])

            timer.stop()

            test_l = metrics[0] / metrics[1]
            print_and_log("epoch: %d, test loss: %.4f, time: %.4f" % (epoch + 1, test_l.item(), timer.sum()), log_file)

        # save model
        if epoch % 5 == 0:
            torch.save(net.state_dict(), os.path.join(save_path, f'./{time.time_ns()}-epoch-{epoch}.pth'))

In [41]:
dataset = load_coco(15000, data_dir)
dataset.persistent = True
# dataset = fo.Dataset.from_dir(dataset_dir='/kaggle/working/', dataset_type=fo.types.COCODetectionDataset,)
classes = dataset.distinct("ground_truth.detections.label")
train_data = dataset.match_tags("train")
test_data = dataset.match_tags("test")
val_data = dataset.match_tags("validation")
train_dataset = FiftyOneTorchDataset(train_data, transforms=Transformtsz(resize=(448, 448)), classes=classes)
val_dataset_test = FiftyOneTorchDataset(val_data, transforms=Transformtsz(resize=(448, 448)), classes=classes)
test_dataset_test = FiftyOneTorchDataset(test_data, transforms=Transformtsz(resize=(448, 448)), classes=classes)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=8, shuffle=False, collate_fn=collate)#, sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(val_dataset_test, batch_size=8, shuffle=False, collate_fn=collate)#, sampler=train_sampler)
test_loader = torch.utils.data.DataLoader(test_dataset_test, batch_size=8, shuffle=False, collate_fn=collate)#, sampler=train_sampler)

Downloading split 'train' to '/kaggle/working/data/train' if necessary
Found annotations at '/kaggle/working/data/raw/instances_train2017.json'
Sufficient images already downloaded
Existing download of split 'train' is sufficient
Downloading split 'validation' to '/kaggle/working/data/validation' if necessary
Found annotations at '/kaggle/working/data/raw/instances_val2017.json'
Only found 5000 (<15000) samples matching your requirements
Sufficient images already downloaded
Existing download of split 'validation' is sufficient
Downloading split 'test' to '/kaggle/working/data/test' if necessary
Found test info at '/kaggle/working/data/raw/image_info_test2017.json'
Sufficient images already downloaded
Existing download of split 'test' is sufficient
Loading 'coco-2017' split 'train'
 100% |█████████████| 15000/15000 [1.9m elapsed, 0s remaining, 114.4 samples/s]      
Loading 'coco-2017' split 'validation'
 100% |███████████████| 5000/5000 [38.0s elapsed, 0s remaining, 138.6 samples/s]   

In [152]:
#resnet18 = torchvision.models.resnet18(pretrained=True)
# net = Yolo() # classical YoloV1 with our backbone
# resnet 18 backbone
# remove avg pool and fc
resnet18 = torchvision.models.resnet18(weights='ResNet18_Weights.IMAGENET1K_V1')
backbone = nn.Sequential(*list(resnet18.children())[:-2])
for param in backbone.parameters():
    param.requires_grad = False
net = Yolo(backbone, backbone_out_channels=512)

In [None]:
train(net, train_iter=train_loader, test_iter=test_loader, num_epochs=20, lr=0.0001)

In [None]:
!zip -r coco15k.zip kaggle/working/data
display(FileLink('coco15k.zip'))

## Testing

In [191]:
resnet18 = torchvision.models.resnet18(weights='ResNet18_Weights.IMAGENET1K_V1')
backbone = nn.Sequential(*list(resnet18.children())[:-2])
net = Yolo(backbone, backbone_out_channels=512)
net.load_state_dict(torch.load('/kaggle/working/chkpt/1714178438389879385-epoch-20.pth'))
net.eval()

Yolo(
  (backbone): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
 

In [206]:
class RawDataforTest(torch.utils.data.Dataset):
    
    def __init__(self, fiftyone_dataset, transforms=None, gt_field="ground_truth", classes=None,):
        
        self.samples = fiftyone_dataset
        self.transforms = transforms
        self.gt_field = gt_field

        self.img_paths = self.samples.values("filepath")

        self.classes = classes
        if not self.classes:
            # Get list of distinct labels that exist in the view
            self.classes = self.samples.distinct(
                "%s.detections.label" % gt_field
            )

        if self.classes[0] != "background":
            self.classes = ["background"] + self.classes

        self.labels_map_rev = {c: i for i, c in enumerate(self.classes)}

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        
        img_path = self.img_paths[idx]
        sample = self.samples[img_path]
        metadata = sample.metadata
        img = Image.open(img_path).convert("RGB")
        width, height = img.size

        if sample[self.gt_field] is not None:
            detections = sample[self.gt_field].detections
            for det in detections:
                category_id = self.labels_map_rev[det.label]
                coco_obj = fouc.COCOObject.from_label(
                    det, metadata, category_id=category_id,
                )   
                x, y, w, h = coco_obj.bbox
                
                # update labels from absolute to relative
                h, w = float(h), float(w)

                ret_targets = []
                ret_targets.append({
                        'xmin': float(x) / w,
                        'ymin': float(y) / h,
                        'xmax': float(x+w) / w,
                        'ymax': float(x+h) / h,
                        'category': category_id,
                })
                
                
            img = torchvision.transforms.functional.resize(img, (448, 448))
            #img = torchvision.transforms.ToTensor(img)
            img = torchvision.transforms.functional.to_tensor(img)
            #if self.transforms is not None:
                #img, target = self.transforms(img, target)
        
            return img, json.dumps(ret_targets)

In [None]:
from enum import Enum
import json


class InterpolationMethod(Enum):
    Interpolation_11 = 1
    Interpolation_101 = 2


class CalculationMetrics():
    def __init__(self, IoU: float, confidence: float, mustbe_FP: bool):#, is_difficult: bool):

        self.IoU = IoU
        self.confidence = confidence
        self.mustbe_FP = mustbe_FP
        #self.is_difficult = is_difficult


def compare_metrics(metrics1: CalculationMetrics, metrics2: CalculationMetrics):
    if metrics1.confidence == metrics2.confidence:
        return metrics2.IoU - metrics1.IoU
    return metrics2.confidence - metrics1.confidence


class ObjectDetectionMetricsCalculator():

    def __init__(self, num_classes: int, confidence_thres: float):

        # initialize data
        self.data = [{"data": [], "detection": 0, "truth": 0} for _ in range(num_classes)]
        self.confidence_thres = confidence_thres


    def add_image_data(self, pred: torch.Tensor, truth: str):

        pred = pred.reshape(-1, 30)
        truth = json.loads(truth)

        choose_truth_index = [None for _ in range(pred.shape[0])]
        iou = [0 for _ in range(pred.shape[0])]

        for i in range(pred.shape[0]):
            score, cat = pred[i][10:30].max(dim=0)
            confidence = pred[i][4]
            # filter by confidence threshold
            if confidence * score < self.confidence_thres: continue
            
            x, y, w, h = pred[i][0:4]
            # calculate cell index
            xidx = i % 7
            yidx = i // 7
            # transform cell relative coordinates to image relative coordinates
            xhat = (x + xidx) / 7.0
            yhat = (y + yidx) / 7.0

            xmin_hat = xhat - w / 2
            xmax_hat = xhat + w / 2
            ymin_hat = yhat - h / 2
            ymax_hat = yhat + h / 2

            for j in range(len(truth)):
                bbox = truth[j]
                # judge whether is same class
                if cat != bbox['category']: continue
                # calculate IoU
                xmin, ymin, xmax, ymax = bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']
                wi = min(xmax, xmax_hat) - max(xmin, xmin_hat)
                wi = max(wi, 0)
                hi = min(ymax, ymax_hat) - max(ymin, ymin_hat)
                hi = max(hi, 0)
                intersection = wi * hi
                union = (xmax - xmin) * (ymax - ymin) + (xmax_hat - xmin_hat) * (ymax_hat - ymin_hat) - intersection
                this_iou = intersection / (union + 1e-6)
                # determine whether to choose this ground truth
                if iou[i] is None: choose = True
                elif iou[i] < this_iou: choose = True
                else: choose = False
                # if choose, assign value
                if choose:
                    iou[i] = this_iou
                    choose_truth_index[i] = j
        # init a bool array for judging mustbe_FP later
        truth_chosen = [False for _ in range(len(truth))]
        # sort according to IoU
        sort_idx = np.argsort(iou)[::-1]
        # add into metrics
        for i in sort_idx:
            score, cat = pred[i][10:30].max(dim=0)
            confidence = pred[i][4]
            # filter by confidence threshold
            if confidence * score < self.confidence_thres: continue

            truth_index = choose_truth_index[i]
            if truth_index == None: 
                mustbe_FP = True
                is_difficult = False
            elif truth_chosen[truth_index]:
                mustbe_FP = True
                #is_difficult = truth[choose_truth_index[i]]['difficult']
            else: 
                mustbe_FP = False
                truth_chosen[choose_truth_index[i]] = True
                #is_difficult = truth[choose_truth_index[i]]['difficult']

            self.data[cat]['data'].append(CalculationMetrics(iou[i], float(confidence * score), mustbe_FP))#, is_difficult))

            # update detection statistics
            self.data[cat]['detection'] += 1
        # update ground truth statistics
        for bbox in truth:
            #if bbox['difficult']: continue
            self.data[bbox['category']]['truth'] += 1


    def calculate_precision_recall(self, iou_thres: float, class_idx: int) -> list:

        ret = []
        # retrieve count
        truth_cnt = self.data[class_idx]['truth']
        # accumulated TP
        acc_TP = 0
        # accumulated difficult count
        #acc_difficult = 0
        # sort metrics by confidence
        data = sorted(self.data[class_idx]['data'], key=cmp_to_key(compare_metrics))
        for i, metrics in enumerate(data):
            if metrics.IoU >= iou_thres and not metrics.mustbe_FP: #and not metrics.is_difficult:
                acc_TP += 1
            #if metrics.is_difficult:
                #acc_difficult += 1
            if i + 1 - acc_difficult > 0:
                ret.append({
                    'precision': acc_TP / (i + 1) #- acc_difficult),
                    'recall': acc_TP / truth_cnt
                })
        
        return ret


    def calculate_average_precision(self, iou_thres: float, class_idx: int, itpl_option: InterpolationMethod) -> float:

        prl = self.calculate_precision_recall(iou_thres=iou_thres, class_idx=class_idx)

        if itpl_option == InterpolationMethod.Interpolation_11:
            intp_pts = [0.1 * i for i in range(11)]
        elif itpl_option == InterpolationMethod.Interpolation_101:
            intp_pts = [0.01 * i for i in range(101)]
        else:
            raise Exception('Unknown Interpolation Method')

        max_dict = {}
        gmax = 0

        for pr in prl[::-1]:
            gmax = max(gmax, pr['precision'])
            max_dict[pr['recall']] = gmax

        if len(max_dict) < 1: return 0.

        max_keys = max_dict.keys()
        max_keys = sorted(max_keys)

        key_ptr = len(max_keys) - 2
        last_key = max_keys[-1]

        AP = 0

        for query in intp_pts[::-1]:
            if key_ptr < 0:
                if query > last_key:
                    ans = 0
                else:
                    ans = max_dict[last_key]
            else:
                if query > last_key:
                    ans = 0
                elif query > max_keys[key_ptr]:
                    ans = max_dict[last_key]
                else:
                    while key_ptr >= 0:
                        if query > max_keys[key_ptr]:
                            break
                        last_key = max_keys[key_ptr]
                        key_ptr -= 1
                    ans = max_dict[last_key]
            AP += ans

        AP /= len(intp_pts)
        return AP


    def calculate_mAP(self, iou_thres: float, itpl_option: InterpolationMethod) -> float:
        mAP = 0
        for c in range(len(self.data)):
            mAP += self.calculate_average_precision(iou_thres, c, itpl_option)
        mAP /= len(self.data)

        return mAP

In [208]:
def test_and_draw_mAP(net, test_iter_raw, device):
        net.eval()
        print("Changed to eval")
        net.to(device)
        calc = ObjectDetectionMetricsCalculator(80, 0.1)
        for i, (X, YRaw) in enumerate(test_iter_raw):
            if i % 1000 = 0:
                print(f'Calculating {i+1}...')
            #to_tensor = torchvision.transforms.ToTensor()
            #X = to_tensor(img).unsqueeze_(0).to(device)
            X = X.to(device)
            YHat = net(X)
            for yhat, yraw in zip(YHat, YRaw):
                yhat = nms(yhat)
                calc.add_image_data(yhat.cpu(), yraw)
        print("Finished calculating")
        print("mAP on validation:", calculate_mAP(0.5, InterpolationMethod.Interpolation_11))
        #for i in range(80):
            #draw_precision_recall(calc.calculate_precision_recall(0.5, i), i)

In [240]:
from IPython import display

raw = RawDataforTest(val_data)
iter_raw = get_loader(raw)
test_and_draw_mAP(net, iter_raw, torch.device('cuda'))

Changed to eval
Calculating 1...
Calculating 2...
Calculating 3...
Calculating 4...
Finished calculating
mAP on validation: 0.16416070
