# Train a neural network.

In [None]:
import tensorflow as tf
import tensorflow.keras as keras
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

Set parameters

In [None]:
# file with training input and output data: format is x,y
input_file = 'training.csv'

# names of input/output columns
inputs = ['mu', 'angle', 'threshold']
outputs = ['speed']#['low_speed', 'high_speed']

# these set the input/output dimensions of the network
input_size = len(inputs)
output_size = len(outputs)

Read data

In [None]:
data = pd.read_csv(input_file)
data.columns = inputs + outputs

x = np.array(data[inputs])
y = np.array(data[outputs])

Normalizing preprocessing layer from training data

In [None]:
normalizer = keras.layers.experimental.preprocessing.Normalization()
normalizer.adapt(x)

Penalize overpredictions more than underpredictions

In [None]:
def asymmetric_mse(y_true, y_pred):
    standard_mse = keras.losses.mse(y_true, y_pred)
    geq = keras.backend.any(keras.backend.greater(y_pred, y_true)) # true/false, are there overpredictions?
    geq_scale = keras.backend.switch(geq,5.0,1.0) # if there are overpredictions, scale up mse
    return geq_scale * standard_mse

Base model

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Dense(4, activation='linear',input_dim=input_size,kernel_regularizer=keras.regularizers.l2(0.0000001)))
model.add(keras.layers.Dense(20,activation='relu'))
model.add(keras.layers.Dense(20,activation='relu'))
model.add(keras.layers.Dense(output_size,activation='relu'))
model.compile(loss=asymmetric_mse,optimizer='adam')

Compile model

In [None]:
input_shape = x.shape[1:]
full_model_input = keras.Input(shape=input_shape)
normalized_input = normalizer(full_model_input)
full_model_output = model(normalized_input)
full_model = keras.Model(full_model_input, full_model_output)
full_model.compile(loss=asymmetric_mse,optimizer='adam',metrics=['mse'])

Split data

In [None]:
x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=0.33, shuffle= True)

Train model

In [None]:
model_output = full_model.fit(x_train,y_train,epochs=400,batch_size=10,verbose=0,validation_data=(x_val,y_val)) # check validation

View training

In [None]:
#plt.yscale('log')
plt.plot(model_output.history['loss'])
plt.figure()
#plt.yscale('log')
plt.plot(model_output.history['mse'])
plt.figure()
#plt.yscale('log')
plt.plot(model_output.history['val_loss'])
plt.figure()
#plt.yscale('log')
plt.plot(model_output.history['val_mse'])
plt.figure()

View output

In [None]:
# plot speed vs angle given mu, threshold
mu = 0.009 # set mu
thresh = 1 # set threshold

# bug: mu = 0.009 is read as 0.0090..01
#plot_values = [i for i in x if i[0] == mu and i[2] == thresh] # x, y
plot_x = [i for i in x if np.isclose(i[0], mu) and i[2] == thresh] # x, y
# this is not generic enough...
pred_x = [[mu,angle,thresh] for angle in np.linspace(0,165,165)]
pred = full_model.predict(pred_x)

#plt.plot([i[1] for i in plot_values], [y[i] for i,v in enumerate(x) if v[0] == mu and v[2] == thresh])
plt.plot([i[1] for i in plot_x], [y[i] for i,v in enumerate(x) if np.isclose(v[0], mu) and v[2] == thresh])
plt.plot([i[1] for i in pred_x], pred)

Save model

In [None]:
full_model.save("1_speed_network.h5")

Find largest divergence between prediction and training data

In [None]:
divergence = (full_model.predict(x) - y)
print(max(divergence), x[np.where(divergence == max(divergence))[0]])