In [1]:
import cv2
import json
import matplotlib.pyplot as plt
import numpy as np
import pickle
import torch
import torchvision
from PIL import Image
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
%matplotlib inline

In [2]:
PATH_GT = 'gt.json'
PATH_TRAIN = 'benchmark_velocity_train/clips/'
PATH_TEST = 'benchmark_velocity_test/clips/'
N_TRAIN_CLIPS = 1074
N_TEST_CLIPS = 269
MAX_WIDTH = 1280
FOCAL_X = 714.1526
VEHICLE_WIDTH = 1.9766383787711062
DISPLACEMENT = 48

### Vehicle Detection

In [3]:
def intersection_over_union(bound_1, bound_2):
    x_min_inter = max(bound_1[0][0], bound_2[0][0])
    y_min_inter = max(bound_1[0][1], bound_2[0][1])
    x_max_inter = min(bound_1[1][0], bound_2[1][0])
    y_max_inter = min(bound_1[1][1], bound_2[1][1])
    
    intersection = max(0, x_max_inter - x_min_inter) * max(0, y_max_inter - y_min_inter)
    
    area_1 = (bound_1[1][0] - bound_1[0][0]) * (bound_1[1][1] - bound_1[0][1])
    area_2 = (bound_2[1][0] - bound_2[0][0]) * (bound_2[1][1] - bound_2[0][1])
    
    union = area_1 + area_2 - intersection
    
    return intersection / union

In [4]:
def predict_vehicle(model, img_path):
    img = Image.open(img_path)
    transform = torchvision.transforms.ToTensor()
    img = transform(img)
    
    pred = model([img])
    
    vehicle = []
    pred_boxes = [((i[0], i[1]), (i[2], i[3])) for i in list(pred[0]['boxes'].detach().numpy())]
    pred_score = list(pred[0]['scores'].detach().numpy())
    pred_cls = list(pred[0]['labels'].detach().numpy())
    for i in range(len(pred_boxes)):
        if pred_score[i] > 0.5 and pred_cls[i] in [3, 6, 8]:
            vehicle.append(pred_boxes[i])
    
    return vehicle

In [5]:
def detect_vehicle(start, end, test=True, save=False):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    model.eval()
    
    pred_list = []
    
    for img_i in range(start, end + 1):
        
        if test:
            img_path = f'{PATH_TEST}{img_i}/imgs/040.jpg'
        else:
            img_path = f'{PATH_TRAIN}{img_i}/imgs/040.jpg'

        with torch.no_grad():
            pred = predict_vehicle(model, img_path)
            
        pred_list.append(pred)
        
    if save:
        if test:
            with open('pred_vehicle_test.pickle', 'wb') as f:
                pickle.dump(pred_list, f)
        else:
            with open('pred_vehicle_train.pickle', 'wb') as f:
                pickle.dump(pred_list, f)
    
    return pred_list

# detect_vehicle(1, N_TRAIN_CLIPS, test=False, save=True)
# detect_vehicle(1, N_TEST_CLIPS, save=True)

In [6]:
def load_pred_vehicle(test=True):
    pred_list = None
    if test:
        with open('pred_vehicle_test.pickle', 'rb') as f:
            pred_list = pickle.load(f)
    else:
        with open('pred_vehicle_train.pickle', 'rb') as f:
            pred_list = pickle.load(f)
        
    return pred_list

In [7]:
def test_detect_vehicle():
    iou_list = []
    vehicle_count = 0
    
    pred_list = load_pred_vehicle()
    
    for img_i in range(1, N_TEST_CLIPS + 1):
        pred = pred_list[img_i - 1]
        
        anno_path = f'{PATH_TEST}{img_i}/annotation.json'
        anno = None
        with open(anno_path, "r") as f:
            anno = json.load(f)

        iou_matrix = np.zeros((len(anno), len(pred)))

        for i in range(len(anno)):
            vehicle_count += 1
            
            x_min = anno[i]['bbox']['left']
            y_min = anno[i]['bbox']['top']
            x_max = anno[i]['bbox']['right']
            y_max = anno[i]['bbox']['bottom']
            bound_i = ((x_min, y_min), (x_max, y_max))

            for j in range(len(pred)):
                iou = intersection_over_union(bound_i, pred[j])
                if iou > 0.5:
                    iou_matrix[i][j] = iou

        anno_index = list(range(len(anno)))
        while len(anno_index) != 0 and np.any(iou_matrix):
            max_ind = np.unravel_index(np.argmax(iou_matrix), iou_matrix.shape)
            anno_index.remove(max_ind[0])
            iou_list.append(iou_matrix[max_ind])
            for i in range(len(anno)):
                iou_matrix[i][max_ind[1]] = 0
            for i in range(len(pred)):
                iou_matrix[max_ind[0]][i] = 0
        
        if (len(anno_index) != 0):
            for i in anno_index:
                print(f'Failed Detection: {img_i}: {anno[i]}')

    return np.mean(np.array(iou_list)), len(iou_list), vehicle_count

In [8]:
iou_score, count_detected, count_total = test_detect_vehicle()
print(f'Detection Rate: {count_detected / count_total * 100}% ({count_detected} / {count_total})')
print(f'Average IOU: {iou_score}')

Failed Detection: 61: {'bbox': {'top': 342.1967468262, 'right': 698.6245117188, 'left': 673.005859375, 'bottom': 363.7939758301}}
Failed Detection: 179: {'bbox': {'top': 341.252746582, 'right': 706.7437744141, 'left': 677.0107421875, 'bottom': 365.3073425293}}
Detection Rate: 99.46666666666667% (373 / 375)
Average IOU: 0.8510568270957872


### Task 1

In [9]:
def calculate_vehicle_width():
    X = []
    y = []
    
    pred_list = load_pred_vehicle(test=False)
    
    for img_i in range(1, N_TRAIN_CLIPS + 1):
        pred = pred_list[img_i - 1]
        
        anno_path = f'{PATH_TRAIN}{img_i}/annotation.json'
        anno = None
        with open(anno_path, "r") as f:
            anno = json.load(f)

        iou_matrix = np.zeros((len(anno), len(pred)))

        for i in range(len(anno)):            
            x_min = anno[i]['bbox']['left']
            y_min = anno[i]['bbox']['top']
            x_max = anno[i]['bbox']['right']
            y_max = anno[i]['bbox']['bottom']
            bound_i = ((x_min, y_min), (x_max, y_max))

            for j in range(len(pred)):
                iou = intersection_over_union(bound_i, pred[j])
                if iou > 0.5:
                    iou_matrix[i][j] = iou

        anno_index = list(range(len(anno)))
        while len(anno_index) != 0 and np.any(iou_matrix):
            max_ind = np.unravel_index(np.argmax(iou_matrix), iou_matrix.shape)
            anno_index.remove(max_ind[0])
            for i in range(len(anno)):
                iou_matrix[i][max_ind[1]] = 0
            for i in range(len(pred)):
                iou_matrix[max_ind[0]][i] = 0
            x_min = pred[max_ind[1]][0][0]
            x_max = pred[max_ind[1]][1][0]
            X.append(x_max - x_min)
            y.append(anno[max_ind[0]]['position'][0])

        if (len(anno_index) != 0):
             for i in anno_index:
                print(f'Failed Detection: {img_i}: {anno[i]}')
            
    X = FOCAL_X / np.array(X)
    X = X[:,np.newaxis]
    y = np.array(y)
    
    width, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
    
    return width[0]

In [10]:
width_vehicle = calculate_vehicle_width()
print(f'Vehicle Width: {width_vehicle}')

Failed Detection: 347: {'velocity': [0.4624676038, -0.4259779911], 'bbox': {'top': 341.2297058105, 'right': 725.4488525391, 'bottom': 356.5171508789, 'left': 704.8684692383}, 'position': [66.3459097032, 2.3075873921]}
Failed Detection: 363: {'velocity': [5.0882967303, -0.467765399], 'bbox': {'top': 343.403137207, 'right': 656.3140869141, 'bottom': 358.0908508301, 'left': 639.3043212891}, 'position': [115.2258990375, -3.1657691644]}
Failed Detection: 429: {'velocity': [0.1501203173, -0.038742024], 'bbox': {'top': 335.2302856445, 'right': 738.0653076172, 'bottom': 394.3702087402, 'left': 650.09375}, 'position': [20.6826064221, -0.4108689989]}
Failed Detection: 525: {'velocity': [1.8354622848, 0.4049606419], 'bbox': {'top': 341.0624084473, 'right': 772.6825561523, 'bottom': 361.9103393555, 'left': 745.6542358398}, 'position': [49.9142001493, 4.5033125868]}
Failed Detection: 575: {'velocity': [0.0800550675, -0.0193297919], 'bbox': {'top': 302.6954040527, 'right': 742.5352783203, 'bottom': 

In [11]:
def calculate_displacement():
    pred_list = load_pred_vehicle(test=False)
    
    mse_min = MAX_WIDTH ** 2
    c_min = None
    
    for c in range(0, 60):
        error = []
        
        for img_i in range(1, N_TRAIN_CLIPS + 1):
            pred = pred_list[img_i - 1]

            anno_path = f'{PATH_TRAIN}{img_i}/annotation.json'
            anno = None
            with open(anno_path, "r") as f:
                anno = json.load(f)

            iou_matrix = np.zeros((len(anno), len(pred)))

            for i in range(len(anno)):            
                x_min = anno[i]['bbox']['left']
                y_min = anno[i]['bbox']['top']
                x_max = anno[i]['bbox']['right']
                y_max = anno[i]['bbox']['bottom']
                bound_i = ((x_min, y_min), (x_max, y_max))

                for j in range(len(pred)):
                    iou = intersection_over_union(bound_i, pred[j])
                    if iou > 0.5:
                        iou_matrix[i][j] = iou

            anno_index = list(range(len(anno)))
            while len(anno_index) != 0 and np.any(iou_matrix):
                max_ind = np.unravel_index(np.argmax(iou_matrix), iou_matrix.shape)
                anno_index.remove(max_ind[0])
                for i in range(len(anno)):
                    iou_matrix[i][max_ind[1]] = 0
                for i in range(len(pred)):
                    iou_matrix[max_ind[0]][i] = 0
                x_min = pred[max_ind[1]][0][0]
                x_max = pred[max_ind[1]][1][0]
                if x_min > (MAX_WIDTH / 2 + c):
                    dist_pred = x_min - (MAX_WIDTH / 2 + c)
                elif x_max < (MAX_WIDTH / 2 + c):
                    dist_pred = x_max - (MAX_WIDTH / 2 + c)
                else:
                    dist_pred = 0
                scale = VEHICLE_WIDTH / (x_max - x_min)
                dist_pred = scale * dist_pred
                dist_true = anno[max_ind[0]]['position'][1]
                error.append((dist_pred - dist_true) ** 2)

#             if (len(anno_index) != 0):
#                  for i in anno_index:
#                     print(f'Failed Detection: {img_i}: {anno[i]}')

        mse = np.mean(np.array(error))
        if mse < mse_min:
            mse_min = mse
            c_min = c
            
    return c_min

image_displacement = calculate_displacement()
print(f'Image Displacement: {image_displacement}')

Image Displacement: 48


In [12]:
def calculate_distance(x_min, x_max, c=DISPLACEMENT):
    width_img = x_max - x_min
    dist_long = FOCAL_X * VEHICLE_WIDTH / width_img

    scale = VEHICLE_WIDTH / width_img
    center = MAX_WIDTH / 2 + c

    if x_min > center:
        dist_lat_img = x_min - center
    elif x_max < center:
        dist_lat_img = x_max - center
    else:
        dist_lat_img = 0

    dist_lat = scale * dist_lat_img
    
    return dist_long, dist_lat

In [13]:
def get_distance(img_i, save=False):
    dist_dict = []
    
    pred_list = load_pred_vehicle()
    pred = pred_list[img_i - 1]
    
    for i in pred:
        x_min = i[0][0]
        x_max = i[1][0]
        
        dist_long, dist_lat = calculate_distance(x_min, x_max)
        
        dist_dict.append({
            'pred': i,
            'dist': [dist_long, dist_lat]
        })
        
    img_path = f'{PATH_TEST}{img_i}/imgs/040.jpg'
    img = cv2.imread(img_path)
    for i in dist_dict:
        img = cv2.rectangle(img, (int(i['pred'][0][0]), int(i['pred'][0][1])), (int(i['pred'][1][0]), int(i['pred'][1][1])), (0, 255, 0), 2)
        img = cv2.putText(img, f'pos:({i["dist"][0]:0.2f},{i["dist"][1]:0.2f})m', (int(i['pred'][0][0]), int(i['pred'][0][1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 255, 0))
    
    if save:
        cv2.imwrite(f'dist_{img_i}.jpg', img)
        
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.imshow(img)
    plt.title(f'Image {img_i}')
    plt.show()
    
    return dist_dict

In [14]:
def get_distance_label(gt):
    distance = np.linalg.norm(np.array(gt["position"]))
    if distance < 20:
        return 0
    elif distance < 45:
        return 1
    else:
        return 2
    
def calc_error(a, b):
    return np.linalg.norm(np.array(a) - np.array(b)) ** 2

In [15]:
def test_distance(c=DISPLACEMENT):
    error = [[], [], []]
    
    pred_list = load_pred_vehicle()
    
    for img_i in range(1, N_TEST_CLIPS + 1):
        pred = pred_list[img_i - 1]
        
        anno_path = f'{PATH_TEST}{img_i}/annotation.json'
        anno = None
        with open(anno_path, "r") as f:
            anno = json.load(f)
            
        gt_all = None
        with open(PATH_GT, "r") as f:
            gt_all = json.load(f)
        gt = gt_all[img_i - 1]

        iou_matrix = np.zeros((len(anno), len(pred)))

        for i in range(len(anno)):            
            x_min = anno[i]['bbox']['left']
            y_min = anno[i]['bbox']['top']
            x_max = anno[i]['bbox']['right']
            y_max = anno[i]['bbox']['bottom']
            bound_i = ((x_min, y_min), (x_max, y_max))

            for j in range(len(pred)):
                iou = intersection_over_union(bound_i, pred[j])
                if iou > 0.5:
                    iou_matrix[i][j] = iou

        anno_index = list(range(len(anno)))
        while len(anno_index) != 0 and np.any(iou_matrix):
            max_ind = np.unravel_index(np.argmax(iou_matrix), iou_matrix.shape)
            anno_index.remove(max_ind[0])
            for i in range(len(anno)):
                iou_matrix[i][max_ind[1]] = 0
            for i in range(len(pred)):
                iou_matrix[max_ind[0]][i] = 0
            pos = calculate_distance(pred[max_ind[1]][0][0], pred[max_ind[1]][1][0], c=c)
            dist_label = get_distance_label(gt[max_ind[0]])
            error[dist_label].append(calc_error(pos, gt[max_ind[0]]['position']))
        
        if (len(anno_index) != 0):
            for i in anno_index:
                print(f'Failed Detection: {img_i}: {anno[i]}')
                
    error_near = np.mean(np.array(error[0]))
    error_mid = np.mean(np.array(error[1]))
    error_far = np.mean(np.array(error[2]))
    error_avg = (error_near + error_mid + error_far) / 3
    
    return error_near, error_mid, error_far, error_avg

In [16]:
mse_dist_near, mse_dist_mid, mse_dist_far, mse_dist_avg =  test_distance(c=0)
print(f'MSE Distance (Near): {mse_dist_near}')
print(f'MSE Distance (Mid): {mse_dist_mid}')
print(f'MSE Distance (Far): {mse_dist_far}')
print(f'MSE Distance (Average): {mse_dist_avg}')

Failed Detection: 61: {'bbox': {'top': 342.1967468262, 'right': 698.6245117188, 'left': 673.005859375, 'bottom': 363.7939758301}}
Failed Detection: 179: {'bbox': {'top': 341.252746582, 'right': 706.7437744141, 'left': 677.0107421875, 'bottom': 365.3073425293}}
MSE Distance (Near): 10.850509655459
MSE Distance (Mid): 26.138942835991205
MSE Distance (Far): 69.42299160761495
MSE Distance (Average): 35.47081469968838


In [17]:
mse_dist_near, mse_dist_mid, mse_dist_far, mse_dist_avg =  test_distance()
print(f'MSE Distance (Near): {mse_dist_near}')
print(f'MSE Distance (Mid): {mse_dist_mid}')
print(f'MSE Distance (Far): {mse_dist_far}')
print(f'MSE Distance (Average): {mse_dist_avg}')

Failed Detection: 61: {'bbox': {'top': 342.1967468262, 'right': 698.6245117188, 'left': 673.005859375, 'bottom': 363.7939758301}}
Failed Detection: 179: {'bbox': {'top': 341.252746582, 'right': 706.7437744141, 'left': 677.0107421875, 'bottom': 365.3073425293}}
MSE Distance (Near): 9.925672269174072
MSE Distance (Mid): 22.441786483823694
MSE Distance (Far): 61.758771788200704
MSE Distance (Average): 31.375410180399488


### Task 2

In [18]:
def calculate_velocity(img_i, bound_end, mode='csrt', show_error=False):
    tracker = None
    if mode == 'csrt':
        tracker = cv2.TrackerCSRT_create()
    elif mode == 'median':
        tracker = cv2.TrackerMedianFlow_create()
    else:
        raise Exception('Invalid mode')
        
    img_path = f'{PATH_TEST}{img_i}/imgs/040.jpg'
    img_end = cv2.imread(img_path)
    bbox_end = (bound_end[0][0], bound_end[0][1], bound_end[1][0] - bound_end[0][0], bound_end[1][1] - bound_end[0][1])
    ok = tracker.init(img_end, bbox_end)
    
    bbox_list = []
    
    if ok:
        for j in reversed(range(1, 40)):
            img_path = f'{PATH_TEST}{img_i}/imgs/{j:03d}.jpg'
            img = cv2.imread(img_path)
            ok, bbox = tracker.update(img)
            if not ok:
                break
            bbox_list.append(bbox)

    if len(bbox_list) == 0:
        if show_error:
            print(f'Failed Tracking: {img_i}: {i}')
        vel_long = None
        vel_lat = None
    else:
        t = (len(bbox_list) + 1) / 20
        dist_start = calculate_distance(bbox_list[-1][0], bbox_list[-1][0] + bbox_list[-1][2])
        dist_end = calculate_distance(bbox_end[0], bbox_end[0] + bbox_end[2])
        vel_long = (dist_end[0] - dist_start[0]) / t
        vel_lat = (dist_end[1] - dist_start[1]) / t
        
    return vel_long, vel_lat

In [19]:
def get_velocity(img_i, mode='csrt', show_error=False, save=False):
    vel_dict = []
    
    pred_list = load_pred_vehicle()
    pred = pred_list[img_i - 1]
    for i in pred:
        vel_long, vel_lat = calculate_velocity(img_i, i, mode, show_error)
            
        vel_dict.append({
            'pred': i,
            'vel': [vel_long, vel_lat]
        })

    img_path = f'{PATH_TEST}{img_i}/imgs/040.jpg'
    img = cv2.imread(img_path)
    for i in vel_dict:
        img = cv2.rectangle(img, (int(i['pred'][0][0]), int(i['pred'][0][1])), (int(i['pred'][1][0]), int(i['pred'][1][1])), (0, 255, 0), 2)
        if (i['vel'][0] is not None and i['vel'][1] is not None):
            img = cv2.putText(img, f'vel:({i["vel"][0]:0.2f},{i["vel"][1]:0.2f})m/s', (int(i['pred'][0][0]), int(i['pred'][0][1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 255, 0))
        
    if save:
        cv2.imwrite(f'vel_{img_i}.jpg', img)
        
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.imshow(img)
    plt.title(f'Image {img_i}')
    plt.show()
        
    return vel_dict

In [20]:
def test_velocity(mode='csrt'):
    error = [[], [], []]
    
    pred_list = load_pred_vehicle()
    
    for img_i in range(1, N_TEST_CLIPS + 1):
        pred = pred_list[img_i - 1]
     
        anno_path = f'{PATH_TEST}{img_i}/annotation.json'
        anno = None
        with open(anno_path, "r") as f:
            anno = json.load(f)
            
        gt_all = None
        with open(PATH_GT, "r") as f:
            gt_all = json.load(f)
        gt = gt_all[img_i - 1]

        iou_matrix = np.zeros((len(anno), len(pred)))

        for i in range(len(anno)):            
            x_min = anno[i]['bbox']['left']
            y_min = anno[i]['bbox']['top']
            x_max = anno[i]['bbox']['right']
            y_max = anno[i]['bbox']['bottom']
            bound_i = ((x_min, y_min), (x_max, y_max))

            for j in range(len(pred)):
                iou = intersection_over_union(bound_i, pred[j])
                if iou > 0.5:
                    iou_matrix[i][j] = iou

        anno_index = list(range(len(anno)))
        while len(anno_index) != 0 and np.any(iou_matrix):
            max_ind = np.unravel_index(np.argmax(iou_matrix), iou_matrix.shape)
            anno_index.remove(max_ind[0])
            for i in range(len(anno)):
                iou_matrix[i][max_ind[1]] = 0
            for i in range(len(pred)):
                iou_matrix[max_ind[0]][i] = 0
            vel = calculate_velocity(img_i, pred[max_ind[1]], mode)
            if vel[0] is None or vel[1] is None:
                print(f'Failed Velocity: {img_i}: {anno[max_ind[0]]}')
            else:
                dist_label = get_distance_label(gt[max_ind[0]])
                error[dist_label].append(calc_error(vel, gt[max_ind[0]]['velocity']))
        
        if (len(anno_index) != 0):
            for i in anno_index:
                print(f'Failed Detection: {img_i}: {anno[i]}')
                
    error_near = np.mean(np.array(error[0]))
    error_mid = np.mean(np.array(error[1]))
    error_far = np.mean(np.array(error[2]))
    error_avg = (error_near + error_mid + error_far) / 3
    
    return error_near, error_mid, error_far, error_avg

In [21]:
mse_vel_near, mse_vel_mid, mse_vel_far, mse_vel_avg =  test_velocity(mode='csrt')
print(f'MSE Distance (Near): {mse_vel_near}')
print(f'MSE Distance (Mid): {mse_vel_mid}')
print(f'MSE Distance (Far): {mse_vel_far}')
print(f'MSE Distance (Average): {mse_vel_avg}')

Failed Detection: 61: {'bbox': {'top': 342.1967468262, 'right': 698.6245117188, 'left': 673.005859375, 'bottom': 363.7939758301}}
Failed Detection: 179: {'bbox': {'top': 341.252746582, 'right': 706.7437744141, 'left': 677.0107421875, 'bottom': 365.3073425293}}
MSE Distance (Near): 1.252496914990898
MSE Distance (Mid): 2.845494823895707
MSE Distance (Far): 7.641211359038976
MSE Distance (Average): 3.9130676993085274


In [22]:
mse_vel_near, mse_vel_mid, mse_vel_far, mse_vel_avg =  test_velocity(mode='median')
print(f'MSE Distance (Near): {mse_vel_near}')
print(f'MSE Distance (Mid): {mse_vel_mid}')
print(f'MSE Distance (Far): {mse_vel_far}')
print(f'MSE Distance (Average): {mse_vel_avg}')

Failed Detection: 61: {'bbox': {'top': 342.1967468262, 'right': 698.6245117188, 'left': 673.005859375, 'bottom': 363.7939758301}}
Failed Detection: 179: {'bbox': {'top': 341.252746582, 'right': 706.7437744141, 'left': 677.0107421875, 'bottom': 365.3073425293}}
Failed Velocity: 253: {'bbox': {'top': 340.3983154297, 'right': 719.2253417969, 'left': 699.192199707, 'bottom': 356.7189941406}}
MSE Distance (Near): 0.14045779843641149
MSE Distance (Mid): 1.9556855704711311
MSE Distance (Far): 10.27920671591838
MSE Distance (Average): 4.125116694941974
