In [None]:
import sys; sys.path.insert(0, '..')
import os
import time
import keras
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt
from tabulate import tabulate
sys.path.insert(1, '../utils')
from utils.df_utils import df_wrapper
from dataclasses import dataclass
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from keras.optimizers import RMSprop, Adam, SGD
from sklearn.preprocessing import MinMaxScaler
from keras.utils import to_categorical
from utils.declarations import training_files, testing_files, POSE_MAP

In [None]:
AMOUNT_OF_SENSORS = 3
VALIDATION_TESTSET = "004"

In [None]:
x_train_arr = []
for key in training_files:
    print(key)
    elem = df_wrapper(training_files[key].csv_file)
    elem.concat_sensor_data(AMOUNT_OF_SENSORS)
    elem.align_poses(training_files[key].annot_file, POSE_MAP)
    x_train_arr.append(elem)

x_train = pd.concat([x.df.drop([' TimeStamp (s)', 'Pose'],axis=1) for x in x_train_arr])
y_train = pd.concat([x.df['Pose'] for x in x_train_arr])

In [None]:
x_test_dict = dict()
y_test_dict = dict()
for key in testing_files:
    elem = df_wrapper(testing_files[key].csv_file)
    elem.concat_sensor_data(AMOUNT_OF_SENSORS)
    elem.align_poses(testing_files[key].annot_file, POSE_MAP)
    y_test = elem.df["Pose"]
    y_test.index = [i for i in range(len(y_test))]
    x_test_dict[key] = elem.df
    y_test_dict[key] = y_test

x_test = x_test_dict[VALIDATION_TESTSET].drop([' TimeStamp (s)', 'Pose'], axis=1) 
y_test = y_test_dict[VALIDATION_TESTSET]

In [None]:
# Create numpy arrays
x_train_numpy = x_train.values
x_test_numpy = x_test.values
y_train_numpy = y_train.values
y_test_numpy = y_test.values

# Hot encode categories into numbers
y_train = to_categorical(y_train_numpy,9)
y_test = to_categorical(y_test_numpy,9)


In [None]:
print(x_train_numpy.shape)

In [None]:
from scipy import stats

NUM_TIMESTAMPS = 100

def create_3d_array(array):

    arr_3d = []
    temp_2d = []
    for i in range(1,len(array)):
        temp_2d.append(array[i])
        if i % NUM_TIMESTAMPS == 0:
            arr_3d.append(temp_2d)
            temp_2d = []
    
    return arr_3d

def create_2d_y_array(array):
    arr_2d_y_train = []
    temp_y_train = []
    mode_arr = []

    for i in range(1,len(array)):
        #temp_2d.append(array[0])
        temp_y_train.append(array[i])
        if i % NUM_TIMESTAMPS == 0:
            mode_arr.append(temp_y_train)

            temp_y_train = []
            #temp_2d = []

    y_train_to_be_encoded = []

    for i in range(len(mode_arr)):
        mode = stats.mode(mode_arr[i])
        y_train_to_be_encoded.append(mode.mode[0])

    #ONE HOT ENCODING
    encoding = []
    for value in y_train_to_be_encoded:
        vector = [0 for _ in range(9)]
        vector[value] = 1
        encoding.append(vector)
    
    return np.array(encoding)

In [None]:
x_train = np.array(create_3d_array(x_train_numpy, NUM_TIMESTAMPS))
y_train = np.array(create_2d_y_array(y_train_numpy, NUM_TIMESTAMPS))

x_test = np.array(create_3d_array(x_test_numpy, NUM_TIMESTAMPS))
y_test = np.array(create_2d_y_array(y_test_numpy, NUM_TIMESTAMPS))
print(np.shape(x_train))
print(np.shape(y_train))
print(np.shape(x_test))

In [None]:
from keras.layers import LSTM
from tensorflow.keras import layers

OPTIM = Adam(learning_rate=0.00001)

model = Sequential()
model.add(LSTM(1024, activation='sigmoid', input_shape=[x_train.shape[1], x_train.shape[2]]))
model.add(Dense(1024, activation='relu'))
model.add(Dropout(rate=0.2))
model.add(Dense(700, activation='relu'))
model.add(Dropout(rate=0.2))
model.add(Dense(100, activation='relu'))
model.add(Dropout(rate=0.2))
model.add(Dense(9, activation='softmax'))
model.compile(optimizer=OPTIM,  loss=keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])

In [None]:
model.fit(x=x_train, y=y_train, batch_size=256, epochs=50, validation_data=(x_test,y_test))

In [None]:
history = pd.DataFrame(model.history.history)
history.plot()

In [None]:

from sklearn.metrics import confusion_matrix

predictions = model.predict(x_test)
model.evaluate(x_test,y_test)

In [None]:
y_test_1d = []
predictions_1d = []
for i in range(len(y_test)):
    y_test_1d.append(np.argmax(y_test[i]))
    predictions_1d.append(np.argmax(predictions[i]))

sns.heatmap(data=confusion_matrix(y_test_1d,predictions_1d), cmap="YlGnBu", annot=True)