In [2]:
import tensorflow as tf
import numpy as np

# samples, length, channels

np_pred = np.array(
    [[[1.0, 0.0],
     [1.0, 0.0],
     [1.0, 0.0],
     [0.0, 0.0],
     [0.0, 0.0],
     [0.0, 0.0],
     [1.0, 1.0],
     [1.0, 1.0]],
    [[0.0, 1.0],
     [0.0, 1.0],
     [0.0, 1.0],
     [1.0, 1.0],
     [1.0, 1.0],
     [1.0, 0.0],
     [1.0, 0.0],
     [1.0, 0.0],]])

np_truth = np.array(
    [[[1.0,  0.0],
     [1.0, 0.0],
     [1.0, 0.0],
     [0.0, 0.0],
     [0.0, 0.0],
     [0.0, 1.0],
     [0.0, 1.0],
     [0.0, 1.0],],
    [[0.0, 1.0],
     [0.0, 1.0],
     [0.0, 1.0],
     [1.0, 1.0],
     [1.0, 1.0],
     [1.0, 0.0],
     [1.0, 0.0],
     [1.0, 0.0]]])

In [3]:
from typing import Tuple
import numpy as np
from cup_scripts.metric import fscore_step_detection
from utils.util_functions import convert_float_to_binary_mask, convert_mask_to_cup_format

# original implementation f1-score

def cup_f1_score(y_pred: np.array, y_true: np.array) -> Tuple[float, float, float]:
    """takes two arrays containg (poss. multi-channel) time series of predictions and the respective ground truth;
    converts them from the mask to the cup format and calculates the score and metrics

    Args:
        y_pred (np.array): array of predicted masks on time-series
        y_true (np.array): array of actual masks on time-series

    Returns:
        Tuple[float, float, float]: Tuple of mean f-score, precision, recall
    """
    
    def process_channel(mask: np.array) -> list:
        print(f'mask: {mask}')
        # process one mask of shape (length,)
        binary_mask = convert_float_to_binary_mask(mask)
        steps = convert_mask_to_cup_format(binary_mask)
        steps = np.array(steps).tolist()
        print(f'steps: {steps}')
        return steps

    predictions = []
    ground_truth = []

    for mask_pred, mask_truth in zip(y_pred, y_true):
        print(f'mask_pred: {mask_pred}')
        multi_channel: bool = (len(y_pred[0].shape) > 1)
        channels = y_pred[0].shape[1] if multi_channel else 1

        for channel in range(channels):
            channel_pred = mask_pred[:, channel] if multi_channel else mask_pred
            channel_truth = mask_truth[:, channel] if multi_channel else mask_truth
            processed_pred = process_channel(mask=channel_pred)
            processed_truth = process_channel(mask=channel_truth)
            predictions.append(processed_pred)
            ground_truth.append(processed_truth)

    fscore, precision, recall = fscore_step_detection(y_pred=predictions, y_true=ground_truth)
    return fscore, precision, recall

In [5]:
cup_f1_score(np_pred, np_truth)

mask_pred: [[1. 0.]
 [1. 0.]
 [1. 0.]
 [0. 0.]
 [0. 0.]
 [0. 0.]
 [1. 1.]
 [1. 1.]]
mask: [1. 1. 1. 0. 0. 0. 1. 1.]
steps: [[0, 2], [6, 7]]
mask: [1. 1. 1. 0. 0. 0. 0. 0.]
steps: [[0, 2]]
mask: [0. 0. 0. 0. 0. 0. 1. 1.]
steps: [[6, 7]]
mask: [0. 0. 0. 0. 0. 1. 1. 1.]
steps: [[5, 7]]
mask_pred: [[0. 1.]
 [0. 1.]
 [0. 1.]
 [1. 1.]
 [1. 1.]
 [1. 0.]
 [1. 0.]
 [1. 0.]]
mask: [0. 0. 0. 1. 1. 1. 1. 1.]
steps: [[3, 7]]
mask: [0. 0. 0. 1. 1. 1. 1. 1.]
steps: [[3, 7]]
mask: [1. 1. 1. 1. 1. 0. 0. 0.]
steps: [[0, 4]]
mask: [1. 1. 1. 1. 1. 0. 0. 0.]
steps: [[0, 4]]


(0.6666666666666666, 0.625, 0.75)

In [6]:
def tf_cup_f1_score(y_pred: np.array, y_true: np.array) -> Tuple[float, float, float]:
    """takes two arrays containg (poss. multi-channel) time series of predictions and the respective ground truth;
    converts them from the mask to the cup format and calculates the score and metrics

    Args:
        y_pred (np.array): array of predicted masks on time-series
        y_true (np.array): array of actual masks on time-series

    Returns:
        Tuple[float, float, float]: Tuple of mean f-score, precision, recall
    """
    
    def process_channel(mask: np.array) -> list:
        print(mask)
        # process one mask of shape (length,)
        binary_mask = convert_float_to_binary_mask(mask)
        steps = convert_mask_to_cup_format(binary_mask)
        steps = np.array(steps).tolist()
        print(steps)
        return steps

    predictions = []
    ground_truth = []
    
    for mask_pred, mask_truth in zip(y_pred, y_true):
        multi_channel: bool = (len(y_pred[0].shape) > 1)
        channels = y_pred[0].shape[1] if multi_channel else 1

        for channel in range(channels):
            channel_pred = mask_pred[:, channel] if multi_channel else mask_pred
            channel_truth = mask_truth[:, channel] if multi_channel else mask_truth
            processed_pred = process_channel(mask=channel_pred)
            processed_truth = process_channel(mask=channel_truth)
            predictions.append(processed_pred)
            ground_truth.append(processed_truth)

    fscore, precision, recall = fscore_step_detection(y_pred=predictions, y_true=ground_truth)
    return fscore, precision, recall

In [4]:
# from utils.util_functions import shift_array

def tf_convert_float_to_binary_mask(mask: tf.Tensor, threshold: float = 0.5):
    # NOTE: the brackets need to remain to mark this operation
    # as assignment, leading to the bool-to-float-conversion
    # False -> 0.0, True -> 1.0 
    return tf.where(mask >= threshold, 1.0, 0.0)

def tf_shift_array(arr, num, fill=0):
    # https://stackoverflow.com/questions/30399534/shift-elements-in-a-numpy-array#30534478
    # cf. shift5
    shifted = tf.roll(arr, num, 0)
    if num > 0:
        result = tf.concat([tf.fill([num], fill), shifted[num:]], axis=0)
    elif num < 0:
        result = tf.concat([shifted[:num], tf.fill([abs(num)], fill)], axis=0)
    else:
        result = arr
    return result

def tf_convert_mask_to_cup_format(arr: tf.Tensor) -> tf.RaggedTensor:
    # create shifted array with orignal[n+1] = shifted[n]
    shifted_arr = tf_shift_array(arr, -1, fill=arr[-1])
    
    # step_changes must differ with in value with their successor
    # begin of step arr[n] = 0, arr[n+1] = 1
    # -> step starts at n+1
    # end of step arr[n] = 1, arr[n+1] = 0
    # -> step ends at n
    bool_arr = tf.where(arr == 1.0, True, False)
    bool_shifted_arr = tf.where(shifted_arr == 1.0, True, False)
    step_changes = tf.math.logical_xor(bool_arr, bool_shifted_arr)
    
    # create array with indices and apply boolean mask for selection
    changes = tf.cast(tf.squeeze(tf.where(step_changes), axis=1), dtype=tf.int32)
    # add start of first step at 0 if the prediction starts with a step
    if bool_arr[0]:
        changes = tf.concat([tf.constant([-1]), changes], axis=0)
    # add end of last step at len(arr) - 1 if the prediction ends with a step
    if bool_arr[-1]:
        changes = tf.concat([changes, tf.constant([bool_arr.shape[0] - 1])], axis=0)
    
    # explanation: see comments above and *.ipynb
    correct_starts = tf.tile(tf.constant([1, 0]), [int(tf.shape(changes)[0] / 2)])
    changes += correct_starts
    
    # convert array (vector) to matrix
    nested = tf.reshape(changes, (int(tf.shape(changes)[0] / 2), 2))
    
    # delete all steps of length one, i. e. start == end
    nested = nested[~(nested[:, 0] == nested[:, 1])]
    return nested

In [62]:
from model.loss_functions.tf_cup_f1_score import tf_fscore_step_detection

@tf.function
def tf_cup_f1_score(y_pred: tf.Tensor, y_true: tf.Tensor) -> Tuple[float, float, float]:
    def process_channel(mask: tf.Tensor) -> list:
        binary_mask = tf_convert_float_to_binary_mask(mask)
        steps = tf_convert_mask_to_cup_format(binary_mask)
        return steps
    
    predictions = tf.ragged.constant([[[0, 0]]], dtype=tf.int32)
    ground_truth = tf.ragged.constant([[[0, 0]]], dtype=tf.int32)
    
    channels = y_pred[0].shape[1]
    for pred_index in tf.range(y_pred.shape[0]):
        tf.autograph.experimental.set_loop_options(
            shape_invariants=[
                (predictions, tf.TensorShape([None, None, None])),
                (ground_truth, tf.TensorShape([None, None, None]))]
            )
        
        mask_pred = y_pred[pred_index]
        mask_truth = y_true[pred_index]
        
        for channel_index in tf.range(channels):
            tf.autograph.experimental.set_loop_options(
            shape_invariants=[
                (predictions, tf.TensorShape([None, None, None])),
                (ground_truth, tf.TensorShape([None, None, None]))]
            )
            channel_pred = mask_pred[:, channel_index] if channels > 0 else mask_pred
            channel_truth = mask_truth[:, channel_index] if channels > 0 else mask_truth
            processed_pred = process_channel(mask=channel_pred)
            processed_truth = process_channel(mask=channel_truth)
            
            predictions = tf.concat((predictions, [processed_pred]), 0)
            ground_truth = tf.concat((ground_truth, [processed_truth]), 0)

    # return predictions[1:], ground_truth[1:]
    fscore = tf_fscore_step_detection(y_pred=predictions[1:], y_true=ground_truth[1:])
    return fscore 

In [63]:
tf_cup_f1_score(tf.constant(np_pred), tf.constant(np_truth))

<tf.Tensor: shape=(), dtype=float32, numpy=0.6666667>

In [58]:

@tf.function        
def sample_function(sample):
    # _, length, channels -> _, channels, length
    sampleT = tf.transpose(sample)
    steps = tf.map_fn(channel_function, sampleT, fn_output_signature=tf.RaggedTensorSpec(shape=(1, None, None), dtype=tf.int32, ragged_rank=2, row_splits_dtype=tf.int64))
    return steps

@tf.function
def tf_process_channel(mask: tf.Tensor) -> tf.RaggedTensor:
    binary_mask = tf_convert_float_to_binary_mask(mask)
    steps = tf_convert_mask_to_cup_format(binary_mask)
    return steps

@tf.function
def channel_function(channel):
    steps = tf_process_channel(channel)
    tensor = tf.ragged.stack(steps)
    return tensor

for i, sample in enumerate(np_pred):
    # _, length, channels -> _, channels, length
    sampleT = tf.transpose(sample)
    for k, channel in enumerate(sampleT):
        steps = tf_process_channel(channel)
        
# outmost loop -> iterate over samples in batch


'''
e = tf.ragged.constant(
    [[[[0, 2],
   [6, 7]], [[6, 7]]],
     [[[3, 7]], [[0, 4]]]]
     )
(bs, ch, None, 2) -> (bs*ch, None, 2)
(2, 2, None, 2) -> (4, None, 2)
(batch_size, channels, steps je channel, step-bracket) 
'''

@tf.function
def conversion(input_tensor):
    t = tf.map_fn(fn=sample_function, elems=input_tensor, fn_output_signature=tf.RaggedTensorSpec(shape=(2, 1, None, None), dtype=tf.int32, ragged_rank=3, row_splits_dtype=tf.int64))
    batch_size = t.shape[0]
    channels = t.shape[1]
    collector = []
    for b in range(batch_size):
        for i in range(channels):
            collector.append(t[b, i, :, :, :])
    return tf.concat(collector, 0)

t = conversion(np_pred)
print(t.shape)
t

(4, None, None)


<tf.RaggedTensor [[[0, 2],
  [6, 7]], [[6, 7]], [[3, 7]], [[0, 4]]]>

In [9]:
THRESHOLD_IoU = 0.75

@tf.function
def _check_step_list(step_list):
    
    def check_assertions(x):
        assert (tf.shape(x)[0] == 2)[0], f'A step consists of a start and an end: {x}.'
        assert x[0] < x[1]
        return x
    tf.map_fn(check_assertions, step_list)

    '''for step in step_list:
        assert len(step) == 2, f"A step consists of a start and an end: {step}."
        start, end = step
        assert start < end, f"start should be before end: {step}."'''

@tf.function
def tf_inter_over_union(interval_1, interval_2):
    a = interval_1[0]
    b = interval_1[1]
    c = interval_2[0]
    d = interval_2[1]

    intersection = tf.math.maximum(0, tf.math.minimum(b, d) - tf.math.maximum(a, c))
    if intersection > 0:
        union = tf.math.maximum(b, d) - tf.math.minimum(a, c)
    else:
        union = (b - a) + (d - c)
    
    return intersection / union

@tf.function
def _tf_step_detection_precision(step_list_true, step_list_pred):
    # NOTE: no working implementation existing
    # _check_step_list(step_list_pred)
    
    if step_list_pred.shape[0] == 0:  # empty prediction
        return 0.0

    n_correctly_predicted = 0
    detected_index_set = set()  # set of index of detected true steps
    
    for pred_index in tf.range(step_list_pred.nrows()):
        step_pred = step_list_pred[pred_index]
        for true_index in tf.range(step_list_true.nrows()):
            step_true = step_list_true[true_index]
            if (true_index.ref() not in detected_index_set) and (
                tf_inter_over_union(step_pred, step_true) > THRESHOLD_IoU
            ):
                n_correctly_predicted += 1
                detected_index_set.add(true_index.ref())
                break
                
    return n_correctly_predicted / tf.cast(step_list_pred.nrows(), dtype=tf.int32) 

@tf.function
def _tf_step_detection_recall(step_list_true, step_list_pred):
    # NOTE: no working implementation exists
    # _check_step_list(step_list_pred)

    n_detected_true = 0
    predicted_index_set = set()  # set of indexes of predicted steps
    
    for true_index in tf.range(step_list_true.nrows()):
        step_true = step_list_true[true_index]
        for pred_index in tf.range(step_list_pred.nrows()):
            step_pred = step_list_pred[pred_index]
            if (pred_index.ref() not in predicted_index_set) and (
                tf_inter_over_union(step_pred, step_true) > THRESHOLD_IoU
            ):
                n_detected_true += 1
                predicted_index_set.add(pred_index.ref())
                break
    recall = n_detected_true / tf.cast(step_list_true.nrows(), dtype=tf.int32)
    return recall

@tf.function(autograph=True)
def tf_fscore_step_detection(y_true, y_pred) -> float:
    
    if y_true.shape[0] == 0:
        return 0.0
    
    fscore_list = tf.constant((0,), dtype=tf.float32)
    
    for index in tf.range(y_true.nrows()):
        tf.autograph.experimental.set_loop_options(
            shape_invariants=[(fscore_list, tf.TensorShape([None]))]
        )

        step_list_true = y_true[index]
        step_list_pred = y_pred[index]
        prec = _tf_step_detection_precision(step_list_true, step_list_pred)
        rec = _tf_step_detection_recall(step_list_true, step_list_pred)
        if prec + rec < 1e-6:
            fscore = tf.cast(0.0, tf.float32)
        else:
            fscore = tf.cast((2 * prec * rec) / (prec + rec), tf.float32)
        fscore_list = tf.concat((fscore_list, [fscore]), 0)
    return tf.math.reduce_mean(fscore_list[1:])

In [10]:
y_pred = [[[0, 2]], [[6, 7]], [[3, 7]], [[0, 4]]]
y_true = [[[0, 2],
  [6, 7]], [[6, 7]], [[3, 7]], [[0, 4]]]
tf_fscore_step_detection(tf.ragged.constant(y_true), tf.ragged.constant(y_pred))

<tf.Tensor: shape=(), dtype=float32, numpy=0.9166667>

In [11]:
def tf_fscore_step_detection(y_true: tf.Tensor, y_pred: tf.Tensor) -> float:
    
    if tf.shape(y_true)[0] == 0:
        return 0

    fscore_list = list()

    for (step_list_true, step_list_pred) in zip(y_true, y_pred):
        prec = _step_detection_precision(step_list_true, step_list_pred)
        rec = _step_detection_recall(step_list_true, step_list_pred)
        if prec + rec < 1e-6:
            fscore_list.append(0.0)
        else:
            fscore_list.append((2 * prec * rec) / (prec + rec))

    return np.mean(fscore_list)

In [12]:
@tf.function
def myfunc(x):
    new_list = list()
    for s in x:
        new_list.append(x)
    return new_list

myfunc(tf.constant([2, 3, 4]))

[<tf.Tensor: shape=(3,), dtype=int32, numpy=array([2, 3, 4])>]

In [13]:
import model.metric_functions.cup_f1_score as mmc

In [14]:
mmc.cup_f1_score(np_pred, np_truth)

(0.6666666666666666, 0.625, 0.75)

In [15]:
tf_cup_f1_score(np_pred, np_truth)

[1. 1. 1. 0. 0. 0. 1. 1.]
[[0, 2], [6, 7]]
[1. 1. 1. 0. 0. 0. 0. 0.]
[[0, 2]]
[0. 0. 0. 0. 0. 0. 1. 1.]
[[6, 7]]
[0. 0. 0. 0. 0. 1. 1. 1.]
[[5, 7]]
[0. 0. 0. 1. 1. 1. 1. 1.]
[[3, 7]]
[0. 0. 0. 1. 1. 1. 1. 1.]
[[3, 7]]
[1. 1. 1. 1. 1. 0. 0. 0.]
[[0, 4]]
[1. 1. 1. 1. 1. 0. 0. 0.]
[[0, 4]]


(0.6666666666666666, 0.625, 0.75)

In [16]:
import model.loss_functions.tf_cup_f1_score as mlt
mlt.tf_cup_f1_score(np_pred, np_truth)

<tf.Tensor: shape=(), dtype=float32, numpy=0.6666667>