In [1]:
import numpy as np
import pandas as pd
import os
from sklearn.metrics import classification_report, confusion_matrix
import logging
import torch
import chart_studio.plotly as py
import plotly.graph_objs as go
from plotly.offline import iplot, init_notebook_mode
# Using plotly + cufflinks in offline mode
import cufflinks
cufflinks.go_offline(connected=True)
init_notebook_mode(connected=True)

In [2]:
def contact_detection_accuracy(df, allowable_delay = 20):
    "df should include label, majority voting and time"
    TP, TN, FP, FN = 0, 0, 0, 0
    
    df['label_diff'] = df['label'].diff().fillna(0)
    df['TP'], df['TN'], df['FP'], df['FN'] = None, None, None, None
 
    contact_delays = []
    no_contact_delays = []
    df = df.reset_index(drop=True)
 
    change_events = df[df['label_diff'] != 0].index.tolist()
 
    # Add first and last index if not present
    if 0 not in change_events:
        change_events.insert(0, 0)
    if len(df) - 1 not in change_events:
        change_events.append(len(df) - 1)
 
    for event_idx in range(len(change_events) - 1):
        idx = change_events[event_idx]
        
        if df.label[idx] == 1: # Starting a contact event
            for i in range(idx, min(idx + allowable_delay, len(df))):
                if df.majority_voting[i] == 1:
                    delay = df.time[i] - df.time[idx]
                    contact_delays.append(delay)
                    break
            
            if i >= (idx + allowable_delay - 1):
                contact_delays.append(0)
 
            # Calculate TP and FN
            for j in range(change_events[event_idx+1], max(change_events[event_idx+1]-allowable_delay, 0), -1):
                if df.majority_voting[j] == 1:
                    break
 
            for idx_ in range(i, j):
                if df.majority_voting[idx_] == df.label[idx_]:
                    TP += 1
                    df.loc[idx_, 'TP'] = 0.9
                else:
                    FN += 1
                    df.loc[idx_, 'FN'] = 0.9
 
        else: # Starting a contact-free event
 
            for i in range(idx, min(idx + allowable_delay, len(df))):
                if df.majority_voting[i] == 0:
                    delay = df.time[i] - df.time[idx]
                    no_contact_delays.append(delay)
                    break
            
            if i >= (idx + allowable_delay - 1):
                no_contact_delays.append(0)
 
            # Calculate TN and FP
            for j in range(change_events[event_idx+1], max(change_events[event_idx+1]-allowable_delay, 0), -1):
                if df.majority_voting[j] == 0:
                    break
                
            for idx_ in range(i, j):
                if df.majority_voting[idx_] == df.label[idx_]:
                    TN += 1
                    df.loc[idx_, 'TN'] = 0.1
                else:
                    FP += 1
                    df.loc[idx_, 'FP'] = 0.1
 
    # Compute averages safely
    contact_avg_delay = np.nanmean([d for d in contact_delays if d > 0])
    no_contact_avg_delay = np.nanmean([d for d in no_contact_delays if d > 0])
 
    ModelAccuracy = (TP + TN) / (TP + TN + FP + FN) * 100
    DetectionFailureRate = FN / (TP + FN) * 100
    FalseAlarmRate = FP / (TN + FP) * 100
 
    return df, TP, TN, FP, FN, contact_avg_delay, no_contact_avg_delay, ModelAccuracy, DetectionFailureRate, FalseAlarmRate, contact_delays, no_contact_delays
 

df = pd.read_csv('../DATA/Labeled_data/0213_2LCNN_Post1_ST/labeled_modelResult_data.csv')
# Call the function
df, TP, TN, FP, FN, contact_avg_delay, no_contact_avg_delay, ModelAccuracy, DetectionFailureRate, FalseAlarmRate, contact_delays, no_contact_delays = contact_detection_accuracy(df, 20)
 
print('contact_delays:', contact_delays)
print('no_contact_delays:', no_contact_delays)
print(f"Accuracy:{ModelAccuracy:.2f}%, DetectionFailureRate:{DetectionFailureRate:.2f}%, FalseAlarmRate:{FalseAlarmRate:.2f}%, Contact Detection Delay: {contact_avg_delay*1000:.3f}ms, No-Contact Detection Delay: {no_contact_avg_delay*1000:.3f}ms, TP: {TP}, TN: {TN}, FP: {FP}, FN: {FN}")
 
# Plot results
df[['time', 'label', 'majority_voting', 'TP', 'TN', 'FP', 'FN']].iplot(
    x='time', kind='scatter',
    mode={'label': 'lines', 'majority_voting': 'lines', 'TP': 'markers', 'TN': 'markers', 'FP': 'markers', 'FN': 'markers'},
    title='Contact Detection Analysis (Interactive)',
    xTitle='Time Index', yTitle='Signal', theme='white',
    symbol=[None, None, 'x', 'x', 'x', 'x'],
    size = 5,
    colors=['blue', 'pink', 'green', 'lightgreen', 'orange', 'red'],
)


df


Mean of empty slice



contact_delays: [0, 0.054997861036099494, 0, 0, 0, 0, 0, 0, 0, 0.03500561800319346]
no_contact_delays: [0.0, 0.0, 0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0]
Accuracy:93.65%, DetectionFailureRate:0.00%, FalseAlarmRate:6.37%, Contact Detection Delay: 45.002ms, No-Contact Detection Delay: nanms, TP: 17, TN: 5406, FP: 368, FN: 0


Unnamed: 0,index,time,tau_J0,tau_J1,tau_J2,tau_J3,tau_J4,tau_J5,tau_J6,tau_J_d0,...,etau_J6,label,model_result_time,touch_type_idx,majority_voting,label_diff,TP,TN,FP,FN
0,1,0.000000,-0.002160,-18.597219,-0.098155,16.724924,0.198997,2.289623,-0.008373,0.0,...,0.008373,0,0.000000,0.0,0,0.0,,0.1,,
1,2,0.004995,0.037512,-18.632925,-0.173533,16.724924,0.198997,2.310680,-0.059643,0.0,...,0.059643,0,0.005373,0.0,0,0.0,,0.1,,
2,3,0.009997,0.037512,-18.632925,-0.173533,16.724924,0.198997,2.310680,-0.059643,0.0,...,0.059643,0,0.010769,0.0,0,0.0,,0.1,,
3,4,0.015017,-0.037866,-18.632925,-0.173533,16.760630,0.189842,2.279552,-0.028515,0.0,...,0.028515,0,0.017379,0.0,0,0.0,,0.1,,
4,5,0.019999,-0.037866,-18.632925,-0.173533,16.760630,0.189842,2.279552,-0.028515,0.0,...,0.028515,0,0.022239,0.0,0,0.0,,0.1,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5998,5999,29.990044,-0.148950,-18.712271,-0.213206,16.800303,0.235618,2.352795,0.052967,0.0,...,-0.052967,0,29.991934,0.0,0,0.0,,0.1,,
5999,6000,29.994999,-0.037866,-18.672598,-0.173533,16.760630,0.244773,2.341808,0.073109,0.0,...,-0.073109,0,29.997341,0.0,0,0.0,,0.1,,
6000,6001,29.999994,-0.037866,-18.672598,-0.173533,16.760630,0.244773,2.341808,0.073109,0.0,...,-0.073109,0,30.002277,0.0,0,0.0,,0.1,,
6001,6002,30.005002,-0.037866,-18.672598,-0.173533,16.760630,0.244773,2.341808,0.073109,0.0,...,-0.073109,0,30.007372,0.0,0,0.0,,0.1,,


In [4]:
import os
import pandas as pd

gesture_mapping = {"NC": 0, "ST": 1, "P": 2, "G": 3}

# Assuming your data is loaded into a pandas DataFrame from 'labeled_modelResult_data.csv'
df = pd.read_csv('../DATA/Labeled_data/0213_2LCNN_Post1_ST/labeled_modelResult_data.csv')

# Initialize lists for y_true and y_pred
y_true = []
y_pred = []

# Extract touch type from folder name dynamically
folder_path = '../DATA/Labeled_data/0213_2LCNN_Post1_ST'  
folder_name = os.path.basename(folder_path)  # Get the folder name (e.g., '0213_2LCNN_Post1_ST')
touch_type_from_folder = folder_name.split('_')[-1]  

# Loop through each row in the DataFrame to apply the logic
for index, row in df.iterrows():
    # Determine y_true
    if (row['label'] == 1) & (row['majority_voting'] == 1):
        # If there is a touch (label=1), extract touch type from folder name
        y_true.append(gesture_mapping[touch_type_from_folder])
    # else:
    #     # If there is no touch (label=0), set y_true as NC
    #     y_true.append(gesture_mapping["NC"])

    # Determine y_pred based on majority voting
    if  (row['label'] == 1) & (row['majority_voting'] == 1):
        # If majority_voting=1, set y_pred to touch_type_idx
        y_pred.append(row['touch_type_idx'])
    # else:
    #     # If majority_voting=0, set y_pred as NC (0)
    #     y_pred.append(gesture_mapping["NC"])

# 计算混淆矩阵
conf_matrix = confusion_matrix(y_true, y_pred, labels=list(gesture_mapping.values()))

# 计算准确率
total_samples = conf_matrix.sum()
true_positives = conf_matrix.diagonal().sum()
accuracy = (true_positives / total_samples) * 100 if total_samples > 0 else 0

# 计算检测失败率
failure_rate = [
    ((conf_matrix[i].sum() - conf_matrix[i, i]) / conf_matrix[i].sum()) * 100
    if conf_matrix[i].sum() > 0 else 0
    for i in range(len(conf_matrix))
]

print(f"Accuracy: {accuracy:.2f}%")
print(f"Detection Failure Rate (by class): {failure_rate}")
print("Confusion Matrix:\n", conf_matrix)


Accuracy: 88.24%
Detection Failure Rate (by class): [0, 11.76470588235294, 0, 0]
Confusion Matrix:
 [[ 0  0  0  0]
 [ 0 15  2  0]
 [ 0  0  0  0]
 [ 0  0  0  0]]


In [None]:
#不运行， 这是原代码
data_name, dof, batch_size = 'franka_main', 7 , 62
 
 
best_models_index, folder_name , data_name, dof  = ([   [1, 64, 3, 50],
                                                        [1, 64, 3, 80],
                                                        [1, 64, 3, 100],
                                                        [1, 64, 5, 50],
                                                        [1, 64, 5, 80],
                                                        [1, 64, 5, 100],
                                                        [1, 128, 3, 50],
                                                        [1, 128, 3, 80],
                                                        [1, 128, 3, 100],
                                                        [1, 128, 5, 50],
                                                        [1, 128, 5, 80],
                                                        [1, 128, 5, 100] ], f'{data_name}/trained_models/contact_localization/{batch_size}/', f'{data_name}', dof)
 
logging.info(f'trained model on {folder_name}, tested on {data_name}, with {dof} links')
 
#best_models_index = [[1, 128, 3, 80]]
# hyperparameters
 
n = 14  # Window size for majority voting
 
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
 
if device.type == "cuda":
    print("Using GPU:", torch.cuda.get_device_name())
 
main_path = os.getcwd().replace('AIModels', '')
 
 
dict_label = {'link7': 7, 'link6':6, 'link5':5, 'link4':4, 'link3':3, 'link2':2, 'link1':1, 'no_contact': 0}
selected_features = [f'e{i}' for i in range(dof)]#+[ f'de{i}' for i in range(robot_dof)]
 
# Load dataset ONCE outside the model loop
testing_datasets = LoadDatasets(os.getcwd().replace('AIModels', '') + f'/dataset/{data_name}/labeled_data/', dict_label)
test_datasetloader = DataLoader(testing_datasets, batch_size=1, shuffle=False)
 
file_predictions = {}
for link in dict_label.keys():
    for trial_dataset_path, label in test_datasetloader:
        if link in trial_dataset_path[0]:
            print(link)
            for num_layers, hidden_size, gap, seq_num in best_models_index:
                for model_name in os.listdir(f'{main_path}/dataset/{folder_name}/'):                   
                    if f'hiddenSize{hidden_size}_seq_num{seq_num}_gap{gap}' in model_name:
                        if model_name not in file_predictions:
                            file_predictions[model_name] = {'y_pred': [], 'y_true': []}
                        # Load and evaluate model
                        model = cnnLSTM(num_features_joints=seq_num, num_layers=num_layers, hidden_size=hidden_size, dropout=0.7, bidirectional=True)
                        model.to(device)
                        model.load_state_dict(torch.load(f'{main_path}/dataset/{folder_name}/{model_name}'))
                        model.eval()
                        data = LoadSeqDataset(
                            file_path=trial_dataset_path[0], label=label[0],
                            selected_features=selected_features, seq_num=seq_num, gap=1
                        )
                        #data.sequences = data.sequences[len(data)//2:-1]
                        data.sequences = data.sequences[(len(data)-len(data)//2):-1]
 
                        test_loader = DataLoader(data, batch_size=len(data), shuffle=False)
                        # Run inference
                        df= pd.DataFrame()
                        for batch_idx, (seqs, labels) in enumerate(test_loader):
                            seqs = seqs.float().to(device)
                            with torch.no_grad():
                                predictions = model(seqs)
                                predictions = torch.argmax(predictions, dim=1) + 1
 
                            df['time'] = data.data.time[len(data.data.time)-len(data)-1:-1]
                            df['label'] = labels.cpu().numpy()
                            df['model_out'] = predictions.cpu().detach().numpy()
                            df.model_out[df.label == 0] = 0
                            df['majority_voting'] = majority_voting_last_n(df['model_out'], n)
                            
                        #df.iplot(x='time', y=['model_out', 'majority_voting', 'label'], colors=[ 'lightblue', 'darkblue', 'red'], title='Link ' + str(labels.max().item()))
                        # Collect predictions for current model
                        y_true_filtered = df.label[(df.label != 0) & (df.majority_voting != 0)].tolist()
                        y_pred_filtered = df.majority_voting[(df.label != 0) & (df.majority_voting != 0)].tolist()
                        file_predictions[model_name]['y_pred'].extend(y_pred_filtered)
                        file_predictions[model_name]['y_true'].extend(y_true_filtered)
                        break  # Process only first trial per link
                    
            break
 
for model_name in file_predictions.keys():
    y_pred = file_predictions[model_name]['y_pred']
    y_true = file_predictions[model_name]['y_true']
    matrix = confusion_matrix(y_pred=y_pred, y_true=y_true)
 
    total_samples = sum(sum(row) for row in matrix)
    logging.info(f'{model_name}, total samples: {total_samples}')
    #logging.info(f'\n {matrix}')
    true_positives = sum(matrix[i][i] for i in range(len(matrix)))
 
    overall_accuracy = (true_positives / total_samples) * 100
    failure_rate = [(sum(matrix[i]) - matrix[i][i]) / sum(matrix[i]) * 100 for i in range(len(matrix))]
    logging.info(f'acuracy= {overall_accuracy}, detection failure (links): {failure_rate}')
 