In [1]:
# Import all libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from d3rlpy.dataset import MDPDataset
from d3rlpy.algos import DiscreteDecisionTransformer, DiscreteDecisionTransformerConfig
from d3rlpy.models import SGDFactory

import torch

import Levenshtein

from prettytable import PrettyTable

In [2]:
# # File path
participants = ['p1_2022-07-25', 'p2_2022-08-03', 'p3_2022-08-04', 'p4_2022-08-10', 'p5_2022-08-12', 
                'p6_2022-08-15', 'p7_2022-08-15', 'p8_2022-08-19', 'p9_2022-08-22', 'p10_2022-08-26', 
                'p11_2022-08-29', 'p12_2022-08-29', 'p13_2022-08-29', 'p14_2022-08-31', 'p15_2022-09-02', 
                'p16_2022-09-05', 'p17_2022-09-06', 'p18_2022-09-07', 'p19_2022-09-08', 'p20_2022-09-09']

df_list = []

for participant in participants:
    file_path = 'Data/' + participant + '.csv'
    df = pd.read_csv(file_path, low_memory = False)
    df_list.append(df)

In [3]:
# Handle missing values
for i in range(len(df_list)):
    df_list[i]['handover quality'].fillna('NEUTRAL', inplace = True)
    df_list[i]['handover type'].fillna('NEITHER', inplace = True)

In [4]:
# Convert to one-hot encoding
arm_status_mapping = {'STATIONARY': 0, 'REACHING': 1, 'TUCKING': 2}
base_status_mapping = {'STATIONARY': 0, 'TO OPERATOR': 1, 'ROTATING': 2, 'TO PARTICIPANT': 3}
handover_status_mapping = {'LEFT': 0, 'MIDDLE': 1, 'RIGHT': 2}
handover_quality_mapping = {'BAD': -10, 'NEUTRAL': 0, 'GOOD': 10}

for i in range(len(df_list)):
    df_list[i]['arm status'] = df_list[i]['arm status'].map(arm_status_mapping).astype('category')
    df_list[i]['base status'] = df_list[i]['base status'].map(base_status_mapping).astype('category')
    df_list[i]['handover status'] = df_list[i]['handover status'].map(handover_status_mapping).astype('category')
    df_list[i]['handover quality'] = df_list[i]['handover quality'].map(handover_quality_mapping).astype('category')

In [5]:
# Split into train, validation and test set
np.random.seed(48)

test_df_list = []
train_df_list = []

for i in range(int(len(df_list) * 0.8)):
    train_df_list.append(df_list[i])

for i in range(int(len(df_list) * 0.8), len(df_list)):
    test_df_list.append(df_list[i])

In [6]:
# Get the arm observation states
gripper_x_column_index = train_df_list[0].columns.get_loc('gripper (x)')
gripper_qw_column_index = train_df_list[0].columns.get_loc('gripper (qw)')

train_arm_data = torch.cat([torch.tensor(arm.iloc[:, gripper_x_column_index:gripper_qw_column_index + 1].values) for arm in train_df_list], dim = 0)

# Get the handover observation states
handover_z_column_index = train_df_list[0].columns.get_loc('handover_goal (x)')
handover_qw_column_index = train_df_list[0].columns.get_loc('handover_goal (qw)')

train_handover_data = torch.cat([torch.tensor(handover.iloc[:, handover_z_column_index:handover_qw_column_index + 1].values) for handover in train_df_list], dim = 0)

# Get the remaining observation states
starting_observations_column_index = train_df_list[0].columns.get_loc('neutral (global)')
ending_observations_column_index = train_df_list[0].columns.get_loc('right_hip (confidence)')

train_base_observations = torch.cat([torch.tensor(train_df.iloc[:, starting_observations_column_index:ending_observations_column_index + 1].values) for train_df in train_df_list], dim = 0)
train_arm_observations = torch.cat((train_arm_data, torch.cat([torch.tensor(train_df.iloc[:, starting_observations_column_index:ending_observations_column_index + 1].values) for train_df in train_df_list], dim = 0)), dim = 1)
train_handover_observations = torch.cat((train_handover_data, torch.cat([torch.tensor(train_df.iloc[:, starting_observations_column_index:ending_observations_column_index + 1].values) for train_df in train_df_list], dim = 0)), dim = 1)

In [7]:
test_arm_data = torch.cat([torch.tensor(arm.iloc[:, gripper_x_column_index:gripper_qw_column_index + 1].values) for arm in test_df_list], dim = 0)

test_handover_data = torch.cat([torch.tensor(handover.iloc[:, handover_z_column_index:handover_qw_column_index + 1].values) for handover in test_df_list], dim = 0)

test_base_observations = torch.cat([torch.tensor(test_df.iloc[:, starting_observations_column_index:ending_observations_column_index + 1].values) for test_df in test_df_list], dim = 0)
test_arm_observations = torch.cat((test_arm_data, torch.cat([torch.tensor(test_df.iloc[:, starting_observations_column_index:ending_observations_column_index + 1].values) for test_df in test_df_list], dim = 0)), dim = 1)
test_handover_observations = torch.cat((test_handover_data, torch.cat([torch.tensor(test_df.iloc[:, starting_observations_column_index:ending_observations_column_index + 1].values) for test_df in test_df_list], dim = 0)), dim = 1)

In [8]:
# Get the action states
train_arm_status_action = torch.cat([torch.tensor(arm_status_action['arm status'].values) for arm_status_action in train_df_list])
train_base_status_action = torch.cat([torch.tensor(base_status_action['base status'].values) for base_status_action in train_df_list])
train_handover_status_action = torch.cat([torch.tensor(handover_status_action['handover status'].values) for handover_status_action in train_df_list])

# Get the rewards states
train_rewards = torch.cat([torch.tensor(reward['handover quality'].values) for reward in train_df_list])

# Get the terminals states
train_episode = torch.cat([torch.tensor(episode['episode'].values) for episode in train_df_list])
episode_indices = torch.where(train_episode[1:] != train_episode[:-1])
train_terminals = torch.zeros_like(train_episode)
train_terminals[episode_indices] = 1

In [9]:
# Get the action states
test_arm_status_action = torch.cat([torch.tensor(arm_status_action['arm status'].values) for arm_status_action in test_df_list])
test_base_status_action = torch.cat([torch.tensor(base_status_action['base status'].values) for base_status_action in test_df_list])
test_handover_status_action = torch.cat([torch.tensor(handover_status_action['handover status'].values) for handover_status_action in test_df_list])

# Get the rewards states
test_rewards = torch.cat([torch.tensor(reward['handover quality'].values) for reward in test_df_list])

# Get the terminals states
test_episode = torch.cat([torch.tensor(episode['episode'].values) for episode in test_df_list])
episode_indices = torch.where(test_episode[1:] != test_episode[:-1])
test_terminals = torch.zeros_like(test_episode)
test_terminals[episode_indices] = 1

In [10]:
current_timestep = 0
max_timestep = 0

for item in train_terminals:
    if item == 0:
        current_timestep += 1
        max_timestep = max(max_timestep, current_timestep)
    else:
        current_timestep = 0

In [None]:
# Dataset with arm status as action state
arm_status_dataset = MDPDataset(
    observations = train_arm_observations.numpy(),
    actions = train_arm_status_action.numpy(),
    rewards = train_rewards.numpy(),
    terminals = train_terminals.numpy(),
)

In [14]:
def levenshtein_distance(seq1, seq2):
    str1 = ''.join(map(str, seq1))
    str2 = ''.join(map(str, seq2))
    
    return Levenshtein.distance(str1, str2)

def model_plotting(data, data_type, dataset, prediction, action, action_state):
    for i in range(len(data)):
        plt.figure(figsize = (20, 125))

        for j, episode in enumerate(data[i]['episode'].unique()):
            indices = data[i].index[data[i]['episode'] == episode].tolist()
            current_participant_predicition = np.sum(prediction[indices] != action[indices])
            
            plt.subplot(len(data[i]['episode'].unique()), 1, j + 1)
            plt.plot(prediction[indices], label = 'Prediction')
            plt.plot(action[indices], label = 'Action')

            if action_state == 'arm status':
                plt.yticks([0, 1, 2], ['STATIONARY', 'REACHING', 'TUCKING'])
                plt.ylabel('Arm status')
            elif action_state == 'base status':
                plt.yticks([0, 1, 2, 3], ['STATIONARY', 'TO OPERATOR', 'ROTATING', 'TO PARTICIPANT'])
                plt.ylabel('Base status')
            elif action_state == 'handover status':
                plt.yticks([0, 1, 2], ['LEFT', 'MIDDLE', 'RIGHT'])
                plt.ylabel('Handover status')

            plt.title(f'{participants[i + 16]} Episode {episode} - Incorrect Prediction: {current_participant_predicition} / {len(indices)}')
            plt.xlabel('Time step')
            plt.legend(loc = 'lower right')

        plt.suptitle(f'{data_type.title()} Set', fontsize = 20, color = 'red')
        plt.tight_layout(rect = [0, 0, 1, 0.98])
        plt.savefig(f'Output Week 9 - S2/{dataset}/{action_state}/{data_type}_' + participants[i + 16] + '.png')
        plt.close()

In [None]:
# Arm status
dt_arm_status = DiscreteDecisionTransformer(config = DiscreteDecisionTransformerConfig(max_timestep = max_timestep, learning_rate = 0.0001, batch_size = 256, optim_factory = SGDFactory(momentum = 0.9, weight_decay = 1e-5)), device = 'cuda:0')

# Train the model
dt_arm_status.fit(arm_status_dataset, n_steps = 500000, n_steps_per_epoch = 10000)

In [None]:
# Predict the action
dt_arm_status_wrapper = dt_arm_status.as_stateful_wrapper(target_return = 1000)

dt_arm_status_test_prediction = []
for observation, reward in zip(test_arm_observations.numpy(), test_rewards.numpy()):
    dt_arm_status_test_prediction.append(dt_arm_status_wrapper.predict(observation, reward))

dt_arm_status_test_prediction = np.array(dt_arm_status_test_prediction)

# Plot the accuracy of the model
model_plotting(data = test_df_list,
                data_type = 'test',
                dataset = 'instantaneous',
                prediction = dt_arm_status_test_prediction,
                action = test_arm_status_action.numpy(),
                action_state = 'arm status')

# Calculate the accuracy of the model
test_arm_distance_list = []
test_arm_indices_list = []

for i in range(len(test_df_list)):
    test_arm_distance = []
    test_arm_indices = []

    for _, episode in enumerate(test_df_list[i]['episode'].unique()):
        indices = test_df_list[i].index[test_df_list[i]['episode'] == episode].tolist()
        test_arm_distance.append(levenshtein_distance(test_arm_status_action.numpy()[indices], dt_arm_status_test_prediction[indices]))
        test_arm_indices.append(indices)
    test_arm_distance_list.append(test_arm_distance)
    test_arm_indices_list.append(test_arm_indices)

arm_table = PrettyTable(['Metrics', 'Test'])

test_arm_result1 = []
test_arm_result2 = []
test_arm_result3 = []
test_arm_result4 = []
test_arm_result5 = []
test_arm_result6 = []

for i in range(len(test_df_list)):
    test_arm_result1.append(np.mean(test_arm_distance_list[i]))
    test_arm_result2.append(np.median(test_arm_distance_list[i]))
    test_arm_result3.append(np.std(test_arm_distance_list[i]))
    test_arm_result4.append(np.mean([len(indices) for indices in test_arm_indices_list[i]]))
    test_arm_result5.append(np.median([len(indices) for indices in test_arm_indices_list[i]]))
    test_arm_result6.append(np.std([len(indices) for indices in test_arm_indices_list[i]]))

arm_result = {
    'Mean Distance': f'{np.mean(test_arm_result1):.2f}',
    'Median Distance': f'{np.mean(test_arm_result2):.2f}',
    'Std Distance': f'{np.mean(test_arm_result3):.2f}',
    'Mean Episode Length': f'{np.mean(test_arm_result4):.2f}',
    'Median Episode Length': f'{np.mean(test_arm_result5):.2f}',
    'Std Episode Length': f'{np.mean(test_arm_result6):.2f}'
}

for key, value in arm_result.items():
    arm_table.add_row([key, value])

print(arm_table)