In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
import os
print(os.getcwd())

c:\Users\vashi\Aresty Projects\Python Projects\EyeTrackingMLVSC\LSTM Path Predictor\Path Predictor


Load Data Into Dataframe

In [2]:
# Load data into dataframe
file_name = 'combinedData.csv'
experiment_data = pd.read_csv(file_name)
# List of available variables
experiment_data = experiment_data[['timeSinceStart', 'trial_id', 'room_id', 'pos_x', 'pos_z', 'Participant_ID', 
                                'gate', 'distanceToGate', 'front_row_pos_x', 'back_row_pos_x', 'distance_to_front_1', 
                                'distance_to_front_2', 'distance_to_front_3', 'distance_to_front_4', 'distance_to_front_5', 
                                'distance_to_back_1', 'distance_to_back_2', 'distance_to_back_3', 'distance_to_back_4', 
                                'distance_to_back_5', 'row_direction', 'WorldPosX', 'WorldPosY', 'WorldPosZ' ]]
experiment_data.rename(columns={'timeSinceStart' : 'time_since_start', 'Participant_ID' : 'participant_id', 
                                'distanceToGate' : 'distance_to_gate'}, inplace=True)


Choose variables to be used for prediction and process data to be used for model

In [3]:
# Set the categorical and continuous variables by creating list of names of variables being used for input and output
cat_vars_list = ['gate', 'row_direction'] # List of categorical variables (Features can be added when more data is gathered)
cont_vars_list = ['time_since_start', 'distance_to_gate'] # List of continuous variables (Features can be added when more data is gathered)
pos_labels = ['pos_x', 'pos_z'] 

# Information used for labels (coordinates)
label_vars = experiment_data[pos_labels]

# Use one hot encodng for categorical variables
one_hot_encoder = OneHotEncoder(sparse_output=False)
categorical_vars = one_hot_encoder.fit_transform(experiment_data[cat_vars_list]) # Can add categorical variables if necessary

# Normalize path data 
scaler = MinMaxScaler()
continuous_vars = scaler.fit_transform(experiment_data[cont_vars_list]) # Can add continuous variables if necesssary

# Combine the continous and categorical features
combined_features = np.hstack([categorical_vars, continuous_vars, label_vars])
#print(combined_features)

# Create df and add participant and trial ids
combined_df = pd.DataFrame(combined_features)
combined_df['trial_id'] = experiment_data['trial_id']
combined_df['participant_id'] = experiment_data['participant_id']

# Create df with data split up into each trial
all_trial_data = [] # Will become a 3D array of the form, trial, sequence, features

# Split df by participant id and trial id to get each individual trial
# Must be split up this way in order to make sequences based on each trial
grouped = combined_df.groupby(['participant_id', 'trial_id'])

for (p_id, t_id), data in grouped:
    trial_data = data.drop(['participant_id', 'trial_id'], axis=1)
    seq = np.array(trial_data)
    all_trial_data.append(seq)
    
for index,trials in enumerate(all_trial_data):
    all_trial_data[index] = np.squeeze(trials)
print("Number of trials of data: ", len(all_trial_data))
print("Example number of sequences (changes per trial): ",len(all_trial_data[0]))
print("Number of features: ",len(all_trial_data[0][0]))


Number of trials of data:  252
Example number of sequences (changes per trial):  767
Number of features:  8


Machine Learning Import Statements

In [4]:
import tensorflow as tf
import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

Create Sequences used by RNN

In [5]:
# Create sequences of the data that will be used as input for the RNN
def create_sequence(trial_data, seq_length):
    features, labels = [], []
    filler_data = trial_data[0]
    filler_list = [filler_data]*seq_length
    filler_list = np.array(filler_list)
    trial_data = np.vstack([filler_list, trial_data])
    for i in range(seq_length, len(trial_data)):
        feat = np.array(trial_data[i-seq_length:i])
        feat = feat[:, :-2]
        label = np.array(trial_data[i])
        label = label[-2:]
        features.append(feat)
        labels.append(label)
    return features, labels

def create_sequence_list(trial_data_array, seq_length):
    # Create an array of all of the sequences and their labels
    x_data, y_data = [], []
    for trials in trial_data_array: 
        features, labels = create_sequence(trials, seq_length)
        for feat in features:
            x_data.append(feat)
        for label in labels:
            y_data.append(label)
    return np.array(x_data),np.array(y_data)


# Create sequences to be used as input for RNN
SEQUENCE_LENGTH = 25 # How many frames of data should be used to predict the next position 

# Split data into training and 
test_split_index = int(len(all_trial_data) * 0.8)
training_sequences = all_trial_data[:test_split_index]
test_sequences = all_trial_data[test_split_index:]

# Create an array of all of the sequences and their labels
x_train, y_train = create_sequence_list(training_sequences, SEQUENCE_LENGTH)
x_test, y_test = create_sequence_list(test_sequences, SEQUENCE_LENGTH)

# Print shape of data
print("X Training Data Shape: ", x_train.shape)
print("Y Training Data Shape: ", y_train.shape)

print("X Test Data Shape: ", x_test.shape)
print("Y Test Data Shape: ", y_test.shape)




X Training Data Shape:  (130073, 25, 6)
Y Training Data Shape:  (130073, 2)
X Test Data Shape:  (39049, 25, 6)
Y Test Data Shape:  (39049, 2)


Create the RNN model

In [6]:
# Create input shape for the data
input_shape = (SEQUENCE_LENGTH, len(all_trial_data[0][0])-2)  # batch size, length of sequence, number of features

# Construct RNN model with LSTM layer for memory and Dense layer for pos_x, pos_z output
model = Sequential()
model.add(LSTM(100, activation='relu', input_shape=input_shape, recurrent_dropout=0.5, return_sequences=False))
model.add(Dense(2)) # Number of outputs: which is predicted pos_x and pos_z
model.compile(optimizer='adam', loss='mse')

Train Model

In [7]:
model.fit(x_train, y_train, epochs=5, batch_size=128)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x18665a25d30>

Store Model

In [None]:
model_file_name = 'PathPredictionRNN.h5'
model.save(model_file_name)

Load Model

In [11]:
from tensorflow.keras.models import load_model
model = load_model('PathPredictionRNN.h5')

Evaluate Model

In [None]:
model.evaluate(x_test, y_test, batch_size=128)

Graph predictions of test data

In [8]:
import plotly.express as px
import plotly.graph_objects as go

# Function to graph trial
def graph_trial(test_sequence):
    # Create sequences of the data
    x_data, y_data = create_sequence(test_sequence, SEQUENCE_LENGTH)
    x_data = np.array(x_data)
    y_data = np.array(y_data)

    # Make a prediction with untrained data
    prediction = model.predict(x_data)
    values = y_data

    # Set x and z values to be graphed for prediction and actual
    prediction = np.array(prediction)
    prediction_x = prediction[:, 0]
    prediction_z = prediction[:, 1]

    values = np.array(values)
    values_x = values[:, 0]
    values_z = values[:, 1]

    graph_df = pd.DataFrame()
    graph_df['prediction_x'] = prediction_x
    graph_df['prediction_z'] = prediction_z
    graph_df['values_x'] = values_x
    graph_df['values_z'] = values_z

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=prediction_x, y=prediction_z, name='Predicted Path', mode='markers'))
    fig.add_trace(go.Scatter(x=values_x, y=values_z, name='Actual Path', mode='markers'))
    fig.update_xaxes(range=[-2,2])
    fig.update_yaxes(range=[-1,6])
    fig.update_layout(
        title='Predicted Path vs Actual Path',
        xaxis_title='X Position',
        yaxis_title='Z Position',
        height= 1000
    )
    return fig

graphs = []
for sequence in test_sequences:
    graphs.append(graph_trial(sequence))

for fig in graphs[:5]:
    fig.show()

