## Physics Informed Neural Networks to Approximate Solution of PDEs

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import torch.optim as optim
import torch
from torch.utils.data import DataLoader
from Common import NeuralNet
import time
torch.manual_seed(128)
from matplotlib.animation import FuncAnimation
import matplotlib.cm as cm
import scienceplots
import os
import logging
plt.style.use(['science','no-latex','ieee','bright'])

# plt.style.use(['science','no-latex','grid','bright','ieee'])
from matplotlib.colors import LogNorm


logging.getLogger('matplotlib.font_manager').disabled = True


In [3]:
if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    print ("MPS device not found.")

In [4]:
from PIL import Image

def save_gif_PIL(outfile, files, fps=60, loop=0):
    "Helper function for saving GIFs"
    imgs = [Image.open(file) for file in files]
    imgs[0].save(fp=outfile, format='GIF', append_images=imgs[1:], save_all=True, duration=int(1000/fps), loop=loop)

In [5]:
def save_plot_to_folder(plot, folder_name, file_name):
    """
    Save a Matplotlib plot to a specified folder.

    Parameters:
    - plot: The Matplotlib plot object.
    - folder_name: The name of the folder where the plot will be saved.
    - file_name: The name of the file (including the extension, e.g., "plot.png").

    Example usage:
    save_plot_to_folder(my_plot, "20hz", "loss_plot.png")
    """
    # Create the folder if it doesn't exist
    if not os.path.exists(str(folder_name)):
        os.makedirs(str(folder_name))

    # Save the plot inside the folder
    plot_filename = os.path.join(str(folder_name), file_name)
    plot.savefig(plot_filename,dpi=600,bbox_inches='tight')

    print(f"Plot saved to '{plot_filename}'")
    
    
    

In [15]:
def create_fdm_comparison_plots(fdm_file_path, pinn_model, num_time_steps, c,c_baseline):
    """
    Create comparison plots between FDM and PINN solutions and calculate L2 error.

    Parameters:
    - fdm_file_path (str): File path to the FDM data file (npz format).
    - pinn_model: The trained PINN model.
    - num_time_steps (int): Number of time steps to extract from the FDM model.
    - c: Velocity parameter.
    """

    # Load FDM data
    fdm = np.load(fdm_file_path)
    x_fdm, t_fdm, U_fdm = fdm['X'], fdm['T'], fdm['U']

    # Since the size of fdm tensor is huge, select only 100 time steps on which we will compare both the results. 
    # Determine the indices for the selected time steps
    selected_indices = np.round(np.linspace(0, len(t_fdm) - 1000, num_time_steps)).astype(int)

    # Extract selected time steps and corresponding solution
    selected_time_steps = t_fdm[selected_indices]
    # selected_time_steps = selected_time_steps[:-1]
    selected_solution = U_fdm[:, selected_indices]

    # Print the shape of each array
    print("Shape of x_fdm:", x_fdm.shape)
    print("Shape of selected_time_steps:", selected_time_steps.shape)
    print("Shape of selected_solution:", selected_solution.shape)

    # Flatten the arrays for scatter plot
    flat_x = x_fdm.reshape(-1, 1).repeat(len(selected_time_steps), axis=1).flatten()
    flat_t = np.tile(selected_time_steps, len(x_fdm))
    flat_u = selected_solution.flatten()
    
    # Meshgrid for scatter plot
    X_input = np.meshgrid(selected_time_steps, x_fdm)

    # Reshape the meshgrid to a 2D array
    X_input = np.column_stack((X_input[0].flatten(), X_input[1].flatten()))

    # Print the shape of X_input
    print("Shape of X_input:", X_input.shape)
    
    # Compute PINN solution on the FDM data points. [t,x]
    pinn_sol_fdm_data = pinn_model.approximate_solution(torch.tensor(X_input,dtype=torch.float32).to(device))
    pinn_sol_fdm_data = pinn_sol_fdm_data.detach().cpu().reshape(-1,)
    
    # Create folders if they don't exist
    folder_path = f'fdm_comp/c_{c}'
    os.makedirs(folder_path, exist_ok=True)
    
    # Define the font dictionary
    font = {'size': 18}
    # 
        # Set font size for ticks
    plt.rc('xtick', labelsize=16)
    plt.rc('ytick', labelsize=16)

    # Scatter plot for FDM solution
    plt.figure(figsize=(5, 5), dpi=600)
    plt.scatter(flat_x, flat_t, c=flat_u, cmap='jet', antialiased=True)
    plt.colorbar()
    plt.title('FDM', fontsize=16)
    plt.xlabel('x', fontsize=16)
    plt.ylabel('t', fontsize=16)
    plt.xlim([0, 3.14])
    plt.ylim([0, 2 * 3.14])
    plt.savefig(os.path.join(folder_path, 'fdm_solution.png'), dpi=600)
    plt.close()

    # Scatter plot for PINN solution
    plt.figure(figsize=(5, 5), dpi=600)
    plt.scatter(flat_x, flat_t, c=pinn_sol_fdm_data, cmap='jet', antialiased=True)
    plt.colorbar()
    plt.title('PINN', fontsize=16)
    plt.xlabel('x', fontsize=16)
    plt.ylabel('t', fontsize=16)
    plt.xlim([0, 3.14])
    plt.ylim([0, 2 * 3.14])
    plt.savefig(os.path.join(folder_path, 'pinn_solution.png'), dpi=600)
    plt.close()

    # Scatter plot for the difference with colorbar limit
    plt.figure(figsize=(5, 5), dpi=600)
    difference_scatter = plt.scatter(flat_x, flat_t, c=flat_u - pinn_sol_fdm_data.numpy(), cmap='jet', antialiased=True, vmin=-0.01, vmax=0.01)
    cbar = plt.colorbar(difference_scatter)
#     cbar.set_clim(-0.01, 0.01)  # Adjust the color limits as needed
    plt.title('Difference', fontsize=16)
    plt.xlabel('x', fontsize=16)
    plt.ylabel('t', fontsize=16)
    plt.xlim([0, 3.14])
    plt.ylim([0, 2 * 3])
    plt.savefig(os.path.join(folder_path, f'difference_{c_baseline}.png'), dpi=600)
    plt.close()
    # Calculate L2 error
    err = (np.mean((pinn_sol_fdm_data.numpy() - flat_u) ** 2) / np.mean(flat_u ** 2)) ** 0.5 * 100
    print(f'L2 Relative Error Norm for c={c}: {err:.2f}%')

    # Save FDM and PINN solutions, and the difference
    np.savez(os.path.join(folder_path, 'fdm_data.npz'), x=flat_x, t=flat_t, u=flat_u)
    np.savez(os.path.join(folder_path, 'pinn_data.npz'), x=flat_x, t=flat_t, u=pinn_sol_fdm_data.numpy())
    np.savez(os.path.join(folder_path, 'difference_data.npz'), x=flat_x, t=flat_t, difference=flat_u - pinn_sol_fdm_data.numpy())


In [11]:
class Pinns:
    def __init__(self, n_int_, n_sb_, n_tb_,t_max,c,loadPretrained=False,model_path=None):
        self.n_int = n_int_
        self.n_sb = n_sb_
        self.n_tb = n_tb_
        self.files = []
        self.c = c
        self.font = {'family': 'Times New Roman', 'style': 'italic', 'weight': 'normal', 'size': 20}
        self.tmax = t_max
        #iteration number
        self.iter = 0
        # Extrema of the solution domain (t,x) in [0,0.1]x[-1,1]
        self.domain_extrema = torch.tensor([[0, t_max],  # Time dimension
                                            [0, np.pi]]) # Space dimension
        
#         print(f"Domain Extreme Shape",self.domain_extrema.shape )
        self.t_max = t_max
        # Number of space dimensions
        self.space_dimensions = 1

        # Parameter to balance role of data and PDE
        self.lambda_u = 10
        if loadPretrained:
            self.path = f'plots/forward/pinn/{self.c}/tl'
        else:
            self.path = f'plots/forward/pinn/{self.c}'

        self.model_load_path = model_path
        
        self.checkpath()

        # F Dense NN to approximate the solution of the underlying heat equation
        self.approximate_solution = NeuralNet(input_dimension=self.domain_extrema.shape[0], output_dimension=1,
                                              n_hidden_layers=5,
                                              neurons=64,
                                              regularization_param=0.,
                                              regularization_exp=2.,
                                              retrain_seed=42)
        self.approximate_solution = self.approximate_solution.to(device)
       
        if loadPretrained:
            print("Loading Pretrained Model....")
            self.approximate_solution.load_state_dict(torch.load(model_path))
            self.approximate_solution.eval()
            
        '''self.approximate_solution = MultiVariatePoly(self.domain_extrema.shape[0], 3)'''

        # Generator of Sobol sequences
        self.soboleng = torch.quasirandom.SobolEngine(dimension=self.domain_extrema.shape[0])

        # Training sets S_sb, S_tb, S_int as torch dataloader
        self.training_set_sb_0,self.training_set_sb_L, self.training_set_tb, self.training_set_int = self.assemble_datasets()
        
        # Build and plot geometry: 
        self.plotGeometry()
        
        self.lbfgs = optim.LBFGS(self.approximate_solution.parameters(),
                              lr=float(1),
                              max_iter=50000,
                              max_eval=50000,
                              history_size=150,
                              line_search_fn="strong_wolfe",
                              tolerance_change=1.0 * np.finfo(float).eps)
        
        self.adam = optim.Adam(self.approximate_solution.parameters(),
                            lr=float(0.005))

            
        self.training_loss = []
        
    ################################################################################################
    # Function to linearly transform a tensor whose value are between 0 and 1
    # to a tensor whose values are between the domain extrema
    def convert(self, tens):
        assert (tens.shape[1] == self.domain_extrema.shape[0])
        return tens * (self.domain_extrema[:, 1] - self.domain_extrema[:, 0]) + self.domain_extrema[:, 0]

    # Initial condition to solve the heat equation u0(x)=-sin(pi x)
    def initial_condition(self, x,):
        return torch.sin(x)

    # Exact solution for the heat equation ut = u_xx with the IC above
    def exact_solution(self, inputs):
        t = inputs[:, 0]
        x = inputs[:, 1]

        u = torch.sin(x)*(torch.sin(t) + torch.cos(t))
#         u = 0.5*(torch.sin(x+2*t) + torch.sin(x-2*t))
        return u

    ################################################################################################
    # Function returning the input-output tensor required to assemble the training set S_tb corresponding to the temporal boundary
    def add_temporal_boundary_points(self):
        t0 = self.domain_extrema[0, 0]
        tL = self.domain_extrema[0, 0]
    
        input_tb_0 = self.convert(self.soboleng.draw(self.n_tb))
        input_tb_0[:, 0] = torch.full(input_tb_0[:, 0].shape, t0)
        output_tb_0 = self.initial_condition(input_tb_0[:, 1]).reshape(-1, 1)
    
        return input_tb_0, output_tb_0

    # Function returning the input-output tensor required to assemble the training set S_sb corresponding to the spatial boundary
    def add_spatial_boundary_points_minima(self):

        x0 = self.domain_extrema[1, 0]
        
        input_sb = self.convert(self.soboleng.draw(self.n_sb))
        
#         print(f"Spatial Boundary Shape: ",input_sb.shape)
        input_sb_0 = torch.clone(input_sb)
        input_sb_0[:, 1] = torch.full(input_sb_0[:, 1].shape, x0)

        output_sb_0 = torch.zeros((input_sb.shape[0], 1))

        return input_sb_0, output_sb_0
    
     # Function returning the input-output tensor required to assemble the training set S_sb corresponding to the spatial boundary
    def add_spatial_boundary_points_extrema(self):

        xL = self.domain_extrema[1, 1]

        input_sb = self.convert(self.soboleng.draw(self.n_sb))

        input_sb_L = torch.clone(input_sb)
        input_sb_L[:, 1] = torch.full(input_sb_L[:, 1].shape, xL)

        output_sb_L = torch.zeros((input_sb.shape[0], 1))
#         print(f'Shape of boundary points L: {output_sb_L.shape}')
# 
        return input_sb_L,output_sb_L
    
  
    #  Function returning the input-output tensor required to assemble the training set S_int corresponding to the interior domain where the PDE is enforced
    def add_interior_points(self):
        input_int = self.convert(self.soboleng.draw(self.n_int))
        output_int = torch.zeros((input_int.shape[0], 1))
#         print(f'Shape of int points: {output_int.shape}')
        return input_int, output_int

    # Function returning the training sets S_sb, S_tb, S_int as dataloader
    def assemble_datasets(self):
        input_sb_0, output_sb_0 = self.add_spatial_boundary_points_minima()   # S_sb_0
        input_sb_L, output_sb_L = self.add_spatial_boundary_points_extrema()
        input_tb, output_tb = self.add_temporal_boundary_points()             # S_tb
        input_int, output_int = self.add_interior_points()         # S_int
        
         # Move each dataset to the CUDA device
        input_sb_0 = input_sb_0.to(device)
        output_sb_0 = output_sb_0.to(device)
        input_sb_L = input_sb_L.to(device)
        output_sb_L = output_sb_L.to(device)
        input_tb = input_tb.to(device)
        output_tb = output_tb.to(device)
        input_int = input_int.to(device)
        output_int = output_int.to(device)
        
        training_set_sb_0 = DataLoader(torch.utils.data.TensorDataset(input_sb_0, output_sb_0), batch_size=self.n_sb, shuffle=False)
        training_set_sb_L = DataLoader(torch.utils.data.TensorDataset(input_sb_L, output_sb_L), batch_size=self.n_sb, shuffle=False)

        training_set_tb = DataLoader(torch.utils.data.TensorDataset(input_tb, output_tb), batch_size=self.n_tb, shuffle=False)
        training_set_int = DataLoader(torch.utils.data.TensorDataset(input_int, output_int), batch_size=self.n_int, shuffle=False)
                
        return training_set_sb_0,training_set_sb_L, training_set_tb, training_set_int

    ################################################################################################
    # Function to compute the terms required in the definition of the TEMPORAL boundary residual
    def apply_initial_condition(self, input_tb):
        u_pred_tb = self.approximate_solution(input_tb)
        return u_pred_tb

    # Function to compute the terms required in the definition of the SPATIAL boundary residual
    def apply_boundary_conditions(self, input_sb):
        u_pred_sb = self.approximate_solution(input_sb)
        return u_pred_sb
    
    # Function to compute the terms required in the definition of the SPATIAL boundary residual
    def apply_boundary_conditions_neuman(self, input_sb):
        input_sb.requires_grad = True
        u = self.approximate_solution(input_sb)
        grad_u = torch.autograd.grad(u.sum(), input_sb, create_graph=True)[0]
        grad_u_x = grad_u[:, 1]

        return grad_u_x
    
     # Function to compute the terms required in the definition of the SPATIAL boundary residual
    def apply_boundary_conditions_open_0(self, input_sb):
        input_sb.requires_grad = True
        u = self.approximate_solution(input_sb)
        grad_u = torch.autograd.grad(u.sum(), input_sb, create_graph=True)[0]
        grad_u_x = grad_u[:, 1]
        grad_u_t = grad_u[:, 0]
        k = grad_u_t - grad_u_x
        return k
    
    def apply_boundary_conditions_open_L(self, input_sb):
        input_sb.requires_grad = True
        u = self.approximate_solution(input_sb)
        grad_u = torch.autograd.grad(u.sum(), input_sb, create_graph=True)[0]
        grad_u_x = grad_u[:, 1]
        grad_u_t = grad_u[:, 0]
        k = grad_u_t + grad_u_x
        return k
    
    def apply_initial_condition_derivative(self,input_tb):
        input_tb.requires_grad = True
        u = self.approximate_solution(input_tb)
        grad_u = torch.autograd.grad(u.sum(), input_tb, create_graph=True)[0]
        grad_u_t = grad_u[:, 0]
        return grad_u_t

    # Function to compute the PDE residuals
    def compute_pde_residual(self, input_int):
        input_int.requires_grad = True
        u = self.approximate_solution(input_int)

        # grad compute the gradient of a "SCALAR" function L with respect to some input nxm TENSOR Z=[[x1, y1],[x2,y2],[x3,y3],...,[xn,yn]], m=2
        # it returns grad_L = [[dL/dx1, dL/dy1],[dL/dx2, dL/dy2],[dL/dx3, dL/dy3],...,[dL/dxn, dL/dyn]]
        # Note: pytorch considers a tensor [u1, u2,u3, ... ,un] a vectorial function
        # whereas sum_u = u1 + u2 + u3 + u4 + ... + un as a "scalar" one

        # In our case ui = u(xi), therefore the line below returns:
        # grad_u = [[dsum_u/dx1, dsum_u/dy1],[dsum_u/dx2, dsum_u/dy2],[dsum_u/dx3, dL/dy3],...,[dsum_u/dxm, dsum_u/dyn]]
        # and dsum_u/dxi = d(u1 + u2 + u3 + u4 + ... + un)/dxi = d(u(x1) + u(x2) u3(x3) + u4(x4) + ... + u(xn))/dxi = dui/dxi
        grad_u = torch.autograd.grad(u.sum(), input_int, create_graph=True)[0]
        grad_u_t = grad_u[:, 0]
        grad_u_x = grad_u[:, 1]
        grad_u_xx = torch.autograd.grad(grad_u_x.sum(), input_int, create_graph=True)[0][:, 1]
        grad_u_tt = torch.autograd.grad(grad_u_t.sum(), input_int, create_graph=True)[0][:, 0]

        residual = grad_u_tt - ((self.c)**2)*grad_u_xx
        return residual.reshape(-1, )

    # Function to compute the total loss (weighted sum of spatial boundary loss, temporal boundary loss and interior loss)
    def compute_loss(self, inp_train_sb_0, u_train_sb_0, inp_train_sb_L, u_train_sb_L, inp_train_tb, u_train_tb, inp_train_int, verbose=False):
           
#         Ct*(1-t_symbol/Te)+1
        
        lambdaT = 1e-2*(1-(inp_train_tb[:,0]/self.t_max)) + 1



        u_pred_sb_0 = self.apply_boundary_conditions(inp_train_sb_0).reshape(-1,1)
        u_pred_sb_L = self.apply_boundary_conditions(inp_train_sb_L).reshape(-1,1)
    
        u_pred_tb = self.apply_initial_condition(inp_train_tb)

                

        u_pred_tb_derivative = self.apply_initial_condition_derivative(inp_train_tb).reshape(-1,1)


        assert (u_pred_sb_0.shape[1] == u_train_sb_0.shape[1])
        assert (u_pred_sb_L.shape[1] == u_train_sb_L.shape[1])
        assert (u_pred_tb.shape[1] == u_train_tb.shape[1])
        assert (u_pred_tb_derivative.shape[1] == u_train_tb.shape[1])

        # PDE Loss 
        r_int = self.compute_pde_residual(inp_train_int)
        # rint = r_int*lambdaT
        
        # Boundary and Initial Conditions   

        # Spatial Boundary        
        r_sb_0 = u_train_sb_0 - u_pred_sb_0
        r_sb_L = u_train_sb_L - u_pred_sb_L
        
        
        r_tb = u_train_tb - u_pred_tb
        
        r_tb_derivate = u_train_tb - u_pred_tb_derivative

        loss_sb_0 = torch.mean(r_sb_0 ** 2)
        loss_sb_L = torch.mean(r_sb_L ** 2)
        
        loss_tb = torch.mean(lambdaT*(r_tb) ** 2)
        loss_int = torch.mean((r_int) ** 2)
                
        loss_tb_derivative = torch.mean(lambdaT*(r_tb_derivate) ** 2)

        loss = (loss_sb_0 + loss_sb_L)  +  (loss_tb_derivative + loss_tb) + loss_int 
        
        if not self.iter % 500:
            if verbose: 
                print("Total loss: ", round(loss.item(), 5), "| IC loss: ", round(loss_tb.item(), 4), "| BC loss: ", round(loss_sb_0.item(), 4),"| BC loss at L: ", round(loss_sb_L.item(), 4), "| Coll. Loss: ", round((loss_int).item(), 4),"| T.B Deriv. Loss: ", round((loss_tb_derivative).item(), 4))
        self.iter += 1
        return loss

    ################################################################################################
    def fit(self, num_epochs, optimizer, verbose=True):
        history = list()

        # Loop over epochs
        for epoch in range(num_epochs):
            
            if verbose and ((epoch % 500) ==  0): 
                print("################################ ", epoch, " ################################")

            for j, ((inp_train_sb_0, u_train_sb_0),(inp_train_sb_L, u_train_sb_L), (inp_train_tb, u_train_tb), (inp_train_int, u_train_int)) in enumerate(zip(self.training_set_sb_0, self.training_set_sb_L, self.training_set_tb, self.training_set_int)):
                def closure():
                    optimizer.zero_grad()
                    loss = self.compute_loss(inp_train_sb_0, u_train_sb_0,inp_train_sb_L,u_train_sb_L, inp_train_tb, u_train_tb, inp_train_int, verbose=verbose)
                    loss.backward()

                    history.append(loss.item())
                    return loss

                optimizer.step(closure=closure)
                

        print('Final Loss: ', history[-1])

        return history
    
    def train(self,num_epochs=5,verbose=True):
        
        print("Training started...")
        self.training_loss = self.fit(num_epochs=5000,optimizer=self.adam,verbose=verbose)
        print("Adam Completed....")
        
        # Training with L-BFGS optimizer
        lbfgs_loss = self.fit(num_epochs=num_epochs, optimizer=self.lbfgs, verbose=verbose)
        self.training_loss.append(lbfgs_loss)

        print('Plotting Loss...')
        
        #  Save Model Weights
        torch.save(self.approximate_solution.state_dict(), f'models/pinn_c_{self.c}_wave.pt')
        return self.training_loss

            

    ################################################################################################
    """
    Function to check the path where to store the model and plots for each iteration.
    """
    def checkpath(self):
        if not os.path.exists(self.path):
            print("Creating folder: ",self.path)
            os.makedirs(self.path)
        if not os.path.exists(f'{self.path}/animation'):
            print(f'Creating Folder {self.path}/animation')
            os.makedirs(f'{self.path}/animation')
            
        if not os.path.exists('models'):
            os.makedirs('models')
    
    def plotLoss(self,data):
        # plt.figure(dpi=600)
        plt.figure(figsize=(3,3),dpi=600)
        plt.title(f'1D Wave for c = {c}')
        plt.grid(True, which="both", ls=":")
        plt.plot(data, label=f'Train Loss for c = {c}')
        plt.xlabel("Epoch",fontdict=self.font)
        plt.ylabel("Loss",fontdict=self.font)
        plt.tick_params(labelsize=11)
        plt.yscale("log")
        plt.legend()
        save_plot_to_folder(plt, self.path, 'loss')
        
            

    def plotGeometry(self,save_plot=False):
        # Plot the input training points
        input_sb_, output_sb_ = self.add_spatial_boundary_points_minima()
        input_sb_extrema, output_sb_extrema = self.add_spatial_boundary_points_extrema()

        input_tb_, output_tb_ = self.add_temporal_boundary_points()
        input_int_, output_int_ = self.add_interior_points()

        plt.figure(figsize=(5,5),dpi=600)
        font = {'family': 'Times New Roman', 'style': 'italic', 'weight': 'normal', 'size': 12}


        plt.scatter(input_sb_[:, 1].detach().numpy(), input_sb_[:, 0].detach().numpy(), label="Boundary Points x=0")
        plt.scatter(input_sb_extrema[:, 1].detach().numpy(), input_sb_extrema[:, 0].detach().numpy(), label="Boundary Points x=L")
        #
        plt.scatter(input_int_[:, 1].detach().numpy(), input_int_[:, 0].detach().numpy(), label="Interior Points")
        plt.scatter(input_tb_[:, 1].detach().numpy(), input_tb_[:, 0].detach().numpy(), label="Initial Points")
        plt.xlabel("x",fontdict=font)
        plt.ylabel("t",fontdict=font)
        plt.tick_params(labelsize=14)

        plt.legend(loc='center left', bbox_to_anchor=(1, 0.9), fontsize=14)
        plt.close()
        
        if save_plot:
            save_plot_to_folder(plt, self.path, "geometry")
    
    def plotting(self):
        inputs = self.soboleng.draw(90000)
        inputs = self.convert(inputs)

        output = self.approximate_solution(inputs).reshape(-1, )
        exact_output = self.exact_solution(inputs).reshape(-1, )

              # Create a figure with the specified size and dpi
#         plt.figure(dpi=600)

        # Create a scatter plot for the approximate solution
        im2 = plt.scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=output.detach(), cmap="jet")
        # Set the labels, color bar, grid, and title
        plt.xlabel("x",fontdict=self.font)
        plt.ylabel("t",fontdict=self.font)
        plt.axis('equal')
        plt.colorbar(im2)
        plt.grid(True, which="both", ls=":")
        plt.title("Approximate Solution")
        plt.xlim([0,3.14])
        plt.ylim([0,6.28])

        # Show the plot
        plt.show()
        
#         # Calculate the absolute difference between exact_output and output
#         diff = (exact_output.detach() - output.detach())
        
#         im3 = axs[2].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=diff, cmap="viridis")  # Use a different colormap
# #         axs[2].set_xlim(0,3.14)
# #         axs[2].set_ylim(0,6.28) 
#         axs[2].set_xlabel("x")
#         axs[2].set_ylabel("t")
#         axs[2].set_title("Difference")
#         plt.colorbar(im3, ax=axs[2])
#         axs[2].grid(True, which="both", ls=":")


#         err = (torch.mean((output - exact_output) ** 2) / torch.mean(exact_output ** 2)) ** 0.5 * 100
#         print("L2 Relative Error Norm: ", err.item(), "%")
        


In [12]:
# Solve the heat equation:
# u_t = u_xx, (t,x) in [0, 0.1]x[-1,1]
# with zero dirichlet BC and
# u(x,0)= -sin(pi x)

n_int = 512

n_sb = 32
n_tb = 64
tmax = 2*np.pi
c = 1
try:
    del(pinn)
except Exception as e:
    print(e)
pinn = Pinns(n_int, n_sb, n_tb,tmax,c=c,loadPretrained=False,model_path='models/pinn_c_1_wave.pt')

In [13]:
%%time
loss = pinn.train()

Training started...
################################  0  ################################
Total loss:  25.0608 | IC loss:  1.7743 | BC loss:  11.2865 | BC loss at L:  5.5053 | Coll. Loss:  3.1267 | T.B Deriv. Loss:  3.3679
################################  500  ################################
Total loss:  0.04793 | IC loss:  0.0018 | BC loss:  0.0037 | BC loss at L:  0.0089 | Coll. Loss:  0.0322 | T.B Deriv. Loss:  0.0014
################################  1000  ################################
Total loss:  0.03822 | IC loss:  0.0006 | BC loss:  0.0069 | BC loss at L:  0.0128 | Coll. Loss:  0.0175 | T.B Deriv. Loss:  0.0004
################################  1500  ################################
Total loss:  0.02751 | IC loss:  0.003 | BC loss:  0.0097 | BC loss at L:  0.0111 | Coll. Loss:  0.0033 | T.B Deriv. Loss:  0.0004
################################  2000  ################################
Total loss:  0.0255 | IC loss:  0.007 | BC loss:  0.0061 | BC loss at L:  0.0089 | Coll. Lo

In [16]:
fdm_path = '../fdm/wave_data_1.npz'
create_fdm_comparison_plots(fdm_path, pinn_model=pinn, num_time_steps=300, c=pinn.c,c_baseline='base')

Shape of x_fdm: (315,)
Shape of selected_time_steps: (300,)
Shape of selected_solution: (315, 300)
Shape of X_input: (94500, 2)
L2 Relative Error Norm for c=1: 0.22%


In [41]:
from helper_new import create_fdm_comparison_plots

In [42]:
create_fdm_comparison_plots

<function helper_new.create_fdm_comparison_plots(fdm_file_path, pinn_model, num_time_steps, c)>

In [52]:
fdm_path = 'fdm/wave_data_4.npz'
create_fdm_comparison_plots(fdm_path, pinn_model=pinn_6, num_time_steps=300, c=4)

Shape of x_fdm: (315,)
Shape of selected_time_steps: (300,)
Shape of selected_solution: (315, 300)
Shape of X_input: (94500, 2)
L2 Relative Error Norm for c=4: 3.16%


In [None]:
# PINN with TL
n_int = 512

n_sb = 32
n_tb = 64
tmax = 2*np.pi
c = 4
try:
    del(pinn_7)
except Exception as e:
    print(e)
pinn_7 = Pinns(n_int, n_sb, n_tb,tmax,c=c,loadPretrained=True,model_path='models/pinn_c_2_wave.pt')

In [None]:
loss_7 = pinn_7.train()

In [None]:
loss_6 = pinn_6.training_loss

In [None]:
# plt.figure(dpi=600)
plt.figure(figsize=(3,3),dpi=600)
plt.title(f'1D Wave for c = {c}')
plt.grid(True, which="both", ls=":")
plt.plot(loss_6, label=f'Without Transfer Learning')
plt.plot(loss_7,label='Transfer Learning Source c = 2')

plt.xlabel("Epoch",fontdict=font)
plt.ylabel("Loss",fontdict=font)
plt.tick_params(labelsize=11)
plt.yscale("log")
plt.legend()
plt.savefig(f'{pinn_6.path}/loss.png')


In [None]:
# compare results of c=2 with and without TL
# Compare
# Generate data for approximate and exact solutions
inputs = pinn_7.soboleng.draw(8000)
inputs = pinn_7.convert(inputs).to(device)
output = pinn_7.approximate_solution(inputs).reshape(-1, ) # c=1


# Create a figure with subplots
fig, (ax1) = plt.subplots(1, 1, figsize=(5, 5), dpi=600)

# Font settings
font = {'family': 'Times New Roman', 'style': 'italic', 'weight': 'normal', 'size': 20}

# Plot the approximate solution on the left subplot (ax1)
im1 = ax1.scatter(inputs[:, 1].detach().cpu(), inputs[:, 0].detach().cpu(), c=output.detach().cpu(), cmap="jet", antialiased=True)
ax1.set_xlabel("x", fontdict=font)
ax1.set_ylabel("t", fontdict=font)
ax1.grid(True, which="both", ls=":", alpha=0.6)
ax1.set_title("PINN Solution with Transfer Learning.",fontsize=18)
ax1.set_xlim([0, 3.14])
ax1.set_ylim([0, 6.28])
ax1.tick_params(axis='x', which='both', labelsize=18)
ax1.tick_params(axis='y', which='both', labelsize=18)

# Create a colorbar outside of the subplots
cbar_ax = fig.add_axes([0.93, 0.15, 0.02, 0.7])  # [x, y, width, height]
cbar = plt.colorbar(im1, cax=cbar_ax)
cbar.set_label("PINN Solution", rotation=90, fontsize=12)

# Adjust the spacing between subplots
plt.subplots_adjust(wspace=0.4)

fig.savefig(f'{pinn_7.path}/approximate_solution_with_tl.png')


# PINN without Temporal Loss Weighting

In [None]:
class Pinn_WTLW:
    def __init__(self, n_int_, n_sb_, n_tb_,t_max,c,loadPretrained=False,model_path=None):
        self.n_int = n_int_
        self.n_sb = n_sb_
        self.n_tb = n_tb_
        self.files = []
        self.c = c
        self.font = {'family': 'Times New Roman', 'style': 'italic', 'weight': 'normal', 'size': 12}
        self.tmax = t_max
        #iteration number
        self.iter = 0
        # Extrema of the solution domain (t,x) in [0,0.1]x[-1,1]
        self.domain_extrema = torch.tensor([[0, t_max],  # Time dimension
                                            [0, np.pi]]) # Space dimension
        
        print(f"Domain Extreme Shape",self.domain_extrema.shape )
        self.t_max = t_max
        # Number of space dimensions
        self.space_dimensions = 1

        # Parameter to balance role of data and PDE
        self.lambda_u = 10
        if loadPretrained:
            self.path = f'plots/forward/pinn_wtlw/{self.c}/tl'
        else:
            self.path = f'plots/forward/pinn_wtlw/{self.c}'

        self.model_load_path = model_path
        
        self.checkpath()

        # F Dense NN to approximate the solution of the underlying heat equation
        self.approximate_solution = NeuralNet(input_dimension=self.domain_extrema.shape[0], output_dimension=1,
                                              n_hidden_layers=5,
                                              neurons=64,
                                              regularization_param=0.,
                                              regularization_exp=2.,
                                              retrain_seed=42)
        self.approximate_solution = self.approximate_solution.to(device)
       
        if loadPretrained:
            print("Loading Pretrained Model....")
            self.approximate_solution.load_state_dict(torch.load(model_path))
            self.approximate_solution.eval()
            
        '''self.approximate_solution = MultiVariatePoly(self.domain_extrema.shape[0], 3)'''

        # Generator of Sobol sequences
        self.soboleng = torch.quasirandom.SobolEngine(dimension=self.domain_extrema.shape[0])

        # Training sets S_sb, S_tb, S_int as torch dataloader
        self.training_set_sb_0,self.training_set_sb_L, self.training_set_tb, self.training_set_int = self.assemble_datasets()
        
        # Build and plot geometry: 
        self.plotGeometry()
        
        self.optimizer = optim.LBFGS(self.approximate_solution.parameters(),
                              lr=float(1),
                              max_iter=50000,
                              max_eval=50000,
                              history_size=150,
                              line_search_fn="strong_wolfe",
                              tolerance_change=1.0 * np.finfo(float).eps)
        self.training_loss = []
        
    ################################################################################################
    # Function to linearly transform a tensor whose value are between 0 and 1
    # to a tensor whose values are between the domain extrema
    def convert(self, tens):
        assert (tens.shape[1] == self.domain_extrema.shape[0])
        return tens * (self.domain_extrema[:, 1] - self.domain_extrema[:, 0]) + self.domain_extrema[:, 0]

    # Initial condition to solve the heat equation u0(x)=-sin(pi x)
    def initial_condition(self, x,):
        return torch.sin(x)

    # Exact solution for the heat equation ut = u_xx with the IC above
    def exact_solution(self, inputs):
        t = inputs[:, 0]
        x = inputs[:, 1]

        u = torch.sin(x)*(torch.sin(t) + torch.cos(t))
#         u = 0.5*(torch.sin(x+2*t) + torch.sin(x-2*t))
        return u

    ################################################################################################
    # Function returning the input-output tensor required to assemble the training set S_tb corresponding to the temporal boundary
    def add_temporal_boundary_points(self):
        t0 = self.domain_extrema[0, 0]
        tL = self.domain_extrema[0, 0]
    
        input_tb_0 = self.convert(self.soboleng.draw(self.n_tb))
        input_tb_0[:, 0] = torch.full(input_tb_0[:, 0].shape, t0)
        output_tb_0 = self.initial_condition(input_tb_0[:, 1]).reshape(-1, 1)
    
        return input_tb_0, output_tb_0

    # Function returning the input-output tensor required to assemble the training set S_sb corresponding to the spatial boundary
    def add_spatial_boundary_points_minima(self):

        x0 = self.domain_extrema[1, 0]
        
        input_sb = self.convert(self.soboleng.draw(self.n_sb))
        
#         print(f"Spatial Boundary Shape: ",input_sb.shape)
        input_sb_0 = torch.clone(input_sb)
        input_sb_0[:, 1] = torch.full(input_sb_0[:, 1].shape, x0)

        output_sb_0 = torch.zeros((input_sb.shape[0], 1))

        return input_sb_0, output_sb_0
    
     # Function returning the input-output tensor required to assemble the training set S_sb corresponding to the spatial boundary
    def add_spatial_boundary_points_extrema(self):

        xL = self.domain_extrema[1, 1]

        input_sb = self.convert(self.soboleng.draw(self.n_sb))

        input_sb_L = torch.clone(input_sb)
        input_sb_L[:, 1] = torch.full(input_sb_L[:, 1].shape, xL)

        output_sb_L = torch.zeros((input_sb.shape[0], 1))
        print(f'Shape of boundary points L: {output_sb_L.shape}')
# 
        return input_sb_L,output_sb_L
    
  
    #  Function returning the input-output tensor required to assemble the training set S_int corresponding to the interior domain where the PDE is enforced
    def add_interior_points(self):
        input_int = self.convert(self.soboleng.draw(self.n_int))
        output_int = torch.zeros((input_int.shape[0], 1))
        print(f'Shape of int points: {output_int.shape}')
        return input_int, output_int

    # Function returning the training sets S_sb, S_tb, S_int as dataloader
    def assemble_datasets(self):
        input_sb_0, output_sb_0 = self.add_spatial_boundary_points_minima()   # S_sb_0
        input_sb_L, output_sb_L = self.add_spatial_boundary_points_extrema()
        input_tb, output_tb = self.add_temporal_boundary_points()             # S_tb
        input_int, output_int = self.add_interior_points()         # S_int
        
         # Move each dataset to the CUDA device
        input_sb_0 = input_sb_0.to(device)
        output_sb_0 = output_sb_0.to(device)
        input_sb_L = input_sb_L.to(device)
        output_sb_L = output_sb_L.to(device)
        input_tb = input_tb.to(device)
        output_tb = output_tb.to(device)
        input_int = input_int.to(device)
        output_int = output_int.to(device)
        
        training_set_sb_0 = DataLoader(torch.utils.data.TensorDataset(input_sb_0, output_sb_0), batch_size=self.n_sb, shuffle=False)
        training_set_sb_L = DataLoader(torch.utils.data.TensorDataset(input_sb_L, output_sb_L), batch_size=self.n_sb, shuffle=False)

        training_set_tb = DataLoader(torch.utils.data.TensorDataset(input_tb, output_tb), batch_size=self.n_tb, shuffle=False)
        training_set_int = DataLoader(torch.utils.data.TensorDataset(input_int, output_int), batch_size=self.n_int, shuffle=False)
                
        return training_set_sb_0,training_set_sb_L, training_set_tb, training_set_int

    ################################################################################################
    # Function to compute the terms required in the definition of the TEMPORAL boundary residual
    def apply_initial_condition(self, input_tb):
        u_pred_tb = self.approximate_solution(input_tb)
        return u_pred_tb

    # Function to compute the terms required in the definition of the SPATIAL boundary residual
    def apply_boundary_conditions(self, input_sb):
        u_pred_sb = self.approximate_solution(input_sb)
        return u_pred_sb
    
    # Function to compute the terms required in the definition of the SPATIAL boundary residual
    def apply_boundary_conditions_neuman(self, input_sb):
        input_sb.requires_grad = True
        u = self.approximate_solution(input_sb)
        grad_u = torch.autograd.grad(u.sum(), input_sb, create_graph=True)[0]
        grad_u_x = grad_u[:, 1]

        return grad_u_x
    
     # Function to compute the terms required in the definition of the SPATIAL boundary residual
    def apply_boundary_conditions_open_0(self, input_sb):
        input_sb.requires_grad = True
        u = self.approximate_solution(input_sb)
        grad_u = torch.autograd.grad(u.sum(), input_sb, create_graph=True)[0]
        grad_u_x = grad_u[:, 1]
        grad_u_t = grad_u[:, 0]
        k = grad_u_t - grad_u_x
        return k
    
    def apply_boundary_conditions_open_L(self, input_sb):
        input_sb.requires_grad = True
        u = self.approximate_solution(input_sb)
        grad_u = torch.autograd.grad(u.sum(), input_sb, create_graph=True)[0]
        grad_u_x = grad_u[:, 1]
        grad_u_t = grad_u[:, 0]
        k = grad_u_t + grad_u_x
        return k
    
    def apply_initial_condition_derivative(self,input_tb):
        input_tb.requires_grad = True
        u = self.approximate_solution(input_tb)
        grad_u = torch.autograd.grad(u.sum(), input_tb, create_graph=True)[0]
        grad_u_t = grad_u[:, 0]
        return grad_u_t

    # Function to compute the PDE residuals
    def compute_pde_residual(self, input_int):
        input_int.requires_grad = True
        u = self.approximate_solution(input_int)

        # grad compute the gradient of a "SCALAR" function L with respect to some input nxm TENSOR Z=[[x1, y1],[x2,y2],[x3,y3],...,[xn,yn]], m=2
        # it returns grad_L = [[dL/dx1, dL/dy1],[dL/dx2, dL/dy2],[dL/dx3, dL/dy3],...,[dL/dxn, dL/dyn]]
        # Note: pytorch considers a tensor [u1, u2,u3, ... ,un] a vectorial function
        # whereas sum_u = u1 + u2 + u3 + u4 + ... + un as a "scalar" one

        # In our case ui = u(xi), therefore the line below returns:
        # grad_u = [[dsum_u/dx1, dsum_u/dy1],[dsum_u/dx2, dsum_u/dy2],[dsum_u/dx3, dL/dy3],...,[dsum_u/dxm, dsum_u/dyn]]
        # and dsum_u/dxi = d(u1 + u2 + u3 + u4 + ... + un)/dxi = d(u(x1) + u(x2) u3(x3) + u4(x4) + ... + u(xn))/dxi = dui/dxi
        grad_u = torch.autograd.grad(u.sum(), input_int, create_graph=True)[0]
        grad_u_t = grad_u[:, 0]
        grad_u_x = grad_u[:, 1]
        grad_u_xx = torch.autograd.grad(grad_u_x.sum(), input_int, create_graph=True)[0][:, 1]
        grad_u_tt = torch.autograd.grad(grad_u_t.sum(), input_int, create_graph=True)[0][:, 0]

        residual = grad_u_tt - ((self.c)**2)*grad_u_xx
        return residual.reshape(-1, )

    # Function to compute the total loss (weighted sum of spatial boundary loss, temporal boundary loss and interior loss)
    def compute_loss(self, inp_train_sb_0, u_train_sb_0, inp_train_sb_L, u_train_sb_L, inp_train_tb, u_train_tb, inp_train_int, verbose=True):
           
#         Ct*(1-t_symbol/Te)+1
        
#         lambdaT = 1e-2*(1-(inp_train_tb[:,0]/self.t_max)) + 1
      

        u_pred_sb_0 = self.apply_boundary_conditions(inp_train_sb_0).reshape(-1,1)
        u_pred_sb_L = self.apply_boundary_conditions(inp_train_sb_L).reshape(-1,1)
    
        u_pred_tb = self.apply_initial_condition(inp_train_tb)

                

        u_pred_tb_derivative = self.apply_initial_condition_derivative(inp_train_tb).reshape(-1,1)


        assert (u_pred_sb_0.shape[1] == u_train_sb_0.shape[1])
        assert (u_pred_sb_L.shape[1] == u_train_sb_L.shape[1])
        assert (u_pred_tb.shape[1] == u_train_tb.shape[1])
        assert (u_pred_tb_derivative.shape[1] == u_train_tb.shape[1])

        # PDE Loss 
        r_int = self.compute_pde_residual(inp_train_int)
        # rint = r_int*lambdaT
        
        # Boundary and Initial Conditions   

        # Spatial Boundary        
        r_sb_0 = u_train_sb_0 - u_pred_sb_0
        r_sb_L = u_train_sb_L - u_pred_sb_L
        
        
        r_tb = u_train_tb - u_pred_tb
        
        r_tb_derivate = u_train_tb - u_pred_tb_derivative

        loss_sb_0 = torch.mean(r_sb_0 ** 2)
        loss_sb_L = torch.mean(r_sb_L ** 2)
        
        loss_tb = torch.mean((r_tb) ** 2)
        loss_int = torch.mean((r_int) ** 2)
                
        loss_tb_derivative = torch.mean((r_tb_derivate) ** 2)

        loss = 1e-3*(loss_sb_0 + loss_sb_L + loss_tb_derivative + loss_tb ) + loss_int
        
        if not self.iter % 50:
            if verbose: 
                print("Total loss: ", round(loss.item(), 4), "| IC loss: ", round(loss_tb.item(), 4), "| BC loss: ", round(loss_sb_0.item(), 4),"| BC loss at L: ", round(loss_sb_L.item(), 4), "| Coll. Loss: ", round((loss_int).item(), 4),"| T.B Deriv. Loss: ", round((loss_tb_derivative).item(), 4))
#                 inputs = self.soboleng.draw(100000)
#                 inputs = self.convert(inputs).to(device)

#                 output = self.approximate_solution(inputs).reshape(-1, )
#                 exact_output = self.exact_solution(inputs).reshape(-1, )

#                 # Create a figure with the specified size and dpi
#                 plt.figure(figsize=(4, 4), dpi=150)


#                 # Create a scatter plot for the approximate solution
#                 im2 = plt.scatter(inputs[:, 1].detach().cpu(), inputs[:, 0].detach().cpu(), c=output.detach().cpu(), cmap="jet")

#                 # Set the labels, color bar, grid, and title
#                 plt.xlabel("x",fontdict=self.font)
#                 plt.ylabel("t",fontdict=self.font)
#                 plt.xlim([0,3.14])
#                 plt.ylim([0,self.tmax])
#                 plt.tick_params(labelsize=11)
#                 plt.colorbar(im2)
#                 plt.grid(True, which="both", ls=":")
#                 plt.title(f"Approximate Solution at Step: {self.iter}")


#                 file = f'{self.path}/animation/pinn_%.8i.png'%(self.iter+1)
#                 plt.savefig(file, bbox_inches='tight', pad_inches=0.1, dpi=600, facecolor="white")
#                 plt.close()
#                 self.files.append(file)
        self.iter += 1
        return loss

    ################################################################################################
    def fit(self, num_epochs, optimizer, verbose=True):
        history = list()

        # Loop over epochs
        for epoch in range(num_epochs):
            if verbose: print("################################ ", epoch, " ################################")

            for j, ((inp_train_sb_0, u_train_sb_0),(inp_train_sb_L, u_train_sb_L), (inp_train_tb, u_train_tb), (inp_train_int, u_train_int)) in enumerate(zip(self.training_set_sb_0, self.training_set_sb_L, self.training_set_tb, self.training_set_int)):
                def closure():
                    optimizer.zero_grad()
                    loss = self.compute_loss(inp_train_sb_0, u_train_sb_0,inp_train_sb_L,u_train_sb_L, inp_train_tb, u_train_tb, inp_train_int, verbose=verbose)
                    loss.backward()

                    history.append(loss.item())
                    return loss

                optimizer.step(closure=closure)
                

        print('Final Loss: ', history[-1])

        return history
    
    def train(self,num_epochs=5,verbose=True):
        self.training_loss = self.fit(num_epochs=num_epochs,optimizer=self.optimizer,verbose=verbose)
        print('Plotting Loss...')
        self.plotLoss(self.training_loss)
        #  Save Model Weights
        torch.save(self.approximate_solution.state_dict(), f'models/pinn_c_{self.c}_wave.pt')
        return self.training_loss

            

    ################################################################################################
    """
    Function to check the path where to store the model and plots for each iteration.
    """
    def checkpath(self):
        if not os.path.exists(self.path):
            print("Creating folder: ",self.path)
            os.makedirs(self.path)
        if not os.path.exists(f'{self.path}/animation'):
            print(f'Creating Folder {self.path}/animation')
            os.makedirs(f'{self.path}/animation')
            
        if not os.path.exists('models'):
            os.makedirs('models')
    
    def plotLoss(self,data):
        # plt.figure(dpi=600)
        plt.figure(figsize=(3,3),dpi=600)
        plt.title(f'1D Wave for c = {c}')
        plt.grid(True, which="both", ls=":")
        plt.plot(data, label=f'Train Loss for c = {c}')
        plt.xlabel("Epoch",fontdict=self.font)
        plt.ylabel("Loss",fontdict=self.font)
        plt.tick_params(labelsize=11)
        plt.yscale("log")
        plt.legend()
        save_plot_to_folder(plt, self.path, 'loss')
        
            

    def plotGeometry(self,save_plot=True):
        # Plot the input training points
        input_sb_, output_sb_ = self.add_spatial_boundary_points_minima()
        input_sb_extrema, output_sb_extrema = self.add_spatial_boundary_points_extrema()

        input_tb_, output_tb_ = self.add_temporal_boundary_points()
        input_int_, output_int_ = self.add_interior_points()

        plt.figure(figsize=(10,5),dpi=600)
        font = {'family': 'Times New Roman', 'style': 'italic', 'weight': 'normal', 'size': 12}
        plt.scatter(input_sb_[:, 1].detach().numpy(), input_sb_[:, 0].detach().numpy(), label="Boundary Points x=0")
        plt.scatter(input_sb_extrema[:, 1].detach().numpy(), input_sb_extrema[:, 0].detach().numpy(), label="Boundary Points x=L")
        #
        plt.scatter(input_int_[:, 1].detach().numpy(), input_int_[:, 0].detach().numpy(), label="Interior Points")
        plt.scatter(input_tb_[:, 1].detach().numpy(), input_tb_[:, 0].detach().numpy(), label="Initial Points")
        plt.xlabel("x",fontdict=font)
        plt.ylabel("t",fontdict=font)
        plt.tick_params(labelsize=11)

        plt.legend(loc='center left', bbox_to_anchor=(1, 0.9), fontsize=11)
#         plt.show()
        
        if save_plot:
            save_plot_to_folder(plt, self.path, "geometry")
    
    def plotting(self):
        inputs = self.soboleng.draw(90000)
        inputs = self.convert(inputs)

        output = self.approximate_solution(inputs).reshape(-1, )
        exact_output = self.exact_solution(inputs).reshape(-1, )

              # Create a figure with the specified size and dpi
#         plt.figure(dpi=600)

        # Create a scatter plot for the approximate solution
        im2 = plt.scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=output.detach(), cmap="jet")
        # Set the labels, color bar, grid, and title
        plt.xlabel("x",fontdict=self.font)
        plt.ylabel("t",fontdict=self.font)
        plt.axis('equal')
        plt.colorbar(im2)
        plt.grid(True, which="both", ls=":")
        plt.title("Approximate Solution")
        plt.xlim([0,3.14])
        plt.ylim([0,6.28])

        # Show the plot
        plt.show()
        
#         # Calculate the absolute difference between exact_output and output
#         diff = (exact_output.detach() - output.detach())
        
#         im3 = axs[2].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=diff, cmap="viridis")  # Use a different colormap
# #         axs[2].set_xlim(0,3.14)
# #         axs[2].set_ylim(0,6.28) 
#         axs[2].set_xlabel("x")
#         axs[2].set_ylabel("t")
#         axs[2].set_title("Difference")
#         plt.colorbar(im3, ax=axs[2])
#         axs[2].grid(True, which="both", ls=":")


#         err = (torch.mean((output - exact_output) ** 2) / torch.mean(exact_output ** 2)) ** 0.5 * 100
#         print("L2 Relative Error Norm: ", err.item(), "%")
        


In [None]:
# Solve the heat equation:
# u_t = u_xx, (t,x) in [0, 0.1]x[-1,1]
# with zero dirichlet BC and
# u(x,0)= -sin(pi x)

n_int = 512

n_sb = 32
n_tb = 64
tmax = 2*np.pi
c = 1
try:
    del(pinn1)
except Exception as e:
    print(e)
pinn1 = Pinn_WTLW(n_int, n_sb, n_tb,tmax,c=c,loadPretrained=False,model_path='models/pinn_c_1_wave.pt')

In [None]:
pinn1.train()

In [None]:
# Generate data for approximate and exact solutions
inputs = pinn1.soboleng.draw(120000)
inputs = pinn1.convert(inputs).to(device)
output = pinn1.approximate_solution(inputs).reshape(-1, )
exact_output = pinn1.exact_solution(inputs).reshape(-1, )


# # Create a figure for the left subplot
# fig1, ax1 = plt.subplots(figsize=(3, 6),dpi=600)
# font = {'family': 'Times New Roman', 'style': 'italic', 'weight': 'normal', 'size': 12}
# im1 = ax1.scatter(inputs[:, 1].detach().cpu(), inputs[:, 0].detach().cpu(), c=output.detach().cpu(), cmap="jet", antialiased=True)
# ax1.set_xlabel("x", fontdict=font)
# ax1.set_ylabel("t", fontdict=font)
# ax1.grid(True, which="both", ls=":", alpha=0.6)
# ax1.set_title("PINN Solution")
# ax1.set_xlim([0, 3.14])
# ax1.set_ylim([0, 6.28])
# cbar_ax1 = fig1.add_axes([0.93, 0.15, 0.02, 0.7])
# cbar1 = plt.colorbar(im1, cax=cbar_ax1)
# cbar1.set_label("Approximate Solution", rotation=90, fontsize=12)

# # Create a figure for the right subplot
# fig2, ax2 = plt.subplots(figsize=(3, 6),dpi=600)
# im2 = ax2.scatter(inputs[:, 1].detach().cpu(), inputs[:, 0].detach().cpu(), c=exact_output.detach().cpu(), cmap="jet", antialiased=True)
# ax2.set_xlabel("x", fontdict=font)
# ax2.set_ylabel("t", fontdict=font)
# ax2.grid(True, which="both", ls=":", alpha=0.6)
# ax2.set_title("Exact Solution")
# ax2.set_xlim([0, 3.14])
# ax2.set_ylim([0, 6.28])
# cbar_ax2 = fig2.add_axes([0.93, 0.15, 0.02, 0.7])
# cbar2 = plt.colorbar(im2, cax=cbar_ax2)
# cbar2.set_label("Exact Solution", rotation=90, fontsize=12)

# # Save the figures
# fig1.savefig(f'{pinn1.path}/approximate_solution.png')
# fig2.savefig(f'{pinn1.path}/exact_solution.png')

# # Close the figures
# plt.close(fig1)
# plt.close(fig2)

In [None]:
# Difference between true and analytical solution. 
# Calculate the difference between the approximate and exact solutions
# difference = (((output - exact_output) ** 2) /(exact_output ** 2)) ** 0.5 
difference = (exact_output  - output)

# Create a figure with a single subplot
fig, ax = plt.subplots(1, 1,figsize=(5,5),dpi=600)

# Font settings
font = {'family': 'Times New Roman', 'style': 'italic', 'weight': 'normal', 'size': 12}

# Plot the difference using a colormap
im = ax.scatter(inputs[:, 1].detach().cpu(), inputs[:, 0].detach().cpu(), c=difference.detach().cpu(), cmap="viridis", antialiased=False)
ax.set_xlabel("$x$", fontdict=font)
ax.set_ylabel("$t$", fontdict=font)
ax.grid(True, which="both", ls=":", alpha=0.6)
ax.set_title("Difference between PINN and Exact Solution")
ax.set_xlim([0, 3.14])


ax.set_ylim([0, 6.28])

# Create a colorbar outside of the subplot
cbar_ax = fig.add_axes([0.93, 0.15, 0.02, 0.7])  # [x, y, width, height]
cbar = plt.colorbar(im, cax=cbar_ax)
cbar.set_label("Difference", rotation=90, fontsize=12)
# Set the minimum and maximum values for the colorbar
# im.set_clim(0, 0.02)

# Save the plot as a high-resolution image (e.g., PNG or PDF)
# plt.savefig("difference_plot.png", dpi=300, bbox_inches="tight")
fig.savefig(f'{pinn1.path}/difference_c1_withoutLossWeighting.png')

# Show the plot (optional)
plt.show()

In [None]:
pinn1.path

In [None]:
err = (torch.mean((output - exact_output) ** 2) / torch.mean(exact_output ** 2)) ** 0.5 * 100
print("L2 Relative Error Norm: ", err.item(), "%")

# MISC

In [None]:
from PIL import Image

# Assuming imgs is a list of Image objects
imgs = [Image.open(file) for file in pinn.files]

# Create a 2x3 grid for 6 images
fig, axes = plt.subplots(4, 4, figsize=(12, 8))
fig.suptitle("Solution at every 50 steps", fontsize=14)

# Loop through the images and display them on the grid
for i, ax in enumerate(axes.ravel()):
    if i < len(imgs):
        ax.imshow(imgs[i])
        ax.set_title(f"Step {i * 50}")
        ax.axis("off")

# Adjust spacing and display the plot
plt.tight_layout()
plt.savefig("solution_plot_c_2.png", dpi=300)  # Save the plot as a PNG file
# plt.show()

In [None]:
save_plot_to_folder(plt, 10, "soln_50_steps.png")

## Plot at different timesteps

In [None]:

# Assuming 'pinn' is the instance of your physics-informed neural network (PINN) class
# and 'device' is the device on which you want to perform computations (e.g., 'cuda' or 'cpu').

# Number of time steps
num_time_steps = 6

# Set the font configuration
font = {'family': 'Times New Roman', 'style': 'normal', 'weight': 'normal', 'size': 14}
# plt.rc('font', **font)

# Lists to store handles and labels for legend
legend_handles = []
legend_labels = []

for i in range(num_time_steps):
    # Create a tensor with 1000 rows and 2 columns
    input_shape = (1000, 2)
    time = i * 1.5  # Time step from 0 to 3.14 with a step of 0.5

    # Create the first column with the current time value
    time_data = time * torch.ones(input_shape[0], 1)

    # Create the second column with values from 0 to 3.14
    spatial_data = torch.linspace(0, 3.14, input_shape[0]).unsqueeze(1)

    # Concatenate the two columns to create the input tensor
    input_tensor = torch.cat((time_data, spatial_data), dim=1).to(device)

    # Calculate the output and exact_output for this input tensor
    output = pinn.approximate_solution(input_tensor)
    exact_output = pinn.exact_solution(input_tensor)

    # Set up a new figure and axis for each subplot
    fig, ax = plt.subplots(figsize=(3, 3), dpi=600)

    # Set the y-axis limits
    ax.set_ylim(-1.5, 1.5)
    ax.set_xlim(0, 3.14)

    # Create the plot for 'output' and 'exact_output'
    plot_output, = ax.plot(spatial_data, output.detach().cpu(), label=f'PINN Solution')
    legend_handles.append(plot_output)

    plot_exact_output, = ax.plot(spatial_data, exact_output.detach().cpu(), label=f'Exact Solution')
    legend_handles.append(plot_exact_output)

    # Set the title and labels for x and y axes
    ax.set_title(f"Temporal Loss Weighting",fontsize=12)
    ax.set_xlabel("x")
    ax.set_ylabel("u(x, t)")

    # Add a legend to distinguish the lines
    ax.legend()

    # Save each plot separately with 600 dpi
    plt.savefig(f'timesteps/lossWeighting/output_plot_{i+1}_lossWeighting.png', dpi=600)

# Display the plots
plt.show()

In [None]:
fig, ax = plt.subplots()
input_shape = (1000, 2)
spatial_data = torch.linspace(0, 3.14, input_shape[0]).unsqueeze(1)
time = 3.14
# Create the first column with the current time value
time_data = time * torch.ones(input_shape[0], 1)
# Concatenate the two columns to create the input tensor
input_tensor = torch.cat((time_data, spatial_data), dim=1).to(device)
output = pinn.approximate_solution(input_tensor)
plt.plot(output.detach().cpu())

In [None]:
def anim_1D(save = False, myxlim = (0, 3.14) , myylim = (-2,2)):
    """
    Function allowing to display an annimation based on calculation result with a given time step. This function can be used to save the images sequence in the current directory.
    
    The y parameter is a list containing several functions to display : y = [ [f_1(x)], ... , [f_n(x)] ].
    
    (x:np.ndarray (format 1D), y:np.ndarray (format 2D), pas_de_temps:float , pas_d_images:int, save:bool , myxlim:tuple , myylim:tuple) -> plot (+ .mp4)
    """
    
    input_shape = (1000, 2)
    spatial_data = torch.linspace(0, 3.14, input_shape[0]).unsqueeze(1)
    # Create the first column with the current time value
#     time_data = time * torch.ones(input_shape[0], 1)
    # Concatenate the two columns to create the input tensor
    input_tensor = torch.cat((time_data, spatial_data), dim=1).to(device)
    output = pinn_5.approximate_solution(input_tensor)
    
    
    fig = plt.figure()
    ax = plt.axes(xlim= myxlim , ylim= myylim)
    line, = ax.plot([], [])
    ax.set_title("t = 0 s", fontname = "serif", fontsize = 16)
    ax.set_xlabel("x [m]", fontname = "serif", fontsize = 14)
    ax.set_ylabel("$u$ [m]", fontname = "serif", fontsize = 14)
    def init():
        line.set_data([],[])
        return line,
    
    # animation function.  This is called sequentially
    def animate(i):
        time_data = 0.1*i * torch.ones(input_shape[0], 1)
        # Concatenate the two columns to create the input tensor
        input_tensor = torch.cat((time_data, spatial_data), dim=1).to(device)
        line.set_data(np.linspace(0,3.14,input_shape[0]), pinn.approximate_solution(input_tensor).detach().cpu())
        ax.set_title("$u(x)$ à t = {} s".format(np.round(i*0.1, 4)), fontname = "serif", fontsize = 16)
        return line,
        
    # call the animator.  blit=True means only re-draw the parts that have changed.
    anim = FuncAnimation(fig, animate, init_func=init, frames=72, interval=10, blit=True)

    if save:
        anim.save('animation.gif', writer='pillow')  # Change the filename and writer as needed


    return anim
    

In [None]:
anim_1D(save=True)

In [None]:
# Load fdm 
fdm = np.load('fdm/wave_data.npz')
x,t,U_fdm = fdm['X'],fdm['T'],fdm['U']

# Convert to PyTorch tensor
fdm_result = torch.tensor(U_fdm)

In [None]:
 """Generates exact solution for the heat equation for the given values of x and t."""
# Number of points in each dimension:
x_dim, t_dim = (500, 500)

# Bounds of 'x' and 't':
x_min, t_min = (0, 0)
x_max, t_max = (np.pi, 2*np.pi)
# Create tensors:
t = np.linspace(t_min, t_max, num=t_dim).reshape(t_dim, 1)
x = np.linspace(x_min, x_max, num=x_dim).reshape(x_dim, 1)
usol = np.zeros((x_dim, t_dim)).reshape(x_dim, t_dim)

# Create the input tensor with pairs of (x, t):
X = torch.tensor(np.column_stack((np.repeat(x, t_dim, axis=0), np.tile(t, (x_dim, 1)).reshape(-1, 1))),dtype=torch.float32)

print(X.shape)
solution = pinn.approximate_solution(X.to(device))
# Save solution:
# np.savez("heat_eq_data", x=x, t=t, usol=usol)

In [None]:
# Scatter plot
fig = plt.figure()
ax = fig.add_subplot(111)
sc = ax.scatter(X[:, 0], X[:, 1], c=solution.detach().cpu(), cmap='jet')

ax.set_xlabel('X')
ax.set_ylabel('Time')
# ax.set_zlabel('U')
ax.set_title('Scatter Plot of (X, Time, U)')

# Add colorbar
cbar = fig.colorbar(sc)
cbar.set_label('U')

plt.show()