## Model generator main file

In [None]:
import numpy as np
import re
import unidecode
import os
import pickle
import sys

import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline

from keras                  import regularizers
from keras                  import Input, Model, Sequential
from keras.layers           import Bidirectional, Activation, TimeDistributed, Dense, RepeatVector, Embedding, Dropout, BatchNormalization
from keras.layers.recurrent import LSTM, GRU, SimpleRNN
from keras.utils            import np_utils
from keras.callbacks        import EarlyStopping, TensorBoard, ModelCheckpoint

Import data and make folders

In [None]:
data_path = 'data'
temp_path = data_path + '/temp'
backup_path = data_path + '/backup'
weights_path = data_path + '/weights'
train_history = data_path + '/train_history'

if not os.path.isdir(data_path):
    os.mkdir(data_path)
if not os.path.isdir(temp_path):
    os.mkdir(temp_path)
if not os.path.isdir(backup_path):
    os.mkdir(backup_path)
if not os.path.isdir(weights_path):
    os.mkdir(weights_path)
if not os.path.isdir(train_history):
    os.mkdir(train_history)

with open(data_path + '/joint_angle_data.pickle', 'rb') as file:
    joint_angle_data = pickle.load(file)
    file.close()
    
with open(data_path + '/power_data.pickle', 'rb') as file:
    power_data = pickle.load(file)
    file.close()
    


## Recurrent neural networks

Different model functions

In [None]:
def setModelToLSTM(self, multiple=True):
    n_timesteps, n_features, n_outputs = self.X_train.shape[1], self.X_train.shape[2], self.Y_train.shape[1]
    # define model
    self.model = Sequential()
    self.model.add(Bidirectional(LSTM(64, return_sequences = multiple, kernel_initializer = 'random_uniform', bias_initializer = 'zero', input_shape = (n_timesteps, n_features))))
    self.model.add(Dense(16, activation='relu'))
    self.model.add(Dense(n_outputs))
    self.model.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])


Other functions

In [None]:
def series_to_supervised(dataX, dataY, n_input, n_output):
    #Output: [batchs, n_timesteps, n_features]
    df_X = pd.DataFrame(dataX, columns=list('ABCDEF'))
    df_Y = pd.DataFrame(dataY, columns=list('A'))
    X = np.ndarray((int(df_X.shape[0]/n_input), n_input, df_X.shape[1]) )
    Y = np.ndarray((int(df_Y.shape[0]/n_input), n_output))

    for n in range(1,df_X.shape[0]+1,n_input):
        for m in range(n_input-1,-1, -1):
            X[int((n-1)/n_input),m,:] = df_X[n+m-1:n+m]
            
    for n in range(1,df_Y.shape[0]+1,n_output):
        for m in range(n_input-1,-1, -1):
            Y[int((n-1)/n_output),:] = df_Y[n+m-1:n+m]
            
    return X, Y

In [None]:
def setUpData(self, seq_length, dataX, dataY, n_test_ratio):

    self.X_train = []
    self.Y_train = []
    self.X_test = []
    self.Y_test = []
    self.seq_length = seq_length
    
    X, Y = series_to_supervised(dataX, dataY, n_input=seq_length, n_output=dataY.shape[1])
    
    self.X_train = X[:int(len(X)*(1-n_test_ratio)), :, :]
    self.Y_train = Y[:int(len(Y)*(1-n_test_ratio)), :]
    self.X_test = X[int(len(X)*(1-n_test_ratio)):, :, :]
    self.Y_test = Y[int(len(Y)*(1-n_test_ratio)):, :]
    

In [None]:
def trainModel(self, name):
    # File path for model
    filepath = weights_path + "/weights-" + name + "-{epoch:02d}-{loss:.4f}.hdf5"
    # Callbacks functions
    es = EarlyStopping(monitor='val_loss', patience=5, verbose=0)
    tb = TensorBoard(log_dir='./logs', histogram_freq=0, batch_size=32)
    mc = ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='auto')
    
    # Train the model
    epochs = 10           #Maximum number of epochs to run
    batch_size = 1    #Size of training data batch
    Val_split = 0.1       #Procentage of training data to use as validation data
    history = self.model.fit(self.X_train, self.Y_train, epochs=epochs, batch_size=batch_size, validation_split=Val_split, callbacks=[es, tb, mc])
    
    # Save the model
    filename = "model_" + name + ".hdf5"
    self.model.save_weights(weights_path + '/' + filename)
    # Save the history
    filename = "history_" + name + ".pickle"
    with open(train_history + '/' + filename, 'wb') as f:
        pickle.dump(history.history, f)
        f.close()

Model generator

In [None]:
class Generator:

    set_up_data = setUpData
    train_model = trainModel
    
    set_model_to_RNN = setModelToRNN
    set_model_to_GRU = setModelToGRU
    set_model_to_LSTM = setModelToLSTM
    

## Training the network

In [None]:
gen = Generator()
seqLleng = 1 #12ms/leng unit
batch_size = 8
n_test_ratio = 0.02
print((power_data.shape))
gen.set_up_data(seqLleng, joint_angle_data, power_data, n_test_ratio)
print(gen.X_train.shape)
print(gen.Y_train.shape)
print(gen.X_test.shape)
print(gen.Y_test.shape)
gen.set_model_to_LSTM(multiple=False)
name = 'test_run'
gen.train_model(name = name)