In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from keras import Sequential
from keras.layers import LSTM, Dense

# Raw data loading

In [None]:
raw_df = pd.read_csv('car_data.csv')
print(raw_df.shape)

In [None]:
raw_df.head()

# Data preprocessing

In [None]:
def LSTM_preprocessing(df, scaler, look_back=16, look_ahead=8, test_size=0.25):
    raw_X = df[['use_percent','number_of_process','is_recording','framerate','resolution_height','resolution_width','speed','red_intensity','green_intensity','blue_intensity','is_buzzing','ultrasonicOn']]
    raw_X = scaler.fit_transform(raw_X)
    raw_Y = df['voltage']
    X = []
    y = []
    for i in range(len(raw_X) - look_back - look_ahead + 1):
        X.append(raw_X[i : i + look_back])
        y.append(raw_Y[i + look_back : i + look_back + look_ahead])
    X_train, X_test, Y_train, Y_test = train_test_split(np.array(X), np.array(y), test_size=test_size)
    return X_train, X_test, Y_train, Y_test

def divisors(x):
    res = []
    for i in range(1,x,1):
        if x % i==0:
            res.append(i)
    return res

df = raw_df.drop('timestamp', axis=1)
scaler = MinMaxScaler(feature_range=(0,1))
X_train, X_test, Y_train, Y_test = LSTM_preprocessing(df, scaler)
print('X_train  :',X_train.shape)
print('Y_train  :',Y_train.shape)
print('X_test   :',X_test.shape)
print('Y_test   :',Y_test.shape)
divisors = divisors(X_train.shape[0])
print('Batch sizes suitable such as they are a divisor of the training set size : ', divisors)

# LSTM Model


In [None]:
def SoC_forecasting_model(input_shape, output_shape, stateful=True,batch_size=None, verbose=True):
    assert (input_shape[0] % batch_size == 0) or batch_size is None, 'Batch size should be a multiplier of the number of entries'
    SoC_model = Sequential(name ='SoC_Forecasting_Model')
    SoC_model.add(LSTM(32, input_shape=(input_shape[1], input_shape[2]), batch_size=batch_size, recurrent_dropout=0.2, return_sequences=True, stateful=stateful, name='LSTM_input'))
    SoC_model.add(LSTM(32, recurrent_dropout=0.2, return_sequences=True, stateful=stateful, name='LSTM_hidden1'))
    SoC_model.add(LSTM(32, recurrent_dropout=0.2, return_sequences=True, stateful=stateful, name='LSTM_hidden2'))
    SoC_model.add(LSTM(32, recurrent_dropout=0.2, return_sequences=True, stateful=stateful, name='LSTM_hidden3'))
    SoC_model.add(LSTM(32, recurrent_dropout=0.2, stateful=stateful, name='LSTM_hidden_last'))
    SoC_model.add(Dense(output_shape[1], name='Dense_output'))
    SoC_model.compile(loss='mse', optimizer='adam', metrics=['mae'])
    if verbose :
        print(SoC_model.summary())
    return SoC_model

In [None]:
from tqdm import tqdm
def stateful_fit(model, X_train, Y_train, batch_size, epochs=10, verbose=True):
    assert X_train.shape[0] % batch_size == 0, 'Batch size should be a multiplier of the number of entries'
    for i in range(epochs):
        for j in tqdm(range(0, X_train.shape[0]//batch_size), desc=f"Epoch : {i+1}/{10}", ncols=100, colour='green', disable=not verbose):
            X_batch = X_train[j*batch_size : (j+1)*batch_size]
            Y_batch = Y_train[j*batch_size : (j+1)*batch_size]
            model.train_on_batch(X_batch, Y_batch)
        model.reset_states()

### Creating the model

In [None]:
model = SoC_forecasting_model(X_train.shape, Y_train.shape, batch_size=6)

# Training

## Stateful section

Section to train the model if it is stateful

In [None]:
stateful_fit(model, X_train, Y_train, 6)

## Stateless training

Section to train the model if it is stateless, measuring MSE as loss but also MAE as metric.

In [None]:
history = model.fit(X_train,Y_train, epochs=10, batch_size=6, validation_data=(X_test, Y_test), shuffle=False)
for key in history.history.keys():
    plt.plot(history.history[key])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(history.history.keys(), loc='upper right')
plt.show()

In [None]:
metrics = model.evaluate(X_test, Y_test)
print('MSE : ', metrics[0])
print('MAE : ', metrics[1])

# Saving model

In [None]:
model.save('4WD_Car_SoC_Forecasting_model_deep')