In [None]:
from time import perf_counter 
import math
import numpy as np
import matplotlib.pyplot as plt
import scipy.optimize as sopt

import tensorflow as tf

In [None]:
# Code to use the scipy optimizers
# DO NOT modify this cell

# use float64 by default
tf.keras.backend.set_floatx("float64")

# Reshape 1D arrays to 2D arrays
def reshape_2d(x):
    return tf.reshape(x,(x.shape[0],1))

# Construct a function that can be minimized by scipy optimize
# starting from a tensorflow model
def function_factory(model, x_train, y_train, validation_data=None,
                     iprint=-1):
    """A factory to create a function required by scipy.optimize.
    Args:
        model [in]: an instance of `tf.keras.Model` or its subclasses.
        loss [in]: a function with signature loss_value = loss(pred_y, true_y).
        x_train [in]: input for training data.
        y_train [in]: output for training data.
        validation_data [in]: tuple (x_val,y_val) with validation data.
        iprint [in]: sets the frequency with which the loss info is printed out
    Returns:
        A function that has a signature of:
            loss_value, gradients = f(model_parameters)
    """

    # obtain the shapes of all trainable parameters in the model
    shapes = tf.shape_n(model.trainable_variables)
    n_tensors = len(shapes)

    # we'll use tf.dynamic_stitch and tf.dynamic_partition later, so we need to
    # prepare required information first
    count = 0
    idx = []  # stitch indices
    part = []  # partition indices

    for i, tensor in enumerate(model.trainable_variables):
        n = np.product(tensor.shape)
        idx.append(tf.reshape(
            tf.range(count, count+n, dtype=tf.int32), tensor.shape))
        part.extend([i]*n)
        count += n

    part = tf.constant(part)
    
    loss_fun = tf.keras.losses.MeanSquaredError()

    @tf.function
    def assign_new_model_parameters(params_1d):
        """A function updating the model's parameters with a 1D tf.Tensor.
        Args:
            params_1d [in]: a 1D tf.Tensor representing the model's trainable parameters.
        """
        params = tf.dynamic_partition(params_1d, part, n_tensors)
        for i, (shape, param) in enumerate(zip(shapes, params)):
            model.trainable_variables[i].assign(tf.reshape(param, shape))

    # function to calculate loss value and gradient
    @tf.function
    def tf_tape_grad(params_1d):

        # update the parameters in the model
        assign_new_model_parameters(params_1d)       

        if not (validation_data is None):
          # compute validation loss
          loss_val = model.loss(validation_data[0], validation_data[1])
          # store validation value so we can retrieve later        
          tf.py_function(value_and_grad.hist_loss_val.append,
                        inp=[loss_val], Tout=[])         
              
        # use GradientTape so that we can calculate the gradient of loss w.r.t. parameters
        with tf.GradientTape() as g:
            loss = model.loss(x_train,y_train)

        # calculate gradients and convert to 1D tf.Tensor
        grads = g.gradient(loss, model.trainable_variables)
        grads = tf.dynamic_stitch(idx, grads)

        # increment iteration counter
        value_and_grad.iter.assign_add(1)

        # print out iteration & loss
        if (iprint>=1 and value_and_grad.iter%iprint == 0):
            if validation_data is None:
                tf.print("Loss function eval:", value_and_grad.iter, "loss:", loss)
            else:
                tf.print("Loss function eval:", value_and_grad.iter, "loss:", loss,\
                         "validation loss:", loss_val)
        # store loss value so we can retrieve later
        tf.py_function(value_and_grad.hist_loss.append,
                       inp=[loss], Tout=[])       

        return loss, grads

    # create function that will be returned by this factory
    def value_and_grad(params_1d):
        """A function that can be used by optimizer.
        This function is created by function_factory.
        Args:
           params_1d [in]: a 1D tf.Tensor.
        Returns:
            A scalar loss and the gradients w.r.t. the `params_1d`.
        """
        return [vv.numpy().astype(np.float64) for vv in tf_tape_grad(tf.constant(params_1d, dtype=tf.float64))]

    # store this information as members so we can use it outside the scope
    value_and_grad.iter = tf.Variable(0)
    value_and_grad.idx = idx
    value_and_grad.part = part
    value_and_grad.shapes = shapes
    value_and_grad.assign_new_model_parameters = assign_new_model_parameters
    value_and_grad.hist_loss = []
    value_and_grad.hist_loss_val = []

    return value_and_grad


# Minimize the loss function using a scipy optimizer
def model_fit(model, x_t, y_t, validation_data=None, epochs=1000, iprint=-1,
              figname=None):
    """ 
    Fit a DNN model using scipy optimizers 
  
  
    Parameters: 
    model: tensorflow DNN model
    x_t: input training data
    y_t: output training data
    validation_data: tuple (x_val,y_val) with validation data
    epochs: maximum number of iterations in optimizer
    iprint: frequency for printing the loss function information. 
            Do not print anything if negative. Otherwise, print
            a line every iprint iteration.
    figname [str]: file name to save the figure of the training loss  
    """

    value_and_grad = function_factory(model, x_t, y_t, validation_data, iprint)

    # convert initial model parameters to a 1D tf.Tensor
    init_params = tf.dynamic_stitch(value_and_grad.idx, model.trainable_variables)

    if (iprint>=1):
        print()

    # train the model
    method = 'L-BFGS-B'
    results = sopt.minimize(fun=value_and_grad, x0=init_params,
                            jac=True, method=method,
                            options={'maxiter': epochs})

    print("\nConvergence information:")
    print('loss:', results.fun)
    # Computing the validation loss  
    if not (validation_data is None):
        val_loss = model.loss(validation_data[0], validation_data[1])
        tf.print('validation loss:', val_loss)
        # tf.make_ndarray(tf.make_tensor_proto(val_loss))    
    print('number function evaluations:', results.nfev)
    print('number iterations:', results.nit)
    print('success flag:', results.success)
    print('convergence message:', results.message)

    value_and_grad.assign_new_model_parameters(results.x)

    # Plot history of loss
    plt.figure()   
    plt.plot(value_and_grad.hist_loss, label='loss')
    if not (validation_data is None): 
        plt.plot(value_and_grad.hist_loss_val, label='validation')
    plt.legend()
    plt.xlabel('epoch')
    plt.yscale('log')
    if validation_data is None:    
        plt.title('Training loss')
    else:
        plt.title('Training and validation losses')

    if not (figname is None):
        plt.savefig(figname,dpi=300)

    return results

### Code for data generation.

In [None]:
# Functions for data generation.
# DO NOT modify this cell.

# u
def u_fun(x):
    return np.cos(x**2)

# du
def du_fun(x):
    return -2*x*np.sin(x**2)

# k
def k_fun(x):
    return 1 + x**2

def a_fun(x):
    return x

# d/dx( k du/dx ) + a du/dx + u
def f_fun(x):
    return (-8*x**2 - 2)*np.sin(x**2) + (-4*x**4 - 4*x**2 + 1)*np.cos(x**2)

def generate_data(n_u_obs, n_k_obs, n_a_obs, n_phys):
    """
    Generate n_u_obs data examples for u, n_k_obs examples for k, 
    n_a_obs examples for a, n_phys examples for ode
    """
    
    # u observation data
    x_u_obs = reshape_2d(np.linspace(-1,1,n_u_obs))
    u_obs = u_fun(x_u_obs)
    u_obs = reshape_2d(u_obs)

    # k observation data
    x_k_obs = reshape_2d(np.linspace(-1,1,n_k_obs))
    k_obs = k_fun(x_k_obs)
    k_obs = reshape_2d(k_obs)

    # a observation data
    x_a_obs = reshape_2d(np.linspace(-1,1,n_a_obs))
    a_obs = a_fun(x_a_obs)
    a_obs = reshape_2d(a_obs)

    # Physics data
    x_phys = reshape_2d(np.linspace(-1,1,n_phys))
    y_phys = f_fun(x_phys)
    y_phys = reshape_2d(y_phys)
    
    X_data = [x_u_obs,x_k_obs,x_a_obs,x_phys]
    Y_data = [u_obs,k_obs,a_obs,y_phys]    
    
    return X_data, Y_data

# Questions 1, 2, 3, 4

In [None]:
class PI_NN(tf.keras.models.Model):
    def __init__(self):
        super(PI_NN, self).__init__()
        
        # 1. Define tf keras layers
        
        # 2. Define the loss function to use later on
        
        # delete the line below when your implementation is complete
        pass
            
    
    def call(self, inputs):
        # Forward pass for model, inputs is of size B x 1, where B is batch size
        
        # delete the line below when your implementation is complete
        pass
        
        # 1. Use layers defined in __init__ for the forward pass
        
        # 2. Split the output from the final layer into u, k, a:
        # u, k, a = tf.split(final_layer_output, num_or_size_splits=3, axis=1)
        
        return u, k, a

    def get_derivatives(self, x_input):
        x_input = tf.convert_to_tensor(x_input)
        
        # Compute u, k, a, du/dx (denoted as variable u_x) and the LHS of ODE (denoted as variable phys)
        
        # delete the line below when your implementation is complete
        pass
    
        return u, k, a, u_x, phys
    
    def loss(self, X, Y):
        # u = self(X[0])[0]
        # loss_u = self.loss_fun(Y[0], u)
        
        # k = self(X[1])[1]
        # loss_k = ...
        
        # a = ...
        # loss_a = ...
        
        # rhs_ode = ... (hint: use self.get_derivatives)
        # loss_ode = ...
        
        # loss = weight_u*loss_u + ...
        
        # delete the line below when your implementation is complete
        pass
        
        return loss

# Question 5

In [None]:
# mode 1
# a lot of data for $k$, $a$, $f$. Limited observation for $u$. Learn $u$.

# Initialize model
model = PI_NN()
model.build((1,1))

# Generate training data
X_train, Y_train = generate_data(2, 16, 16, 16)
X_val, Y_val     = generate_data(31, 31, 31, 31)

# TO DO: use the model_fit function to train the model


In [None]:
# Plot the solution

x_true = reshape_2d(np.linspace(-1,1,32))
x_test = reshape_2d(np.linspace(-1,1,128))

u, k, a, u_x, phys = model.get_derivatives(x_test)

plt.plot(x_test,u,'r',label='u')
plt.plot(x_true,u_fun(x_true),'go',label='true')
plt.title('Exact solution and DNN model')
plt.legend()
plt.show()

plt.plot(x_test,k,'r',label='k')
plt.plot(x_true,k_fun(x_true),'go',label='true')
plt.legend()
plt.show()

plt.plot(x_test,a,'r',label='a')
plt.plot(x_true,a_fun(x_true),'go',label='true')
plt.legend()
plt.show()

plt.plot(x_test,u_x,'r',label='ux')
plt.plot(x_true,du_fun(x_true),'go',label='true')
plt.legend()
plt.show()

plt.plot(x_test,phys,'r',label='ode')
plt.plot(x_true,f_fun(x_true),'go',label='true')
plt.legend()

In [None]:
# Plot the error

plt.plot(x_test, u - u_fun(x_test),'r',label='u')
plt.title('Error')
plt.legend()
plt.show()

plt.plot(x_test, k - k_fun(x_test),'r',label='k')
plt.legend()
plt.show()

plt.plot(x_test, a - a_fun(x_test),'r',label='a')
plt.legend()
plt.show()

plt.plot(x_test, u_x - du_fun(x_test),'r',label='ux')
plt.legend()
plt.show()

plt.plot(x_test, phys - f_fun(x_test),'r',label='ode')
plt.legend()

# Question 6

In [None]:
# mode 2
# a lot of data for $u$ and $f$. Limited observation for $k$ and $a$. Learn $k$ and $a$
model = PI_NN()
model.build((1,1))

X_train, Y_train = generate_data(16, 4, 4, 16)
X_val, Y_val = generate_data(31, 31, 31, 31)

# TO DO: use the model_fit function to train the model


In [None]:
# Plot the solution
u, k, a, u_x, _ = model.get_derivatives(x_test)

plt.plot(x_test,k,'r',label='k')
plt.plot(x_true,k_fun(x_true),'go',label='true')
plt.legend()
plt.show()

plt.plot(x_test,a,'r',label='a')
plt.plot(x_true,a_fun(x_true),'go',label='true')
plt.legend()
plt.show()


In [None]:
plt.plot(x_test, u - u_fun(x_test),'r',label='u')
plt.title('Error')
plt.legend()
plt.show()

plt.plot(x_test, k - k_fun(x_test),'r',label='k')
plt.legend()
plt.show()

plt.plot(x_test, a - a_fun(x_test),'r',label='a')
plt.legend()
plt.show()

plt.plot(x_test, u_x - du_fun(x_test),'r',label='ux')
plt.legend()
plt.show()

plt.plot(x_test, phys - f_fun(x_test),'r',label='ode')
plt.legend()
plt.show()

# Question 7

In [None]:
# mode 3
# Hybrid. Combine partial observations for all variables
model = PI_NN()
model.build((1,1))

X_train, Y_train = generate_data(8, 8, 8, 8)
X_val, Y_val = generate_data(31, 31, 31, 31)

# TO DO: use the model_fit function to train the model


In [None]:
# Plot the solution
u, k, a, u_x, phys = model.get_derivatives(x_test)

plt.plot(x_test,u,'r',label='u')
plt.plot(x_true,u_fun(x_true),'go',label='true')
plt.title('Exact solution and DNN model')
plt.legend()
plt.show()

plt.plot(x_test,k,'r',label='k')
plt.plot(x_true,k_fun(x_true),'go',label='true')
plt.legend()
plt.show()

plt.plot(x_test,a,'r',label='a')
plt.plot(x_true,a_fun(x_true),'go',label='true')
plt.legend()
plt.show()

plt.plot(x_test,u_x,'r',label='ux')
plt.plot(x_true,du_fun(x_true),'go',label='true')
plt.legend()
plt.show()

plt.plot(x_test,phys,'r',label='ode')
plt.plot(x_true,f_fun(x_true),'go',label='true')
plt.legend()
plt.show()

In [None]:
plt.plot(x_test, u - u_fun(x_test),'r',label='u')
plt.title('Error')
plt.legend()
plt.show()

plt.plot(x_test, k - k_fun(x_test),'r',label='k')
plt.legend()
plt.show()

plt.plot(x_test, a - a_fun(x_test),'r',label='a')
plt.legend()
plt.show()

plt.plot(x_test, u_x - du_fun(x_test),'r',label='ux')
plt.legend()
plt.show()

plt.plot(x_test, phys - f_fun(x_test),'r',label='ode')
plt.legend()
plt.show()