In [1]:
import numpy as np
import pandas as pd
from tqdm import tqdm
import os
import cv2 as cv
import tensorflow as tf
from PIL import Image

In [2]:
def watershed_transformation(prediction):
    kernel = np.array([[1, 1, 1], [1, -8, 1], [1, 1, 1]], dtype=np.float32)

    imgLaplacian = cv.filter2D(prediction, cv.CV_32F, kernel)
    sharp = np.float32(prediction)
    imgResult = sharp - imgLaplacian

    imgResult = np.clip(imgResult, 0, 255)
    imgResult = imgResult.astype('uint8')

    bw = cv.cvtColor(prediction, cv.COLOR_BGR2GRAY)
    _, bw = cv.threshold(bw, 40, 255, cv.THRESH_BINARY | cv.THRESH_OTSU)

    dist = cv.distanceTransform(bw, cv.DIST_L2, 3)
    cv.normalize(dist, dist, 0, 1.0, cv.NORM_MINMAX)
    _, thresholded = cv.threshold(dist, 0.2, 1.0, cv.THRESH_BINARY)

    kernel1 = np.ones((3, 3), dtype=np.uint8)
    dilated = cv.dilate(thresholded, kernel1)

    dist_8u = dilated.astype('uint8')
    contours, _ = cv.findContours(dist_8u, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    markers = np.zeros(dilated.shape, dtype=np.int32)

    for i in range(len(contours)):
        cv.drawContours(markers, contours, i, (i + 1), -1)

    cv.circle(markers, (5, 5), 3, (255, 255, 255), -1)
    cv.watershed(imgResult, markers)

    return markers


def post_processing(prediction):
    return watershed_transformation(prediction)

In [3]:
def dice_coef_logical(true_positives, false_positives, false_negatives):
    if true_positives + false_negatives + false_positives == 0:
        return 1
    return (2. * true_positives) / (2. * true_positives + false_positives + false_negatives)


def dice_coef(y_true, y_pred, smooth=1.0):
    y_true_f = y_true.flatten()
    y_pred_f = y_pred.flatten()
    intersection = np.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (np.sum(y_true_f) + np.sum(y_pred_f) + smooth)


def iou(y_true, y_pred, smooth=1.0):
    y_true_f = y_true.flatten()
    y_pred_f = y_pred.flatten()
    intersection = np.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (np.sum(y_true_f) + np.sum(y_pred_f) + smooth)


def compute_iou_matrix(markers, instances):
    labels = np.unique(markers)

    labels = labels[labels < 255]
    labels = labels[labels > 0]

    iou_matrix = np.zeros((len(labels), len(instances)), dtype=np.float32)

    for i, label in enumerate(labels):
        prediction_instance = (markers == label).astype(int)

        for j, ground_truth_instance in enumerate(instances):
            iou_value = iou(prediction_instance, ground_truth_instance)
            iou_matrix[i, j] = iou_value

    return iou_matrix


def compute_metric_at_thresholds(iou_matrix):
    dices = []
    if iou_matrix.shape == (0, 0):
        return 1
    for threshold in np.arange(0.5, 1, 0.05):
        true_positives = (iou_matrix.max(axis=1) > threshold).sum()
        false_positives = (iou_matrix.max(axis=1) <= threshold).sum()
        false_negatives = (iou_matrix.max(axis=0) <= threshold).sum()
        dices.append(dice_coef_logical(true_positives, false_positives, false_negatives))
    return np.average(dices)

In [4]:
def evaluate(dataset_path, predictions_path, test_df_path, output_name):
    filenames = pd.read_csv(test_df_path)

    metrics = []
    dices = []
    
    writer = tf.python_io.TFRecordWriter(
        os.path.join(os.path.dirname(predictions_path), output_name + '.tfrecords'))

    for ind, filename in tqdm(filenames.iterrows()):

        prediction = cv.imread(os.path.join(predictions_path, filename["image_name"]) + ".png")
        image = cv.imread(os.path.join(dataset_path, "images", filename["image_name"]) + ".png")
        mask = cv.imread(os.path.join(dataset_path, "masks", filename["image_name"]) + ".png")

        img_size = image.shape
        instances = []

        image_instances_path = os.path.join(dataset_path, "instance_masks", filename["image_name"])

        for instance_name in os.listdir(image_instances_path):
            if ".png" in instance_name and ".xml" not in instance_name:
                rgb_instance = cv.imread(os.path.join(image_instances_path, instance_name))
                bw_instance = cv.cvtColor(rgb_instance, cv.COLOR_BGR2GRAY)
                instances.append(bw_instance)

        markers = post_processing(prediction)
        iou_matrix = compute_iou_matrix(markers, instances)
        metric = compute_metric_at_thresholds(iou_matrix)
        metrics.append(metric)

        img_raw = Image.fromarray(np.uint8(image), 'RGB').tobytes()
        msk_raw = Image.fromarray(np.uint8(mask), 'RGB').tobytes()
        pred_raw = Image.fromarray(np.uint8(prediction), 'RGB').tobytes()

        dice_score = dice_coef(mask, (prediction / 255) > 0.5)

        dices.append(dice_score)
        
        example = tf.train.Example(features=tf.train.Features(feature={
            "img_height": tf.train.Feature(int64_list=tf.train.Int64List(value=[int(img_size[1])])),
            "img_width": tf.train.Feature(int64_list=tf.train.Int64List(value=[int(img_size[0])])),
            "dice_score": tf.train.Feature(float_list=tf.train.FloatList(value=[dice_score])),
            "img_raw": tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
            "mask_raw": tf.train.Feature(bytes_list=tf.train.BytesList(value=[msk_raw])),
            "pred_raw": tf.train.Feature(bytes_list=tf.train.BytesList(value=[pred_raw])),
            "metric": tf.train.Feature(float_list=tf.train.FloatList(value=[metric])),
            "img_name": tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(filename["image_name"])])),
            "msk_name": tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(filename["image_name"])])),
            "pred_name": tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(filename["image_name"])])),
        }))
        writer.write(example.SerializeToString())

    print("Metrics value - {0}".format(round(np.average(metrics), 4)))
    print("Average dice score - {0}".format(round(np.average(dices), 4)))

In [5]:
dataset_path = "../../test_data/preprocessed_data/20160103_66979721-be1b-4451-84e0-4a573236defd_rgb/"
prediction_path = "../../test_data/preprocessed_data/20160103_66979721-be1b-4451-84e0-4a573236defd_rgb/predictions/"

# Dataframe with test image names 
test_df_path = "../../forest_cutting/scripts/20160103_66979721-be1b-4451-84e0-4a573236defd_rgb_test.csv"
output_name = "prediction_metrics"

In [6]:
evaluate(dataset_path, prediction_path, test_df_path, output_name)

50it [00:00, 53.24it/s]

Metrics value - 0.2845
Average dice score - 0.641



