# COMP9517 Project: Individual Component
By Andrew Timkov (z5169762)

## Imports

In [None]:
import os
import json
import math
import random
import pickle
import cv2
import matplotlib.pyplot as plt
import numpy as np
from scipy import ndimage as ndi
from scipy.ndimage.measurements import label
from skimage.feature import canny
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler, MinMaxScaler

## Constants

In [None]:
MODEL_FILE = "model.dat"
SCALER_FILE = "scaler.dat"
HOG_WINDOW_SIZE = (128, 128)
HEATMAP_THRESH = 10
# The colour space used for HOG
COLOR_SPACE = cv2.COLOR_BGR2HSV
plt.rcParams['figure.figsize'] = [16, 8]

## Global Variables

In [None]:
hog = cv2.HOGDescriptor("hog.xml")
svm = None
scaler = None

## Feature Extraction

In [None]:
# Resize any image crop to the fixed HOG window size (128x128)
def resize(img):
    if img.shape[:2] != HOG_WINDOW_SIZE:
        return cv2.resize(img, HOG_WINDOW_SIZE, cv2.INTER_AREA)
    return img

# Retrieve a feature vector for a given image
# Feature vector will always include the HOG descriptor,
# but HSV and RGB histograms can be disabled
def get_features(img, use_hsv=True, use_rgb=True):
    global hog
    img = resize(img)

    feature_vector = hog.compute(img)[:,0]
    
    if use_hsv:
        h, _ = np.histogram(img[:,:,0], 256, [0, 256])
        s, _ = np.histogram(img[:,:,1], 256, [0, 256])
        v, _ = np.histogram(img[:,:,2], 256, [0, 256])
        feature_vector = np.concatenate((feature_vector, h, s, v))
    
    if use_rgb:
        img = cv2.cvtColor(img, cv2.COLOR_HSV2RGB)
        r, _ = np.histogram(img[:,:,0], 256, [0, 256])
        g, _ = np.histogram(img[:,:,1], 256, [0, 256])
        b, _ = np.histogram(img[:,:,2], 256, [0, 256])
        feature_vector = np.concatenate((feature_vector, r, g, b))
    
    return feature_vector
    

In [None]:
# some hardcoded crop areas in the images
nonmatch_bounds = [
    (720, 784, 260, 324), # middle where sky meets trees/ground
    (60, 188, 580, 708), # middle just sky
    (1024, 1280, 200, 456), # right side road+trees
    (0, 256, 240, 496), # left side road + trees + other lane
    (520, 648, 440, 568), # middle of road (likely lane lines)
    (700, 764, 470, 534) # middle of road (likely just plain road)
]
# hardcoded band of negative crop areas, where there a lot of
# false positives were occurring
for x1 in range(0, 1280-128, 256):
    nonmatch_bounds.append((x1, x1+64, 340, 468))

# Check if two given rectangles are overlapping
def is_overlapping(a, b):
    ax1, ax2, ay1, ay2 = a
    bx1, bx2, by1, by2 = b
    
    # left side of one is after right side of other
    if ax1 >= bx2 or bx1 >= ax2:
        return False
    
    # top side of one is below bottom side of other
    if ay1 >= by2 or by1 >= ay2:
        return False
    
    return True

# retrieve feature vectors, annotations and images from
# a list of labels and data files
def parse_data(img_files, label_files, pos_data=[]):
    match_data, nonmatch_data, annotations, imgs = [], [], [], []
    iter_data = list(zip(img_files, label_files))
    random.shuffle(iter_data)
    for i, (img_file, label_file) in enumerate(iter_data):
        label = None
        if not os.path.isfile(label_file):
            continue
        with open(label_file, 'r') as f:
            label = json.load(f)
        if label is None:
            continue
            
        img = cv2.imread(img_file)
        img = cv2.cvtColor(img, COLOR_SPACE)
        imgs.append(img)
        img_annotations = []
        nonmatches = nonmatch_bounds.copy()
        
        for label_bounds in label:
            bbox = label_bounds['bbox']
            x1 = math.floor(bbox['left'])
            x2 = math.ceil(bbox['right'])
            y1 = math.floor(bbox['top'])
            y2 = math.ceil(bbox['bottom'])
            img_annotations.append((x1, x2, y1, y2))
            dim_diff = (x2-x1) - (y2-y1)
            size = x2 - x1
            if x1 >= x2 or y1 >= y2:
                continue
            y1 -= math.floor(dim_diff/2)
            y2 += math.ceil(dim_diff/2)
            if y1 < 0 or y2 > img.shape[0]:
                continue

            match_img = img[y1:y2, x1:x2]
            match_data.append(get_features(match_img))
            nonmatches = [n for n in nonmatches if not is_overlapping(n, (x1, x2, y1, y2))]
        
        for x1, x2, y1, y2 in nonmatches:
            nonmatch_img = img[y1:y2, x1:x2]
            nonmatch_data.append(get_features(nonmatch_img))
        
        annotations.append(img_annotations)
    
    return (match_data, nonmatch_data, annotations, imgs)

def get_data(file_dir, get_pos=False):
    file_dir = os.path.abspath(file_dir)
    img_files, label_files, pos_data = [], [], []
    for clip_num in os.listdir(file_dir):
        clip_path = os.path.join(file_dir, clip_num)
        if os.path.isdir(clip_path):
            img_files.append(os.path.join(clip_path, "imgs", "040.jpg"))
            label_files.append(os.path.join(clip_path, "annotation.json"))
            pos_data.append(clip_num)
    
    if get_pos:
        return parse_data(img_files, label_files, pos_data)
    return parse_data(img_files, label_files)

# Get supplementary data (not used at the moment)
def get_supp_data(file_dir):
    match_data = []
    nonmatch_data = []
    annotations = []
    imgs = []
    file_dir = os.path.abspath(file_dir)
    img_files, label_file = [], os.path.join(file_dir, "annotation.json")
    with open(label_file, 'r') as f:
            label_data = json.load(f)
    

    for entry in label_data[1000:2000]:
        img_file = os.path.join(file_dir, entry['file_name'])
        if not os.path.isfile(img_file):
            continue
        img = cv2.imread(img_file)
        img = cv2.cvtColor(img, COLOR_SPACE)
        imgs.append(img)
        nonmatches = nonmatch_bounds.copy()
        
        label = entry['bbox']
        
        for label_bounds in label:
            x1 = math.floor(label_bounds['left'])
            x2 = math.ceil(label_bounds['right'])
            y1 = math.floor(label_bounds['top'])
            y2 = math.ceil(label_bounds['bottom'])
            annotations.append((x1, x2, y1, y2))
            dim_diff = (x2-x1) - (y2-y1)
            size = x2 - x1
            if x1 >= x2 or y1 >= y2:
                continue
            y1 -= math.floor(dim_diff/2)
            y2 += math.ceil(dim_diff/2)
            if y1 < 0 or y2 > img.shape[0]:
                continue
            #plt.imshow(img)
            #plt.show()
            #cv2.waitKey(0)
            match_img = img[y1:y2, x1:x2]
            match_data.append(get_features(match_img))
            nonmatches = [n for n in nonmatches if not is_overlapping(n, (x1, x2, y1, y2))]
        
        for x1, x2, y1, y2 in nonmatches:
            nonmatch_img = img[y1:y2, x1:x2]
            nonmatch_data.append(get_features(nonmatch_img))
    
    return (match_data, nonmatch_data, annotations, imgs)

## Model Training

In [None]:
# initialise a data scaler (standardisation or normalisation)
def init_scaler(match_data, nonmatch_data, scaler="standardise"):
    if scaler == "standardise":
        return StandardScaler().fit(np.concatenate((match_data, nonmatch_data)))
    else:
        return MinMaxScaler().fit(np.concatenate((match_data, nonmatch_data)))
    
def scale(scaler, data):
    return scaler.transform(data)
    

In [None]:
# train the classifier
# use_supp: whether or not to use supplementary data
# get_stored: whether or not to load pickled classifier
def train_classifier(use_supp=False, get_stored=False, scaler_type="standardise"):
    global scaler, svm
    if get_stored:
        model_dir = os.path.abspath(MODEL_FILE)
        if os.path.isfile(model_dir):
            svm = pickle.load(open(MODEL_FILE, 'rb'))
            scaler_dir = os.path.abspath(SCALER_FILE)
            if os.path.isfile(scaler_dir):
                scaler = pickle.load(open(SCALER_FILE, 'rb'))
                return
            
    match_data, nonmatch_data, annotations, imgs = get_data("benchmark_velocity_train\\clips\\")
    if use_supp:
        supp_match_data, supp_nonmatch_data, supp_annotations, supp_imgs = get_supp_data("benchmark_velocity_supp\\")
        match_data = np.concatenate((match_data, supp_match_data))
        nonmatch_data = np.concatenate((nonmatch_data, supp_nonmatch_data))
    if not get_stored or scaler is None:
        scaler = init_scaler(match_data, nonmatch_data, scaler_type)
        pickle.dump(scaler, open(SCALER_FILE, 'wb+'))

    match_data = scale(scaler, match_data)
    nonmatch_data = scale(scaler, nonmatch_data)
    match_labels = np.ones(match_data.shape[0], dtype=int)
    nonmatch_labels = np.zeros(nonmatch_data.shape[0], dtype=int)
    training_data = np.concatenate((match_data, nonmatch_data))
    training_labels = np.concatenate((match_labels, nonmatch_labels))

    svm = LinearSVC(dual=False)
    svm.fit(training_data, training_labels)
    pickle.dump(svm, open(MODEL_FILE, 'wb+'))
    
    match_res = svm.predict(match_data)
    nonmatch_res = svm.predict(nonmatch_data)
    false_neg = np.sum(match_res != 1)
    false_pos = np.sum(nonmatch_res == 1)
    print("Positive data result:", 1 - (false_neg / float(match_data.shape[0])))
    print("Negative data result:", 1 - (false_pos / float(nonmatch_data.shape[0])))

# test the classifier on the dataset in a given folder
def test_classifier(data_dir):
    global scaler, svm
    match_data, nonmatch_data, annotations, imgs = get_data(data_dir)

    match_data = scale(scaler, match_data)
    nonmatch_data = scale(scaler, nonmatch_data)
    match_labels = np.ones(match_data.shape[0], dtype=int)
    nonmatch_labels = np.zeros(nonmatch_data.shape[0], dtype=int)
    training_data = np.concatenate((match_data, nonmatch_data))
    training_labels = np.concatenate((match_labels, nonmatch_labels))

    match_res = svm.predict(match_data)
    nonmatch_res = svm.predict(nonmatch_data)
    false_neg = np.sum(match_res != 1)
    false_pos = np.sum(nonmatch_res == 1)
    print("Positive data result:", 1 - (false_neg / float(match_data.shape[0])))
    print("Negative data result:", 1 - (false_pos / float(nonmatch_data.shape[0])))

In [None]:
train_classifier(use_supp=False, get_stored=False, scaler_type="standardise")

In [None]:
test_classifier("benchmark_velocity_test\\clips\\")

## Classification & Evaluation

In [None]:
# returns coordinates of the sliding windows in an image
def get_sliding_windows(img):
    h, w, _ = img.shape
    start_y = 230
    end_y = 614
    y_range = end_y - start_y
    overlap = 0.5
    scale = 0.5
    size = 256
    num_layers = int((1/(1-overlap))+2)
    
    windows = []
    
    while size >= 32:
        # 3 layers of each window size
        for i in range(num_layers):
            y1 = start_y + int(i*size*(1-overlap))
            y2 = y1 + size
            if y2 > h:
                break
            for x1 in range(0, w-size+1, int(size*(1-overlap))):
                x2 = x1 + size
                windows.append((x1, x2, y1, y2))
        size = int(size*scale)
        start_y += int(size*scale)
    
    return windows

In [None]:
# calculated IOU / Jaccard Index
def intersection_over_union(a, b):
    ax1, ax2, ay1, ay2 = a
    bx1, bx2, by1, by2 = b
    a_area = (ax2 - ax1) * (ay2 - ay1)
    b_area = (bx2 - bx1) * (by2 - by1)

    inter_width = min(ax2, bx2) - max(ax1, bx1)
    inter_height = min(ay2, by2) - max(ay1, by1)
    if inter_width > 0 and inter_height > 0:
        inter_area = inter_width * inter_height
    else:
        inter_area = 0
    
    union_area = a_area + b_area - inter_area
    return inter_area / union_area

# vehicle prediction evaluation
# pred: list of predicted boxes
# gt: list of ground truth boxes
def evaluate(pred, gts, mode='tp'):
    # true positives
    if mode == 'tp':
        num_detected = 0
        for gt in gts:
            num_detected += any([is_overlapping(p, gt) for p in pred])
        return num_detected / len(gts)
    # false positives
    elif mode == 'fp':
        num_fp = len(pred)
        if num_fp == 0:
            return 0
        for p in pred:
            num_fp -= any([is_overlapping(p, gt) for gt in gts])
        return num_fp / len(pred)
    # jaccard index
    elif mode == 'jaccard':
        num_detected = 0
        jaccard_sum = 0
        for p in pred:
            detected = [gt for gt in gts if is_overlapping(p, gt)]
            if detected:
                jaccards = [intersection_over_union(p, d) for d in detected]
                num_detected += len(jaccards)
                jaccard_sum += sum(jaccards)
        if num_detected == 0:
            return 0
        return jaccard_sum / num_detected
    tp_eval = evaluate(pred, gts, mode='tp')
    fp_eval = evaluate(pred, gts, mode='fp')
    jaccard_eval = evaluate(pred, gts, mode='jaccard')
    return (tp_eval, fp_eval, jaccard_eval)

In [None]:
def generate_heatmap(img, windows):
    heatmap = np.zeros(img.shape, dtype=np.uint8)
    # Add 5 to pixel intensity for each window
    for x1, x2, y1, y2 in windows:
        heatmap[y1:y2, x1:x2] += 5
    heatmap = cv2.medianBlur(heatmap, ksize=11)
    heatmap = cv2.dilate(heatmap, np.ones((11,11), dtype=np.uint8))
    # heatmap threshold filtering
    heatmap[heatmap < HEATMAP_THRESH] = 0
    return heatmap

# returns the RGB version of the image and a list of bounding boxes
# representing vehicle detections. Detections can be shown with show_img=True
def detect(img, show_img=False):
    windows = get_sliding_windows(img)
    features = [get_features(img[y1:y2,x1:x2]) for x1, x2, y1, y2 in windows]
    features_scaled = scale(scaler, features)
    predictions = svm.predict(features_scaled)
    windows = [windows[i] for i in np.argwhere(predictions==1)[:,0]]
    heatmap = generate_heatmap(img, windows)
    labelled_heatmap, obj_count = label(heatmap)
    bboxes = []
    
    img = cv2.cvtColor(img, cv2.COLOR_HSV2RGB)
    
    for o in range(obj_count):
        y_vals, x_vals, channels = np.nonzero(labelled_heatmap == o+1)
        x1 = min(x_vals)
        x2 = max(x_vals)
        y1 = min(y_vals)
        y2 = max(y_vals)
        dx = x2 - x1
        dy = y2 - y1
        # filter out obvious wide false positives
        if dx > 300 and dx > 3*dy:
            continue
        bboxes.append((x1, x2, y1, y2))
        cv2.rectangle(img,(x1, y1),(x2, y2),(0,255,0),3)

    if show_img:
        plt.imshow(img)
        plt.axis('off')
        plt.show()
    return (img, bboxes)

# runs the detection and evaluation for each test file in the given folder
def run_evaluations(data_dir, show_img=False):
    match_data, nonmatch_data, annotations, imgs = get_data(data_dir)
    evaluations = []
    for im, im_annotations in zip(imgs, annotations):
        img, bboxes = detect(im.copy(), show_img=show_img)
        evaluation = evaluate(bboxes, im_annotations, mode='all')
        print("True Positive:", evaluation[0])
        print("False Positive:", evaluation[1])
        print("Jaccard Index:", evaluation[2])
        evaluations.append(evaluation)
    print("\n")
    print("True Positives:", sum([e[0] for e in evaluations]) / len(evaluations))
    print("False Positives:", sum([e[1] for e in evaluations]) / len(evaluations))
    print("Jaccard Index:", sum([e[2] for e in evaluations]) / len(evaluations))


In [None]:
run_evaluations("benchmark_velocity_test\\clips\\", show_img=True)

In [None]:
def calc_distance(img):
    x1, x2, y1, y2 = img
    fx = 714.1526
    fy = 710.3725
    x0 = 713.85
    y0 = 327
    car_width = 1.8
    car_height = 1.5
    
    d = car_width * fx / (x2 - x1)
    
    