In [None]:
import tensorflow as tf

from tensorflow.keras.layers import Dense, Input, BatchNormalization, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras import optimizers
from tensorflow.keras.utils import plot_model
from tensorflow.keras import backend as K

from keras.models import load_model


import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['figure.dpi'] = 110
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, LabelBinarizer

In [None]:
# Set the path to the base directory of frequency-domain hdf files
base_dir = ''

In [None]:
loads = ['05', '20', '25', '40']
reps = ['0', '1']
states = ['r1b', 'r2b', 'r3b', 'r4b', 'rs']

In [None]:
def addresses_generator(signal, loads, states, reps):
    results = {}
    for load in loads:
        results[load] = []
        for state in states:
            results[load].append([signal + '_' + state +'_torque' + load + '_' + rep for rep in reps])

    return results

In [None]:
Ia_addresses = addresses_generator('Ia', loads, states, reps)
Ib_addresses = addresses_generator('Ib', loads, states, reps)
Ic_addresses = addresses_generator('Ic', loads, states, reps)

In [None]:
def df_loader_from_addresses(addresses, base_dir, signal):
    results = {}
    for load in list(addresses.keys()):
        results[load] = []
        for addresses_of_same_load_state_reps in addresses[load]:
            list_of_dfs = [pd.read_hdf(base_dir + signal+ '//' + address + '.hdf', address) for address in addresses_of_same_load_state_reps]
            concat_dfs = pd.concat(list_of_dfs)
            concat_dfs = concat_dfs.reset_index(drop=True)
            results[load].append(concat_dfs)
        results[load] = pd.concat(results[load])
        results[load] = results[load].reset_index(drop=True)
    return results

In [None]:
Ia_dict = df_loader_from_addresses(Ia_addresses, base_dir, 'Ia')
Ib_dict = df_loader_from_addresses(Ib_addresses, base_dir, 'Ib')
Ic_dict = df_loader_from_addresses(Ic_addresses, base_dir, 'Ic')

In [None]:
def train_test_splitter(signal_dict):
  signal_train = {}
  signal_test = {}

  for load in list(signal_dict.keys()):
    temp_train, temp_test = train_test_split(signal_dict[load], test_size=0.25, random_state=42)
    
    signal_train[load] = temp_train
    signal_test[load] = temp_test

  return signal_train, signal_test

In [None]:
Ia_train, Ia_test = train_test_splitter(Ia_dict)
Ib_train, Ib_test = train_test_splitter(Ib_dict)
Ic_train, Ic_test = train_test_splitter(Ic_dict)

In [None]:
Ia_train['total'] = np.concatenate([Ia_train[load] for load in list(Ia_train.keys())])
Ia_test['total'] = np.concatenate([Ia_test[load] for load in list(Ia_test.keys())])

Ib_train['total'] = np.concatenate([Ib_train[load] for load in list(Ib_train.keys())])
Ib_test['total'] = np.concatenate([Ib_test[load] for load in list(Ib_test.keys())])

Ic_train['total'] = np.concatenate([Ic_train[load] for load in list(Ic_train.keys())])
Ic_test['total'] = np.concatenate([Ic_test[load] for load in list(Ic_test.keys())])

In [None]:
def train_test_scaler(signal_train, signal_test):

  for load in list(signal_train.keys()):
    temp_scaler = MinMaxScaler()
    temp_scaler.fit(signal_train[load])
    temp_train_scaled = temp_scaler.transform(signal_train[load])
    temp_test_scaled = temp_scaler.transform(signal_test[load])

    signal_train[load] = temp_train_scaled
    signal_test[load] = temp_test_scaled

  return signal_train, signal_test

In [None]:
Ia_train_scaled, Ia_test_scaled = train_test_scaler(Ia_train, Ia_test)

In [None]:
Ib_train_scaled, Ib_test_scaled = train_test_scaler(Ib_train, Ib_test)

In [None]:
Ic_train_scaled, Ic_test_scaled = train_test_scaler(Ic_train, Ic_test)

In [None]:
labels = np.concatenate([
    np.full((300), 1),
    np.full((300), 2),
    np.full((300), 3),
    np.full((300), 4),
    np.full((300), 0),
])
labels.shape
labels_df = pd.DataFrame({'target': labels})

In [None]:
lables_bin = LabelBinarizer().fit_transform(y=labels_df['target'])
labels_bin_train, labels_bin_test = train_test_split(lables_bin, test_size=0.25, random_state=42)

In [None]:
labels_bin_train_dict = dict(zip(loads, [labels_bin_train, labels_bin_train, labels_bin_train, labels_bin_train]))
labels_bin_train_dict['total'] = np.concatenate([labels_bin_train, labels_bin_train, labels_bin_train, labels_bin_train])

In [None]:
labels_bin_test_dict = dict(zip(loads,[labels_bin_test, labels_bin_test, labels_bin_test, labels_bin_test]))
labels_bin_test_dict['total'] = np.concatenate([labels_bin_test, labels_bin_test, labels_bin_test, labels_bin_test])

In [None]:
def current_concatanator(Ia_train_scaled, Ib_train_scaled, Ic_train_scaled):
  concatanated_dict = {}
  for load in list(Ia_train_scaled.keys()):
    concatanated_dict[load] = np.concatenate((Ia_train_scaled[load], Ib_train_scaled[load], Ic_train_scaled[load]), axis=1)

  return concatanated_dict

In [None]:
I_train_scaled = current_concatanator(Ia_train_scaled, Ib_train_scaled, Ic_train_scaled)

In [None]:
I_test_scaled = current_concatanator(Ia_test_scaled, Ib_test_scaled, Ic_test_scaled)

In [None]:
for load in list(Ia_train.keys()):
  print(load, '   ', I_train_scaled[load].shape, labels_bin_train_dict[load].shape)

In [None]:
for load in list(Ia_test.keys()):
  print(load, '   ', I_test_scaled[load].shape, labels_bin_test_dict[load].shape)

In [None]:
def model_creator():

  input = Input(shape=9999, name='input1')
  input_emb = Dense(units=7500, activation='tanh', name = 'HL1_1')(input)
  input_emb = Dense(units=6000, activation='tanh', name = 'HL1_2')(input_emb)
  input_emb = Dense(units=4500, activation='tanh', name = 'HL1_3')(input_emb)
  input_emb = Dense(units=3000, activation='tanh', name = 'HL1_4')(input_emb)
  input_emb = Dense(units=1500, activation='tanh', name = 'HL1_5')(input_emb)
  input_emb = Dense(units=750, activation='tanh', name = 'HL1_6')(input_emb)
  input_emb = Dense(units=500, activation='tanh', name = 'HL1_7')(input_emb)
  input_emb = Dense(units=250, activation='tanh', name = 'HL1_8')(input_emb)
  input_emb = Dense(units=50, activation='tanh', name = 'HL1_9')(input_emb)
  predicted_label = Dense(units=5, activation='softmax', name='label')(input_emb)

  return Model(inputs = input, outputs = predicted_label)

In [None]:
model = model_creator()
plot_model(model, show_shapes=True)

In [None]:
class SaveBestModel(tf.keras.callbacks.Callback):
    def __init__(self, save_best_metric='val_loss', this_max=False):
        self.save_best_metric = save_best_metric
        self.max = this_max
        if this_max:
            self.best = float('-inf')
        else:
            self.best = float('inf')

    def on_epoch_end(self, epoch, logs=None):
        metric_value = logs[self.save_best_metric]
        if self.max:
            if metric_value > self.best:
                self.best = metric_value
                self.best_weights = self.model.get_weights()

        else:
            if metric_value < self.best:
                self.best = metric_value
                self.best_weights= self.model.get_weights()

In [None]:
lr = 0.000001
ep = 400
n_repetition = 1

training_classification_accs = []
testing_classification_accs = []
histories = {}

In [None]:
for i in range(n_repetition):
  temp_training_accs = []
  temp_testing_accs = []
  for load in list(Ia_train.keys()):

    model = model_creator()
    best_model = SaveBestModel()
    opt = optimizers.Adam(learning_rate=lr, decay=lr / ep)
    model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

    temp_histories  = model.fit(I_train_scaled[load], labels_bin_train_dict[load],
                                validation_split=0.25, epochs = ep, callbacks=[best_model])
    
    model.set_weights(best_model.best_weights)
    
    histories[load + '_' + str(i)] = temp_histories
    
    temp_temp_train = []
    temp_temp_test = []
    
    for evaluation_load in list(Ia_train.keys()):
      temp_temp_train.append(model.evaluate(I_train_scaled[evaluation_load], labels_bin_train_dict[evaluation_load],)[1])
      
      temp_temp_test.append(model.evaluate(I_test_scaled[evaluation_load], labels_bin_test_dict[evaluation_load],)[1])

    temp_training_accs.append(temp_temp_train)
    temp_testing_accs.append(temp_temp_test)



  training_classification_accs.append(temp_training_accs)
  testing_classification_accs.append(temp_testing_accs)

In [None]:
for item in training_classification_accs:
  print(pd.DataFrame(item, columns=list(Ia_train.keys()), index=list(Ia_train.keys())), '\n')

In [None]:
for item in testing_classification_accs:
  print(pd.DataFrame(item, columns=list(Ia_train.keys()), index=list(Ia_train.keys())), '\n')

In [None]:
for item in list(histories.keys()):
  plt.plot(histories[item].history['loss'])
  plt.plot(histories[item].history['val_loss'])
  plt.title('model loss - {}'.format(item))
  plt.ylabel('loss')
  plt.xlabel('epoch')
  plt.legend(['train', 'test'], loc='upper left')
  plt.show()

In [None]:
for item in list(histories.keys()):
  plt.plot(histories[item].history['accuracy'])
  plt.plot(histories[item].history['val_accuracy'])
  plt.title('model accuracy - {}'.format(item))
  plt.ylabel('accuracy')
  plt.xlabel('epoch')
  plt.legend(['train', 'test'], loc='upper left')
  plt.show()