In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.integrate import solve_ivp
from scipy.interpolate import interp1d

from sklearn.model_selection import KFold
from random import shuffle

import time
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
%matplotlib qt

In [None]:
#

In [None]:
tf.keras.backend.set_floatx('float64')

In [None]:
model = keras.Sequential(
    [
        keras.Input(shape=(2,)),
        layers.Dense(6, name="layer1", activation='elu'),
        layers.Dense(1, name="layer2"),
    ]
)


In [None]:
# Generate solution

In [None]:
#
N_n = int(2)
c = 0.4
k = 4.0
def oscilator(t, y, f):
    f_val = f(t)
    return np.array([y[1], -c*y[1]-k*y[0]+f_val])
t_final = 200.0
n_eval = int(1000)
t_span = np.array([0.0, t_final])
t_eval = np.linspace(0, t_final, num=n_eval)
y0 = np.array([1.0, 0.0])
#
f_ext = np.vstack(( np.zeros((200,1)), np.ones((400,1))*1.5, np.ones((400,1))*4.0 ) )
f_ext += np.random.normal(loc = 0.0, scale = 0.02, size=np.shape(f_ext))
f_interp = interp1d(t_eval, f_ext, kind='linear', axis=0)

In [None]:
func_1 = lambda t,y: oscilator(t,y,f_interp)
sol = solve_ivp(func_1, t_span, y0, t_eval=t_eval)

In [None]:
plt.plot(sol.t, sol.y[0,:])

In [None]:
# transform to tensorflow
t_target = tf.constant(sol.t)
y_target = tf.constant(np.transpose(sol.y) )
f_ext_tf = tf.constant(f_ext)

In [None]:
# interpolation in tf
f_ext_interp_np = interp1d(t_eval, f_ext, kind='linear', axis=0)
f_ext_interp = lambda t: tf.expand_dims(tf.constant(f_ext_interp_np(t)), axis=0)

In [None]:
# only displacements
y_target = tf.expand_dims(tf.constant(np.transpose(sol.y[0,:])), axis=1)

In [None]:
# shifted

In [None]:
y_targ_2 = y_target[1:n_eval,:]
y_shift = y_target[0:n_eval-1,:] 
x = tf.concat([y_shift, f_ext_tf[0:n_eval-1,:]], axis=1)

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss=tf.keras.losses.MeanSquaredError())

In [None]:
model.fit(x = x, y = y_targ_2, batch_size = 50, epochs = 200)

In [None]:
y_pred = model.predict(x)

In [None]:
fig = plt.figure()
ax = plt.gca()
ax.plot(y_pred)
ax.plot(y_targ_2.numpy())

In [None]:
#

In [None]:
y_pred = [tf.expand_dims(y_shift[0,:], axis=0)]
for i in range(len(y_shift)):
    y_pred.append(model.predict(y_pred[i]))

In [None]:
# Neural ODE

In [None]:
import sys
sys.path.append('..')
import neural_ode.NeuralODE
import neural_ode.ODESolvers

In [None]:
class SimpleModel(tf.keras.Model):
    def __init__(self, dyn_dim = 1, external_dim=1):
        super().__init__()
        w_init = tf.random_normal_initializer(mean=-1.0, stddev=0.05)
        self.w = tf.Variable(
            initial_value = w_init(shape=(dyn_dim*2, dyn_dim), dtype="float64"),
            trainable=True, name='W' )
        self.dyn_dim = dyn_dim
        w_ext = tf.Variable(
            initial_value = w_init(shape=(external_dim, dyn_dim), dtype="float64"),
            trainable=True, name='W_ext' )
        self.w_ext = w_ext

    def call(self, inputs):
        y_dyn = inputs[:,:self.dyn_dim*2]
        x_ext = inputs[:,self.dyn_dim*2:]
        vels = inputs[:,self.dyn_dim:self.dyn_dim*2]
        accs = tf.matmul(y_dyn, self.w) + tf.matmul(x_ext, self.w_ext)
        return tf.concat([vels, accs], axis=1)


model_2 = SimpleModel()

In [None]:
model_2.variables

In [None]:
n_ode = neural_ode.NeuralODE.NeuralODE(model_2, 2, n_external=1)

In [None]:
n_epoch = 20
n_ode.fit(t_target, y_target, n_epoch=n_epoch, n_batch=5, 
          adjoint_method=False, missing_derivative=[0], adjust_initial=False, x_external=f_ext_tf)

In [None]:
n_ode.model.variables

In [None]:
n_ode.model.variables[0].assign(np.array([[-k],[-c]]))
n_ode.model.variables[1].assign(np.array([[1.0]]))

In [None]:
y0 = tf.concat([y_target[0,0], (y_target[1,0]-y_target[0,0])/(t_target[1]-t_target[0])], axis=0)
sol_model = n_ode.forward_solve(t_target, y0, x_external=f_ext_interp)

In [None]:
fig, ax = plt.subplots()
plt.plot(t_target.numpy(), y_target.numpy())
ax.plot(sol_model['t'].numpy(), sol_model['y'][:,0].numpy() )

# Test

In [None]:
#
N_n = int(2)
c1 = 0.2
k1 = 4.0
def oscilator(t, y, f):
    f_val = f(t)
    return np.array([y[1], -c1*y[1]-k1*y[0]+f_val])
t_final = 200.0
n_eval = int(1000)
t_span = np.array([0.0, t_final])
t_eval = np.linspace(0, t_final, num=n_eval)
y0 = np.array([1.0, 0.0])

In [None]:
func_1 = lambda t,y: oscilator(t,y,f_interp)
sol_test = solve_ivp(func_1, t_span, y0, t_eval=t_eval)

In [None]:
plt.plot(sol_test.t, sol_test.y[0,:])

In [None]:
# transform to tensorflow
t_test = tf.constant(sol_test.t)
y_test = tf.expand_dims(tf.constant(np.transpose(sol_test.y[0,:])), axis=1)

In [None]:
# usual model

In [None]:
# shift
y_test_out = y_test[1:n_eval,:]
y_test_inp = y_test[0:n_eval-1,:] 
x_test = tf.concat([y_test_inp, f_ext_tf[0:n_eval-1,:]], axis=1)

In [None]:
y_pred_test = model.predict(x_test)

In [None]:
# neural ODE model

In [None]:
y0 = tf.concat([y_test[0,0], (y_test[1,0]-y_test[0,0])/(t_target[1]-t_target[0])], axis=0)
sol_model_test = n_ode.forward_solve(t_target, y0, x_external=f_ext_interp)

In [None]:
y_pred_test_2 = sol_model_test['y'][::n_ode.n_ref,0].numpy()

In [None]:
y_test_np = np.reshape(y_test.numpy(), (len(y_test.numpy())) )

In [None]:
def resudual_func(y_targ, y_pr):
    dict_res = {}
    dict_res['residual'] = np.abs(y_targ - y_pr)
    dict_res['residual_rel'] =  dict_res['residual']/(np.abs(y_targ)+np.abs(y_pr)+ 1e-5 )*2
    return dict_res

In [None]:
res1 = resudual_func(y_test_out, y_pred_test)

In [None]:
res2 = resudual_func(y_test_np, y_pred_test_2)

In [None]:
fig, ax = plt.subplots()
ax.plot(y_test_out.numpy())
ax.plot(y_pred_test)
ax.plot(res1['residual'])
ax.hlines(0.2, 0, 1000, color='r')

In [None]:
fig, ax = plt.subplots()
ax.plot(y_test_np)
ax.plot(y_pred_test_2)
ax.plot(res2['residual'])
ax.hlines(0.2, 0, 1000, color='r')