In [88]:
import sys
sys.path.append('../')

import os
from pathlib import Path
import pandas as pd
import plotly.graph_objs as go
from plotly.offline import iplot
import numpy as np

%load_ext autoreload
%autoreload 2
from _util.make_folder_dataset import MakeFolderDataset


#load environment variables from .env file in repo root
%load_ext dotenv
%dotenv

#DATASET_REPO_ROOT_PATH=<absolute-path-to-dataset-repo-root-folder>
dataset_repo_root_path = Path(os.environ.get("DATASET_REPO_ROOT_PATH"))
test_data_path = dataset_repo_root_path / "testData"

labels_map = {'hard':0,'pvc_tube':1, 'soft':2}

evaluation_period_after_contact_sec = 0.3
evaluation_predictions_after_contact = 10


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [89]:
def get_next_contact_time(df, excl_from_time):
    first_no_contact_time = df[df['has_contact'] == 0].iloc[0]['time']
    filtered_df = df[(df['time'] > first_no_contact_time) & (
        df['time'] > excl_from_time) & (df['has_contact'] == 1)]
    return (filtered_df.iloc[0]['time'], filtered_df.index[0]) if len(filtered_df) > 0 else (None, None)


def get_contact_duration(df, time):
    start_time_index = df[(df['time'] < time) & (
        df['has_contact'] == 0)].index[-1] + 1
    start_time = df.loc[start_time_index, 'time']
    try:
        end_time_index = df[(df['time'] > time) & (
            df['has_contact'] == 0)].index[0] - 1
    except IndexError:
        # occurs if filtered df above is empty, which means there is no row with has_contact = 0 after specified time
        end_time_index = df.index[-1]
    end_time = df.loc[end_time_index, 'time']
    return (end_time - start_time), start_time, end_time


def evaluation(path, inst: MakeFolderDataset):
    time_after_contact_indices = []
    nof_predictions_after_contact_indices = []
    last_contact_end_time = -1
    while True:
        contact_time, contact_index = get_next_contact_time(
            inst.model_results, last_contact_end_time)
        if contact_time is None:
            break
        _, _, last_contact_end_time = get_contact_duration(
            inst.model_results, contact_time)
        time_after_contact_indices += inst.model_results[(inst.model_results['time'] >= contact_time) & (
            inst.model_results['time'] <= contact_time + evaluation_period_after_contact_sec) & (inst.model_results["contact_class_prediction"] != -1)].index.tolist()
        nof_predictions_after_contact_indices += inst.model_results[(inst.model_results['time'] >= contact_time) & (inst.model_results["contact_class_prediction"] != -1)].head(
            evaluation_predictions_after_contact).index.tolist()

    filtered_model_results_time = inst.model_results.iloc[time_after_contact_indices]
    value_counts_time = filtered_model_results_time['contact_class_prediction'].value_counts(
    )

    filtered_model_results_nof_predictions = inst.model_results.iloc[
        nof_predictions_after_contact_indices]
    value_counts_nof_predictions = filtered_model_results_nof_predictions['contact_class_prediction'].value_counts(
    )

    true_label = labels_map[inst.contact_type]
    num_true_time = 0
    num_true_nof_predictions = 0
    try:
        num_true_time = value_counts_time[true_label]
        num_true_nof_predictions = value_counts_nof_predictions[true_label]
    except KeyError:
        pass

    num_predicted_time = sum(value_counts_time[value_counts_time.index != -1])
    num_predicted_nof_predictions = sum(
        value_counts_nof_predictions[value_counts_nof_predictions.index != -1])

    '''
    print(f"instance: {path.name} (target class: {inst.contact_type})\n")

    print(
        f"evaluated model results up to {evaluation_period_after_contact_sec}sec after first contact time, with classification result != -1 (prediction was actually made)")
    print("correctly classified predictions: ", num_true_time)
    print("total predictions:", num_predicted_time)
    print("accuracy: ", f"{str((num_true_time/num_predicted_time)*100)}%" if num_predicted_time !=
          0 else "NO AVAILABLE PREDICTIONS")
    print()

    print(
        f"evaluated first {evaluation_predictions_after_contact} predictions per contact, with classification result != -1 (prediction was actually made)")
    print("average correctly classified predictions: ", num_true_nof_predictions / num_predicted_nof_predictions)
    print("accuracy: ", f"{str((num_true_nof_predictions/num_predicted_nof_predictions)*100)}%" if num_predicted_nof_predictions !=
          0 else "NO AVAILABLE PREDICTIONS")
    '''
    
    model_results_classification_made = inst.model_results[(
        inst.model_results["contact_class_prediction"] != -1) | (inst.model_results["has_contact"] == 0)]

    trace1 = go.Scatter(
        x=inst.true_label['time'], y=inst.true_label['DATA0'], name='contact')
    trace2 = go.Scatter(
        x=model_results_classification_made['time'], y=model_results_classification_made['correctly_classified'], name='prediction correctness', mode="markers")
    trace3 = go.Scatter(
        x=inst.model_results['time'], y=inst.model_results['contact_class_prediction'], name=f"prediction<br>({str(labels_map)})", mode="markers", marker=dict(color="#aaaaaa"))
    data = [trace1, trace3, trace2]
    layout = go.Layout(title=f'(instance {path.name})',
                       xaxis=dict(title='time(sec)'),
                       yaxis=dict(title='Y-axis'))
    fig = go.Figure(data=data, layout=layout)
    iplot(fig)

In [90]:
instances: list[tuple[Path, MakeFolderDataset]] = []
for p in test_data_path.iterdir():
    if p.is_dir() and p.name != "_ignore" and p.name != "static_dynamic" and p.name =="GRUModel_sliding_left_offset50ms_dropout_a2_hard_HV15":
        instance = MakeFolderDataset(p.absolute())
        instance.extract_robot_data()
        instance.get_labels_all()

        instance.model_results = pd.read_csv(str((p / "model_result.csv").absolute()), header=None, skiprows=1)
        headers = ["Time_sec", "Time_nsec", "prediction_duration", "contact", "contact_class_prediction"]
        n_missinig_headers = instance.model_results.shape[1] - len(headers)
        for i in range(int(n_missinig_headers / 3)):
            headers.append(f"prob_hard_{i}")
            headers.append(f"prob_pvc_tube_{i}")
            headers.append(f"prob_soft_{i}")
        instance.model_results.columns = headers

        instance.model_results['time'] = instance.model_results['Time_sec'] + instance.model_results['Time_nsec'] - instance.init_time
        instance.true_label = instance.true_label[["time", "DATA0"]]

        x = pd.merge_asof(left=instance.model_results, right=instance.true_label.reset_index(), on='time', tolerance=0.02, direction='nearest')
        instance.model_results = instance.true_label.merge(x, how='left', left_index=True, right_on='index').set_index('index')
        instance.model_results.rename(columns={"DATA0_x": "has_contact", "time_x": "time"}, inplace=True)

        instance.model_results['correctly_classified'] = np.where(
            instance.model_results['contact_class_prediction'].isna(),
            np.nan,
            np.where(
                instance.model_results['has_contact'] == 1,
                np.where(instance.model_results['contact_class_prediction'] ==  labels_map[instance.contact_type], 1, 0),
                np.nan
            )
        )

        instances.append((p, instance))

instances = sorted(instances, key=lambda i: i[0].name)
for inst in instances:
    evaluation(inst[0], inst[1])
