In [None]:
import cv2
import time
import numpy as np
from pathlib import Path

from tools import readPredictions, getFrames, readLines, readImage, line_to_points

pixel_thresh = 20
pt_thresh = 0.85

In [None]:
def get_radian(xs, y_samples):
    if len(xs) > 1:
        slope, intercept = np.polyfit(xs, y_samples, 1)
        theta = np.arctan(slope)
    else:
        theta = 0
    return theta

In [None]:
def line_accuracy(pred, gt, thresh) -> float:
    '''
    pred: predicted slope and intercept
    gt: ground truth line = [(x1, y1), (x2, y2), ...]
    return: a value between [0, 1]
    '''
    gt_xs = gt[0]
    y_samples = gt[1]
    pred_xs = line_to_points(*pred, y_samples[0], y_samples[-1])
    # print("gt_xs", gt_xs[:10])
    # print("xs", pred_xs[:10])
    # print(f"---{thresh = }, {len(y_samples) = }, {np.where(np.abs(gt_xs - pred_xs) < thresh, 1., 0.)[:10]}")
    return np.sum(np.where(np.abs(gt_xs - pred_xs) < thresh, 1., 0.)) / len(y_samples)

In [None]:
def bench_one_frame(pred, gt) -> list:
    '''
    pred: predicted lines = ((ls, li) (rs, ri))
    gt: ground truth lines = [[(x1,x2,...), (y1,y2,...)], [...], ...]

    return: a list of accuracy of and right prediction
    '''
    radians = [get_radian(lane[0], lane[1]) for lane in gt]
    threshs = [pixel_thresh / np.abs(np.sin(radian)) for radian in radians]
    line_accs = []
    # fp, fn = 0., 0.
    # matched = 0.
    for pred_lane in pred:
        accs = [line_accuracy(pred_lane, lane, thresh) for lane, thresh in zip(gt, threshs)]
        # print(pred_lane, accs)
        max_acc = np.max(accs) if len(accs) > 0 else 0.
        # # fp fn will be removed
        # if max_acc < pt_thresh:
        #     fn += 1
        # else:
        #     matched += 1
        line_accs.append(max_acc)
    # fp = len(pred) - matched
    # if len(gt) > 4 and fn > 0:
    #     fn -= 1
    return line_accs
    s = sum(line_accs)
    if len(gt) > 4:
        s -= min(line_accs)
    return s / max(min(4.0, len(gt)), 1.), fp / len(pred) if len(pred) > 0 else 0., fn / max(min(len(gt), 4.) , 1.)

In [None]:
def drawPredictions(img, predicted_lanes):
    for predicted_lane in predicted_lanes:
        lane_points = line_to_points(*predicted_lane, 300, 590, x_only=False).astype(np.int32)
        for point in lane_points:
            cv2.circle(img, point, 3, (0, 0, 255))

In [None]:
def bench_one_video(pred_file: str, gt_dir: Path, show: bool = False, wait_time: int = 0):
    predReader = readPredictions(pred_file)
    #accuracy, fp, fn = 0., 0., 0.
    accuracy = []

    frameNames = getFrames(str(gt_dir))
    for frameName in frameNames:
        pred_lanes = np.array(next(predReader))
        gt_lanes = [np.array(line).T for line in readLines(str(gt_dir / (frameName[:-4] + ".lines.txt")))]
        a = bench_one_frame(pred_lanes, gt_lanes)
        accuracy += a
        
        if show:
            img = readImage(str(gt_dir / frameName), annotation=True)
            drawPredictions(img, pred_lanes)
            cv2.putText(img, f"{a[0]:.2f}", (400, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255),2)
            cv2.putText(img, f"{a[1]:.2f}", (1000, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255),2)
            cv2.imshow("judge", img)
            time.sleep(0.5)
            if cv2.waitKey(wait_time) & 0xFF == ord('q'):
                break
        # accuracy += a
        # fp += p
        # fn += n
    # num = len(gts)
    # # the first return parameter is the default ranking parameter
    # return json.dumps([
    #     {'name': 'Accuracy', 'value': accuracy / num, 'order': 'desc'},
    #     {'name': 'FP', 'value': fp / num, 'order': 'asc'},
    #     {'name': 'FN', 'value': fn / num, 'order': 'asc'}
    # ])
    cv2.destroyAllWindows()
    return accuracy

In [None]:
def bench_a_list_of_video(pred_dir: str, video_list: str, show: bool = False):
    '''
    ## Parameters
    `pred_dir`: the directory containing prediction files.
    `video_list`: the text file containg the paths of .MP4 directories.
    `show`: whether to visualize judging process.
    '''
    pred_dir = Path(pred_dir)
    videoDirs = []
    with open(video_list, "r") as fin:
        for line in fin:
            videoDirs.append(Path(line.strip()))
    
    accuaracy = []
    for videoDir in videoDirs:
        pred_file = str(pred_dir / (videoDir.name + ".prediction.txt"))
        print(pred_file)
        accuaracy += bench_one_video(pred_file, videoDir, show)
    
    accuaracy = np.array(accuaracy[10:])
    total_lines = len(accuaracy)
    good_line_number = np.sum(np.where(accuaracy > pt_thresh, 1, 0))
    print(f"well detected lines: {good_line_number / total_lines}% ({good_line_number}/{total_lines})")
    return np.average(accuaracy)

In [None]:
pred_dir = r"..\prediction"
video_list = r"..\video_list.txt"
bench_a_list_of_video(pred_dir, video_list, show=True)

In [None]:
line_to_points(1, 1, 50, 20)

In [None]:
pred_dir = r"..\prediction"
video_list = r"..\video_list2.txt"
bench_a_list_of_video(pred_dir, video_list)