# Regression NN - v1.1

Required libraries:

Requires the latest pip
pip install --upgrade pip

Current stable release for CPU and GPU
pip install tensorflow

Use seaborn for pairplot
pip install -q seaborn

Pandas library: pip install pandas
Use some functions from tensorflow_docs
pip install -q git+https://github.com/tensorflow/docs

Numpy (if not already installed)
pip install numpy

Matplotlib (if not already installed)
pip install matplotlib

In [None]:
# Imports
# -----------

# required libraries
import pathlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from enum import Enum, auto
# seaborn
import seaborn as sns
# tensorflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# tensorflow_docs
import tensorflow_docs as tfdocs
import tensorflow_docs.plots
import tensorflow_docs.modeling

In [None]:
# Simple enum class to determine study type
class STUDY_TYPES(Enum):
    ALL = auto()
    NOT_FAILED = auto()

In [None]:
# Define inputs and features
INPUT_PARAMS = ['a','b','time']
HAS_FAILED_FEATURE = ['fail']
STRESS_FEATURES = ['sx', 'sy', 'sz','sxy', 'sxz','syz']
DISPLACEMENT_FEATURES = ['u{axis}_{node}'.format(axis=a, node=i) for i in range(1,9) for a in ['x', 'y', 'z']]

INCLUDE_HAS_FAILED_FEATURE = False

# Define training sample
TRAIN_SAMPLE_FRAC = 0.8

# Define range of data to analyse
DATA_RANGE = (0,-1) # all

# Define paths
DATASET_PATH = 'C:\\Users\\igorp\\University of South Florida\\Mao, Wenbin - Myocardium (organized)\\Active\\Guccione_oneElem_study\\pickleData\\data.pickle'

# Define split data type
STUDY = STUDY_TYPES.NOT_FAILED

In [None]:
# Load dataset
# -----------------

# read pickle data and save as a pd dataset
raw_dataset = pd.read_pickle(DATASET_PATH)
# do not modify raw_data, instead, copy its instance
dataset = raw_dataset.copy()
# show some content
dataset.tail()

In [None]:
if STUDY == STUDY_TYPES.NOT_FAILED:
    print("len before:", len(dataset))
    dataset = dataset.drop(dataset[dataset['fail'] == 1.0].index) 
    dataset.reset_index(drop=True, inplace=True)
    dataset.tail()
    print("len after:", len(dataset))

In [None]:
# Extract inputs / features

if INCLUDE_HAS_FAILED_FEATURE == True:
    FEATURES = np.hstack((HAS_FAILED_FEATURE, DISPLACEMENT_FEATURES, STRESS_FEATURES))
else:
    FEATURES = np.hstack((DISPLACEMENT_FEATURES, STRESS_FEATURES))

data_to_drop = [v for v in dataset.columns if v not in np.hstack((INPUT_PARAMS, FEATURES))]
dataset = dataset.drop(data_to_drop, axis=1)

# Crop dataset
dataset = dataset[DATA_RANGE[0]:DATA_RANGE[1]]

dataset.tail()

In [None]:
# Remove values that are not numbers
dataset = dataset.dropna()
dataset.isna().sum()

In [None]:
# Split data into train and test datasets
# Here, we are spliting 80% of the data for training and the rest for testing
train_dataset = dataset.sample(frac=TRAIN_SAMPLE_FRAC,random_state=0)
test_dataset = dataset.drop(train_dataset.index)

In [None]:
# Inpect data
sns.pairplot(train_dataset[['a','b','time']], diag_kind="kde")

In [None]:
# Obtain data statistics
train_stats = train_dataset.describe()
train_stats = train_stats.drop(FEATURES, axis=1)
train_stats = train_stats.transpose()
train_stats

In [None]:
# split inputs and features
train_labels = train_dataset.drop(INPUT_PARAMS, axis=1)     #outputs
train_dataset = train_dataset.drop(FEATURES, axis=1)        #inputs

test_labels = test_dataset.drop(INPUT_PARAMS, axis=1)
test_dataset = test_dataset.drop(FEATURES, axis=1)

In [None]:
# normalize data
def norm(x):
    return (x - train_stats['mean']) / train_stats['std']

normed_train_data = norm(train_dataset)
normed_test_data = norm(test_dataset)

# normed_train_data = train_dataset
# normed_test_data = test_dataset

normed_train_data.tail()

In [None]:
# Define build model

def build_model():
    model = keras.Sequential([
    layers.Dense(60, activation='relu', input_shape=[len(INPUT_PARAMS)]),
    layers.Dense(80, activation='relu'),
    layers.Dense(100, activation='relu'),
    layers.Dense(120, activation='relu'),
    layers.Dense(140, activation='relu'),
    layers.Dense(120, activation='relu'),
    layers.Dense(100, activation='relu'),
    layers.Dense(80, activation='relu'),
    layers.Dense(len(FEATURES), activation='linear')
    ])

    optimizer = tf.keras.optimizers.RMSprop(0.000001)


    lossFunction = tf.keras.losses.MeanSquaredError()
    metrics=['mae', 'mse']
    # metrics=['mean_squared_error']


    # model.compile(
    #             loss='mse',
    #             optimizer=optimizer,
    #             metrics=['mae', 'mse'])

    model.compile(
            loss=lossFunction,
            optimizer=optimizer,
            metrics=metrics)

    # model.compile(loss='mean_squared_error', optimizer='rmsprop')

    return model

In [None]:
# create and inspect model
model = build_model()
model.summary()

In [None]:
# try out model
normed_train_data.tail()
example_batch = normed_train_data[-2:]

example_result = model.predict(example_batch)
example_result

In [None]:
# train model based on epochs
EPOCHS = 1000

# # The patience parameter is the amount of epochs to check for improvement
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)

history = model.fit(
  train_dataset, 
  train_labels,
  epochs=EPOCHS, 
  validation_split = 0.2, 
  verbose=0,
  # batch_size=1,
  # validation_data=(test_dataset, test_labels),
  callbacks=[early_stop, tfdocs.modeling.EpochDots()]
  )

In [None]:
plotter = tfdocs.plots.HistoryPlotter(smoothing_std=2)
plotter.plot({'History': history}, metric = "mae")
# plt.ylim([0, 10])
plt.ylabel('MAE')

In [None]:
# evaluate model based on testing data

loss, mae, mse = model.evaluate(normed_test_data, test_labels, verbose=2)

print("Testing set Mean Abs Error: {:5.2f}".format(mae))

In [None]:
test_predictions = model.predict(normed_test_data)
test_predictions

In [None]:
test_predictions = pd.DataFrame(test_predictions, columns=FEATURES)
test_predictions.tail()

In [None]:
def plot_prediction_vs_true_value(feature, title=None, lims=[0, 0.05]):
    fig, a = plt.subplots()
    a.scatter(test_labels[feature], test_predictions[feature])
    a.set_xlabel('True Values [{fe}]'.format(fe=feature))
    a.set_ylabel('Predictions [{fe}]'.format(fe=feature))
    a.set_title(title)
    lims = [np.min([test_labels[feature].min(), test_predictions[feature].min()]), \
            np.max([test_labels[feature].max(), test_predictions[feature].max()])]
    a.set_xlim(lims)
    a.set_ylim(lims)
    a.set_aspect('equal')
    # _ = a.plot(lims, lims)

In [None]:
for v in FEATURES:
    plot_prediction_vs_true_value(v, title=v)

In [None]:
def plot_error_hist(feature, title=None):
    error = test_predictions[feature] - test_labels[feature]
    fig, a = plt.subplots()

    a.hist(error, bins = 25)
    a.set_xlabel("Prediction Error {fe}".format(fe=feature))
    a.set_title(title)
    _ = a.set_ylabel("Count")

In [None]:
for v in FEATURES:
    plot_error_hist(v, title=v)

In [None]:
# Predict
A = 1.2
B = 2.5

t_space = np.linspace(0,0.2,100)
a_space = np.full(len(t_space), A)
b_space = np.full(len(t_space), B)

d = {'a': a_space, 'b': b_space, 'time': t_space}

pred_inp = pd.DataFrame(data=d)

In [None]:
test_predictions = model.predict(pred_inp)
test_predictions = pd.DataFrame(test_predictions, columns=FEATURES)
test_predictions.tail()

In [None]:
def plot_curve(feature, title=None):
    fig, a = plt.subplots()
    a.scatter(pred_inp['time'], test_predictions[feature])

    a.set_xlabel('time [s]')
    a.set_ylabel('Predictions [{fe}]'.format(fe=feature))
    
    a.set_title(title)
    # lims = [pred_inp['time'].min(), \
    #         np.max([test_labels[feature].max(), test_predictions[feature].max()])]
    # a.set_xlim(lims)
    # a.set_ylim(lims)
    a.set_aspect('equal')

In [None]:
plot_curve('ux_8')

In [None]:
plot_curve('sx')