In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch.optim as optim
import torch
from torch.utils.data import DataLoader
from Common import NeuralNet, MultiVariatePoly
import time
torch.autograd.set_detect_anomaly(True)
torch.manual_seed(128)
# torch.manual_seed
import pandas as pd

In [18]:
class Pinns:
    def __init__(self, n_int_, n_sb_, n_tb_):
        # * Done
        self.n_int = n_int_
        self.n_sb = n_sb_
        self.n_tb = n_tb_

        self.n_phases = 16
        self.alpha_f = 0.005
        self.alpha_s = 0.08
        self.h_f = 5
        self.h_s = 6
        self.T_hot = 4
        self.T_0 = 1
        self.U_f = 1 

        # Extrema of the solution domain (t,x) in [0,0.1]x[-1,1]
        self.domain_extrema = torch.tensor([[0, 8],  # Time dimension
                                            [0, 1]])  # Space dimension
        # Number of space dimensions
        self.space_dimensions = 1

        # Parameter to balance role of data and PDE
        self.lambda_u = 10

        # F Dense NN to approximate the solution of the underlying heat equation
        self.approximate_solution = NeuralNet(input_dimension=self.domain_extrema.shape[0], output_dimension=1,
                                              n_hidden_layers=4,
                                              neurons=20,
                                              regularization_param=0.,
                                              regularization_exp=2.,
                                              retrain_seed=42)
        

        # FF Dense NN to approximate the T_f we wish to infer
        self.approximate_coefficient = NeuralNet(input_dimension=self.domain_extrema.shape[0], output_dimension=1,
                                                 n_hidden_layers=4,
                                                 neurons=20,
                                                 regularization_param=0.,
                                                 regularization_exp=2.,
                                                 retrain_seed=42)

                                    
        '''self.approximate_solution = MultiVariatePoly(self.domain_extrema.shape[0], 3)'''

        # Generator of Sobol sequences
        self.soboleng = torch.quasirandom.SobolEngine(dimension=self.domain_extrema.shape[0])

        # Training sets S_sb, S_tb, S_int as torch dataloader
        self.training_set_sb, self.training_set_tb, self.training_set_int = self.assemble_datasets()

    ################################################################################################
    # Function to linearly transform a tensor whose value are between 0 and 1
    # to a tensor whose values are between the domain extrema
    def convert(self, tens):
        # * Done
        assert (tens.shape[1] == self.domain_extrema.shape[0])
        return tens * (self.domain_extrema[:, 1] - self.domain_extrema[:, 0]) + self.domain_extrema[:, 0]
    
    # Initial condition T_f(x,t = 0) = T_s(x,t=0) = T_0
    # def initial_condition(self, x):
    #     return torch.tensor(self.T_0)

    ################################################################################################
    # Function returning the input-output tensor required to assemble the training set S_tb corresponding to the temporal boundary

    def get_fluid_velocity(self, inputs):
        # * Done
        U_f = torch.full(inputs.shape, -1)
    
        for i, t in enumerate(inputs):
            # Charging Phase
            if (t <=1) or (t > 4 and t<=5): U_f[i] = 1
            # Idle Phase
            elif (t > 1 and t <= 2) or (t > 3 and t <= 4) or (t > 5 and t <= 6): U_f[i] = 0
            # Discharging Phase
            elif (t > 2 and t <= 3) or (t > 6 and t <= 7): U_f[i] = -1
           
        return U_f
    

    def add_training_data(self, path):
        # ! NOT COMPLETE
        
        df_meas = pd.read_csv(path)
        
        df_meas_tensor = torch.tensor(df_meas.values, dtype = torch.float)
        # df_meas = df_meas.drop(columns=['Unnamed: 0'])
        input_data = df_meas_tensor[:, 0:2] 

        output_data = torch.tensor(df_meas["tf"].values, dtype = torch.float).reshape(-1,1)
 
        return input_data, output_data


    def add_temporal_boundary_points(self):
        # * Done
        # t0 = self.domain_extrema[0, 0]
        # input_tb = self.soboleng.draw(self.n_tb)    # input_sb has two columns (t, x) both with random numbers in the two respective domains
        # input_tb[:, 0] = torch.full(input_tb[:, 0].shape, t0)   # overwrite the entier column of time with t0
        # output_tb = torch.full(input_tb.shape, self.T_0)
        t0 = self.domain_extrema[0, 0]
        input_tb = self.convert(self.soboleng.draw(self.n_tb))
        input_tb[:, 0] = torch.full(input_tb[:, 0].shape, t0)
        # print("Input temporal boundary shape: ", input_tb.shape)
        # output_tb = self.initial_condition(input_tb[:, 1]).reshape(-1, 1)
        output_tb = torch.randn(self.n_tb, 1).fill_(self.T_0).reshape(-1, 1)
        # print( "Output temporal boundary shape: ", output_tb.shape)
        return input_tb, output_tb



    # Function returning the input-output tensor required to assemble the training set S_sb corresponding to the spatial boundary
    def add_spatial_boundary_points(self):
        # * Done

        def time_shift_array(array_x0, array_xL, delta_t):
            array_x0_cloned = torch.clone(array_x0)
            array_xL_cloned = torch.clone(array_xL) 

            # Shift the time for soboleng time components to be in [1,2]
            array_x0_cloned[:, 0] = array_x0_cloned[:, 0] + delta_t
            array_xL_cloned[:, 0] = array_xL_cloned[:, 0] + delta_t
            array_merged=  torch.cat((array_x0_cloned, array_xL_cloned), dim=0) # Input data for the charging phase with t in [0,1]

            return array_merged

        # Obtain the input spatuial boundary points
        ############################################################
        # Get x-coordinates of the extrema of the domain
        x0 = self.domain_extrema[1, 0]
        xL = self.domain_extrema[1, 1]

        # input_sb = self.convert(self.soboleng.draw(self.n_sb))

        input_sb = self.soboleng.draw(self.n_sb)

        # Scale the Soboleng points from [0,8]
        scaled_input_sb = self.domain_extrema[0,1] * input_sb[:, 1]
        # print(scaled_input_sb)

        input_sb_0 = torch.clone(input_sb)
        input_sb_0[:, 1] = torch.full(input_sb_0[:, 1].shape, x0)

        input_sb_L = torch.clone(input_sb)
        input_sb_L[:, 1] = torch.full(input_sb_L[:, 1].shape, xL)

        # print("Input spatial charge shape: ", input_sb_0)
        # print("Input spatial charge shape: ", input_sb_L)

        # print(torch.cat([input_sb_0, input_sb_L], 0))

        ############################################################

        '''Implementation of one Cycle (charging -> idle -> discharging -> idle)'''

        # * Charging Phase
        # input_sb_0_charge = torch.clone(input_sb_0)
        # input_sb_L_charge = torch.clone(input_sb_L) 
        # input_sb_charge =  torch.cat((input_sb_0_charge, input_sb_L_charge), dim=0) # Input data for the charging phase with t in [0,1]

        input_sb_charge = time_shift_array(input_sb_0, input_sb_L, 0)
        # print("Input spatial charge shape: ", input_sb_charge)
        charge_output_0 = torch.full(input_sb_0[:, 1].shape, self.T_hot).reshape(-1,1)
        charge_output_L = torch.full(input_sb_0[:, 1].shape, 0).reshape(-1,1)
        charge_ouput = torch.cat((charge_output_0, charge_output_L), dim=0)
    
        # * Discharging Phase
        # input_sb_0_discharge = torch.clone(input_sb_0)
        # input_sb_L_discharge = torch.clone(input_sb_L) 

        # # Shift the time for soboleng time components to be in [1,2]
        # input_sb_0_discharge[:, 0] = input_sb_0_discharge[:, 0] + 1
        # input_sb_L_discharge[:, 0] = input_sb_L_discharge[:, 0] + 1
        # input_sb_discharge=  torch.cat((input_sb_0_discharge, input_sb_L_discharge), dim=0) # Input data for the charging phase with t in [0,1]
        input_sb_discharge = time_shift_array(input_sb_0, input_sb_L, 2)
        discharge_output_0 = torch.full(input_sb_0[:, 1].shape, 0).reshape(-1,1)
        discharge_output_L = torch.full(input_sb_0[:, 1].shape, self.T_0).reshape(-1,1) #T_0 is the same as T_cold
        discharge_output = torch.cat((discharge_output_0, discharge_output_L), dim=0)

        # print(discharge_output)

        # * Idle Phase 1
        input_sb_idle1 =  time_shift_array(input_sb_0, input_sb_L, 1)
        idle_output_0 = torch.full(input_sb_0[:, 1].shape, 0).reshape(-1,1)
        idle_output_L = torch.full(input_sb_0[:, 1].shape, 0).reshape(-1,1)
        idle_output = torch.cat((idle_output_0, idle_output_L), dim=0)


         # * Idle Phase 2
        input_sb_idle2 =  time_shift_array(input_sb_0, input_sb_L, 3)
        # idle_output_0 = torch.full(input_sb_0[:, 1].shape, 0).reshape(-1,1)
        # idle_output_L = torch.full(input_sb_0[:, 1].shape, 0).reshape(-1,1)
        idle_output = torch.cat((idle_output_0, idle_output_L), dim=0)

        cat_input_1 =  torch.cat([input_sb_charge, input_sb_idle1, input_sb_discharge, input_sb_idle2], 0)
        cat_output_1 = torch.cat([charge_ouput, idle_output, discharge_output, idle_output], 0)

        # print("Input spatial boundary shape: ", cat_input_1.shape)
        # print("Output spatial boundary shape: ", cat_output_1.shape)
        # print("Input spatial boundary: ", cat_input_1)
        # print("Output spatial boundary: ", cat_output_1)

        '''Implementation of second Cycle (charging -> idle -> discharging -> idle)'''

        cat_input_2 =  torch.cat([input_sb_charge, input_sb_idle1, input_sb_discharge, input_sb_idle2], 0)
        cat_input_2[:, 0] = cat_input_2[:, 0] + 4
        cat_output_2 = torch.cat([charge_ouput, idle_output, discharge_output, idle_output], 0)

        # print("Input spatial boundary shape: ", cat_input_2.shape)
        # print("Output spatial boundary shape: ", cat_output_2.shape)
        # print("Input spatial boundary: ", cat_input_2)
        # print("Output spatial boundary: ", cat_output_2)

        return torch.cat([cat_input_1.requires_grad_(True), cat_input_2.requires_grad_(True)], 0), torch.cat([cat_output_1, cat_output_2], 0)

    #  Function returning the input-output tensor required to assemble the training set S_int corresponding to the interior domain where the PDE is enforced
    def add_interior_points(self):
        # * Done
        input_int = self.convert(self.soboleng.draw(self.n_int))
        # input_int = self.soboleng.draw(self.n_int)
        output_int = torch.zeros((input_int.shape[0], 1))
        return input_int, output_int

    # Function returning the training sets S_sb, S_tb, S_int and S_data as dataloader
    def assemble_datasets(self):
        # * Done
        # Additionally add the measurement data to the dataset
        # input_data, output_data = self.add_training_data() # S_data 
        input_sb, output_sb = self.add_spatial_boundary_points()   # S_sb
        input_tb, output_tb = self.add_temporal_boundary_points()  # S_tb
        input_int, output_int = self.add_interior_points()         # S_int

        # training_set_data = DataLoader(torch.utils.data.TensorDataset(input_data, output_data), batch_size= output_data.shape[0], shuffle=False)
        training_set_sb = DataLoader(torch.utils.data.TensorDataset(input_sb, output_sb), batch_size=self.n_phases*self.space_dimensions*self.n_sb, shuffle=False)
        training_set_tb = DataLoader(torch.utils.data.TensorDataset(input_tb, output_tb), batch_size=self.n_tb, shuffle=False)
        training_set_int = DataLoader(torch.utils.data.TensorDataset(input_int, output_int), batch_size=self.n_int, shuffle=False)

        return training_set_sb, training_set_tb, training_set_int, # training_set_data

    ################################################################################################
    # Function to compute the terms required in the definition of the TEMPORAL boundary residual
    # Takes as input the temporal boundary points and returns the prediction of the neural network at the temporal boundary points
    def apply_initial_condition(self, input_tb):
        # * Done
        u_pred_tb = self.approximate_solution(input_tb)
        #print(u_pred_tb)
        return u_pred_tb

    def get_phase_arr(self, input_arr, cycle, phase):

            #Phase 1 == Charging 
            #Phase 2 == Idle
            #Phase 3 == Discharging
            #Phase 4 == Idle

            points_per_phase = 2*int(input_arr.shape[0]/(self.n_phases)) # First part is x0 then xL
            if cycle == 1:
                if phase == 1:
                    return input_arr[:points_per_phase, :]
                elif phase == 2:
                    return input_arr[points_per_phase:2*points_per_phase, :]
                elif phase == 3:
                    return input_arr[2*points_per_phase:3*points_per_phase, :]
                elif phase == 4:
                    return input_arr[3*points_per_phase:4*points_per_phase, :]
            elif cycle == 2:
                if phase == 1:
                    return input_arr[4*points_per_phase: 5*points_per_phase, :]
                elif phase == 2:
                    return input_arr[5*points_per_phase: 6*points_per_phase, :]
                elif phase == 3:
                    return input_arr[6*points_per_phase: 7*points_per_phase, :]
                elif phase == 4:   
                    return input_arr[7*points_per_phase: , :]


    # Function to compute the terms required in the definition of the SPATIAL boundary residual
    # Takes as input the spatial boundary points and returns the prediction of the neural network at the temporal boundary points
    def apply_boundary_conditions(self, input_sb):
        #* Done
        
        
        '''Cycle 1'''
        input_sb_charging_0 = self.get_phase_arr(input_sb, 1, 1)[:int(input_sb.shape[0]/16)]
        input_sb_charging_L = self.get_phase_arr(input_sb, 1, 1)[int(input_sb.shape[0]/16):]

        # print(input_sb_charging_0)
        # print(input_sb_charging_L)

        input_sb_idle_0 = self.get_phase_arr(input_sb, 1, 2)[:int(input_sb.shape[0]/16)]
        input_sb_idle_L = self.get_phase_arr(input_sb, 1, 2)[int(input_sb.shape[0]/16):]

        # print(input_sb_idle_0)
        # print(input_sb_idle_L)

        input_sb_discharging_0 = self.get_phase_arr(input_sb, 1, 3)[:int(input_sb.shape[0]/16)]
        input_sb_discharging_L = self.get_phase_arr(input_sb, 1, 3)[int(input_sb.shape[0]/16):]

        input_sb_idle_0_1 = self.get_phase_arr(input_sb, 1, 4)[:int(input_sb.shape[0]/16)]
        input_sb_idle_L_1 = self.get_phase_arr(input_sb, 1, 4)[int(input_sb.shape[0]/16):]

        '''Cycle 2'''
        input_sb_charging_0_2 = self.get_phase_arr(input_sb, 2, 1)[:int(input_sb.shape[0]/16)]
        input_sb_charging_L_2 = self.get_phase_arr(input_sb, 2, 1)[int(input_sb.shape[0]/16):]

        input_sb_idle_0_2 = self.get_phase_arr(input_sb, 2, 2)[:int(input_sb.shape[0]/16)]
        input_sb_idle_L_2 = self.get_phase_arr(input_sb, 2, 2)[int(input_sb.shape[0]/16):]

        input_sb_discharging_0_2 = self.get_phase_arr(input_sb, 2, 3)[:int(input_sb.shape[0]/16)]
        input_sb_discharging_L_2 = self.get_phase_arr(input_sb, 2, 3)[int(input_sb.shape[0]/16):]

        input_sb_idle_0_3 = self.get_phase_arr(input_sb, 2, 4)[:int(input_sb.shape[0]/16)]
        input_sb_idle_L_3 = self.get_phase_arr(input_sb, 2, 4)[int(input_sb.shape[0]/16):]


        ''' Get boundary condition for Charging state '''
        # Cycle 1
        Tf_sb_0_charging_1 = self.approximate_solution(input_sb_charging_0).reshape(-1,1) #*
        Tf_sb_L_charging_1 = self.approximate_solution(input_sb_charging_L).reshape(-1,1)
        grad_Tf_sb_L_charging_1 = torch.autograd.grad(Tf_sb_L_charging_1.sum(), input_sb_charging_L, create_graph=True)[0][:, 1].reshape(-1,1) #*

        # Cycle 2
        Tf_sb_0_charging_2 = self.approximate_solution(input_sb_charging_0_2).reshape(-1,1)
        Tf_sb_L_charging_2 = self.approximate_solution(input_sb_charging_L_2).reshape(-1,1)
        grad_Tf_sb_L_charging_2 = torch.autograd.grad(Tf_sb_L_charging_2.sum(), input_sb_charging_L_2, create_graph=True)[0][:, 1].reshape(-1,1)

        ''' Get boundary condition for Discharging state '''
        # Cycle 1
        Tf_sb_0_discharging_1 = self.approximate_solution(input_sb_discharging_0).reshape(-1,1)
        grad_Tf_sb_0_discharging_1 = torch.autograd.grad(Tf_sb_0_discharging_1.sum(), input_sb_discharging_0, create_graph=True)[0][:, 1].reshape(-1,1)
        Tf_sb_L_discharging_1 = self.approximate_solution(input_sb_discharging_L).reshape(-1,1)
        grad_Tf_sb_L_discharging_1 = torch.autograd.grad(Tf_sb_L_discharging_1.sum(), input_sb_discharging_L, create_graph=True)[0][:, 1].reshape(-1,1)

        # Cycle 2
        Tf_sb_0_discharging_2 = self.approximate_solution(input_sb_discharging_0_2).reshape(-1,1)
        grad_Tf_sb_0_discharging_2 = torch.autograd.grad(Tf_sb_0_discharging_2.sum(), input_sb_discharging_0_2, create_graph=True)[0][:, 1].reshape(-1,1)
        Tf_sb_L_discharging_2 = self.approximate_solution(input_sb_discharging_L_2).reshape(-1,1)
        grad_Tf_sb_L_discharging_2 = torch.autograd.grad(Tf_sb_L_discharging_2.sum(), input_sb_discharging_L_2, create_graph=True)[0][:, 1].reshape(-1,1)

        ''' Get boundary condition for Idle state '''
        # Cycle 1
        # First Idle in Cycle 1
        Tf_sb_0_idle_1 = self.approximate_solution(input_sb_idle_0).reshape(-1,1)
        Tf_sb_L_idle_1 = self.approximate_solution(input_sb_idle_L).reshape(-1,1)
        grad_Tf_sb_0_idle_1 = torch.autograd.grad(Tf_sb_0_idle_1.sum(), input_sb_idle_0, create_graph=True)[0][:, 1].reshape(-1,1) # *
        grad_Tf_sb_L_idle_1 = torch.autograd.grad(Tf_sb_L_idle_1.sum(), input_sb_idle_L, create_graph=True)[0][:, 1].reshape(-1,1) # *
        
        # Second Idle in Cycle 1
        Tf_sb_0_idle_2 = self.approximate_solution(input_sb_idle_0_1).reshape(-1,1)
        Tf_sb_L_idle_2 = self.approximate_solution(input_sb_idle_L_1).reshape(-1,1)
        grad_Tf_sb_0_idle_2 = torch.autograd.grad(Tf_sb_0_idle_2.sum(), input_sb_idle_0_1, create_graph=True)[0][:, 1].reshape(-1,1) # *
        grad_Tf_sb_L_idle_2 = torch.autograd.grad(Tf_sb_L_idle_2.sum(), input_sb_idle_L_1, create_graph=True)[0][:, 1].reshape(-1,1) # *

        # Cycle 2
        # First Idle in Cycle 2
        Tf_sb_0_idle_3 = self.approximate_solution(input_sb_idle_0_2).reshape(-1,1)
        Tf_sb_L_idle_3 = self.approximate_solution(input_sb_idle_L_2).reshape(-1,1)
        grad_Tf_sb_0_idle_3 = torch.autograd.grad(Tf_sb_0_idle_3.sum(), input_sb_idle_0_2, create_graph=True)[0][:, 1].reshape(-1,1) # *
        grad_Tf_sb_L_idle_3 = torch.autograd.grad(Tf_sb_L_idle_3.sum(), input_sb_idle_L_2, create_graph=True)[0][:, 1].reshape(-1,1) # *
        
        # Second Idle in Cycle 2
        Tf_sb_0_idle_4 = self.approximate_solution(input_sb_idle_0_3).reshape(-1,1)
        Tf_sb_L_idle_4 = self.approximate_solution(input_sb_idle_L_3).reshape(-1,1)
        grad_Tf_sb_0_idle_4 = torch.autograd.grad(Tf_sb_0_idle_4.sum(), input_sb_idle_0_3, create_graph=True)[0][:, 1].reshape(-1,1) # *
        grad_Tf_sb_L_idle_4 = torch.autograd.grad(Tf_sb_L_idle_4.sum(), input_sb_idle_L_3, create_graph=True)[0][:, 1].reshape(-1,1) # *

        cat_tensor = torch.cat([
            # Cycle 1
            Tf_sb_0_charging_1, grad_Tf_sb_L_charging_1, grad_Tf_sb_0_idle_1, grad_Tf_sb_L_idle_1, grad_Tf_sb_0_discharging_1, Tf_sb_L_discharging_1,  grad_Tf_sb_0_idle_2, grad_Tf_sb_L_idle_2,
            # Cycle  2
            Tf_sb_0_charging_2, grad_Tf_sb_L_charging_2, grad_Tf_sb_0_idle_3, grad_Tf_sb_L_idle_3 , grad_Tf_sb_0_discharging_2, grad_Tf_sb_L_discharging_2, grad_Tf_sb_0_idle_4, grad_Tf_sb_L_idle_4
        ], 0)

        print(cat_tensor.shape)
        print("Cat tensor: ", cat_tensor)

        # print(cat_tensor)
        return cat_tensor



    # Function to compute the PDE residuals
    def compute_pde_residual(self, input_int):
        input_int.requires_grad = True
        # Obtain the prediction from the neural network
        Tf = self.approximate_solution(input_int).reshape(-1,)
        Ts = self.approximate_coefficient(input_int).reshape(-1,)
        
        # grad compute the gradient of a "SCALAR" function L with respect to some input nxm TENSOR Z=[[x1, y1],[x2,y2],[x3,y3],...,[xn,yn]], m=2
        # it returns grad_L = [[dL/dx1, dL/dy1],[dL/dx2, dL/dy2],[dL/dx3, dL/dy3],...,[dL/dxn, dL/dyn]]
        # Note: pytorch considers a tensor [u1, u2,u3, ... ,un] a vectorial function
        # whereas sum_u = u1 + u2 + u3 + u4 + ... + un as a "scalar" one

        # In our case ui = u(xi), therefore the line below returns:
        # grad_u = [[dsum_u/dx1, dsum_u/dy1],[dsum_u/dx2, dsum_u/dy2],[dsum_u/dx3, dL/dy3],...,[dsum_u/dxm, dsum_u/dyn]]
        # and dsum_u/dxi = d(u1 + u2 + u3 + u4 + ... + un)/dxi = d(u(x1) + u(x2) u3(x3) + u4(x4) + ... + u(xn))/dxi = dui/dxi

        grad_tf = torch.autograd.grad(Tf.sum(), input_int, create_graph=True)[0]
        grad_tf_t = grad_tf[:,0]
        grad_tf_x = grad_tf[:,1]
        
        grad_tf_xx = torch.autograd.grad(grad_tf_x.sum(), input_int, create_graph=True)[0][:,1]
        
        U_f = self.get_fluid_velocity(input_int[:, 0])
        
        residual = (grad_tf_t + U_f*grad_tf_x) - (self.alpha_f*grad_tf_xx - self.h_f*(Tf - Ts))
        return residual.reshape(-1, )

    # Function to compute the total loss (weighted sum of spatial boundary loss, temporal boundary loss and interior loss)
    def compute_loss(self, inp_train_sb, u_train_sb, inp_train_tb, u_train_tb, inp_train_int, verbose=True):

        u_pred_sb = self.apply_boundary_conditions(inp_train_sb)
        u_pred_tb = self.apply_initial_condition(inp_train_tb)

        inp_train_data, u_train_data = self.add_training_data("Task2/DataSolution.txt")
        u_pred_meas = self.approximate_solution(inp_train_data) 

        print("u_pred_sb: ", u_pred_sb.shape)
        print("u_train_sb: ", u_train_sb.shape)
        print("u_pred_tb: ", u_pred_tb.shape)
        print("u_train_tb: ", u_train_tb.shape)

        print("u_pred_tb: ", u_pred_tb)
        print("u_train_tb: ", u_train_tb)

        print("u_pred_meas: ", u_pred_meas.shape)
        print("u_train_data: ", u_train_data.shape)

        assert (u_pred_sb.shape[1] == u_train_sb.shape[1])
        assert (u_pred_tb.shape[1] == u_train_tb.shape[1])
        assert (u_pred_meas.shape[1] == u_train_data.shape[1])

        # Interior point loss
        r_int = self.compute_pde_residual(inp_train_int)

        # Initial Condition loss
        r_tb = u_train_tb - u_pred_tb

        # Measurement loss
        r_meas = u_train_data - u_pred_meas

        # Spatial Boundary loss
        r_sb = u_train_sb - u_pred_sb

        # loss_sb = torch.mean(abs(res_Tf_sb_0) ** 2) + torch.mean(abs(res_Ts_sb_0) ** 2) + torch.mean(abs(res_Ts_sb_L) ** 2) + torch.mean(abs(res_Tf_sb_L) ** 2) # ! To be changed

        # loss_sb = torch.mean(abs(u_train_sb - u_pred_sb) ** 2)
        loss_sb = torch.mean(abs(r_sb) ** 2)
        loss_tb = torch.mean(abs(r_tb) ** 2)
        loss_int = torch.mean(abs(r_int) ** 2) 
        loss_meas = torch.mean(abs(r_meas) ** 2)

        loss_u = loss_sb + loss_tb + loss_meas

        loss = torch.log10(self.lambda_u * loss_u + 1*loss_int)
        if verbose: print("Total loss: ", round(loss.item(), 4), "| PDE Loss: ", round(torch.log10(loss_u).item(), 4), "| Function Loss: ", round(torch.log10(loss_int).item(), 4))

        return loss

    ################################################################################################
    def fit(self, num_epochs, optimizer, verbose=True):
        history = list()

        # Loop over epochs
        for epoch in range(num_epochs):
            if verbose: print("################################ ", epoch, " ################################")

            for j, ((inp_train_sb, u_train_sb), (inp_train_tb, u_train_tb), (inp_train_int, u_train_int)) in enumerate(zip(self.training_set_sb, self.training_set_tb, self.training_set_int)):
                def closure():
                    optimizer.zero_grad()
                    loss = self.compute_loss(inp_train_sb, u_train_sb, inp_train_tb, u_train_tb, inp_train_int, verbose=verbose)
                    loss.backward()

                    history.append(loss.item())
                    return loss

                optimizer.step(closure=closure)

        print('Final Loss: ', history[-1])

        return history

    ################################################################################################
    def plotting(self):
        inputs = self.soboleng.draw(100000)
        inputs = self.convert(inputs)

        output = self.approximate_solution(inputs).reshape(-1, )
        # exact_output = self.exact_solution(inputs).reshape(-1, )

        fig, axs = plt.subplots(1, 2, figsize=(16, 8), dpi=150)
        # im1 = axs[0].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=exact_output.detach(), cmap="jet")
        axs[0].set_xlabel("x")
        axs[0].set_ylabel("t")
        # plt.colorbar(im1, ax=axs[0])
        axs[0].grid(True, which="both", ls=":")
        im2 = axs[1].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=output.detach(), cmap="jet")
        axs[1].set_xlabel("x")
        axs[1].set_ylabel("t")
        plt.colorbar(im2, ax=axs[1])
        axs[1].grid(True, which="both", ls=":")
        axs[0].set_title("Exact Solution")
        axs[1].set_title("Approximate Solution")

        plt.show()

        # err = (torch.mean((output - exact_output) ** 2) / torch.mean(exact_output ** 2)) ** 0.5 * 100
        # print("L2 Relative Error Norm: ", err.item(), "%")


In [19]:
# Solve the heat equation:
# u_t = u_xx, (t,x) in [0, 0.1]x[-1,1]
# with zero dirichlet BC and
# u(x,0)= -sin(pi x)

n_int = 256
n_sb = 64
n_tb = 64

pinn = Pinns(n_int, n_sb, n_tb)

In [20]:
n_epochs = 1
optimizer_LBFGS = optim.LBFGS(pinn.approximate_solution.parameters(),
                              lr=float(0.5),
                              max_iter=50000,
                              max_eval=50000,
                              history_size=150,
                              line_search_fn="strong_wolfe",
                              tolerance_change=1.0 * np.finfo(float).eps)
optimizer_ADAM = optim.Adam(pinn.approximate_solution.parameters(),
                            lr=float(0.001))

In [21]:
hist = pinn.fit(num_epochs=n_epochs,
                optimizer=optimizer_LBFGS,
                verbose=True)

plt.figure(dpi=150)
plt.grid(True, which="both", ls=":")
plt.plot(np.arange(1, len(hist) + 1), hist, label="Train Loss")
plt.xscale("log")
plt.legend()

################################  0  ################################
torch.Size([1024, 1])
Cat tensor:  tensor([[ 0.0000],
        [ 0.5835],
        [ 0.4645],
        ...,
        [-0.3017],
        [-0.3076],
        [-0.3229]], grad_fn=<CatBackward0>)
u_pred_sb:  torch.Size([1024, 1])
u_train_sb:  torch.Size([1024, 1])
u_pred_tb:  torch.Size([64, 1])
u_train_tb:  torch.Size([64, 1])
u_pred_tb:  tensor([[0.1424],
        [0.3229],
        [0.0736],
        [0.2070],
        [0.0127],
        [0.1698],
        [0.1148],
        [0.2582],
        [0.0967],
        [0.2308],
        [0.1553],
        [0.3596],
        [0.1295],
        [0.2890],
        [0.0451],
        [0.1868],
        [0.0600],
        [0.1965],
        [0.1360],
        [0.3056],
        [0.1623],
        [0.3789],
        [0.1063],
        [0.2441],
        [0.1225],
        [0.2732],
        [0.0293],
        [0.1779],
        [0.0858],
        [0.2185],
        [0.1488],
        [0.3410],
        [0.0669],
   

In [None]:
# Plot the input training points
input_sb_, output_sb_ = pinn.add_spatial_boundary_points()
input_tb_, output_tb_ = pinn.add_temporal_boundary_points()
input_int_, output_int_ = pinn.add_interior_points()

plt.figure(figsize=(16, 8), dpi=150)
plt.scatter(input_sb_[:, 1].detach().numpy(), input_sb_[:, 0].detach().numpy(), label="Boundary Points")
plt.scatter(input_int_[:, 1].detach().numpy(), input_int_[:, 0].detach().numpy(), label="Interior Points")
plt.scatter(input_tb_[:, 1].detach().numpy(), input_tb_[:, 0].detach().numpy(), label="Initial Points")
plt.xlabel("x")
plt.ylabel("t")
plt.legend()
plt.show()

In [32]:
input, output = pinn.add_training_data("Task2/DataSolution.txt")

In [33]:
display(input)

tensor([[0.0000, 0.0200],
        [0.0100, 0.0200],
        [0.0200, 0.0200],
        ...,
        [7.9800, 0.9800],
        [7.9900, 0.9800],
        [8.0000, 0.9800]])

In [34]:
display(output)

tensor([2.8750, 3.2501, 3.4717,  ..., 1.1124, 1.1085, 1.1103])