In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.preprocessing import MinMaxScaler
from scipy import ndimage 
tf.compat.v1.set_random_seed(1)

In [None]:
"""Function that prepares the input X and output y for training a LSTM network


Args:
    sequence_1: the first sequence which gets converted into multiple subarrays of length: n_steps
    sequence_2: the second sequence, each n_steps'th element will be part of the output array
    n_steps: the amount of time steps used as an input into the LSTM for prediction

Returns:
    A tuple of 2 numpy arrays in the required format
    
    X.shape = (X.shape[0] - n_steps, n_steps)
    y.shape = (X.shape[0] - n_steps)

"""
def prepare_sequences(sequence_1, sequence_2, n_steps):
    X, y = list(), list()
    for i in range(len(sequence_1)):
        end_ix = i + n_steps

        if end_ix > len(sequence_1)-1:
            break

        seq_x = sequence_1[i:end_ix]
        X.append(seq_x)
        y.append(sequence_2[end_ix])
        
    return np.array(X), np.array(y)


def subsample(sequence, d_sample):
    return sequence[::d_sample]

def smooth(sequence, sigma):
    return ndimage.filters.gaussian_filter(sequence, sigma)

In [None]:
# set hyperparameters
n_steps = 500
n_features = 1
n_lstm_units = 50
n_dense_units = 1
n_epochs = 4

d_sample = 10
gauss_sigma = 10

In [None]:
# ------------------------------------------- Prepare Training Data -------------------------------------------
train_cur_master = np.loadtxt('../data/fobss_data/data/Ri Jumps 25A/battery/Battery_Current.csv', delimiter=';')
train_cur_master = train_cur_master[10000:150000,1]

train_volt_slave_0_cell_4 = np.loadtxt('../data/fobss_data/data/Ri Jumps 25A/cells/Slave_0_Cell_Voltages.csv', delimiter=';')
train_volt_slave_0_cell_4 = train_volt_slave_0_cell_4[10000:150000,4]

# subsample and smooth data 
train_cur_master = subsample(train_cur_master, d_sample)
train_cur_master = smooth(train_cur_master, gauss_sigma)

train_volt_slave_0_cell_4 = subsample(train_volt_slave_0_cell_4, d_sample)
train_volt_slave_0_cell_4 = smooth(train_volt_slave_0_cell_4, gauss_sigma)

# convert into X and y sequences
X_train, y_train = prepare_sequences(train_cur_master, train_volt_slave_0_cell_4, n_steps)
y_train = np.reshape(y_train, (-1, 1))

# fit and scale input
scaler_X = MinMaxScaler(feature_range = (0, 1))
scaler_X.fit(X_train)
X_train_scaled = scaler_X.transform(X_train)

# fit and scale output
scaler_y = MinMaxScaler(feature_range = (0, 1))
scaler_y.fit(y_train)
y_train_scaled = scaler_y.transform(y_train)

# reshape into correct input format
X_train_scaled = X_train_scaled.reshape(X_train_scaled.shape[0], X_train_scaled.shape[1], n_features)
print(X_train_scaled.shape, y_train_scaled.shape)

In [None]:
# ------------------------------------------- Initialize LSTM -------------------------------------------
model = keras.Sequential()

# Adding the first LSTM layer and some Dropout regularisation
model.add(layers.SimpleRNN(units = n_lstm_units, activation='relu', input_shape = (n_steps, n_features), return_sequences=True))
# model.add(layers.Dropout(0.2))

# Adding the output layer
model.add(layers.Dense(units = n_dense_units))

# Show model
model.summary()

In [None]:
# ------------------------------------------- Train LSTM -------------------------------------------
model.compile(optimizer = 'adam', loss = 'mean_squared_error')

# Fitting the RNN to the Training set
history = model.fit(X_train_scaled, y_train_scaled, epochs = n_epochs, verbose = 1)

# Save trained model
model.save("simpleRNN")

In [None]:
# ------------------------------------------- Visualize Training -------------------------------------------
loss = history.history['loss']

epochs = range(1,len(loss)+1)
plt.title('Training error with epochs')
plt.plot(epochs,loss,'bo',label='training loss')
plt.xlabel('epochs')
plt.ylabel('training error')
plt.show()

In [None]:
# ------------------------------------------- Prepare Test Data -------------------------------------------
test_cur_master = np.loadtxt('../data/fobss_data/data/Ri Jumps 25A/battery/Battery_Current.csv', delimiter=';')
test_cur_master = test_cur_master[80000:100000,1]
test_volt_slave_0_cell_4 = np.loadtxt('../data/fobss_data/data/Ri Jumps 25A/cells/Slave_0_Cell_Voltages.csv', delimiter=';')
test_volt_slave_0_cell_4 = test_volt_slave_0_cell_4[80000:100000, 4]

# subsample and smooth data 
test_cur_master = subsample(test_cur_master, d_sample)
test_cur_master = smooth(test_cur_master, gauss_sigma)

test_volt_slave_0_cell_4 = subsample(test_volt_slave_0_cell_4, d_sample)
test_volt_slave_0_cell_4 = smooth(test_volt_slave_0_cell_4, gauss_sigma)

# convert into X and y sequences
X_test, y_test = prepare_sequences(test_cur_master, test_volt_slave_0_cell_4, n_steps)
y_test = np.reshape(y_test, (-1, 1))

# scale input and output
X_test_scaled = scaler_X.transform(X_test)
y_test_scaled = scaler_y.transform(y_test)

# reshape into correct input format
X_test_scaled = X_test_scaled.reshape(X_test_scaled.shape[0], X_test_scaled.shape[1], n_features)

print(X_test_scaled.shape, y_test_scaled.shape)

In [None]:
# predict on test data
yhat_scaled = model.predict(X_test_scaled, verbose = 1)
yhat = scaler_y.inverse_transform(yhat_scaled)

# plot test results
plt.plot(yhat, color='red', label = 'predicted')
plt.plot(y_test, color='blue', label = 'measured')
plt.legend()
plt.show()