In [1]:
import tensorflow as tf
from tensorflow.keras import layers, activations


class Net:
    """Define the base neural network model: Fully Connected Network

    Model defined with tf.keras.Sequential() module

    Args:
    - n_input (2 or 3)
    - n_hidden_layer
    - n_hidden_neuron
    - n_output
    """
    def __init__(self, n_input, n_hidden_layer, n_hidden_neuron, n_output):
        """Constructor: build our neural network architecture"""
        self.features = tf.keras.Sequential()   # use sequential to build the model

        self.features.add(tf.keras.Input(shape=(n_input,)))    #input

        for i in range(n_hidden_layer): #for loop to add the n_hidden_layers (with n_hidden_neuron each)
            self.features.add(layers.Dense(n_hidden_neuron, activation="swish", kernel_initializer='glorot_uniform', bias_initializer='zeros'))

        self.features.add(layers.Dense(n_output,  kernel_initializer='glorot_uniform', bias_initializer='zeros'))   #output


    # return the number of parameters in the network    # TODO: change -> mettere un metodo più furbo...
    def num_parameters(self):
        """return the number of parameters in the network"""
        tot = 0
        for layer in self.features.layers:
            (dim1,dim2) = layer.get_weights()[0].shape
            (dim3,) = layer.get_weights()[1].shape
            tot+= dim1*dim2
            tot+= dim3
        return tot

    # return a list of all dimension of W and b in each layer
    def get_dimensions(self):
        """return a list of all dimension of W and b in each layer"""
        architecture = []
        for layer in self.features.layers:
            (dim1,dim2) = layer.get_weights()[0].shape # dims of matrix W
            (dim3,) = layer.get_weights()[1].shape  # dim of vector bias
            architecture.append((dim1,dim2,dim3))
        return architecture

    #decorator to speed up, alternatively directly call to Net.features(input)
    @tf.function
    def forward(self, x):
        """forward pass of the input x"""
        return self.features(x)

    # return a list of all the matrix W and bias vectors in the network (e.g. shape=(8,))
    def get_parameters(self):
        return self.features.trainable_weights

In [8]:
import numpy as np
import os

import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions

class BayesNN:
    """
    Define PINN-Bayesian network as a collection of NNs with physics-constraint

    Args:
    - num_neural_networks
    """
    def __init__(self, num_neural_networks, pde_noise, nFeature, nLayer, nNeuron, nOutput_vel, param2loss):
        """"Constructor"""

        self.num_neural_networks = num_neural_networks # number of Neural Networks used

        # w_i ~ StudentT(w_i | mu=0, lambda=shape/rate, nu=2*shape)
        # for efficiency, represent StudentT params using Gamma params
        # Nick shape  = 0.5, rate = 10
        self.w_prior_shape = 1.     # TODO: perchè poi inizializziamo w con una normale inizial. di una Neural Network?
        self.w_prior_rate = 0.05    # cioè con Normal(0, sigma) per matrice W e zero per Bias?

        # noise variance 1e-6: beta ~ Gamma(beta | shape, rate)
        ## Nick shape = 10 rate = 10*C0*args*.t**k0 = 10*0.2*0.005^3 = 2.5e-7
        self.beta_prior_shape = 2.
        self.beta_prior_rate = pde_noise

        # for the equation loglikelihood
        self.var_eq = 1e-4
        self.lamda = np.ones((1,))  # TODO: dove lo uso? posso toglierlo? dopo aver aggiunto param2loss

        self.architecture_nn_act_times = []
        self.architecture_nn_cond_velocity = []

        # build "self.num_neural_networks" instances for both act_times and cond_velocity
        instances_nn_act_times = []
        instances_nn_cond_velocity = []

        for i in range(self.num_neural_networks):
            new_instance_time = Net(nFeature, nLayer, nNeuron, 1)
            new_instance_vel = Net(nFeature, nLayer, nNeuron, nOutput_vel)
                # save the dimension of one network
            if(i==0):
                self.architecture_nn_act_times = new_instance_time.get_dimensions()
                self.architecture_nn_cond_velocity = new_instance_vel.get_dimensions()

            # TODO: sistemare prior -> initialization (senza questo pezzo uso una classica inizial. di un Neural Network)
            ### w_i ~ StudentT(w_i | mu=0, lambda=shape/rate, nu=2*shape)
            # initialization_vector = tfd.StudentT(df = self.w_prior_shape, loc = 0.0,
            #                  scale = self.w_prior_rate).sample(sample_shape=(new_instance.num_parameters(),))
            # new_instance.update_weights(initialization_vector.numpy())

            instances_nn_act_times.append(new_instance_time) # append the i-th NN
            instances_nn_cond_velocity.append(new_instance_vel)

        # store all the Neural Networks
        self.nnets_times = instances_nn_act_times
        self.nnets_vel = instances_nn_cond_velocity

        # store the arg "param2loss", used in criterion()
        self.param2loss = param2loss
    ###############################################################################################################################


    # get the neural networks of index=idx
    def __getitem__(self, idx):
        return (self.nnets_times[idx], self.nnets_vel[idx])


    # forward pass of all the n_samples neural networks
    def forward(self, inputs):
        output_times = []
        output_vel = []
        for i in range(self.n_samples):
            output_times.append(self.nnets_times[i].features(inputs))
            output_vel.append(self.nnets_vel[i].features(inputs))
        output_times = tf.stack(output_times)
        output_vel = tf.stack(output_vel)
        return output_times, output_vel

    def get_trainable_weights(self):
        weights_times = []
        weights_vel = []
        for i in range(self.num_neural_networks):
            weights_times.append(self.nnets_times[i].features.trainable_weights)
            weights_vel.append(self.nnets_vel[i].features.trainable_weights)

        return weights_times, weights_vel

    # ### TODO: use "save" instead of "save_weights"!
    # # save all the neural networks
    # def save_networks(self, path):
    #     for i in range(self.n_samples):
    #         save_path = os.path.join(path, "weights_"+str(i)+".h5")
    #         self.nnets[i].features.save_weights(save_path)
    #
    # # load all the neural networks
    # def load_networks(self, path):
    #     for i in range(self.n_samples):
    #         load_path = os.path.join(path,"weights_"+str(i)+".h5")
    #         self.nnets[i].features.load_weights(load_path)

    # compute the log joint probability = loglikelihood + log_prior_w + log_prior_log_beta
    @tf.function    # decorator @tf.function to speed up the computation
    def log_joint(self, index, output, target):
        """Log joint probability or unnormalized posterior for single model
        instance. Ignoring constant terms for efficiency.

        Args:
            index (int): model index, 0, 1, ..., "n_samples"
            output (Tensor): y_pred from our model
            target (Tensor): y true
        Returns:
            Log joint probability (zero-dim tensor)
        """
        # Normal(target | output, 1 / beta * I)
        #print('output.size = ',output.size(0))
        ntrain = output.shape[0]


        output = tf.dtypes.cast(output, dtype=tf.float64)
        ### train on data for JUST at -> target[:,0]
        ### TODO: mettere un coeff n_exact/n_coll davanti?
        log_likelihood = -0.50*tf.math.exp(self.nnets[index].log_beta)*tf.reduce_sum((target[:,0] - output[:,0])**2) \
                            + 0.50 * (tf.size(target[:,0], out_type = tf.dtypes.float64)) * self.nnets[index].log_beta


        # log prob of prior of weights, i.e. log prob of studentT
        log_prob_prior_w = 0.
        for param in self.nnets[index].features.trainable_weights:
            log_prob_prior_w += (tf.reduce_sum( tf.math.log1p( 0.5 / self.w_prior_rate * (param**2) ) ) )
        log_prob_prior_w *= -(self.w_prior_shape + 0.5)
        #print(log_prob_prior_w)

        # log prob of prior of log noise-precision (NOT noise precision)
        # noise prior (beta_shape - 1)*log_beta
        log_prob_prior_log_beta = ((self.beta_prior_shape-1) * self.nnets[index].log_beta-  self.beta_prior_rate * (tf.math.exp(self.nnets[index].log_beta)) )
        #print(log_prob_prior_log_beta)
        log_likelihood=tf.dtypes.cast(log_likelihood, dtype=tf.float64)
        log_prob_prior_w=tf.dtypes.cast(log_prob_prior_w, dtype=tf.float64)
        log_prob_prior_log_beta=tf.dtypes.cast(log_prob_prior_log_beta, dtype=tf.float64)
        return (log_likelihood + log_prob_prior_w + log_prob_prior_log_beta)

    # compute the derivative of at and v wrt x and y
    @tf.function    # decorator @tf.function to speed up the computation
    def _gradients(self,index,x,y,z):
        with tf.GradientTape(persistent = True) as t1:
            t1.watch(x)
            t1.watch(y)
            if(self.n_input == 3):
                t1.watch(z)
                net_in = tf.concat((x,y,z),1)
            else:
                net_in = tf.concat((x,y),1) #input for NN
            output = self.nnets[index].features(net_in) #output after forward(input)
            at = output[:,0]    #activation time
            v = output[:,1]     #local speed


        at_x = t1.gradient(at, x)
        v_x = t1.gradient(v, x)
        at_y = t1.gradient(at, y)
        v_y = t1.gradient(v, y)
        if(self.n_input == 3):
            at_z = t1.gradient(at, z)
            v_z = t1.gradient(v, z)
        else:
            at_z = None
            v_z = None
        del t1
        return at_x,v_x,at_y,v_y,at_z,v_z,v,output.shape[0]

    # compute the loss and logloss of Physics Constrain (PDE constraint)
    def criterion(self,index,x,y,z,ntrain_exact):
        """
        Compute the loss and logloss of PDE constraint:
        here we are using the Eikonal eq  =>  || Grad(at) || = 1/v
        (we can rewrite it in a residual form as:
        res = ||Grad(at)||*v - 1 = 0 )
        We are also penalizing big gradients for the conduction_velocity

        Args:
            index (int): model index, 0, 1, ..., "n_samples"
            x,y: batch of collocation data
            ntrain_exact: number of exact data we have
        Returns:
            logloss1+logloss2: total log loss of pde
            loss_1: loss of first constraint (Eikonal)
            loss_2: loss of second constraint (No High gradients in velocity)
        """
        # compute the derivatives
        at_x,v_x, at_y,v_y, at_z,v_z, v,output_size = self._gradients(index,x,y,z)

        # Eikonal eq constraint: res = ||Grad(at)||*v - 1 = 0
        v = tf.dtypes.cast(v, tf.float64)
        v = tf.reshape(v, shape=(v.shape[0],1))

        if(self.n_input == 3):
            # pde loss -> for collocation points
            loss_1 = tf.math.multiply( (tf.math.sqrt(tf.math.square(at_x) + tf.math.square(at_y)+ tf.math.square(at_z))) , v) -1
            # penalizing high gradients of V -> param2loss * Grad(v) small enough
            loss_2 = self.param2loss*tf.math.sqrt(tf.math.square(v_x) + tf.math.square(v_y) + tf.math.square(v_z))
        else:
            # pde loss -> for collocation points
            loss_1 = tf.math.multiply( (tf.math.sqrt(tf.math.square(at_x) + tf.math.square(at_y))) , v) -1
            # penalizing high gradients of V -> param2loss * Grad(v) small enough
            loss_2 = self.param2loss*tf.math.sqrt(tf.math.square(v_x) + tf.math.square(v_y))

        # TODO: sistemare il fattore ntrain_exact/output_size... è giusto? serve?
        # log loss for a Gaussian
        logloss1 = ntrain_exact / output_size * (
                            - 0.5 * self.nnets[index].beta_eq
                            * tf.reduce_sum((loss_1 - tf.zeros_like(loss_1))**2)
                            )
        # log loss for a Gaussian
        logloss2 = ntrain_exact / output_size * (
                            - 0.5 * self.nnets[index].beta_eq
                            * tf.reduce_sum((loss_2 - tf.zeros_like(loss_2))**2)
                            )

        return (logloss1+logloss2), loss_1, loss_2


    def predict(self, inputs):
        """
        Prediction at data_test = inputs.
        Args:
            input (Tensor): [N, 2], test input
        Returns:
            y (Tensor): shape=[n_samples,2] prediction of (at,v) for all n_samples neural networks
        """
        y = self.forward(inputs)
        return y

    def mean_and_variances(self, inputs):
        return None


In [9]:
bnn = BayesNN(5, 0.01, 2, 3, 20, 1,1)

In [11]:
tim,vel = bnn.get_trainable_weights()