# F2 metric

In [1]:
import torch
import numpy as np
from scipy.ndimage import label

## Implementations found online

F2 by Rares Barbantan: https://www.kaggle.com/raresbarbantan/f2-metric

Auxiliar code

In [2]:
def iou(img_true, img_pred):
    i = np.sum((img_true*img_pred) >0)
    u = np.sum((img_true + img_pred) >0) + 0.0000000000000000001  # avoid division by zero
    return i/u

F2 metric

In [3]:
thresholds = [0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95]

def f2_rares(masks_true, masks_pred):
    # a correct prediction on no ships in image would have F2 of zero (according to formula),
    # but should be rewarded as 1
    if np.sum(masks_true) == np.sum(masks_pred) == 0:
        return 1.0
    
    f2_total = 0
    for t in thresholds:
        tp,fp,fn = 0,0,0
        ious = {}
        for i,mt in enumerate(masks_true):
            found_match = False
            for j,mp in enumerate(masks_pred):
                miou = iou(mt, mp)
                ious[100*i+j] = miou # save for later
                if miou >= t:
                    found_match = True
            if not found_match:
                fn += 1
                
        for j,mp in enumerate(masks_pred):
            found_match = False
            for i, mt in enumerate(masks_true):
                miou = ious[100*i+j]
                if miou >= t:
                    found_match = True
                    break
            if found_match:
                tp += 1
            else:
                fp += 1
        f2 = (5*tp)/(5*tp + 4*fn + fp)
        f2_total += f2
    
    return f2_total/len(thresholds)

F2 by Iafoss: https://www.kaggle.com/iafoss/unet34-submission-tta-0-699-new-public-lb

In [4]:
def f2_iafoss(true, pred):
    n_th = 10
    b = 4
    thresholds = [0.5 + 0.05*i for i in range(n_th)]
    n_masks = len(true)
    n_pred = len(pred)
    ious = []
    score = 0
    for mask in true:
        buf = []
        for p in pred: buf.append(iou(mask, p))
        ious.append(buf)
    for t in thresholds:   
        tp, fp, fn = 0, 0, 0
        for i in range(n_masks):
            match = False
            for j in range(n_pred):
                if ious[i][j] > t: match = True
            if not match: fn += 1
        
        for j in range(n_pred):
            match = False
            for i in range(n_masks):
                if ious[i][j] > t: match = True
            if match: tp += 1
            else: fp += 1
        score += ((b+1)*tp)/((b+1)*tp + b*fn + fp)       
    return score/n_th

F2 by Mark Ayzenshtadt: https://www.kaggle.com/markup/f2-metric-optimized

In [5]:
def f2_mark(masks_true, masks_pred):
    if np.sum(masks_true) == 0:
        return float(np.sum(masks_pred) == 0)
    
    ious = []
    mp_idx_found = []
    for mt in masks_true:
        for mp_idx, mp in enumerate(masks_pred):
            if mp_idx not in mp_idx_found:
                #print("mt, mp:", mt, mp)
                cur_iou = iou(mt,mp)
                if cur_iou > 0.5:
                    ious.append(cur_iou)
                    mp_idx_found.append(mp_idx)
                    break
    f2_total = 0
    for th in thresholds:
        tp = sum([iou > th for iou in ious])
        fn = len(masks_true) - tp
        fp = len(masks_pred) - tp
        f2_total += (5*tp)/(5*tp + 4*fn + fp)

    return f2_total/len(thresholds)

Tests:

In [6]:
target = np.zeros((2, 5, 5))
target[0, :, 0] = 1
target[0, 2, :] = 1
target[1, :, 2] = 1
prediction = np.zeros((2, 5, 5))
prediction[0, :, 0] = 1
prediction[1, :, 2] = 1

print("Target:\n", target)
print("Predictions:\n", prediction)
print("F2 Rares:\n", f2_rares(target, prediction))
print("F2 Mark:\n", f2_mark(target, prediction))
print("F2 Iafoss:\n", f2_iafoss(target, prediction))

Target:
 [[[1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [1. 1. 1. 1. 1.]
  [1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]]

 [[0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]]]
Predictions:
 [[[1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]]

 [[0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]]]
F2 Rares:
 0.6
F2 Mark:
 0.6
F2 Iafoss:
 0.6


# Metric implementation

In [7]:
def to_onehot_np(y, num_classes=None, axis=0, dtype="float32"):
    """Converts a class numpy.ndarray (integers) to a one hot numpy.ndarray.

    Modified from: https://github.com/keras-team/keras/blob/master/keras/utils/np_utils.py#L9

    Arguments:
        y (numpy.ndarray): array of integer values in the range
            [0, num_classes - 1] to be one hot encoded.
        num_classes (int, optional): total number of classes. If set to None,
            num_classes = max(y) + 1. Default: None.
        axis (int, optional): the axis where the one hot classes are encoded.
            E.g. when set to 1 and the size of y is (5, 5) the output is
            (5, num_classes, 5). Default: 0.
        dtype (torch.dtype, optional): the output data type, as a string (float32,
            float64, int32...). Default: float32.

    Returns:
        A one hot representation of the input numpy.ndarray.
    """
    y = np.array(y, dtype="int")
    if not num_classes:
        num_classes = np.max(y) + 1
    elif np.amax(y) > num_classes - 1 or np.amin(y) < 0:
        raise ValueError("y values outside range [0, {}]".format(num_classes - 1))

    input_shape = y.shape
    y = y.ravel()
    n = y.shape[0]
    output_shape = list(input_shape)
    output_shape.append(num_classes)
    axis_order = list(range(len(input_shape)))
    axis_order.insert(axis, -1)

    categorical = np.zeros((n, num_classes), dtype=dtype)
    categorical[np.arange(n), y] = 1
    categorical = np.reshape(categorical, output_shape)

    return np.transpose(categorical, axis_order)

def split_ships(input, max_ships=30, on_max_error=False):
    """Takes a mask of ships and splits them into different individual masks.

    Uses a structuring element to define connected blobs (ships in this case),
    scipy.ndimage.label does all the work.
    See: https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.label.html

    Arguments:
        input (numpy.ndarray): the mask of ships to split with size (H, W).
        min_size(int, optional): only blobs above this size in pixels are labeled as
            ships, essentially noise removal. Default: 18.
        max_ships_error (int, optional): maximum number of ships allowed in a single
            image. If surpassed, a ValueError is raised. Default: 100.

    Returns:
        numpy.ndarray: the masks of individual ships with size (n, H, W), where n is the
        number of ships. If there are no ships, returns a array of size (1, H, W) filled
        with zeros.

    """
    # The background is also labeled
    max_blobs = max_ships + 1

    # No blobs/ships, return empty mask
    if np.sum(input) == 0:
        return np.expand_dims(input, 0)

    # Labels blobs/ships in the image
    labeled_ships, num_ships = label(input)
    if num_ships > max_blobs:
        if on_max_error:
            raise ValueError(
                "too many ships found {}, expect a maximum of {}".format(
                    num_ships, max_ships
                )
            )
        else:
            # Compute the size of each labeled blob and get the corresponding size so
            # that only max_blobs remain
            blob_sizes = np.bincount(labeled_ships.ravel())
            sorted_blob_sizes = np.sort(blob_sizes)
            min_size = sorted_blob_sizes[-max_blobs]
            too_small = blob_sizes < min_size

            # Labels that are below min_size are set to background, the remaining
            # objects are relabeled
            mask = too_small[labeled_ships]
            labeled_ships[mask] = 0
            labeled_ships, num_ships = label(labeled_ships)

    # For convenience, each ship is isolated in an image. Achieving this is equivalent
    # to converting labeled_ships into its one hot form and then removing the first
    # channel which is the background
    out = to_onehot_np(labeled_ships, num_ships + 1)[1:]

    return out

Tests:

In [8]:
target = np.random.randint(2, size=(16, 16))
prediction = np.random.randint(2, size=(16, 16))
print("Target:\n", target)
print("Predictions:\n", prediction)
print("Split ships target:\n", split_ships(target, max_ships=5).shape)
print()
print()
try:
    split_ships(prediction, max_ships=5, on_max_error=True)
except ValueError as e:
    print("valueError raised:\n", str(e))

Target:
 [[0 1 1 0 1 1 1 0 1 1 1 0 0 1 1 0]
 [1 0 1 1 0 0 0 0 1 1 1 1 0 1 0 0]
 [1 0 0 0 1 1 0 0 0 1 1 0 0 1 1 1]
 [1 1 1 1 0 1 0 1 0 1 1 0 1 1 1 0]
 [0 1 1 0 0 1 0 1 0 1 0 1 0 1 0 1]
 [0 0 1 0 1 0 0 1 1 0 0 1 0 1 1 0]
 [1 1 1 1 1 0 0 0 1 1 0 1 1 1 0 1]
 [1 1 0 1 1 1 0 0 1 0 0 0 1 0 0 0]
 [1 1 1 0 1 0 1 1 1 0 0 1 1 1 1 0]
 [1 1 1 0 0 1 0 1 1 1 0 0 1 1 1 1]
 [1 1 1 0 1 0 1 0 0 0 0 1 1 1 1 0]
 [0 0 0 1 1 0 1 0 1 1 1 1 1 1 0 0]
 [0 1 1 1 1 0 0 0 0 1 1 0 0 1 0 1]
 [1 1 0 0 1 0 0 0 1 1 1 0 0 1 1 1]
 [0 0 0 1 0 1 0 0 1 1 1 1 1 0 1 1]
 [1 1 1 1 0 0 1 1 1 0 1 0 0 1 1 0]]
Predictions:
 [[1 1 0 1 0 0 0 1 0 0 1 1 0 0 0 1]
 [0 0 1 0 0 1 1 0 0 0 1 1 1 1 1 0]
 [1 1 0 1 0 0 1 1 0 0 0 0 0 0 1 1]
 [1 1 1 1 1 0 0 0 1 0 0 1 0 1 1 1]
 [1 0 1 0 0 1 1 1 0 0 1 0 1 1 0 0]
 [1 1 1 0 0 1 0 1 0 0 1 1 1 1 0 1]
 [1 1 0 1 0 0 0 0 0 1 0 0 1 1 1 0]
 [0 1 1 1 0 0 0 0 1 0 0 0 0 1 1 0]
 [1 1 1 0 1 1 0 0 0 0 1 1 0 0 0 1]
 [1 0 1 1 1 0 1 0 0 1 1 0 0 0 0 1]
 [0 1 1 1 1 1 1 1 1 0 1 1 1 1 1 0]
 [0 1 0 0 0 1 1 1 1 0 0 0 1 0 1

In [9]:
target = np.zeros((5, 5))
target[:, 0] = 1
target[2, 2:] = 1
target[:, 2] = 1
prediction = np.array([[0, 0, 1, 1, 1, 0], [2, 2, 0, 0, 3, 3], [0, 0, 0, 0, 3, 3]])

print("Target:\n", target)
print("Predictions:\n", prediction)
print("Split ships target:\n", split_ships(target))
print()
print()
print("Split ships prediction:\n", split_ships(prediction))

Target:
 [[1. 0. 1. 0. 0.]
 [1. 0. 1. 0. 0.]
 [1. 0. 1. 1. 1.]
 [1. 0. 1. 0. 0.]
 [1. 0. 1. 0. 0.]]
Predictions:
 [[0 0 1 1 1 0]
 [2 2 0 0 3 3]
 [0 0 0 0 3 3]]
Split ships target:
 [[[1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]]

 [[0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 1. 1.]
  [0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0.]]]


Split ships prediction:
 [[[0. 0. 1. 1. 1. 0.]
  [0. 0. 0. 0. 1. 1.]
  [0. 0. 0. 0. 1. 1.]]

 [[0. 0. 0. 0. 0. 0.]
  [1. 1. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 0.]]]


Metric class:

In [10]:
def f_score(
    prediction_masks, target_masks, beta=2, thresholds=np.arange(0.5, 1, 0.05)
):
    # If the target is empty return 1 if the prediction is also empty; otherwise
    # return 0
    if np.sum(target_masks) == 0:
        return float(np.sum(prediction_masks) == 0)

    iou_arr = []
    pred_idx_found = []
    for target in target_masks:
        for pred_idx, pred in enumerate(prediction_masks):
            # Check if this prediction mask has already been matched to a target mask
            if pred_idx not in pred_idx_found:
                curr_iou = iou(pred, target)
                if curr_iou > np.min(thresholds):
                    iou_arr.append(curr_iou)
                    # Matched a prediction with a target, remember the index so we don't
                    # match it to another target mask
                    pred_idx_found.append(pred_idx)
                    break

    # F score computation
    fscore_total, tp, fn, fp = 0, 0, 0, 0
    beta_sq = beta * beta
    iou_np = np.array(iou_arr)
    for th in thresholds:
        tp = np.sum(iou_np > th)
        fp = len(prediction_masks) - tp
        fn = len(target_masks) - tp
        fscore_total += (1 + beta_sq) * tp / ((1 + beta_sq) * tp + beta_sq * fn + fp)

    return fscore_total / len(thresholds)


class AirbusFScoreApprox():    
    def __init__(
        self,
        beta=2,
        thresholds=np.arange(0.5, 1, 0.05),
        max_ships=30,
        name="fscore_approx",
    ):
        self.thresholds = thresholds
        self.beta = beta
        self.max_ships = max_ships
        self.fscore_history = []

    def reset(self):
        self.fscore_history = []

    def add(self, predicted, target):
        # Parameter check
        if predicted.size() != target.size():
            raise ValueError(
                "size mismatch, {} != {}".format(predicted.size(), target.size())
            )
        elif tuple(predicted.unique(sorted=True)) not in [(0, 1), (0,), (1,)]:
            raise ValueError("predicted values are not binary")
        elif tuple(target.unique(sorted=True)) not in [(0, 1), (0,), (1,)]:
            raise ValueError("target values are not binary")

        # Flatten the tensor and convert to numpy
        predicted = predicted.squeeze().cpu().numpy()
        target = target.squeeze().cpu().numpy()

        for p, t in zip(predicted, target):
            # Try to split the segmentation mask in into one mask per ship
            # This process might raise an error if too many ships are found, especially
            # during the early stages of training.
            predicted_ships = split_ships(p, max_ships=self.max_ships)

            # Note that here we want to fail if too many ships are found, it should
            # never happen
            target_ships = split_ships(t, max_ships=self.max_ships, on_max_error=True)
            score = f_score(
                predicted_ships,
                target_ships,
                beta=self.beta,
                thresholds=self.thresholds,
            )
            self.fscore_history.append(score)

    def value(self):
        if len(self.fscore_history) == 0:
            return 0
        else:
            return np.mean(self.fscore_history)


Tests:

In [11]:
target = torch.zeros((2, 5, 5))
target[0, :, 0] = 1
target[0, 2, 2:] = 1
target[1, :, 2] = 1
prediction = torch.zeros((2, 5, 5))
prediction[0, :, 0] = 1
prediction[0, 2, 3:] = 1
prediction[1, :, 2] = 1

print("Target:\n", target)
print("Predictions:\n", prediction)
f2 = AirbusFScoreApprox()
f2.add(prediction, target)
print("F2:\n", f2.value())

Target:
 tensor([[[1., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0.],
         [1., 0., 1., 1., 1.],
         [1., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0.]],

        [[0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.]]])
Predictions:
 tensor([[[1., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0.],
         [1., 0., 0., 1., 1.],
         [1., 0., 0., 0., 0.],
         [1., 0., 0., 0., 0.]],

        [[0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.],
         [0., 0., 1., 0., 0.]]])
F2:
 0.85


In [12]:
target = torch.zeros((8, 384, 384))
prediction = torch.zeros((8, 384, 384))
%timeit f2.add(prediction, target)

26.3 ms ± 463 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
