# Modeling LSTM Network to Predict Collision 

We first create the LSTM model that will take as its input time series data from the CVS experiment. This data contains driver response for every subject and every scenario. The data we will feed the LSTM will contain driver response from 3 seconds prior to the warning and till the time the driver reacts. We will use this data to train the model to predict whether collision takes place. As the time series data for every driver-scenario combination will be of a different length, we will need to pad the data to make sure the input to the LSTM is of the same size.  

## Step 1

We will first model the LSTM network which will be fed into the softmax layer for a classification output. The LSTM layer will be fed the driving data which will be of the shape - sequence length X no_of_features. This data will be passed in batches of size batch_size. 

In [None]:
import torch 
import torch.nn as nn
from torch.autograd import Variable

# Defining the LSTM model as a class. The model we will train later will be the instance of this class.

class collisionClassifier(nn.Module):
    def __init__(self, no_of_features, features_in_hidden, output_features, max_sequence_length, no_of_layers = 1):
        self.features_in_hidden = features_in_hidden
        self.no_of_layers = no_of_layers
        self.max_sequence_length = max_sequence_length
        self.lstm_cell = nn.LSTM(no_of_features, features_in_hidden, bias = True)
        self.linear_block = nn.Linear(features_in_hidden, output_features)
        self.smc = nn.Softmax(dim = 1)
    def __init__hidden(self, max_sequence_length):
        hidden = torch.zeros(1, max_sequence_length, self.hidden_size)
        return Variable(hidden)
        
    def __init__cellstate(self, max_sequence_length):
        cellstate = torch.zeros(1, max_sequence_length, self.hidden_size)
        return Variable(cellstate)
    
    def forward_pass(self, input_data, max_sequence_length,sequence_length):
        hidden = self.__init__hidden(max_sequence_length)
        cellstate = self.__init__cellstate(max_sequence_length)
        output, (hidden,cellstate) = self.lstm_cell(input_data, (hidden, cellstate))
        output = self.linear_block(hidden[:,(sequence_length-1)])
        output = self.smc(output)
        return output

## Step 2

Now, we will import the data from the excel sheet and scale it. Once the scaling is done, we will split the different scenario data and pad them to make all sequences of equal length.  

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

filepath_features = "####################"
driving_data = pd.read_excel(filepath_features)
driving_data = driving_data.values

filepath_output = "####################"
output_data = pd.read_excel(filepath_output)
output_data = output_data.values

no_of_data_points = driving_data.shape[0]
no_of_columns = driving_data.shape[1]

scaler = MinMaxScaler(feature_range = (0,1))
scaledData = scaler.fit_transform(driving_data)
scaledData = scaledData.reshape([1, no_of_data_points, no_of_columns])

data_split_vector = [#############################################]

def paddingMethod(scenario_data, data_split_vector):
    scenario_tensor = torch.zeros((len(data_split_vector),max(data_split_vector),scenario_data.shape[2])).float()
    j = 0
    for idx, scen_length in enumerate(data_split_vector):
        for i in range (scen_length):
            scenario_tensor[idx,i,:] = torch.FloatTensor(scenario_data[0,j+i,:])
        j += scen_length
    return scenario_tensor

input_data = paddingMethod(scaledData, data_split_vector)

## Step 3

Now, we will split the data into training and testing set.

In [None]:
independent_events = input_data.size[0]
train_events = independent_events % 8
input_training_data = input_data[0:(train_events+1),:,:]
input_test_data = input_data[(training_events+1):independent_events, : ,:]

## Step 4

Training the model

In [None]:
# initialise the model

collision_predictor = collisionClassifier(no_of_features = input_data.size[2],features_in_hidden = 3,\
                                          output_size = 2, max_sequence_length = max(data_split_vector))
no_of_batches = ##############################
batch_size =##################################

def avg_error(linear_output, target):
    loss = nn.CrossEntropyLoss()
    error = loss(linear_output,target)
    return error

for j in range (0,(no_of_batches - 1)):
    pred_tensor = torch.zeros(batch_size,2)
    target_tensor =  # Need to create a tensor of the target data 
    for i in range (0,(batch_size - 1)):
        event_no = (batch_size*j)+i
        prediction = collision_predictor.forward(input_training_data[even_no,:,:], max(data_split_vector),\
                                                 data_split_vector[event_no])
        pred_tensor[i] = prediction
        target_tensor[i]= ###########################################
    error = avg_error(pred_tensor,target_tensor)
    collision_predictor.zero_grad()
    error.backward()
    optimizer = torch.optim.Adam(collision_predictor.parameters(), lr=0.001)
    optimizer.step()   
    
    

## Step 5

Testing the model 

In [None]:
# The objective of testing is to test the accuracy of the model in predicting whther collision takes place or not
# For this we need to first define the test data and the corresponding target data. This can then be fed into 
# forward function of the collisionClassifier class. This will generate a prediction which will be in the form 
# of a 1X2 tensor which can be hot coded to get as an output whether the collision will take place or not.

# input_test_data stores the test input as a tensor  
test_target = #####################################
test_predictions = np.zeros(test_target.shape[0])
event_no = #########################################
for i in range(0,input_test_data.shape(0)):
    output = collision_predictor.forward(input_tesst_data[i,:,:], max(data_split_vector), data_split_vector[event_no])
    if output[0,0] < output[0,1]:
        test_predictions[i] = 0
    else :
        test_predictions[i] = 1

# Testing the accuracy of model predictions on thetest data set

for i in range(0, input_test_data.shape[0]):
    if test_predictions[i] == test_target[i]:
        no_of_correct_predictions += 1
    else:
        no_of_wrong_predictions += 1

model_accuracy = no_of_correct_predictions/(no_of_correct_predictions+ no_of_wrong_predictions)