50 randomly selected discrete data from equation: $y=x_1^2+x_1+x_2^3+x_2^2$

In [None]:
import numpy as np
import plotly.graph_objects as go

from sklearn.preprocessing import Normalizer

In [None]:
x1 = np.random.rand(200)
x2 = np.random.rand(200)
y = x1**2 + x1 + x2**3 + x2**2

dydx1 = 2*x1 + 1
dydx2 = 3*x2**2 + 2*x2

fig = go.Figure(data=[go.Mesh3d(x=x1, y=x2, z=y,
                                intensity=y,
                                colorscale='Portland',
                                opacity=0.8)])
fig.show()

In [None]:
import tensorflow as tf
from tensorflow import keras

A NN model with a custom loss function.

In [None]:
data_set = list(zip(x1, x2, y, dydx1, dydx2))
data_set = Normalizer().fit_transform(data_set)  # Normalization
data_set = np.random.permutation(data_set)

# num_t = int(0.8 * len(data_set))
# data_t = data_set[:num_t]
data_t = data_set[:]
x_train = [(p[0], p[1]) for p in data_t]
y_train = [(p[2],) for p in data_t]
dydx_train = [(p[3], p[4]) for p in data_t]

x_train = np.array(x_train)
y_train = np.array(y_train)
dydx_train = np.array(dydx_train)

# data_v = data_set[num_t:]
# x_val = [(p[0], p[1]) for p in data_v]
# y_val = [(p[2],) for p in data_v]
# dydx_val = [(p[3], p[4]) for p in data_v]

model = keras.models.Sequential([
  keras.layers.Dense(10, activation='sigmoid', input_shape=(2,)),
  # keras.layers.Dense(10, activation='sigmoid'),
  # keras.layers.Dense(2, input_shape=(2,)),  # remove the activation function.
  keras.layers.Dense(1)
])

def random_batch(x, y, dydx, batch_size=50):
  idx = np.random.randint(len(x), size=batch_size)
  return x[idx], y[idx], dydx[idx]

def print_status_bar(iteration, total, loss, metrics=None):
  metrics = " - ".join([f'{m.name}: {m.result():.4f}' 
              for m in [loss] + (metrics or [])])
  end = ''if iteration < total else "\n"
  print(f'\r{iteration}/{total}' + metrics, end=end)

n_epochs = 30
batch_size = 40
n_steps = len(x_train) // batch_size
optimizer = tf.keras.optimizers.SGD(learning_rate = 0.01)
loss_fn = tf.keras.losses.mean_squared_error
mean_loss = tf.keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
loss_change, metric_change = [], []
mse_change, mse_grad_change = [], []

for epoch in range(1, n_epochs+1):
  print(f"Epoch {epoch}/{n_epochs}")
  for step in range(1, n_steps+1):
    x_batch, y_batch, dydx_batch = random_batch(x_train, y_train,
                                                dydx_train, batch_size)
    with tf.GradientTape() as tape:
      x_batch2 = tf.convert_to_tensor(x_batch)
      tape.watch(x_batch2)
      y_pred = model(x_batch2)
    gradients_to_input = tape.gradient(y_pred, x_batch2)  # the gradients about the input

    sum_error_square = 0
    for i in range(batch_size):
      per_dydx = dydx_batch[i]
      per_grad = gradients_to_input[i]
      # per_error_square = [(per_grad[j] - per_dydx[j])**2 for j in range(2)].sum()  # list has no sum()
      per_error_square = (per_grad[0]-per_dydx[0])**2 + (per_grad[1]-per_dydx[1])**2  # standardize per_dydx?
      # per_error_square = (per_grad[0]+per_dydx[0])**2 + (per_grad[1]+per_dydx[1])**2  # should subtraction become addition?
                                                                                # the slope should not simply be added or minused
      sum_error_square += per_error_square
    mse_grad = 1 / batch_size * sum_error_square

    with tf.GradientTape() as tape2:
      y_pred = model(x_batch, training=True)
      main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
      mse_change.append(main_loss)
      loss = tf.add_n([main_loss]+1*mse_grad) # change the weights of two losses
    gradients = tape2.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    mean_loss(loss)

    for metric in metrics:
      metric(y_batch, y_pred)
    print_status_bar(step*batch_size, len(y_train), mean_loss, metrics)
  print_status_bar(len(y_train), len(y_train), mean_loss, metrics)

  mse_grad_change.append(mse_grad)
  loss_change.append(mean_loss.result())
  metric_change.append(metrics[-1].result())

  for metric in [mean_loss] + metrics:
    metric.reset_states()


A basic NN model.

In [None]:
model2 = keras.models.Sequential([
  keras.layers.Dense(10, activation='sigmoid', input_shape=(2,)),
  # keras.layers.Dense(10, activation='sigmoid'),
  # keras.layers.Dense(2, input_shape=(2,)),  # remove the activation function.
  keras.layers.Dense(1)
])

model2.compile(loss='mse', optimizer='sgd',
              metrics='mae')
history = model2.fit(x_train, y_train, epochs=30, batch_size=40,)

loss2 = history.history['loss']
metrics2 = history.history['mae']

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


In [None]:
fig = go.Figure()

# Add traces
x = range(1, n_epochs+1)
# fig.add_trace(go.Scatter(x=list(x), y=loss_change,
#                     mode='lines',
#                     name='the value of loss',))
# fig.add_trace(go.Scatter(x=list(x), y=metric_change,
#                     mode='lines',
#                     name='the value of metric (mae)',))
fig.add_trace(go.Scatter(x=list(x), y=mse_change,
                    mode='lines',
                    name='the value of mse',))
fig.add_trace(go.Scatter(x=list(x), y=mse_grad_change,
                    mode='lines',
                    name='the value of mse_grad',))

fig.add_trace(go.Scatter(x=list(x), y=loss2,
                    mode='markers',
                    name='the value of mse2',))
# fig.add_trace(go.Scatter(x=list(x), y=metrics2,
#                     mode='markers',
#                     name='the value of metric2 (mae)',))

fig.update_layout(title='Loss and metrics of two models',)

fig.show()

In [None]:
x1_new = np.random.rand(20)
x2_new = np.random.rand(20)
x_new_b = np.c_[np.ones((20, 1)), x1_new, x2_new]

x_test = list(zip(x1_new, x2_new))
y_test = model.predict(x_test)
y_test = y_test.ravel()
fig = go.Figure(data=[go.Mesh3d(x=x1_new, y=x2_new, z=y_test,
                                intensity=y_test,
                                colorscale='Portland',
                                opacity=0.8)])
fig.show()

