Burgers equation in time with state-parameter - Implementing the FNO model

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import prevision as prv
import tensorflow as tf

In [None]:
lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
  0.01,    
  decay_steps=100,
  decay_rate=8,
  staircase=True)

def custom_loss(y_true, y_pred):
    return tf.keras.losses.cosine_similarity(y_true,y_pred) + 10*tf.keras.losses.huber(y_true,y_pred)

def get_optimizer():
    return tf.keras.optimizers.Adam(lr_schedule)

def get_callbacks(name):
    return [
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=1000, min_delta=0.001),
        ]

def compile_and_fit(model, name, x_train, y_train, x_test, y_test, optimizer=None, max_epochs=1000, batch_size=32):

    if optimizer is None:
        optimizer = get_optimizer()

    model.compile(optimizer=optimizer,
                loss=custom_loss,
                metrics=[
                  tf.keras.losses.MeanSquaredError("auto", "mean_squared_error")])

    model.summary()

    history = model.fit(
            x_train, y_train,
            epochs=max_epochs,
            validation_data=(x_test,y_test),
            callbacks=get_callbacks(name),
            batch_size = batch_size,
            verbose=2)

    return history

# Loading training dataset
folder = 'Burgers_time_cx'
input_train=np.load('../data/' + folder + '/input_train.npy')
input_test=np.load('../data/' + folder + '/input_test.npy')
output_train=np.load('../data/' + folder + '/output_train.npy')
output_test=np.load('../data/' + folder + '/output_test.npy')

INPUTDIM = (input_train.shape[1],input_train.shape[2])
OUTPUTDIM = (output_train.shape[1],)

verbose = True
p_dim = 512+128
n=3
k_max = 14

kernel_reg = 0.0001
dropout = 0.005

# Creting model
input_layer = layers.Input(shape = INPUTDIM, name= 'input_layer')
input_layer_flat = layers.Reshape((INPUTDIM[0]*INPUTDIM[1],)) (input_layer)
P_layer = layers.Dense(p_dim, activation='relu', kernel_regularizer = regularizers.l2(kernel_reg), name='P_layer') (input_layer_flat)
P_layer = layers.Dropout(dropout) (P_layer)
# Repeat the custom module 'n' times
for i in range(n):
    if verbose:
        print('Creating Fourier Layer ' +str(i))
    if i ==0:
        fourier_module_output = prv.Fourier_Layer(name='fourier_layer_'+str(i), k_max=k_max)(P_layer)
    else:
        fourier_module_output = prv.Fourier_Layer(name='fourier_layer_'+str(i), k_max=k_max)(fourier_module_output)

output_layer = layers.Dense(OUTPUTDIM[0], activation='linear', kernel_regularizer = regularizers.l2(kernel_reg), name='output_layer') (fourier_module_output)
output_layer= layers.Dropout(dropout) (output_layer)

if verbose:
    print('-------------------------------------------------------')
model = tf.keras.Model(inputs=input_layer, outputs = output_layer, name = 'Burgers_time_FNO')
if verbose:
    model.summary()

history = compile_and_fit(model, model.name, x_train=input_train, y_train=output_train, x_test=input_test, y_test=output_test, batch_size=256 , max_epochs=5000)

bc     = history.history['loss']
val_bc = history.history['val_loss']

plt.semilogx(range(np.shape(bc)[0]),bc, linewidth=2,)
plt.semilogx(range(np.shape(val_bc)[0]),val_bc,'--', linewidth=2,)
plt.title('FNO learning')
plt.ylim([-1,1])
plt.grid(True)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend(('Loss training set','Loss validation set'))
plt.show()