In [None]:
%load_ext tensorboard

In [None]:
import os
import datetime

from collections import defaultdict

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tqdm import tqdm
from scipy.integrate import solve_ivp
from sklearn.gaussian_process.kernels import RBF
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.metrics import mean_absolute_percentage_error, mean_squared_error

from assets.ml.DeepONetwork import TrunkNN
from assets.ml.DeepONetwork import BranchNN
from assets.ml.DeepONetwork import DeepONET
from assets.ml.DeepONetwork import DeepOPINN
from assets.ml.DeepONetwork import LogPINNLossesCallback

In [None]:
def createSamples(lengthScale, k, domain):
    kernel = RBF(length_scale=lengthScale)
    GP = GaussianProcessRegressor(kernel=kernel)
    u_sample = np.zeros((k, 100))
    for i in range(k):
        n = np.random.randint(0, 10000)
        u_sample[i, :] = GP.sample_y(domain.reshape(-1, 1), random_state=n).flatten()

    return u_sample

In [None]:
def solutions(u, t):
    sols = []
    for i in tqdm(range(u.shape[0]), desc="Generating Solutions", ncols=90):
        sol = solve_ivp(lambda var_t, var_s: np.interp(var_t, t.flatten(), u[i, :]), t_span=[0, 1], y0=[0], t_eval=t.flatten(), method='RK45')
        sols.append(sol.y[0].reshape(-1, 1))

    return np.array(sols)

In [None]:
def generate_dataset(N, length_scale, ODE_solve=False):
    """Generate dataset for Physics-informed DeepONet training.
    
    Args:
    ----
    N: int, number of u(·) profiles
    length_scale: float, length scale for RBF kernel
    ODE_solve: boolean, indicate whether to compute the corresponding s(·)
    
    Outputs:
    --------
    X: the dataset for t, u(·) profiles, and u(t)
    y: the dataset for the corresponding ODE solution s(·)
    """
    
    # Create random fields
    random_field = createSamples(length_scale, N, domain=np.linspace(0, 1, 100))
    
    # Compile dataset
    X = np.zeros((N*100, 100+2))
    y = np.zeros((N*100, 1))

    for i in tqdm(range(N), desc="Generating Data", ncols = 80):
        u = np.tile(random_field[i, :], (100, 1))
        t = np.linspace(0, 1, 100).reshape(-1, 1)

        # u(·) evaluated at t
        u_t = np.diag(u).reshape(-1, 1)

        # Update overall matrix
        X[i*100:(i+1)*100, :] = np.concatenate((t, u, u_t), axis=1)

        # Solve ODE
        if ODE_solve:
            sol = solve_ivp(lambda var_t, var_s: np.interp(var_t, t.flatten(), random_field[i, :]), 
                            t_span=[0, 1], y0=[0], t_eval=t.flatten(), method='RK45')
            y[i*100:(i+1)*100, :] = sol.y[0].reshape(-1, 1)
        
    return X, y

In [None]:
# Create training dataset
N_train = 1000
length_scale_train = 0.5
X_train, y_train = generate_dataset(N_train, length_scale_train, ODE_solve=True)

# Create validation dataset
N_val = 500
length_scale_test = 0.3
X_val, y_val = generate_dataset(N_val, length_scale_test, ODE_solve=True)

# Create testing dataset
N_test = 500
length_scale_test = 0.2
X_test, y_test = generate_dataset(N_test, length_scale_test, ODE_solve=True)

In [None]:
X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
X_val = tf.convert_to_tensor(X_val, dtype=tf.float32)
X_test = tf.convert_to_tensor(X_test, dtype=tf.float32)

In [None]:
# Restructure the trainging dataset
t_train = X_train[:, :1]
g_train = X_train[:, 1:-1]
gt_train = X_train[:, -1:]

X_train = [g_train, t_train, gt_train]

# Restructure the validation dataset
t_val = X_val[:, :1]
g_val = X_val[:, 1:-1]
gt_val = X_val[:, -1:]

X_val = [g_val, t_val, gt_val]

# Restructure the test dataset
t_test = X_test[:, :1]
g_test = X_test[:, 1:-1]
gt_test = X_test[:, -1:]

X_test = [g_test, t_test, gt_test]

In [None]:
domain = np.linspace(0, 1, 100)

fig, axs = plt.subplots(2, 2, figsize=(16, 16))

index = np.random.choice(np.arange(0, N_train), size=4, replace=False)

for i in range(4):
    ax = axs[i // 2][i % 2]
    ax.plot(domain, g_train[index[i]*100], label = f"Profile at index {index[i]}")
    ax.plot(domain, y_train[index[i]*100:(index[i]+1)*100], "-.", label = f"Solution at index {index[i]}")
    ax.set_xlabel('t', fontsize=12)
    ax.set_ylabel('u(t)', fontsize=12)
    ax.legend()
    ax.grid(True, color = 'lightgrey')

plt.show()

In [None]:
log_dir = os.path.join(".", "assets", "logs", "fits", f"DeepONET_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_images=True, profile_batch= '10, 110')

In [None]:
%tensorboard --logdir ./assets/logs --port=8081

In [None]:
modelPath = os.path.join(".", "assets", "ml", "DeepONET", "deepONET.h5")
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=modelPath, monitor='val_loss', save_best_only=True, save_weights_only=True)

In [None]:
learning_rate_callback = tf.keras.callbacks.ReduceLROnPlateau(factor=0.9, patience=50, min_lr=1e-9)
_callbacks = [tensorboard_callback, model_checkpoint_callback, learning_rate_callback]
callbacks = tf.keras.callbacks.CallbackList(_callbacks, add_history=False)

In [None]:
branchHiddenLayers = [tf.keras.layers.Dense(20, activation = 'tanh', name = f"branchNETDense_layer{i+1}") for i in range(5)]
trunkHiddenLayers = [tf.keras.layers.Dense(20, activation = 'tanh', name = f"trunkNETDense_layer{i+1}") for i in range(5)]


branchNET = BranchNN(hiddenLayers=branchHiddenLayers, input_shape=(100,))
trunkNET = TrunkNN(hiddenLayers=trunkHiddenLayers, input_shape=(1,))
deepONET = DeepONET(branchNN=branchNET, trunkNN=trunkNET)

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3, beta_1=0.9, beta_2=0.999)
deepONET.compile(optimizer=optimizer, loss='mse', metrics=['mae', tf.keras.losses.MeanAbsolutePercentageError(), tf.keras.metrics.RootMeanSquaredError()])

In [None]:
deepONET.build(input_shape=[(None, 100), (None, 1)])

In [None]:
deepONET.summary()

In [None]:
deepONET.fit(x=X_train, y=y_train, epochs=5000, verbose=False, batch_size=100, validation_data=(X_val, y_val), callbacks=_callbacks)

In [None]:
deepONET.load_weights(filepath=modelPath)

testDomain = np.linspace(0, 1, 100)
testU = createSamples(lengthScale=0.2, k=1, domain=testDomain)
groundTruth = solutions(u=testU, t=testDomain).reshape(-1, 1)

print(testDomain.shape)
print(testU.shape)
print(groundTruth.shape)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10, 9))

ax.plot(testDomain, testU.reshape(100, ), label = "Profile")
ax.plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")

ax.set_xlabel("$x$")
ax.set_ylabel("$f(x)$")
ax.tick_params(axis='x', labelcolor = 'tab:blue')
ax.legend()
ax.grid(True, color='lightgrey')

plt.show()

In [None]:
testBranchInput = np.array([testU for i in range(testDomain.shape[0])]).reshape(-1, 100)
print(testBranchInput.shape)
testTrunkInput = testDomain.reshape(-1, 1)
print(testTrunkInput.shape)

preds = deepONET.predict([testBranchInput, testTrunkInput])
print(preds.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10, 9))

ax[0].plot(testDomain, testU.reshape(100, ), label = "Profile")
ax[0].plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")
ax[0].plot(testDomain, preds.reshape(100, ), linestyle = ":", label = "Prediction")

ax[0].set_xlabel("$x$")
ax[0].set_ylabel("$f(x)$")
ax[0].tick_params(axis='x', labelcolor = 'tab:blue')
ax[0].legend()
ax[0].grid(True, color='lightgrey')

ax[1].plot(testDomain, preds - groundTruth, ".", color = "tab:red", label = "Residuals")
ax[1].axhline(0, linestyle = ":", color = "k")

ax[1].set_xlabel("$x$")
ax[1].set_ylabel("Residual")
ax[1].tick_params(axis='x', labelcolor = 'tab:blue')
ax[1].legend()
ax[1].grid(True, color='lightgrey')

plt.show()

In [None]:
pred_test = deepONET.predict(X_test)

fig, axs = plt.subplots(2, 2, figsize=(16, 16))
index = np.random.choice(np.arange(0, N_test), size=4, replace=False)
for i in range(4):
    ax = axs[i // 2, i % 2]
    ax.plot(domain, g_test[index[i]*100], label = f"Profile at index {index[i]}")
    ax.plot(domain, y_test[index[i]*100:(index[i]+1)*100], "-.", label = f"Solution at index {index[i]}")
    ax.plot(domain, pred_test[index[i]*100:(index[i]+1)*100], "-.", label = f"Prediction at index {index[i]}")
    ax.set_xlabel('t', fontsize=12)
    ax.set_ylabel('u(t)', fontsize=12)
    ax.legend()
    ax.grid(True, color = 'lightgrey')

plt.show()

In [None]:
branchOutput = deepONET.branchNN(testBranchInput)
print(branchOutput.shape)

trunkOutput = deepONET.trunkNN(testTrunkInput)
print(trunkOutput.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10,9))

for i in range(20):
    ax[0].plot(testDomain, branchOutput[:, i], '-', color = "C0", alpha = 0.5)
    ax[0].plot(testDomain, trunkOutput[:, i], '-', color = "C1", alpha = 0.5)

ax[0].grid(True, color = 'lightgrey')

for i in range(100):
    ax[1].plot(branchOutput[i, :], '-', color = "C0", alpha = 0.5)
    ax[1].plot(trunkOutput[i, :], '-', color = "C1", alpha = 0.5)

ax[1].grid(True, color = 'lightgrey')

plt.show()

In [None]:
testDomain = np.linspace(0, 1, 100)
testU = createSamples(lengthScale=0.05, k=1, domain=testDomain)
groundTruth = solutions(u=testU, t=testDomain).reshape(-1, 1)

print(testDomain.shape)
print(testU.shape)
print(groundTruth.shape)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10, 9))

ax.plot(testDomain, testU.reshape(100, ), label = "Profile")
ax.plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")

ax.set_xlabel("$x$")
ax.set_ylabel("$f(x)$")
ax.tick_params(axis='x', labelcolor = 'tab:blue')
ax.legend()
ax.grid(True, color='lightgrey')

plt.show()

In [None]:
testBranchInput = np.array([testU for i in range(testDomain.shape[0])]).reshape(-1, 100)
print(testBranchInput.shape)
testTrunkInput = testDomain.reshape(-1, 1)
print(testTrunkInput.shape)

preds = deepONET.predict([testBranchInput, testTrunkInput])
print(preds.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10, 9))

ax[0].plot(testDomain, testU.reshape(100, ), label = "Profile")
ax[0].plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")
ax[0].plot(testDomain, preds.reshape(100, ), linestyle = ":", label = "Prediction")

ax[0].set_xlabel("$x$")
ax[0].set_ylabel("$f(x)$")
ax[0].tick_params(axis='x', labelcolor = 'tab:blue')
ax[0].legend()
ax[0].grid(True, color='lightgrey')

ax[1].plot(testDomain, preds - groundTruth, ".", color = "tab:red", label = "Residuals")
ax[1].axhline(0, linestyle = ":", color = "k")

ax[1].set_xlabel("$x$")
ax[1].set_ylabel("Residual")
ax[1].tick_params(axis='x', labelcolor = 'tab:blue')
ax[1].legend()
ax[1].grid(True, color='lightgrey')

plt.show()

In [None]:
branchOutput = deepONET.branchNN(testBranchInput)
print(branchOutput.shape)

trunkOutput = deepONET.trunkNN(testTrunkInput)
print(trunkOutput.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10,9))

for i in range(20):
    ax[0].plot(testDomain, branchOutput[:, i], '-', color = "C0", alpha = 0.5)
    ax[0].plot(testDomain, trunkOutput[:, i], '-', color = "C1", alpha = 0.5)

ax[0].grid(True, color = 'lightgrey')

for i in range(100):
    ax[1].plot(branchOutput[i, :], '-', color = "C0", alpha = 0.5)
    ax[1].plot(trunkOutput[i, :], '-', color = "C1", alpha = 0.5)

ax[1].grid(True, color = 'lightgrey')

plt.show()

In [None]:
log_dir = os.path.join(".", "assets", "logs", "fits", f"DeepOPINN_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_images=True, profile_batch=True, )

fileWriter = tf.summary.create_file_writer(os.path.join(log_dir, "metrics"))
fileWriter.set_as_default()

In [None]:
modelPath = os.path.join(".", "assets", "ml", "DeepOPINN", "deepOPINN.h5")
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=modelPath, monitor='val_loss', save_best_only=True, save_weights_only=True)

In [None]:
pinnCallback = LogPINNLossesCallback()

In [None]:
learning_rate_callback = tf.keras.callbacks.ReduceLROnPlateau(factor=0.9, patience=50, min_lr=1e-9)
_callbacks = [tensorboard_callback, model_checkpoint_callback, learning_rate_callback, pinnCallback]
callbacks = tf.keras.callbacks.CallbackList(_callbacks, add_history=False)

In [None]:
branchHiddenLayers = [tf.keras.layers.Dense(20, activation = 'tanh', name = f"branchNETDense_layer{i+1}") for i in range(5)]
trunkHiddenLayers = [tf.keras.layers.Dense(20, activation = 'tanh', name = f"trunkNETDense_layer{i+1}") for i in range(5)]

branchNET = BranchNN(hiddenLayers=branchHiddenLayers, input_shape=(100,))
trunkNET = TrunkNN(hiddenLayers=trunkHiddenLayers, input_shape=(1,))
deepOPINN = DeepOPINN(branchNN=branchNET, trunkNN=trunkNET)

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3, beta_1=0.9, beta_2=0.999)
deepOPINN.compile(optimizer=optimizer, loss='mse', metrics=['mae', tf.keras.losses.MeanAbsolutePercentageError(), tf.keras.metrics.RootMeanSquaredError()])

In [None]:
deepOPINN.build(input_shape=[(None, 100), (None, 1)])

In [None]:
deepOPINN.summary()

In [None]:
deepOPINN.fit(x=X_train, y=y_train, epochs=5000, verbose=True, batch_size=100, validation_data=(X_val, y_val), callbacks=_callbacks)

In [None]:
fig, ax = plt.subplots(2, 2, figsize=(16, 16))

ax[0][0].plot(deepOPINN.lossTracker.loss_history['traditional_loss'], label = "Traditional Loss")
ax[0][0].grid(True, color = "lightgrey")
ax[0][0].legend()
ax[0][0].set_yscale('log')

ax[0][1].plot(deepOPINN.lossTracker.loss_history['IC_loss'], label = "IC Loss")
ax[0][1].grid(True, color = "lightgrey")
ax[0][1].legend()
ax[0][1].set_yscale('log')

ax[1][0].plot(deepOPINN.lossTracker.loss_history['physics_loss'], label = "Physics Loss")
ax[1][0].grid(True, color = "lightgrey")
ax[1][0].legend()
ax[1][0].set_yscale('log')

ax[1][1].plot(deepOPINN.lossTracker.loss_history['total_loss'], label = "Combined Loss")
ax[1][1].grid(True, color = "lightgrey")
ax[1][1].legend()
ax[1][1].set_yscale('log')

plt.show()

In [None]:
deepOPINN.load_weights(filepath=modelPath)

testDomain = np.linspace(0, 1, 100)
testU = createSamples(lengthScale=0.2, k=1, domain=testDomain)
groundTruth = solutions(u=testU, t=testDomain).reshape(-1, 1)

print(testDomain.shape)
print(testU.shape)
print(groundTruth.shape)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10, 9))

ax.plot(testDomain, testU.reshape(100, ), label = "Profile")
ax.plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")

ax.set_xlabel("$x$")
ax.set_ylabel("$f(x)$")
ax.tick_params(axis='x', labelcolor = 'tab:blue')
ax.legend()
ax.grid(True, color='lightgrey')

plt.show()

In [None]:
testBranchInput = np.array([testU for i in range(testDomain.shape[0])]).reshape(-1, 100)
print(testBranchInput.shape)
testTrunkInput = testDomain.reshape(-1, 1)
print(testTrunkInput.shape)

preds = deepOPINN.predict([testBranchInput, testTrunkInput])
print(preds.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10, 9))

ax[0].plot(testDomain, testU.reshape(100, ), label = "Profile")
ax[0].plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")
ax[0].plot(testDomain, preds.reshape(100, ), linestyle = ":", label = "Prediction")

ax[0].set_xlabel("$x$")
ax[0].set_ylabel("$f(x)$")
ax[0].tick_params(axis='x', labelcolor = 'tab:blue')
ax[0].legend()
ax[0].grid(True, color='lightgrey')

ax[1].plot(testDomain, preds - groundTruth, ".", color = "tab:red", label = "Residuals")
ax[1].axhline(0, linestyle = ":", color = "k")

ax[1].set_xlabel("$x$")
ax[1].set_ylabel("Residual")
ax[1].tick_params(axis='x', labelcolor = 'tab:blue')
ax[1].legend()
ax[1].grid(True, color='lightgrey')

plt.show()

In [None]:
pred_test = deepOPINN.predict(X_test)

fig, axs = plt.subplots(2, 2, figsize=(16, 16))
index = np.random.choice(np.arange(0, N_test), size=4, replace=False)
for i in range(4):
    ax = axs[i // 2, i % 2]
    ax.plot(domain, g_test[index[i]*100], label = f"Profile at index {index[i]}")
    ax.plot(domain, y_test[index[i]*100:(index[i]+1)*100], "-.", label = f"Solution at index {index[i]}")
    ax.plot(domain, pred_test[index[i]*100:(index[i]+1)*100], "-.", label = f"Prediction at index {index[i]}")
    ax.set_xlabel('t', fontsize=12)
    ax.set_ylabel('u(t)', fontsize=12)
    ax.legend()
    ax.grid(True, color = 'lightgrey')

plt.show()

In [None]:
branchOutput = deepOPINN.branchNN(testBranchInput)
print(branchOutput.shape)

trunkOutput = deepOPINN.trunkNN(testTrunkInput)
print(trunkOutput.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10,9))

for i in range(20):
    ax[0].plot(testDomain, branchOutput[:, i], '-', color = "C0", alpha = 0.5)
    ax[0].plot(testDomain, trunkOutput[:, i], '-', color = "C1", alpha = 0.5)

ax[0].grid(True, color = 'lightgrey')

for i in range(100):
    ax[1].plot(branchOutput[i, :], '-', color = "C0", alpha = 0.5)
    ax[1].plot(trunkOutput[i, :], '-', color = "C1", alpha = 0.5)

ax[1].grid(True, color = 'lightgrey')

plt.show()

In [None]:
testDomain = np.linspace(0, 1, 100)
testU = createSamples(lengthScale=0.05, k=1, domain=testDomain)
groundTruth = solutions(u=testU, t=testDomain).reshape(-1, 1)

print(testDomain.shape)
print(testU.shape)
print(groundTruth.shape)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10, 9))

ax.plot(testDomain, testU.reshape(100, ), label = "Profile")
ax.plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")

ax.set_xlabel("$x$")
ax.set_ylabel("$f(x)$")
ax.tick_params(axis='x', labelcolor = 'tab:blue')
ax.legend()
ax.grid(True, color='lightgrey')

plt.show()

In [None]:
testBranchInput = np.array([testU for i in range(testDomain.shape[0])]).reshape(-1, 100)
print(testBranchInput.shape)
testTrunkInput = testDomain.reshape(-1, 1)
print(testTrunkInput.shape)

preds = deepOPINN.predict([testBranchInput, testTrunkInput])
print(preds.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10, 9))

ax[0].plot(testDomain, testU.reshape(100, ), label = "Profile")
ax[0].plot(testDomain, groundTruth, linestyle = "-.", label = "Ground Truth")
ax[0].plot(testDomain, preds.reshape(100, ), linestyle = ":", label = "Prediction")

ax[0].set_xlabel("$x$")
ax[0].set_ylabel("$f(x)$")
ax[0].tick_params(axis='x', labelcolor = 'tab:blue')
ax[0].legend()
ax[0].grid(True, color='lightgrey')

ax[1].plot(testDomain, preds - groundTruth, ".", color = "tab:red", label = "Residuals")
ax[1].axhline(0, linestyle = ":", color = "k")

ax[1].set_xlabel("$x$")
ax[1].set_ylabel("Residual")
ax[1].tick_params(axis='x', labelcolor = 'tab:blue')
ax[1].legend()
ax[1].grid(True, color='lightgrey')

plt.show()

In [None]:
branchOutput = deepOPINN.branchNN(testBranchInput)
print(branchOutput.shape)

trunkOutput = deepOPINN.trunkNN(testTrunkInput)
print(trunkOutput.shape)

In [None]:
fig, ax = plt.subplots(2, 1, figsize=(10,9))

for i in range(20):
    ax[0].plot(testDomain, branchOutput[:, i], '-', color = "C0", alpha = 0.5)
    ax[0].plot(testDomain, trunkOutput[:, i], '-', color = "C1", alpha = 0.5)

ax[0].grid(True, color = 'lightgrey')

for i in range(100):
    ax[1].plot(branchOutput[i, :], '-', color = "C0", alpha = 0.5)
    ax[1].plot(trunkOutput[i, :], '-', color = "C1", alpha = 0.5)

ax[1].grid(True, color = 'lightgrey')

plt.show()