# Evaluation

This script consists of code that evaluates high level objectives including:
1. detection accuracy, F1 score
2. measurement errors

In [1]:
import cv2
import numpy as np
import os
import helper
import statistics as stat
import json
import pandas as pd
from tqdm import tqdm

%load_ext autoreload
%autoreload 2

## 1. Helper functions

In [3]:
def fit_polygons_to_rotated_bboxes(polygons):
    r"""
    convert polygons to rotated bboxes using cv2.minAreaRect().

    Args:
     - polygons (list): is a list of polygon points [x1, y1, x2, y2,...]
    """
    rbboxes = []
    for p in polygons:
        pts_x = p[::2]
        pts_y = p[1::2]
        pts = [[x, y] for x, y in zip(pts_x, pts_y)]
        pts = np.array(pts, np.float32)
        rect = cv2.minAreaRect(pts)  #  ((cx, cy), (w, h), a)
        rbboxes.append(rect)
    return rbboxes


def check_bbox_intersection(rect1, rect2):
    r"""
    check if there exist an overlap between two bounding boxes.

    Args:
    --------------
    - bbox_1: first bbox ((cx, cy), (w, h), a)
    - bbox_2: second bbox ((cx, cy), (w, h), a)
    """
    output = cv2.rotatedRectangleIntersection(rect1, rect2)

    print(output)


def convert_bbox_to_rbbox(bbox: list[float], angle=False):
    r"""
    convert bbox from format [x,y,w,h] to ((cx,cy), (w,h), a)
    """
    if not angle:
        return ((bbox[0], bbox[1]), (bbox[2], bbox[3]), 0)
    else:
        return ((bbox[0], bbox[1]), (bbox[2], bbox[3]), bbox[4])


def convert_bboxes_to_rbboxes(bboxes: list[list], angle=False):
    r"""
    convert a list of bboxes to a list of rbboxes
    """
    rbboxes = []
    for bbox in bboxes:
        rbboxes.append(convert_bbox_to_rbbox(bbox, angle))
    return rbboxes


def get_ground_truth_polygons(file_name: str) -> list:
    lines = []
    polygons = []
    with open(file_name, "r") as f:
        lines = f.readlines()

    # prepare polygons and texts
    for i, line in enumerate(lines):
        chars = line.split(" ")
        chars = list(map(float, chars))
        polygons.append(chars[1:])

    return polygons


def scale_up_polygons(polygons: list, scale_h: int, scale_w: int):
    for j, p in enumerate(polygons):
        scales = [scale_h if i % 2 != 0 else scale_w for i in range(len(p))]
        polygons[j] = [p[i] * scales[i] for i in range(len(p))]

    return polygons


def scale_up_bboxes(bboxes: list, scale_h: int, scale_w: int):
    output_bboxes = []

    for b in bboxes:
        cx = b[0][0] * scale_w  # scale x
        cy = b[0][1] * scale_h  # scale y
        w = b[1][0] * scale_w  # scale w
        h = b[1][1] * scale_h  # scale h
        a = b[2]
        output_bboxes.append(((cx, cy), (w, h), a))

    return output_bboxes


def get_predict_bboxes(file_name: str) -> tuple[list, list]:
    lines = []
    bboxes = []
    rbboxes = []
    with open(file_name, "r") as f:
        lines = f.readlines()

    # prepare polygons and texts
    is_rbboxes = False
    for line in lines:
        chars = line.split(" ")
        if chars[0] == "rotated_bbox\n":
            is_rbboxes = True
            continue
        chars = list(map(float, chars))
        if not is_rbboxes:
            bboxes.append(chars[1:])
        else:
            rbboxes.append(chars[1:])

    bboxes = convert_bboxes_to_rbboxes(bboxes)
    rbboxes = convert_bboxes_to_rbboxes(rbboxes, angle=True)
    return bboxes, rbboxes


def measurements_to_ratios(measurements: list):
    ratios = []
    for m in measurements:
        r1 = m[0][0] / m[0][1]
        r2 = m[1][0] / m[1][1]

        # make sure the ratio is always between 0 and 1
        if r1 > 1:
            r1 = 1 / r1
        if r2 > 1:
            r2 = 1 / r2

        r = (r1, r2)
        ratios.append(r)
    return ratios


def get_error_metric(measurements: list):
    ratios = measurements_to_ratios(measurements)

    errs = []
    for r in ratios:
        errs.append(abs(r[0] - r[1]))

    if len(errs) < 1:  # no measurement
        err_mean = 0
        err_median = 0
    else:
        err_mean = stat.mean(errs)
        err_median = stat.median(errs)

    if len(measurements) < 2:  # only one measurement
        err_stdev = 0
        err_var = 0
    else:
        err_stdev = stat.stdev(errs)
        err_var = stat.variance(errs)

    return (err_mean, err_stdev, err_var, err_median)


def evaluate(img_filename, label_filename, pred_filename) -> dict:
    r"""
    This is the evaluation code that returns a dictionary consisting of:
    (precision, recall, err_mean, err_stdev, err_var, err_median)
    Args
    -------------
    - img_filename
    - label_filename
    - pred_filename
    """

    # read image
    im = cv2.imread(img_filename)
    scale_h, scale_w = im.shape[0], im.shape[1]

    # Get ground truth rbboxes
    polygons_true = get_ground_truth_polygons(label_filename)
    polygons_true = scale_up_polygons(polygons_true, scale_h, scale_w)
    rbboxes_true = helper.fit_polygons_to_rotated_bboxes(polygons_true)

    # Get predict bboxes (detection output) and rbboexes (from mask)
    bboxes_pred, rbboxes_pred = get_predict_bboxes(pred_filename)
    bboxes_pred = scale_up_bboxes(bboxes_pred, scale_h, scale_w)

    # Now we have below three lists
    #
    # - masks_truth: ground truth masks converted into rbboxes
    # - bboxes_pred: predict bboxes
    # - masks_pred: predict masks converted into rbboxes
    #
    # We can then use cv2.rotatedRectangleIntersection(rect1, rect2) to see if check their pairwise intersections.
    #
    # test = cv2.rotatedRectangleIntersection(rect1, rect2)
    # test[0] == 1 indicates rect1 and rect2 are intersect; other wise not intersect

    total_stomata = 0
    total_detected = 0
    measurements = []  # measurement = ((h_true, w_true),(h_pred, w_pred))
    for r_true in rbboxes_true:
        total_stomata += 1
        for b_pred in bboxes_pred:  # check if stomata is detected
            test_detect = cv2.rotatedRectangleIntersection(r_true, b_pred)
            if test_detect[0] == 1:  # stomata is detected in prediction
                total_detected += 1
                for (
                    r_pred
                ) in rbboxes_pred:  # check if the predicted mask exists for the stomata
                    test_mask = cv2.rotatedRectangleIntersection(b_pred, r_pred)
                    if test_mask[0] == 1:  # find the corresponding mask
                        # compute height and weight
                        m = ((r_true[1][0], r_true[1][1]), (r_pred[1][0], r_pred[1][1]))
                        measurements.append(m)
                        break
                break

    precision = total_detected / len(bboxes_pred)
    recall = total_detected / total_stomata
    err_mean, err_stdev, err_var, err_median = get_error_metric(measurements)

    result = {
        "precision": precision,
        "recall": recall,
        "err_mean": err_mean,
        "err_stdev": err_stdev,
        "err_var": err_var,
        "err_median": err_median,
    }

    return result


def inference_to_dataframe(img_filename, pred_filename) -> tuple[str, pd.DataFrame]:
    r"""tp
    This is the inferencecode that convert model inference outputs to
    Args
    -------------
    - img_filename
    - label_filename
    - pred_filename
    """
    img_name = os.path.splitext(img_filename)[0].split("/")[-1]

    # read image
    im = cv2.imread(img_filename)
    scale_h, scale_w = im.shape[0], im.shape[1]

    # Get predict bounding bboxes
    bboxes_pred, rbboxes_pred = get_predict_bboxes(pred_filename)
    bboxes_pred = scale_up_bboxes(bboxes_pred, scale_h, scale_w)

    measurements = []  # measurement = (id, long_axis, short_axis, ratio)

    for idx, r_pred in enumerate(rbboxes_pred):
        # Get long and short axis
        if r_pred[1][0] > r_pred[1][1]:
            long_axis, short_axis = r_pred[1][0], r_pred[1][1]
        else:
            long_axis, short_axis = r_pred[1][1], r_pred[1][0]

        ratio = short_axis / long_axis
        measurements.append((idx, scale_h, scale_w, long_axis, short_axis, ratio))

    df = pd.DataFrame(
        measurements,
        columns=["id", "img_height", "img_width", "long_axis", "short_axis", "ratio"],
    )

    return img_name, df

In [None]:
data_dir = "../../../google-drive/stomaVDP/2023-all-337"
label_dir = os.path.join(data_dir, "labels", "train")
img_dir = os.path.join(data_dir, "images", "train")

img_file = "CK005.jpg"
img_filename = os.path.join(img_dir, img_file)
true_file = "CK005.txt"
true_filename = os.path.join(label_dir, true_file)

pred_filename = "../runs/predict-seg/exp45/labels/CK005.txt"

img_name, df = inference_to_dataframe(img_filename, pred_filename)
print(img_name)
df["img_name"] = img_name
df

In [4]:
img_dir = "/mnt/linux/abrc/abrc/dataset/stomaVDP/2023-all-new-337/images/val"
pred_dir = "../runs/predict-seg/exp2/labels"
output_file = "../runs/predict-seg/exp2/labels.csv"

pred_files = os.listdir(pred_dir)
img_files = os.listdir(img_dir)

imgs = []
for f in pred_files:
    filename = os.path.splitext(f)[0]
    imgs.append([img for img in img_files if filename in img][0])


print(f"{len(pred_files)} inference files matches with images...")
# print(pred_files)
print(f"{len(imgs)} images to process...")
# print(imgs)

result_dfs = []
for pred, img in tqdm(zip(pred_files, imgs)):
    img_filename = os.path.join(img_dir, img)
    pred_filename = os.path.join(pred_dir, pred)

    filename, df = inference_to_dataframe(img_filename, pred_filename)
    df["img_name"] = filename
    result_dfs.append(df)


df_output = pd.concat(result_dfs, axis=0, ignore_index=True)
df_output.to_csv(output_file)

69 inference files matches with images...
69 images to process...


69it [00:02, 27.40it/s]


## 2. Main function for one file (cell for development)

In [None]:
# This cell is only used for code development.
# Use next one indead.

data_dir = "../../../google-drive/stomaVDP/2023-all-337"
label_dir = os.path.join(data_dir, "labels", "train")
img_dir = os.path.join(data_dir, "images", "train")

img_file = "CK005.jpg"
img_file_name = os.path.join(img_dir, img_file)
ground_truth_file = "CK005.txt"
file_name = os.path.join(label_dir, ground_truth_file)

# Read image
im = cv2.imread(img_file_name)
scale_h, scale_w = im.shape[0], im.shape[1]

# Get ground-truth bboxes
polygons_true = get_ground_truth_polygons(file_name)
polygons_true = scale_up_polygons(polygons_true, scale_h, scale_w)
bboxes_true = helper.fit_polygons_to_rotated_bboxes(polygons_true)

# Get predict bboxes
predict_file = "../runs/predict-seg/exp45/labels/CK005.txt"
bboxes_pred, rbboxes_pred = get_predict_bboxes(predict_file)
bboxes_pred = scale_up_bboxes(bboxes_pred, scale_h, scale_w)

# Now we have below three lists
#
# - masks_truth: ground truth masks converted into rbboxes
# - bboxes_pred: predict bboxes
# - masks_pred: predict masks converted into rbboxes
#
# We can then use cv2.rotatedRectangleIntersection(rect1, rect2) to see if check their pairwise intersections.
#
# test = cv2.rotatedRectangleIntersection(rect1, rect2)
# test[0] == 1 indicates rect1 and rect2 are intersect; other wise not intersect

total_stomata = 0
total_detected = 0
measurements = []  # measurement = ((h_true, w_true),(h_pred, w_pred))
for b_true in bboxes_true:
    total_stomata += 1
    for b_pred in bboxes_pred:  # check if stomata is detected
        test_detect = cv2.rotatedRectangleIntersection(b_true, b_pred)
        if test_detect[0] == 1:  # stomata is detected in prediction
            total_detected += 1
            for (
                r_pred
            ) in rbboxes_pred:  # check if the predicted mask exists for the stomata
                test_mask = cv2.rotatedRectangleIntersection(b_pred, r_pred)
                if test_mask[0] == 1:  # find the corresponding mask
                    # compute height and weight
                    m = ((b_true[1][0], b_true[1][1]), (r_pred[1][0], r_pred[1][1]))
                    measurements.append(m)
                    break
            break

precision = total_detected / len(bboxes_pred)
recall = total_detected / total_stomata
err_mean, err_stdev, _, err_median = get_error_metric(measurements)
print("==== exp results ====")
print(f"precision: {precision}, recall: {recall}")
print(measurements)
print(f"err_mean: {err_mean}, err_stdev: {err_stdev}, err_median: {err_median}")

## Main function that iterate through validation set

In [5]:
data_dir = "/mnt/linux/abrc/abrc/dataset/stomaVDP/2023-all-new-337"
label_dir = os.path.join(data_dir, "labels", "val")
img_dir = os.path.join(data_dir, "images", "val")

pred_dir = "../runs/predict-seg/exp2/labels"
output_file = "../runs/predict-seg/exp2/results.json"


label_files = os.listdir(label_dir)
pred_files = os.listdir(pred_dir)
img_files = os.listdir(img_dir)

inter_files = list(set(label_files) & set(pred_files))

imgs = []
for f in inter_files:
    filename = os.path.splitext(f)[0]
    imgs.append([img for img in img_files if filename in img][0])


print(len(inter_files))
print(len(imgs))
print(inter_files)
print(imgs)
results = []
for label, img in tqdm(zip(inter_files, imgs)):
    img_filename = os.path.join(img_dir, img)
    label_filename = os.path.join(label_dir, label)
    pred_filename = os.path.join(pred_dir, label)

    result = evaluate(img_filename, label_filename, pred_filename)
    result["name"] = os.path.splitext(img)[0]
    results.append(result)

with open(output_file, "w") as f:
    json.dump(results, f, indent=4)

69
69
['10090.txt', 'u_tr_242.txt', '0615_08-04.3.5.txt', '1.txt', 'c_tr_91_cut_want.txt', '0615_07-10.2.1.txt', 'c_tr_97_cut_want.txt', '5.txt', '0615_03-14.2.2.txt', '34.txt', '0611_05-12.3.5.txt', 'u_tr_160.txt', '0615_03-05.2.2.txt', 'u_tr_95.txt', 'BE_20X007.txt', '0611_09-14.2.2.txt', 'u_tr_279.txt', 'u_tr_39.txt', 'c_tr_24_cut_want.txt', 'p_tr_102.txt', 'u_tr_299.txt', '0615_03-02.1.1.txt', '0611_12-11.3.2.txt', '0611_10-6.2.5.txt', 'u_tr_209.txt', 'p_tr_30.txt', '0611_12-03.3.4.txt', 'BE_20X002.txt', '197.txt', 'c_tr_41_cut_want.txt', '0615_09-8.3.4.txt', 'C1L006.txt', 'BE_20X015.txt', '0611_16-14.2.2.txt', '150.txt', 'p_tr_98.txt', 'u_tr_245.txt', '10394.txt', '0615_12-13.3.2.txt', 'u_tr_113.txt', '0615_03-13.1.3.txt', '0611_07-08.2.2.txt', 'C1L002.txt', '0615_10-02.1.1.txt', 'p_tr_53.txt', 'u_tr_149.txt', 'u_tr_55.txt', 'p_tr_36.txt', 'p_tr_10.txt', 'p_tr_112.txt', 'u_tr_212.txt', '0611_07-01.3.3.txt', '10356.txt', '190.txt', 'c_tr_5_cut_want.txt', 'p_tr_113.txt', 'BE_20X008.

69it [00:03, 21.84it/s]


In [1]:
import pandas as pd

output_file = "../runs/predict-seg/exp2/results.json"

df = pd.read_json(output_file)

In [3]:
df = df.sort_values(by="name")
df.to_csv("../runs/predict-seg/exp2/results.csv")