### Importing modules

In [None]:
!pip install tensorflow

In [None]:
!pip install keras-tuner --upgrade

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder,StandardScaler
from tensorflow.keras.utils import to_categorical
import tensorflow as tf
from tensorflow import keras, lite
from tensorflow.keras.layers import LSTM, Dense , Dropout, Flatten, ConvLSTM2D, RepeatVector
import keras_tuner as kt

### Data preparation

In [None]:
train_set= pd.read_csv("Data/train_motion_data.csv")
test_set=pd.read_csv("Data/test_motion_data.csv")

In [None]:
train_set

In [None]:
test_set

In [None]:
# Global constants
TIMESTEPS =20
FEATURES =6
LABELS=2
N_RECORDS =10

#CNN constants
N_COLUMNS = 4
N_LENGTH = TIMESTEPS //N_COLUMNS
N_ROWS =1

In [None]:
# Splitting the samples by class

df_train_normal=train_set.loc[train_set['Class']=="NORMAL"]
df_train_slow=train_set.loc[train_set['Class']=="SLOW"]
df_train_aggressive=train_set.loc[train_set['Class']=="AGGRESSIVE"]


In [None]:
df_test_normal=test_set.loc[test_set['Class']=="NORMAL"]
df_test_slow=test_set.loc[test_set['Class']=="SLOW"]
df_test_aggressive=test_set.loc[test_set['Class']=="AGGRESSIVE"]

### Plotting 3 driving behaviours against accX , accY and accZ

In [None]:
plt.plot(df_train_normal['Timestamp'],df_train_normal['AccX'],label="Acceleration X")
plt.plot(df_train_normal['Timestamp'],df_train_normal['AccY'],label="Acceleration Y")
plt.plot(df_train_normal['Timestamp'],df_train_normal['AccZ'],label="Acceleration Z")

plt.title("Normal Driving Behaviour")
plt.xlabel("Timestamp")
plt.ylabel("Acceleration in m/s^2")
plt.legend(loc='upper left')
plt.show()

In [None]:
plt.plot(df_train_slow['Timestamp'],df_train_slow['AccX'],label="Acceleration X")
plt.plot(df_train_slow['Timestamp'],df_train_slow['AccY'],label="Acceleration Y")
plt.plot(df_train_slow['Timestamp'],df_train_slow['AccZ'],label="Acceleration Z")

plt.title("Slow Driving Behaviour")
plt.xlabel("Timestamp")
plt.ylabel("Acceleration in m/s^2")
plt.legend(loc='upper left')
plt.show()

In [None]:
plt.plot(df_train_aggressive['Timestamp'],df_train_aggressive['AccX'],label="Acceleration X")
plt.plot(df_train_aggressive['Timestamp'],df_train_aggressive['AccY'],label="Acceleration Y")
plt.plot(df_train_aggressive['Timestamp'],df_train_aggressive['AccZ'],label="Acceleration Z")

plt.title("Aggressive Driving Behaviour")
plt.xlabel("Timestamp")
plt.ylabel("Acceleration in m/s^2")
plt.legend(loc='upper left')
plt.show()

### Plotting 3 driving behaviours against GyroX , GyroY and GyroZ

In [None]:
plt.plot(df_train_normal['Timestamp'],df_train_normal['GyroX'],label="Gyro X")
plt.plot(df_train_normal['Timestamp'],df_train_normal['GyroY'],label="Gyro Y")
plt.plot(df_train_normal['Timestamp'],df_train_normal['GyroZ'],label="Gyro Z")

plt.title("Normal Driving Behaviour (Gyroscope)")
plt.xlabel("Timestamp")
plt.ylabel("In degree/s")
plt.legend(loc='upper left')
plt.show()

In [None]:
plt.plot(df_train_slow['Timestamp'],df_train_slow['GyroX'],label="Gyro X")
plt.plot(df_train_slow['Timestamp'],df_train_slow['GyroY'],label="Gyro Y")
plt.plot(df_train_slow['Timestamp'],df_train_slow['GyroZ'],label="Gyro Z")

plt.title("Slow Driving Behaviour (Gyroscope)")
plt.xlabel("Timestamp")
plt.ylabel("In degree/s")
plt.legend(loc='upper left')
plt.show()

In [None]:
plt.plot(df_train_aggressive['Timestamp'],df_train_aggressive['GyroX'],label="Gyro X")
plt.plot(df_train_aggressive['Timestamp'],df_train_aggressive['GyroY'],label="Gyro Y")
plt.plot(df_train_aggressive['Timestamp'],df_train_aggressive['GyroZ'],label="Gyro Z")

plt.title("Aggressive Driving Behaviour (Gyroscope)")
plt.xlabel("Timestamp")
plt.ylabel("In degree/s")
plt.legend(loc='upper left')
plt.show()

### Removing N_RECORDS data from first and last of each set that is caused due to class switching

In [None]:
df_train_slow= df_train_slow.iloc[N_RECORDS:]
df_train_slow= df_train_slow.iloc[:-N_RECORDS]

df_train_normal= df_train_normal.iloc[N_RECORDS:]
df_train_normal= df_train_normal.iloc[:-N_RECORDS]

df_train_aggressive= df_train_aggressive.iloc[N_RECORDS:]
df_train_aggressive= df_train_aggressive.iloc[:-N_RECORDS]



In [None]:
df_test_slow= df_test_slow.iloc[N_RECORDS:]
df_test_slow= df_test_slow.iloc[:-N_RECORDS]

df_test_normal= df_test_normal.iloc[N_RECORDS:]
df_test_normal= df_test_normal.iloc[:-N_RECORDS]

df_test_aggressive= df_test_aggressive.iloc[N_RECORDS:]
df_test_aggressive= df_test_aggressive.iloc[:-N_RECORDS]


In [None]:
# Checking data per label

print('----Train----')
print("Slow samples: "+str(df_train_slow.__len__()))
print("Normal samples: "+str(df_train_normal.__len__()))
print("Aggressive samples: "+str(df_train_aggressive.__len__()))

df_train_slow=df_train_slow.tail(1300)
df_train_normal=df_train_normal.tail(1160)
df_train_aggressive=df_train_aggressive.tail(1080)


In [None]:
print('----Test----')
print("Slow samples: "+str(df_test_slow.__len__()))
print("Normal samples: "+str(df_test_normal.__len__()))
print("Aggressive samples: "+str(df_test_aggressive.__len__()))

df_test_slow=df_test_slow.tail(1250)
df_test_normal=df_test_normal.tail(970)
df_test_aggressive=df_test_aggressive.tail(790)

In [None]:
#separating features from labels

#train
X_train_normal = df_train_normal.iloc[:,:FEATURES]
X_train_slow = df_train_slow.iloc[:,:FEATURES]
X_train_aggressive = df_train_aggressive.iloc[:,:FEATURES]

#test
X_test_normal = df_test_normal.iloc[:,:FEATURES]
X_test_slow = df_test_slow.iloc[:,:FEATURES]
X_test_aggressive = df_test_aggressive.iloc[:,:FEATURES]

#train
Y_train_normal=df_train_normal.Class
Y_train_slow=df_train_slow.Class
Y_train_aggressive=df_train_aggressive.Class

Y_test_normal=df_test_normal.Class
Y_test_slow=df_test_slow.Class
Y_test_aggressive=df_test_aggressive.Class
X_train_normal




In [None]:
# combine all the samples into a train and test dataset

X_train=pd.concat([X_train_normal,X_train_aggressive])
Y_train=pd.concat([Y_train_normal,Y_train_aggressive])

X_test=pd.concat([X_test_normal,X_test_aggressive])
Y_test=pd.concat([Y_test_normal,Y_test_aggressive])
'''X_train'''
Y_train


In [None]:
# Mapping labels to numbers
labelEncoder = LabelEncoder()
Y_train=labelEncoder.fit_transform(Y_train)
Y_test=labelEncoder.transform(Y_test)

In [None]:
# Converting to one hot encoded vectors
Y_train = to_categorical(Y_train, num_classes=LABELS)
Y_test = to_categorical(Y_test, num_classes=LABELS)

In [None]:
# scaling data
scaler = StandardScaler(with_mean = True , with_std=True)
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
#Reshaping the input for the Conv_LSTM network (Features)
train_samples = X_train.shape[0]//TIMESTEPS
X_train =X_train.reshape(train_samples, N_COLUMNS, N_ROWS, N_LENGTH, FEATURES)


test_samples = X_test.shape[0]//TIMESTEPS
X_test =X_test.reshape(test_samples, N_COLUMNS, N_ROWS, N_LENGTH, FEATURES)

print('Train features shape: '+str(X_train.shape[0]))
print('Test features shape:' + str(X_test.shape[0]))

In [None]:
Y_train=Y_train[::TIMESTEPS]
Y_test=Y_test[::TIMESTEPS]

print('Train labels shape: '+str(Y_train.shape[0]))
print('Test labels shape: '+str(Y_test.shape[0]))

In [None]:
tf.random.set_seed(42)

# Build the model by assigning
# the number of layers and number of neurons for each layer
# the learning rate
# the number of epochs
# ------------------------------------------------------------------------
# To get these optimal parameters I will use the Keras Tuner library
# so it will get the optimal parameters for the NN
# and pass it to the model_builder function
def model_builder(hp):
    model = tf.keras.Sequential()

    model.add(
        ConvLSTM2D(
            filters=hp.Int('filter_units', min_value=16, max_value=64, step=16),
            kernel_size=(1,3), 
            activation='relu', 
            input_shape=(N_COLUMNS, N_ROWS, N_LENGTH, FEATURES)
            )
        )
        
    model.add(Dropout(hp.Float('dropout_0_rate', min_value=0, max_value=0.5, step=0.1)))

    model.add(Flatten())

    model.add(Dense(hp.Int('dense_units', min_value=32, max_value=512, step=32), activation='relu'))

    model.add(Dense(LABELS, activation='softmax'))
    

    # Tune the learning rate for the optimizer
    # Choose an optimal value from 0.1, 0.01, 0.001, or 0.0001
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-1, 1e-2, 1e-3, 1e-4])

    model.compile(
        loss=tf.keras.losses.BinaryCrossentropy(),
        optimizer=tf.keras.optimizers.Adam(learning_rate=hp_learning_rate),
        metrics=[
            tf.keras.metrics.BinaryAccuracy(name='accuracy'),
            tf.keras.metrics.Precision(name='precision'),
            tf.keras.metrics.Recall(name='recall')
        ]
    )

    return model

In [None]:
# To be able to get the optimal parameters for our network
# we have to create a Tuner with the following setup
# 'objective' -> will use to create test models
# 'max_epochs' -> the maximum number of epochs to train one model
# 'factor' -> the reduction factor for the number of epochs and number of models for each bracket
# 'directory' -> will save the training history there
tuner= kt.Hyperband(model_builder,
                   objective='val_accuracy',
                   max_epochs=50,
                   factor=3,
                   directory='bin_conv_lstm_dir',
                   project_name='driving_behaviour')

In [None]:
#Stop early function
stop_early=tf.keras.callbacks.EarlyStopping(monitor='val_loss',patience=5)

In [None]:
#considering the top_performing mode

tuner.search(
    x=X_train,
    y=Y_train,
    epochs=50,
    validation_data=(X_test, Y_test),
    callbacks=[stop_early],
    shuffle=True
)

In [None]:
best_mode = tuner.get_best_models()[0]