In [5]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import time
import scipy.io
import math
import os
import matplotlib.gridspec as gridspec
from plotting import newfig
from mpl_toolkits.axes_grid1 import make_axes_locatable
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras import layers, activations
from scipy.interpolate import griddata
from eager_lbfgs import lbfgs, Struct
from pyDOE import lhs
from keras.engine.saving import save_model

In [6]:
layer_sizes = [2, 20, 20, 20, 20, 20, 20, 20, 20, 1]

sizes_w = []
sizes_b = []
for i in range(len(layer_sizes) - 1): 
    sizes_w.append(layer_sizes[i] * layer_sizes[i+1]) 
    sizes_b.append(layer_sizes[i+1])

In [7]:
def set_weights(model, w, sizes_w, sizes_b):
        for i, layer in enumerate(model.layers[0:]):
            start_weights = sum(sizes_w[:i]) + sum(sizes_b[:i])
            end_weights = sum(sizes_w[:i+1]) + sum(sizes_b[:i])
            weights = w[start_weights:end_weights]
            w_div = int(sizes_w[i] / sizes_b[i])
            weights = tf.reshape(weights, [w_div, sizes_b[i]])
            biases = w[end_weights:end_weights + sizes_b[i]]
            weights_biases = [weights, biases]
            layer.set_weights(weights_biases)


In [8]:
def get_weights(model):
        w = []
        for layer in model.layers[0:]:
            weights_biases = layer.get_weights()
            weights = weights_biases[0].flatten()
            biases = weights_biases[1]
            w.extend(weights)
            w.extend(biases)

        w = tf.convert_to_tensor(w)
        return w

In [9]:
def neural_net(layer_sizes):
    model = Sequential()
    model.add(layers.InputLayer(input_shape=(layer_sizes[0],)))
    for width in layer_sizes[1:-1]:
        model.add(layers.Dense(
            width, activation=tf.nn.tanh,
            kernel_initializer="glorot_normal"))
    model.add(layers.Dense(
            layer_sizes[-1], activation=None,
            kernel_initializer="glorot_normal"))
    return model

In [10]:
u_model = neural_net(layer_sizes)

u_model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 20)                60        
_________________________________________________________________
dense_1 (Dense)              (None, 20)                420       
_________________________________________________________________
dense_2 (Dense)              (None, 20)                420       
_________________________________________________________________
dense_3 (Dense)              (None, 20)                420       
_________________________________________________________________
dense_4 (Dense)              (None, 20)                420       
_________________________________________________________________
dense_5 (Dense)              (None, 20)                420       
_________________________________________________________________
dense_6 (Dense)              (None, 20)                4

In [11]:
def loss(x_f_batch, t_f_batch,
             x0, t0, u0, x_lb,
             t_lb, x_ub, t_ub, col_weights, u_weights, b_weights_lb, b_weights_ub):

    f_u_pred = f_model(x_f_batch, t_f_batch)
    u0_pred = u_model(tf.concat([x0, t0],1))
    u_lb_pred, _ = u_x_model(x_lb, t_lb)
    u_ub_pred, _ = u_x_model(x_ub, t_ub)
    
    # Compute weighted losses
    mse_0_u = tf.reduce_mean(tf.square(u_weights * (u0 - u0_pred)))
    mse_b_u = tf.reduce_mean(tf.square(b_weights_lb * (u_lb_pred - 0))) + \
              tf.reduce_mean(tf.square(b_weights_ub * (u_ub_pred - 0)))
    mse_f_u = tf.reduce_mean(tf.square(col_weights * f_u_pred))

    return  mse_0_u + mse_b_u + mse_f_u , mse_0_u, mse_f_u

In [12]:
@tf.function
def f_model(x,t):
    u = u_model(tf.concat([x,t], 1))
    u_x = tf.gradients(u,x)
    u_xx = tf.gradients(u_x, x)
    u_t = tf.gradients(u,t)
    f_u = u_t + u*u_x - (0.01/tf.constant(math.pi))*u_xx

    return f_u

In [13]:
@tf.function
def u_x_model(x,t):
    u = u_model(tf.concat([x,t],1))
    u_x = tf.gradients(u,x)
    return u,u_x

In [14]:
@tf.function
def grad(model, x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights, b_weights_lb, b_weights_ub):
    with tf.GradientTape(persistent=True) as tape:
        #tape.watch(col_weights)
        #tape.watch(u_weights)
        loss_value, mse_0, mse_f = loss(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub,
                                        col_weights, u_weights, b_weights_lb, b_weights_ub)
        grads = tape.gradient(loss_value, u_model.trainable_variables)
        #print(grads)
        grads_col = tape.gradient(loss_value, col_weights)
        grads_u = tape.gradient(loss_value, u_weights)
        
        grads_lb = tape.gradient(loss_value, b_weights_lb)
        grads_ub = tape.gradient(loss_value, b_weights_ub)

    return loss_value, mse_0, mse_f, grads, grads_col, grads_u, grads_lb, grads_ub

In [15]:
def fit(x_f, t_f, x0, t0, u0, x_lb, t_lb, x_ub, t_ub,
        col_weights, u_weights, b_weights_lb, b_weights_ub, tf_iter, newton_iter):
    # Built in support for mini-batch, set to N_f (i.e. full batch) by default
    batch_sz = N_f
    n_batches =  N_f // batch_sz
    start_time = time.time()
    
    # Separate optimizers for each weight category
    tf_optimizer = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.90)
    tf_optimizer_coll = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.90)
    tf_optimizer_u = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.90)
    tf_optimizer_lb = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.90)
    tf_optimizer_ub = tf.keras.optimizers.Adam(lr = 0.005, beta_1=.90)

    print("starting Adam training")

    for epoch in range(tf_iter):
        for i in range(n_batches):

            x0_batch = x0#[i*batch_sz:(i*batch_sz + batch_sz),]
            t0_batch = t0#[i*batch_sz:(i*batch_sz + batch_sz),]
            u0_batch = u0#[i*batch_sz:(i*batch_sz + batch_sz),]

            x_f_batch = x_f[i*batch_sz:(i*batch_sz + batch_sz),]
            t_f_batch = t_f[i*batch_sz:(i*batch_sz + batch_sz),]

            # Compute loss gradients
            loss_value, mse_0, mse_f, grads, grads_col, grads_u, grads_lb, grads_ub = grad(u_model,
                x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch,
                x_lb, t_lb, x_ub, t_ub,
                col_weights, u_weights, b_weights_lb, b_weights_ub)
            
            # Apply gradients conditionally
            if grads is not None:
                tf_optimizer.apply_gradients(zip(grads, u_model.trainable_variables))
            if grads_col is not None:
                tf_optimizer_coll.apply_gradients(zip([-grads_col], [col_weights]))
            if grads_u is not None:
                tf_optimizer_u.apply_gradients(zip([-grads_u], [u_weights]))
            if grads_lb is not None:
                tf_optimizer_lb.apply_gradients(zip([-grads_lb], [b_weights_lb]))
            if grads_ub is not None:
                tf_optimizer_ub.apply_gradients(zip([-grads_ub], [b_weights_ub]))
            
#             tf_optimizer.apply_gradients(zip(grads, u_model.trainable_variables))
#             tf_optimizer_coll.apply_gradients(zip([-grads_col], [col_weights]))
#             tf_optimizer_u.apply_gradients(zip([-grads_u], [u_weights]))
#             tf_optimizer_lb.apply_gradients(zip([-grads_lb], [b_weights_lb]))
#             tf_optimizer_ub.apply_gradients(zip([-grads_ub], [b_weights_ub]))
            
        if epoch % 10 == 0:
            elapsed = time.time() - start_time
            print('It: %d, Time: %.2f' % (epoch, elapsed))
            tf.print(f"mse_0: {mse_0}  mse_f: {mse_f}   total loss: {loss_value}")
            start_time = time.time()


    #l-bfgs-b optimization
    print("Starting L-BFGS training")

    loss_and_flat_grad = get_loss_and_flat_grad(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub,
                                                col_weights, u_weights, b_weights_lb, b_weights_ub)

    lbfgs(loss_and_flat_grad,
      get_weights(u_model),
      Struct(), maxIter=newton_iter, learningRate=0.8)

In [16]:
# L-BFGS implementation from https://github.com/pierremtb/PINNs-TF2.0
def get_loss_and_flat_grad(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub, col_weights, u_weights, b_weights_lb, b_weights_ub):
    def loss_and_flat_grad(w):
        with tf.GradientTape() as tape:
            set_weights(u_model, w, sizes_w, sizes_b)
            loss_value, _, _ = loss(x_f_batch, t_f_batch, x0_batch, t0_batch, u0_batch, x_lb, t_lb, x_ub, t_ub,
                                    col_weights, u_weights, b_weights_lb, b_weights_ub)
        grad = tape.gradient(loss_value, u_model.trainable_variables)
        grad_flat = []
        for g in grad:
            grad_flat.append(tf.reshape(g, [-1]))
        grad_flat = tf.concat(grad_flat, 0)
        #print(loss_value, grad_flat)
        return loss_value, grad_flat

    return loss_and_flat_grad

In [17]:
def predict(X_star):
    X_star = tf.convert_to_tensor(X_star, dtype=tf.float32)
    u_star, _ = u_x_model(X_star[:,0:1],
                     X_star[:,1:2])

    f_u_star = f_model(X_star[:,0:1],
                 X_star[:,1:2])

    return u_star.numpy(), f_u_star.numpy()


In [19]:
import json


# Helper function to set weights based on scenario
def enable_adaptive(enable_col, enable_u, enable_b, N_f, N0, N_b):
    col_weights = tf.Variable(
        tf.reshape(
            tf.repeat(100.0 if enable_col else 1.0, N_f),
            (N_f, -1)
        )
    )
    u_weights = tf.Variable(
        tf.random.uniform([N0, 1]) if enable_u else tf.ones([N0, 1])
    )
    b_weights_lb = tf.Variable(
        tf.random.uniform([N_b, 1]) if enable_b else tf.ones([N_b, 1])
    )
    b_weights_ub = tf.Variable(
        tf.random.uniform([N_b, 1]) if enable_b else tf.ones([N_b, 1])
    )
    return col_weights, u_weights, b_weights_lb, b_weights_ub

# Directory to save trained models
base_dir = "trained_models"
os.makedirs(base_dir, exist_ok=True)

# Load data (from Raissi et al.)
data = scipy.io.loadmat('burgers_shock.mat')
t = data['t'].flatten()[:, None]
x = data['x'].flatten()[:, None]
Exact = data['usol']
Exact_u = np.real(Exact)

# Parameters
lb = np.array([-1.0])  # x lower boundary
ub = np.array([1.0])   # x upper boundary
N0 = 100               # Initial condition points
N_b = 25               # Boundary points per side
N_f = 10000            # Collocation points

# Generate initial, boundary, and collocation points
idx_x = np.random.choice(x.shape[0], N0, replace=False)
x0 = x[idx_x, :]
u0 = tf.cast(Exact_u[idx_x, 0:1], dtype=tf.float32)

idx_t = np.random.choice(t.shape[0], N_b, replace=False)
tb = t[idx_t, :]

X_f = lb + (ub - lb) * lhs(2, N_f)
x_f = tf.convert_to_tensor(X_f[:, 0:1], dtype=tf.float32)
t_f = tf.convert_to_tensor(np.abs(X_f[:, 1:2]), dtype=tf.float32)

# Generate point vectors for training
X0 = np.concatenate((x0, 0 * x0), 1)  # (x0, 0)
X_lb = np.concatenate((0 * tb + lb[0], tb), 1)  # (lb[0], tb)
X_ub = np.concatenate((0 * tb + ub[0], tb), 1)  # (ub[0], tb)

x0 = tf.cast(X0[:, 0:1], dtype=tf.float32)
t0 = tf.cast(X0[:, 1:2], dtype=tf.float32)
x_lb = tf.convert_to_tensor(X_lb[:, 0:1], dtype=tf.float32)
t_lb = tf.convert_to_tensor(X_lb[:, 1:2], dtype=tf.float32)
x_ub = tf.convert_to_tensor(X_ub[:, 0:1], dtype=tf.float32)
t_ub = tf.convert_to_tensor(X_ub[:, 1:2], dtype=tf.float32)

# Training and saving loop
epoch_scenarios = [100, 1000, 10000]
sub_scenarios = [
    {"name": "all_off", "enable_col": False, "enable_u": False, "enable_b": False},
    {"name": "col_on", "enable_col": True, "enable_u": False, "enable_b": False},
    {"name": "boundary_on", "enable_col": False, "enable_u": False, "enable_b": True},
    {"name": "initial_on", "enable_col": False, "enable_u": True, "enable_b": False},
]

for epochs in epoch_scenarios:
    for sub_scenario in sub_scenarios:
        print(f"Training for {epochs} epochs with scenario: {sub_scenario['name']}")

        # Set weights for the current sub-scenario
        col_weights, u_weights, b_weights_lb, b_weights_ub = enable_adaptive(
            sub_scenario["enable_col"],
            sub_scenario["enable_u"],
            sub_scenario["enable_b"],
            N_f,
            N0,
            N_b,
        )

        # Train the model incrementally
        fit(x_f, t_f, x0, t0, u0, x_lb, t_lb, x_ub, t_ub, 
            col_weights, u_weights, b_weights_lb, b_weights_ub, 
            tf_iter=epochs, newton_iter=epochs)

        # Save model and configuration
        model_dir = os.path.join(base_dir, f"{epochs}_epochs", sub_scenario["name"])
        os.makedirs(model_dir, exist_ok=True)

        u_model.save(os.path.join(model_dir, f"{epochs}_{sub_scenario}"))

        config = {
            "epochs": epochs,
            "sub_scenario": sub_scenario["name"],
            "col_active": sub_scenario["enable_col"],
            "boundary_active": sub_scenario["enable_b"],
            "initial_active": sub_scenario["enable_u"],
        }
        with open(os.path.join(model_dir, "config.json"), "w") as f:
            json.dump(config, f)

# Generate mesh for predictions
X, T = np.meshgrid(x, t)
X_star = np.hstack((X.flatten()[:, None], T.flatten()[:, None]))
u_star = Exact_u.T.flatten()[:, None]

# Get predictions
u_pred, f_u_pred = predict(X_star)

# Compute L2 error
error_u = np.linalg.norm(u_star - u_pred, 2) / np.linalg.norm(u_star, 2)
print('Error u: %e' % error_u)

U_pred = griddata(X_star, u_pred.flatten(), (X, T), method='cubic')
FU_pred = griddata(X_star, f_u_pred.flatten(), (X, T), method='cubic')


Training for 100 epochs with scenario: all_off
starting Adam training
It: 0, Time: 1.16
mse_0: 0.04189290106296539  mse_f: 0.02813652716577053   total loss: 0.07099688053131104
It: 10, Time: 0.29
mse_0: 0.28682583570480347  mse_f: 0.05965794622898102   total loss: 0.5980369448661804
It: 20, Time: 0.29
mse_0: 0.254004567861557  mse_f: 0.04674641042947769   total loss: 0.3049846291542053
It: 30, Time: 0.28
mse_0: 0.15541259944438934  mse_f: 0.03489159047603607   total loss: 0.19237393140792847
It: 40, Time: 0.29
mse_0: 0.08774000406265259  mse_f: 0.09928335249423981   total loss: 0.19226044416427612
It: 50, Time: 0.30
mse_0: 0.10414940118789673  mse_f: 0.05521158501505852   total loss: 0.16451409459114075
It: 60, Time: 0.31
mse_0: 0.09072147309780121  mse_f: 0.06681305170059204   total loss: 0.16157615184783936
It: 70, Time: 0.28
mse_0: 0.0826394259929657  mse_f: 0.07312148064374924   total loss: 0.1591460406780243
It: 80, Time: 0.27
mse_0: 0.0852038562297821  mse_f: 0.0681005120

In [None]:
#plotting script in the style of Raissi et al

######################################################################
############################# Plotting ###############################
######################################################################

X0 = np.concatenate((x0, 0*x0), 1) # (x0, 0)
X_lb = np.concatenate((0*tb + lb[0], tb), 1) # (lb[0], tb)
X_ub = np.concatenate((0*tb + ub[0], tb), 1) # (ub[0], tb)
X_u_train = np.vstack([X0, X_lb, X_ub])

fig, ax = newfig(1.3, 1.0)
ax.axis('off')

####### Row 0: h(t,x) ##################
gs0 = gridspec.GridSpec(1, 2)
gs0.update(top=1-0.06, bottom=1-1/3, left=0.15, right=0.85, wspace=0)
ax = plt.subplot(gs0[:, :])

h = ax.imshow(U_pred.T, interpolation='nearest', cmap='YlGnBu',
              extent=[lb[1], ub[1], lb[0], ub[0]],
              origin='lower', aspect='auto')
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(h, cax=cax)


line = np.linspace(x.min(), x.max(), 2)[:,None]
ax.plot(t[25]*np.ones((2,1)), line, 'k--', linewidth = 1)
ax.plot(t[50]*np.ones((2,1)), line, 'k--', linewidth = 1)
ax.plot(t[75]*np.ones((2,1)), line, 'k--', linewidth = 1)

ax.set_xlabel('$t$')
ax.set_ylabel('$x$')
leg = ax.legend(frameon=False, loc = 'best')
#    plt.setp(leg.get_texts(), color='w')
ax.set_title('$u(t,x)$', fontsize = 10)

####### Row 1: h(t,x) slices ##################
gs1 = gridspec.GridSpec(1, 3)
gs1.update(top=1-1/3, bottom=0, left=0.1, right=0.9, wspace=0.5)

ax = plt.subplot(gs1[0, 0])
ax.plot(x,Exact_u[:,25], 'b-', linewidth = 2, label = 'Exact')
ax.plot(x,U_pred[25,:], 'r--', linewidth = 2, label = 'Prediction')
ax.set_xlabel('$x$')
ax.set_ylabel('$u(t,x)$')
ax.set_title('$t = %.2f$' % (t[25]), fontsize = 10)
ax.axis('square')
ax.set_xlim([-1.1,1.1])
ax.set_ylim([-1.1,1.1])

ax = plt.subplot(gs1[0, 1])
ax.plot(x,Exact_u[:,50], 'b-', linewidth = 2, label = 'Exact')
ax.plot(x,U_pred[50,:], 'r--', linewidth = 2, label = 'Prediction')
ax.set_xlabel('$x$')
ax.set_ylabel('$u(t,x)$')
ax.axis('square')
ax.set_xlim([-1.1,1.1])
ax.set_ylim([-1.1,1.1])
ax.set_title('$t = %.2f$' % (t[50]), fontsize = 10)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.3), ncol=5, frameon=False)

ax = plt.subplot(gs1[0, 2])
ax.plot(x,Exact_u[:,75], 'b-', linewidth = 2, label = 'Exact')
ax.plot(x,U_pred[75,:], 'r--', linewidth = 2, label = 'Prediction')
ax.set_xlabel('$x$')
ax.set_ylabel('$u(t,x)$')
ax.axis('square')
ax.set_xlim([-1.1,1.1])
ax.set_ylim([-1.1,1.1])
ax.set_title('$t = %.2f$' % (t[75]), fontsize = 10)

#show u_pred across domain
fig, ax = plt.subplots()

ec = plt.imshow(U_pred.T, interpolation='nearest', cmap='rainbow',
            extent=[0.0, 1.0, -1.0, 1.0],
            origin='lower', aspect='auto')

ax.autoscale_view()
ax.set_xlabel('$t$')
ax.set_ylabel('$x$')
cbar = plt.colorbar(ec)
cbar.set_label('$u(x,t)$')
plt.title("Predicted $u(x,t)$",fontdict = {'fontsize': 14})
plt.show()

# Show F_U_pred across domain, should be close to 0
fig, ax = plt.subplots()

ec = plt.imshow(FU_pred.T, interpolation='nearest', cmap='rainbow',
            extent=[0.0, math.pi/2, -5.0, 5.0],
            origin='lower', aspect='auto')

ax.autoscale_view()
ax.set_xlabel('$x$')
ax.set_ylabel('$t$')
cbar = plt.colorbar(ec)
cbar.set_label('$\overline{f}_u$ prediction')
plt.show()

# collocation point weights
plt.scatter(t_f, x_f, c = col_weights.numpy(), s = col_weights.numpy()/5)
plt.show()