In [1]:
import json
from pathlib import Path
from shutil import copy2 as copy

In [2]:
from tqdm import tqdm
import cv2
import numpy as np
import pandas as pd

## Parking Annot to TXT

In [None]:
with open('amano-nvr-parking.json') as f:
    context = json.load(f)

In [None]:
dest = Path('/home/jiun/datasets/amano/raw/parking')
dest.mkdir(exist_ok=True, parents=True)

In [None]:
for info in context:
    name, *_ = info['name'].split('.')
    boxes = np.array([box['position'] for box in info['bounding_boxes']])
    pd.DataFrame(boxes).to_csv(str(dest.joinpath(f'{name}.txt')), header=None, index=None)

## Parking

## Model load

In [191]:
import torch
from torch.nn import functional as F
from torch.autograd import Variable
from torchvision import transforms

In [4]:
import sys
sys.path.append('/home/jiun/workspace/Classifier')

In [5]:
from models.model import Model

In [209]:
backbone = 'efficientnet-b0'
weight = '/home/jiun/datasets/weights/grid/eff0-128.pth'
shape = (128, 128)

In [210]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [211]:
model = Model.new(backbone, class_num=2, pretrained=True)

Loaded pretrained weights for efficientnet-b0


In [212]:
model = model.to(device)
model.load_state_dict(torch.load(weight, map_location=lambda s, l: s))
model.eval()
print('Loaded')

Loaded


## Inference

In [213]:
from PIL import Image
from IPython.display import display

def show(ary):
    display(Image.fromarray(ary))

In [214]:
parking_src = Path('/home/jiun/datasets/amano/raw/parking')
parkings = sorted(parking_src.glob('*.txt'))
images_src = Path('/home/jiun/datasets/amano/nvr-warp/test/images')
annots_src = Path('/home/jiun/datasets/amano/nvr-warp/test/annotations')
dest = Path('/home/jiun/datasets/amano/nvr-warp/test/parking-eff0-128')
dest.mkdir(exist_ok=True, parents=True)

In [215]:
mean, std = (.485, .456, .406), (.229, .224, .225)

In [216]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

In [217]:
for parking_file in tqdm(parkings):
    try:
        parking = pd.read_csv(str(parking_file), header=None).values
    except:
        parking = np.empty((0, 4), dtype=np.int16)
    
    images = sorted(images_src.glob(f'{parking_file.stem}*.jpg'))
    annots = sorted(annots_src.glob(f'{parking_file.stem}*.txt'))
    
    for image, annot in zip(images, annots):
        try:
            g = pd.read_csv(str(annot), header=None).values
        except:
            g = np.empty((0, 4), dtype=np.int16)
        
        img = cv2.imread(str(image))
        
        results = np.empty((0, 2), dtype=np.float)
        for x, y, w, h in parking:
            inputs = img[y:y+h, x:x+w]
            
            if y > 540:
                inputs = cv2.flip(inputs, 0)
            
            inputs = transform(inputs).to(device)
            inputs = Variable(inputs.unsqueeze(0), requires_grad=False)
            
            outputs = model(inputs)
            conf, pred = torch.max(F.softmax(outputs), 1)
            
            results = np.concatenate((
                results,
                np.array([(pred.item(), conf.item())]),
            ))
            
        pd.DataFrame(results).to_csv(str(dest.joinpath(annot.name)), header=None, index=None)

100%|██████████| 256/256 [05:20<00:00,  1.25s/it]


In [249]:
dest = Path('/home/jiun/datasets/amano/nvr-warp/test/parking-eff0-128')

In [250]:
images = sorted(images_src.glob('*.jpg'))
annots = sorted(annots_src.glob('*.txt'))
parkings = sorted(dest.glob('*.txt'))

In [251]:
def check(box, pos, ratio=.0):
    x1, y1, x2, y2 = box
    xx, yy = pos
    w, h = x2-x1, y2-y1
    return x1-w*ratio <= xx <= x2+w*ratio and y1-h*ratio <= yy <= y2+h*ratio

In [252]:
evaluator = Evaluator(n_class=2, method=Evaluator.compute_acc)

In [253]:
for image, annot, parking in tqdm(zip(images, annots, parkings), total=len(images)):
    img = cv2.imread(str(image))
    flag, *_ = image.stem.split('_CT_')
    
    try:
        g = pd.read_csv(str(annot), header=None).values
    except:
        g = np.empty((0, 4), dtype=np.int16)
    
    try:
        pp = pd.read_csv(str(parking_src.joinpath(f'{flag}.txt')), header=None).values
        pp[:, 2] += pp[:, 0]
        pp[:, 3] += pp[:, 1]
        p = pd.read_csv(str(parking), header=None).values
    except:
        pp = np.empty((0, 4), dtype=np.int16)
        p = np.empty((0, 4), dtype=np.int16)

    assert len(pp) == len(p)
    
    for x, y, w, h in g:
        cv2.rectangle(img, (int(x), int(y)), (int(w), int(h)), (0, 0, 255), 1)
    
    for klass, (x, y, w, h) in zip(p[:, 0], pp):
        cv2.rectangle(img, (int(x), int(y)), (int(w), int(h)), (0, 255, 0) if klass else (255, 0, 0), 1)
        box = np.array((x, y, x+w, y+h))
    
    # extract gt only parking lot defined
    p_index = []
    g_index = []
    for i, box in enumerate(pp):
        flag = False
        for j, pos in enumerate(zip((g[:, 0] + g[:, 2]) / 2, (g[:, 1] + g[:,3]) / 2)):
            if check(box, pos):
                p_index.append(i)
                g_index.append(j)
                flag = True
                break
    p_index = list(set(p_index))
    g_index = list(set(g_index))
    
    # extract only detection (classifier)
    det = pp[np.array(p_index, dtype=np.int)[p[:, 0][p_index].astype(np.bool)]]
    get = g[g_index]
    
#     for x, y, w, h in get:
#         cv2.rectangle(img, (int(x), int(y)), (int(w), int(h)), (0, 0, 255), 1)
    
#     for x, y, w, h in det:
#         cv2.rectangle(img, (int(x), int(y)), (int(w), int(h)), (0, 255, 0), 1)
    
#     show(img)
    
    evaluator.update((
        np.ones(np.size(det, 0), dtype=np.int),
        np.ones(np.size(det, 0), dtype=np.float32),
        det.astype(np.float32),
        None,
    ), (
        np.ones(np.size(get, 0), dtype=np.int),
        get.astype(np.float32),
        None,
    ))
    
    
    

100%|██████████| 1313/1313 [01:11<00:00, 18.46it/s]


In [254]:
evaluator.mAP



array([0.98398284, 0.98398284])

In [255]:
evaluator.dump()



(array([0.98398284, 0.98398284]),
 array([0.        , 0.99947964]),
 array([0.        , 0.98449513]))

## ToDetection format

In [238]:
parking_src = Path('/home/jiun/datasets/amano/raw/parking')
parkings = sorted(parking_src.glob('*.txt'))
images_src = Path('/home/jiun/datasets/amano/nvr-warp/test/images')
annots_src = Path('/home/jiun/datasets/amano/nvr-warp/test/annotations')
detection_src = Path('/home/jiun/datasets/amano/nvr-warp/results/ssd-vgg16')
dest = Path('/home/jiun/datasets/amano/nvr-warp/test/parking-eff0-128')
dest.mkdir(exist_ok=True, parents=True)

In [None]:
for parking_file in tqdm(parkings):
    try:
        parking = pd.read_csv(str(parking_file), header=None).values
    except:
        parking = np.empty((0, 4), dtype=np.int16)
    
    images = sorted(images_src.glob(f'{parking_file.stem}*.jpg'))
    annots = sorted(annots_src.glob(f'{parking_file.stem}*.txt'))
    
    for image, annot in zip(images, annots):
        try:
            g = pd.read_csv(str(annot), header=None).values
        except:
            g = np.empty((0, 4), dtype=np.int16)
        
        img = cv2.imread(str(image))
        
        results = np.empty(0, dtype=np.uint8)
        for x, y, w, h in parking:
            inputs = img[y:y+h, x:x+w]
            
            if y > 540:
                inputs = cv2.flip(inputs, 0)
            
            inputs = transform(inputs).to(device)
            inputs = Variable(inputs.unsqueeze(0), requires_grad=False)
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            results = np.append(results, preds.item())
        pd.DataFrame(results).to_csv(str(dest.joinpath(annot.name)), header=None, index=None)

In [241]:
from typing import Tuple, Union
from collections import defaultdict

import torch
import numpy as np


def compute_iou_(a, b):
    area = (b[:, 2] - b[:, 0]) * (b[:, 3] - b[:, 1])

    iw = torch.min(torch.unsqueeze(a[:, 2], dim=1), b[:, 2]) - torch.max(torch.unsqueeze(a[:, 0], 1), b[:, 0])
    ih = torch.min(torch.unsqueeze(a[:, 3], dim=1), b[:, 3]) - torch.max(torch.unsqueeze(a[:, 1], 1), b[:, 1])

    iw = torch.clamp(iw, min=0)
    ih = torch.clamp(ih, min=0)

    ua = torch.unsqueeze((a[:, 2] - a[:, 0]) * (a[:, 3] - a[:, 1]), dim=1) + area - iw * ih

    ua = torch.clamp(ua, min=1e-8)

    intersection = iw * ih

    IoU = intersection / ua

    return IoU


def compute_iou(box, boxes, box_area, boxes_area):
    y1 = np.maximum(box[0], boxes[:, 0])
    y2 = np.minimum(box[2], boxes[:, 2])
    x1 = np.maximum(box[1], boxes[:, 1])
    x2 = np.minimum(box[3], boxes[:, 3])
    intersection = np.maximum(x2 - x1, 0) * np.maximum(y2 - y1, 0)
    union = box_area + boxes_area[:] - intersection[:]
    iou = intersection / union
    return iou


def compute_overlaps(boxes1, boxes2):
    area1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1])
    area2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1])

    overlaps = np.zeros((boxes1.shape[0], boxes2.shape[0]))
    for i in range(overlaps.shape[1]):
        overlaps[:, i] = compute_iou(boxes2[i], boxes1, area2[i], area1)
    return overlaps


class Evaluator:
    def __init__(self, n_class: int, sample_patch: int = 11, threshold: float = .5, method=None):
        self.n_class = n_class
        self.patch = np.linspace(0, 1, sample_patch)
        self.threshold = threshold
        self.method = method or self.compute_iou

        self.center_total = np.empty((0, 2), dtype=np.int)
        self.center_positive = np.empty((0, 2), dtype=np.int)

        self.TP, self.FP, self.FN = np.zeros((3, sample_patch, n_class), dtype=np.uint32)
        self.gt_counts, self.pd_counts = np.zeros((2, n_class), dtype=np.uint32)

    def update(self, predictions: Tuple[np.ndarray, Union[np.ndarray, None], np.ndarray, Union[np.ndarray, None]],
               groundtruths: Tuple[np.ndarray, np.ndarray, Union[np.ndarray, None]]) \
            -> None:
        """Update predictions and groundtruths for each frames.

        :param predictions: contains (class_ids, scores, bounding_boxes, masks)
        :param groundtruths: contains (class_ids, bounding_boxes, masks)
        :return:
        """
        pd_class_ids, pd_scores, pd_bboxes, pd_masks = predictions
        gt_class_ids, gt_bboxes, gt_masks = groundtruths

        if pd_scores is None:
            pd_scores = np.ones_like(pd_class_ids)

        if pd_bboxes.size > 0:
            iou = self.method(pd_bboxes, gt_bboxes, self.threshold)

        for klass in range(self.n_class):
            gt_number = np.sum(gt_class_ids == klass)

            self.gt_counts[klass] += gt_number
            self.pd_counts[klass] += (pd_class_ids == klass).sum()

            for p, patch in enumerate(self.patch):
                pd_mask = np.logical_and(pd_class_ids == klass, pd_scores >= patch)
                pd_number = np.sum(pd_mask)

                if pd_number == 0:
                    self.FN[p][klass] += gt_number
                    continue

                # X, Y distribution store
                centers = np.stack((pd_bboxes[:, ::2].mean(-1), pd_bboxes[:, 1::2].mean(-1))).T.astype(np.int)
                self.center_total = np.concatenate((self.center_total, centers))
                self.center_positive = np.concatenate((self.center_positive, centers[pd_mask]))

                true_positive = np.sum(iou[pd_mask][:, gt_class_ids == klass].any(axis=0))

                self.TP[p][klass] += true_positive
                self.FP[p][klass] += pd_number - true_positive
                self.FN[p][klass] += gt_number - true_positive

    @staticmethod
    def compute_acc(tar_boxes: np.ndarray, src_boxes: np.ndarray, threshold: float) \
            -> np.ndarray:
        results = np.empty((0, np.size(src_boxes, 0)), dtype=np.bool)
        xxs, yys = (src_boxes[:, 0] + src_boxes[:, 2])/2, (src_boxes[:, 1] + src_boxes[:, 3])/2
        for tar in tar_boxes:
            results = np.concatenate((results, np.expand_dims(np.array(tuple(check(tar, (x, y)) for x, y in zip(xxs, yys))), 0)))
        return results

    @staticmethod
    def compute_iou(tar_boxes: np.ndarray, src_boxes: np.ndarray, threshold: float) \
            -> np.ndarray:
        iou = compute_overlaps(tar_boxes, src_boxes) >= threshold
        axis, argm = np.arange(np.size(iou, 0)), iou.argmax(axis=1)
        outputs = np.zeros_like(iou)
        outputs[axis, argm] = iou[axis, argm]
        return outputs >= threshold

    @property
    def precision(self) \
            -> np.ndarray:
        return np.nan_to_num(self.TP / (self.TP + self.FP)).mean(axis=0)

    @property
    def recall(self) \
            -> np.ndarray:
        return np.nan_to_num(self.TP / (self.TP + self.FN)).mean(axis=0)

    @property
    def mAP(self) \
            -> np.ndarray:
        prev_recall, ap = np.zeros((2, self.n_class))

        for precision, recall in zip(self.precision[::-1], self.recall[::-1]):
            ap += precision * (recall - prev_recall)
            prev_recall = recall

        return ap

    def dump(self) \
            -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        return self.mAP, self.precision, self.recall
