In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torch.nn.utils.parametrizations import weight_norm
import time
import pandas as pd


# Set initial random seed for reproducibility
init_seed = 0
np.random.seed(init_seed)
torch.manual_seed(init_seed)
torch.cuda.manual_seed(init_seed)
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False

def fwd_gradients(Y, x):
    """
    Compute the gradient of Y with respect to x.
    Parameters:
        Y (torch.Tensor): Output tensor.
        x (torch.Tensor): Input tensor, gradients will be computed with respect to this.
    Returns:
        torch.Tensor: Gradient of Y with respect to x.
    """
    dummy = torch.ones_like(Y)
    G = torch.autograd.grad(Y, x, dummy, create_graph=True)[0]
    return G

class Net(torch.nn.Module):
    """
    Define the neural network architecture.
    """
    def __init__(self, layer_dim, X, device):
        """
        Initialize the neural network.
        Parameters:
            layer_dim (list): List containing the number of neurons in each layer.
            X (numpy.ndarray): Input data, used for normalization.
            device (torch.device): Device (CPU or GPU).
        """
        super().__init__()
        # Calculate the mean and standard deviation for input normalization
        self.X_mean = torch.from_numpy(X.mean(0, keepdims=True)).float().to(device)
        self.X_std = torch.from_numpy(X.std(0, keepdims=True)).float().to(device)
        
        self.num_layers = len(layer_dim)
        temp = []
        
        # Create the layers
        for l in range(1, self.num_layers):
            layer = torch.nn.Linear(layer_dim[l-1], layer_dim[l])
            layer = weight_norm(layer, name='weight', dim=0)  # Apply weight normalization
            torch.nn.init.normal_(layer.weight)  # Initialize weights using normal distribution
            temp.append(layer)
        
        self.layers = torch.nn.ModuleList(temp)
        
    def forward(self, x):
        """
        Forward pass through the network.
        Parameters:
            x (torch.Tensor): Input tensor.
        Returns:
            torch.Tensor: Output tensor after passing through the network.
        """
        # Normalize the input using the computed mean and std
        x = (x - self.X_mean) / self.X_std
        for i in range(0, self.num_layers-1):
            x = self.layers[i](x)
            if i < self.num_layers-2:
                x = torch.nn.functional.silu(x)  # Apply activation function (silu)
        return x

class TSONN:
    """
    Physics-Informed Neural Network (PINN) for solving transient temperature field problems.
    """
    def __init__(self, layers, device):
        """
        Initialize the TSONN model.
        Parameters:
            layers (list): List containing the number of neurons in each layer.
            device (torch.device): Device (CPU or GPU).
        """
        self.Nx = 101  # Number of spatial grid points
        self.Nt = 101  # Number of temporal grid points
        self.layers = layers
        self.device = device
        
        # Define the grid for spatial and temporal variables
        t = torch.linspace(0.0, 5.0, self.Nt)
        x = torch.linspace(0.0, 10.0, self.Nx)
        xx, tt = torch.meshgrid(x, t, indexing='ij')
        
        # Create the reference grid (X_ref) for the model input
        self.X_ref = torch.cat([xx.reshape(-1,1), tt.reshape(-1,1)], dim=1).to(self.device).requires_grad_(True)
        
        # Boundary and initial condition points
        self.X_ic = torch.cat([xx[:, [0]], tt[:, [0]]], dim=1).to(self.device)
        self.u_ic = torch.full_like(xx[:, [0]], -10, dtype=torch.float32, device=self.device)
        self.X_lbc = torch.cat([xx[[0]], tt[[0]]], dim=0).T.to(self.device)
        self.X_lbc.requires_grad = True
        self.X_ubc = torch.cat([xx[[-1]], tt[[-1]]], dim=0).T.to(self.device)
        self.X_ubc.requires_grad = True
        
        # Log dictionary to keep track of losses and metrics
        self.log = {'losses':[], 'losses_b':[], 'losses_i':[], 'losses_f':[], 'losses_s':[], 'mse_exact':[], 'time':[]}
        self.min_loss = 1
        
        # Initialize the neural network model
        self.model = Net(layers, self.X_ref.cpu().detach().numpy(), self.device).to(self.device)

    def exact_solution(self, z, t, alpha=0.016, beta_deg=33, L=10, psi_d=-10, c=0.104):
        """
        Exact solution of the transient infiltration problem.
        Parameters:
            z (torch.Tensor): Spatial grid points.
            t (torch.Tensor): Temporal grid points.
            alpha, beta_deg, L, psi_d, c: Parameters for the exact solution.
        Returns:
            torch.Tensor: Exact solution at the given (z, t) points.
        """
        beta = torch.tensor(beta_deg * torch.pi / 180.0, dtype=torch.float32, device=self.device)
        z = z.to(self.device, dtype=torch.float32)
        t = t.to(self.device, dtype=torch.float32)
        alpha = torch.tensor(alpha, dtype=torch.float32, device=self.device)
        psi_d = torch.tensor(psi_d, dtype=torch.float32, device=self.device)
        L = torch.tensor(L, dtype=torch.float32, device=self.device)
        c = torch.tensor(c, dtype=torch.float32, device=self.device)
        
        # Compute part of the exact solution
        part_1 = (1 - torch.exp(alpha * psi_d)) * (1 - torch.exp(-alpha * torch.cos(beta) * z)) / (1 - torch.exp(-alpha * torch.cos(beta) * L))
        sum_terms = 0
        for k in range(1, 9999):
            lambda_k = k * torch.pi / L
            mu_k = (alpha**2 / 4 + lambda_k**2) / c
            sum_terms += ((-1) ** k) * (lambda_k / mu_k) * torch.sin(lambda_k * z) * torch.exp(-mu_k * t)
        
        part_2 = (2 * (1 - torch.exp(alpha * psi_d)) / (L * c)) * torch.exp(alpha * torch.cos(beta) * (L - z) / 2) * sum_terms
        u_true = 1/alpha * torch.log(part_1 + part_2 + torch.exp(alpha * psi_d))
        
        return u_true

    def Msei(self):
        """
        Initial condition loss (mean squared error between predicted and initial condition values).
        """
        u = self.model(self.X_ic)
        msei = F.mse_loss(u, self.u_ic)
        return msei
    
    def Mseb(self):
        """
        Boundary condition loss (mean squared error between predicted and boundary condition values).
        """
        u_lbc = self.model(self.X_lbc)
        u_ubc = self.model(self.X_ubc)
        mseb = F.mse_loss(u_lbc, torch.full_like(u_lbc, -10, device=self.device)) + \
               F.mse_loss(u_ubc, torch.full_like(u_ubc, 0, device=self.device))
        return mseb
    
    def TimeStepping(self):
        """
        Perform one time step, store the current solution.
        """
        u = self.model(self.X_ref)
        self.U0 = u.detach()
    
    def Msef(self):
        """
        PDE residual loss (based on the differential equation).
        """
        beta = torch.tensor(33 * torch.pi / 180.0, dtype=torch.float32, device=self.device)
        u = self.model(self.X_ref)
        u_xt = fwd_gradients(u, self.X_ref)
        u_x = u_xt[:,0:1]
        u_t = u_xt[:,1:2]
        u_xx = fwd_gradients(u_x, self.X_ref)[:,0:1]
        
        # Compute the PDE residual
        f = u_t * 0.016 * 0.39 / 0.06 - u_xx - 0.016 * torch.cos(beta) * u_x - 0.016 * u_x ** 2
        dt = 0.5
        msef = 1/dt**2 * ((u - self.U0 + dt * f)**2).mean()
        return msef
    
    def Mses(self):
        """
        Relative L2 error between predicted solution and exact solution.
        """
        z = self.X_ref[:, 0:1]
        t = self.X_ref[:, 1:2]
        u_true = self.exact_solution(z, t)
        u_pred = self.model(self.X_ref)
        mses = torch.norm(u_pred - u_true) / torch.norm(u_true)
        return mses

    def Loss(self):
        """
        Total loss (sum of initial, boundary, PDE, and relative error losses).
        """
        msei = self.Msei()
        mseb = self.Mseb()
        msef = self.Msef()
        loss = msei + mseb + msef
        return loss, msei, mseb, msef

    def ResidualPoint(self):
        """
        Generate random points for the residual calculation.
        """
        t = torch.rand((10000, 1), device=self.device) * 5
        z = torch.rand((10000, 1), device=self.device) * 10
        self.X = torch.cat([z, t], dim=1).requires_grad_(True)

    def train(self, epoch):
        """
        Train the model for a given number of epochs.
        """
        if len(self.log['time']) == 0:
            t_start = time.time()
        else:
            t_start = time.time() - self.log['time'][-1]
        
        best_mse = float('inf')
        
        for i in range(epoch):
            def closure():
                self.optimizer.zero_grad()
                self.loss, self.loss_i, self.loss_b, self.loss_f = self.Loss()
                self.loss.backward()
                return self.loss
            
            self.optimizer = torch.optim.LBFGS(self.model.parameters(), max_iter=100)
            self.ResidualPoint()
            self.TimeStepping()
            self.optimizer.step(closure)
            self.loss_s = self.Mses()
            
            z = self.X_ref[:, 0:1]
            t = self.X_ref[:, 1:2]
            u_true = self.exact_solution(z, t)
            u_pred = self.model(self.X_ref)
            mse_exact = F.mse_loss(u_pred, u_true)
            self.log['mse_exact'].append(mse_exact.item())
            
            t_end = time.time()
            elapsed = t_end - t_start
            self.log['time'].append(elapsed)
            t_start = time.time()
            
            self.log['losses'].append(self.loss.item())
            self.log['losses_f'].append(self.loss_f.item())
            self.log['losses_b'].append(self.loss_b.item())
            self.log['losses_i'].append(self.loss_i.item())
            self.log['losses_s'].append(self.loss_s.item())
            
            if mse_exact.item() < best_mse:
                best_mse = mse_exact.item()
                torch.save(self.model.state_dict(), 'best_model.pth')
            
            if (self.loss != self.loss) or ((i > 1) and (self.loss.item() > 3 * self.log['losses'][-2])):
                if i == 0:
                    self.model = Net(self.layers, self.X_ref.cpu().detach().numpy(), self.device).to(self.device)
                    continue
                else:
                    self.model.load_state_dict(torch.load('best_model.pth'))
                    print('Loading best model')
                    self.ResidualPoint()
                    self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
                    self.scheduler = lr_scheduler.StepLR(self.optimizer, step_size=100, gamma=0.9)
                    continue
                
            self.TimeStepping()
            
            if (i+1) % 1 == 0 or (i+1) == epoch:
                print(f'Epoch {i+1}/{epoch} | Loss: {self.loss.item():.4e} | PDE Loss: {self.loss_f.item():.4e} | Relative Error: {self.loss_s.item():.4e} | Exact Solution MSE: {mse_exact.item():.4e}')

if __name__ == '__main__':
    t1 = time.time()
    torch.set_num_threads(1)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    layers = [2, 128, 128, 128, 128, 1]
    nn = TSONN(layers, device)
    nn.train(100)
    t2 = time.time()
    print(f'Total training time: {t2 - t1:.2f} seconds')

    # Plot the loss and error evolution during training
    plt.figure(figsize=(10,6))
    plt.plot(nn.log['time'], np.log10(nn.log['losses']), label='Loss')
    plt.plot(nn.log['time'], np.log10(nn.log['losses_s']), label='Relative L2 Error')
    plt.plot(nn.log['time'], np.log10(nn.log['losses_f']), label='PDE Loss')
    plt.plot(nn.log['time'], np.log10(nn.log['losses_b']), label='Boundary Loss')
    plt.plot(nn.log['time'], np.log10(nn.log['losses_i']), label='Initial Loss')
    plt.plot(nn.log['time'], np.log10(nn.log['mse_exact']), label='Exact Solution MSE')
    plt.xlabel('Time (seconds)')
    plt.ylabel('Log10(Loss)')
    plt.legend()
    plt.title('Loss and Error During Training')
    plt.savefig('loss_and_error.png')
    plt.close()

    # Compare the predicted and exact solutions
    XX = nn.X_ref[:, 0].cpu().detach().numpy().reshape(nn.Nx, nn.Nt)
    TT = nn.X_ref[:, 1].cpu().detach().numpy().reshape(nn.Nx, nn.Nt)
    z = nn.X_ref[:, 0:1]
    t = nn.X_ref[:, 1:2]
    u_pred = nn.model(nn.X_ref).cpu().detach().numpy().reshape(nn.Nx, nn.Nt)
    u_true = nn.exact_solution(z, t).cpu().detach().numpy().reshape(nn.Nx, nn.Nt)
    error = np.abs(u_true - u_pred)

    # Plot heatmaps for reference solution, predicted solution, and absolute error
    fig, axs = plt.subplots(1, 3, figsize=(18, 5))
    c1 = axs[0].pcolor(TT, XX, u_true, cmap='jet')
    fig.colorbar(c1, ax=axs[0])
    axs[0].set_xlabel('$t$')
    axs[0].set_ylabel('$x$')
    axs[0].set_title('Reference solution')
    c2 = axs[1].pcolor(TT, XX, u_pred, cmap='jet')
    fig.colorbar(c2, ax=axs[1])
    axs[1].set_xlabel('$t$')
    axs[1].set_ylabel('$x$')
    axs[1].set_title('iTSONN solution')
    c3 = axs[2].pcolor(TT, XX, error, cmap='jet')
    fig.colorbar(c3, ax=axs[2])
    axs[2].set_xlabel('$t$')
    axs[2].set_ylabel('$x$')
    axs[2].set_title('Absolute error')
    plt.tight_layout()
    plt.savefig('solution_comparison.png')
    plt.close()

    # Plot solution comparisons at specific time points
    fig, axs = plt.subplots(1, 3, figsize=(18, 5))
    time_indices = [0, 50, 100]
    for idx, t_idx in enumerate(time_indices):
        axs[idx].plot(XX[:,0], u_true[:,t_idx], label='Reference solution', color='blue')
        axs[idx].plot(XX[:,0], u_pred[:,t_idx], '--', label='iTSONN solution', color='red')
        axs[idx].set_xlabel('$x$')
        axs[idx].set_ylabel('$u$')
        axs[idx].set_title(f'$t = {TT[0, t_idx]:.2f}$')
        axs[idx].legend()
    plt.tight_layout()
    plt.savefig('solution_at_specific_times.png')
    plt.close()

    # Save the true and predicted solutions to Excel
    u_true_flat = u_true.flatten()
    u_pred_flat = u_pred.flatten()
    x_flat = XX.flatten()
    t_flat = TT.flatten()
    data_all = {
        'x': x_flat,
        't': t_flat,
        'True_Solution': u_true_flat,
        'Predicted_Solution': u_pred_flat,
        'Absolute_Error': np.abs(u_true_flat - u_pred_flat)
    }
    df_all = pd.DataFrame(data_all)
    df_all.to_excel('true_vs_predicted_solution.xlsx', index=False)
    print("Saved true and predicted solutions to 'true_vs_predicted_solution.xlsx'")

    # Save solutions at specific time points
    for t_idx in time_indices:
        t_value = TT[0, t_idx]
        data_time = {
            'x': XX[:, 0],
            'True_Solution': u_true[:, t_idx],
            'Predicted_Solution': u_pred[:, t_idx],
            'Absolute_Error': np.abs(u_true[:, t_idx] - u_pred[:, t_idx])
        }
        df_time = pd.DataFrame(data_time)
        filename = f'solution_at_t_{t_value:.2f}.xlsx'
        df_time.to_excel(filename, index=False)
        print(f"Saved solutions at t={t_value:.2f} to '{filename}'")


  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


Epoch 1/100 | Loss: 2.9156e+00 | PDE Loss: 1.3194e+00 | Relative Error: 8.5111e-01 | Exact Solution MSE: 3.0296e+01
Epoch 2/100 | Loss: 1.6870e+00 | PDE Loss: 5.5506e-01 | Relative Error: 7.8030e-01 | Exact Solution MSE: 2.5465e+01
Epoch 3/100 | Loss: 1.2037e+00 | PDE Loss: 2.8559e-01 | Relative Error: 7.2681e-01 | Exact Solution MSE: 2.2093e+01
Epoch 4/100 | Loss: 1.0197e+00 | PDE Loss: 1.7337e-01 | Relative Error: 6.8117e-01 | Exact Solution MSE: 1.9406e+01
Epoch 5/100 | Loss: 9.5446e-01 | PDE Loss: 1.1988e-01 | Relative Error: 6.4049e-01 | Exact Solution MSE: 1.7157e+01
Epoch 6/100 | Loss: 9.0666e-01 | PDE Loss: 1.0978e-01 | Relative Error: 6.0352e-01 | Exact Solution MSE: 1.5233e+01
Epoch 7/100 | Loss: 8.7419e-01 | PDE Loss: 7.5702e-02 | Relative Error: 5.7030e-01 | Exact Solution MSE: 1.3602e+01
Epoch 8/100 | Loss: 8.5594e-01 | PDE Loss: 8.8218e-02 | Relative Error: 5.3861e-01 | Exact Solution MSE: 1.2133e+01
Epoch 9/100 | Loss: 8.1655e-01 | PDE Loss: 6.9106e-02 | Relative Error: 