In [1258]:
reset -f

In [1259]:
import torch
import torch.nn as nn

import numpy as np
import matplotlib.pyplot as plt 

from pyDOE import lhs 

In [1260]:
torch.manual_seed(1234)
np.random.seed(1234)

In [1261]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"Working on {device}")

Working on cpu


In [1262]:
def batch_generator(x, t, y, batch_size, dev = device):   
    #idx = torch.randperm(len(x))
    idx = torch.arange(len(x)) 
    
    # Ensure the batch size is not larger than the available data points
    num_batches = (len(x) + batch_size - 1) // batch_size  # calculate number of batches
    
    for i in range(num_batches): 
        
        batch_idx = idx[i * batch_size : min((i + 1) * batch_size, len(x))]  # Get batch indices
        
        batch_x = x[batch_idx].to(dev)
        batch_t = t[batch_idx].to(dev)
        batch_y = y[batch_idx].to(dev)
        
        # Yield the batch
        yield batch_x, batch_t, batch_y

In [1263]:
#functions for data normalization
def normalize(inputs): # to [-1, 1]
    inputs_min, inputs_max = inputs.min(), inputs.max()
    
    return 2*(inputs - inputs_min) / (inputs_max - inputs_min) - 1

def denormalize(inputs): # from [-1, 1]
    inputs_min, inputs_max = inputs.min(), inputs.max()
    
    return ((inputs + 1)/2)*(inputs_max - inputs_min) + inputs_min


def standardize(inputs):
    
    return  (inputs - inputs.mean())/(inputs.std())

def destandardize(inputs):
    
    return  inputs*inputs.std() + inputs.mean()

In [1264]:
#funtion for data a-dimensionalization

L = 3.85e5 # mean orbital radius of the moon
T = (2.361e6)/(2*np.pi)


In [1265]:
# Data Handling Function
def data_handler(x, t, y, n_ic, n_domain):
    
    idx = torch.randperm(len(x))
    x_shuff, t_shuff = x[idx], t[idx]
    
    # Bounds of space-time domain
    lb = np.array([[t_shuff.min(), x_shuff.min()]])
    ub = np.array([[t_shuff.max(), x_shuff.max()]])

    # Initial Conditions
    idx_x0 = torch.randperm(n_ic) 
    X0 = np.column_stack((x[idx_x0, 0], np.zeros(len(idx_x0))))
    Y0 = exactSolution(X0[:,1], X0[:,0])[:, None]
    
    # Collocation Points with LHS
    X_T_domain = lb + (ub - lb) * lhs(2, n_domain)
    Y_domain = exactSolution(X_T_domain[:,1], X_T_domain[:,0])[:, None]
    
    '''
    # Normalization of domain and initial conditions
    X0[:,0] = standardize(X0[:,0])
    Y0 = standardize(Y0)

    X_T_domain[:,1] = standardize(X_T_domain[:,1])
    X_T_domain[:,0] = standardize(X_T_domain[:,0])
    Y_domain = standardize(Y_domain)
    '''
    
    x0 = torch.tensor(X0[:, 0], requires_grad=True).view(-1,1).float().to(device)
    t0 = torch.tensor(X0[:, 1], requires_grad=True).view(-1,1).float().to(device)
    y0 = torch.tensor(Y0).float().to(device)

    x_domain = torch.tensor(X_T_domain[:, 0], requires_grad=True).view(-1,1).float().to(device)
    t_domain = torch.tensor(X_T_domain[:, 1], requires_grad=True).view(-1,1).float().to(device)
    y_domain = torch.tensor(Y_domain).float().to(device)

    x = torch.tensor(x, requires_grad=True).view(-1,1).float().to(device)
    t = torch.tensor(t, requires_grad=True).view(-1,1).float().to(device)
    y = torch.tensor(y).float().to(device)

    return x0, t0, y0, x_domain, t_domain, y_domain, x, t, y

In [1266]:
class PINN_Advection(nn.Module):
    
    def __init__(self, layers, loss_type, n_batch, *data):
        super(PINN_Advection, self).__init__() 

        (self.t0, self.x0, self.y0, self.t_domain, self.x_domain, self.y_domain, self.t, self.x, self.y) = data
        self.n_batch = n_batch
        self.losstype = loss_type

        self.layers = nn.ModuleList()
        self.activation = nn.Tanh() # Activation function

        for i in range(len(layers) - 1): 
            self.layers.append(nn.Linear(layers[i], layers[i+1]))  # Fully connected linear layer: the operation [out = w*input + b] will be made by hand with factorized weights

        # Random weight factorization memorization
        self.s_list = {}
        self.v_list = {}
        
        for i in range(len(self.layers)): 
            mean = 1.0
            std = 0.1
        
            w = self.layers[i].weight # weights are trained but used only now (and for backward procedure)
            
            # Generate scaling vector s and pivot matrix v for weight factorization
            s = mean + torch.normal(mean, std, size=(w.shape[-1],))
            s = torch.exp(s)
            self.s_list[f"s_{i}"] = nn.Parameter(s, requires_grad=True)
            self.register_parameter(f"s_{i}", self.s_list[f"s_{i}"])  # Register the parameter
            
            v = w / s
            self.v_list[f"v_{i}"] = nn.Parameter(v, requires_grad=True)
            self.register_parameter(f"v_{i}", self.v_list[f"v_{i}"]) # Register the parameter
   
        self.optimizer = None
        self.train_loss_history = []
        
        #For weighted loss procedures
        self.n_losses = 3
        self.loss_weights = torch.ones(self.n_losses) # Initialize losses weights
        self.f = 50
        #self.previous_residual = None
        #self.n_temporal_weights = int(np.sqrt(len(self.t_domain)))
        #self.temporal_weights = torch.ones(self.n_temporal_weights)  # Initialize temporal weights
        self.alpha = 0.9
        self.epsilon = 0.01
        
        self.c = 10 # flow velocity


    def get_factorized_weight(self, i):        
        b = self.layers[i].bias

        s = self.s_list[f"s_{i}"]
        v = self.v_list[f"v_{i}"]
        
        return s * v, b

        
    def forward(self, X): # Forward pass using random decomposed weights
        a = X.float()
        
        for i in range(len(self.layers)):
            
            kernel, b = self.get_factorized_weight(i)
            
            a = torch.matmul(a, kernel.T) + b 
            
            if i < len(self.layers) - 1:
                a = self.activation(a)
                    
        return a    

    
    def network_prediction(self, t, x):
        
        return self.forward(torch.cat((t, x), 1))
        
        
    def advection_prediction(self, t, x): # Compute the differential equation for Advection
        u = self.network_prediction(t, x)
        du_dt = self.get_derivative(u, t, 1)
        du_dx = self.get_derivative(u, x, 1)
        
        f =  du_dt + self.c*du_dx
        
        return f

    
    def get_derivative(self, y, x, n): # General formula to compute the n-th order derivative of y = f(x) with respect to x
        if n == 0:  # (n is the order if the derivative)
            return y
        else:
            dy_dx = torch.autograd.grad(y, x, torch.ones_like(y).to(device), create_graph=True, retain_graph=True, allow_unused=True)[0]
        
        return self.get_derivative(dy_dx, x, n - 1)
        

    def loss_IC(self, x, t, y):
        y_pred_IC = self.network_prediction(t, x)
        y_pred_IC.requires_grad_(True)
        
        if self.losstype == 'mse':
            loss_IC = torch.mean((y - y_pred_IC) ** 2).to(device)
            
        elif self.losstype == 'logcosh':
            loss_IC = torch.mean(torch.log(torch.cosh(y - y_pred_IC))).to(device)
    
        return loss_IC


    def loss_BC_symmetric(self):
        loss_BC_symmetric = 0.0
        y_pred = self.network_prediction(self.t, self.x)
        y_pred.requires_grad_(True) 
        
        if self.losstype == 'mse':
            loss_BC_symmetric += torch.mean((y_pred[:,0] - y_pred[:,-1])**2).to(device)
            loss_BC_symmetric += torch.mean((self.y[:,0] - y_pred[:,0])**2).to(device)
            loss_BC_symmetric += torch.mean((self.y[:,-1] - y_pred[:,-1])**2).to(device)
        
        elif self.losstype == 'logcosh':
            loss_BC_symmetric += torch.mean(torch.log(torch.cosh(y_pred[:,0] - y_pred[:,-1]))).to(device)
            loss_BC_symmetric += torch.mean(torch.log(torch.cosh(self.y[:,0] - y_pred[:,0]))).to(device)
            loss_BC_symmetric += torch.mean(torch.log(torch.cosh(self.y[:,-1] - y_pred[:,-1]))).to(device)
            
        return loss_BC_symmetric

    
    def loss_interior(self, x, t):
        f_pred = self.advection_prediction(x, t)
        f_pred.requires_grad_(True)
        
        if self.losstype == 'mse':
            loss_interior = torch.mean(torch.pow(f_pred,2)).to(device)
            
        elif self.losstype == 'logcosh':
            loss_interior = torch.mean(torch.log(torch.cosh(f_pred))).to(device)
                 
        return loss_interior


    def forward_temp_weights(self, loss_domain):
        loss_domain = torch.tensor(loss_domain)
        
        for i in range(1, self.n_temporal_weights):
            self.temporal_weights[i] = torch.exp(-self.epsilon * torch.sum(loss_domain[0:i-1]))

        return (self.temporal_weights*loss_domain).mean()

    
    def forward_loss_weights(self, losses):       
        losses_tensor = torch.stack(losses)

        parameters = list(self.parameters())
            
        # Create the gradient of each component of the loss respect to the parameters of the model
        grad_norms = []
        for l in losses_tensor: 
                
            if l.requires_grad != True:
                l = l.clone().detach().requires_grad_(True)
 
            grads = torch.autograd.grad(l, parameters, retain_graph=True, allow_unused=True)
            valid_grads = [g.view(-1) for g in grads if g != None]

            if valid_grads:  
                norm = torch.norm(torch.cat(valid_grads))
                    
            else:
                norm = torch.tensor(0.0, device=parameters[0].device)
                
            grad_norms.append(norm)

        grad_norms = torch.stack(grad_norms)
            
        for i in range(self.n_losses):
            lambda_hat = grad_norms.sum() / grad_norms[i]
            self.loss_weights[i] = self.alpha*self.loss_weights[i] + (1 - self.alpha)*lambda_hat
            
    
    def get_training_history(self):
        loss_his = np.array(self.train_loss_history)
        total_loss, loss_IC, loss_domain, loss_BC_symmetric = np.split(loss_his, 4, axis=1) 
        
        return total_loss, loss_IC, loss_domain, loss_BC_symmetric


    def train_network(self, epochs, optim, learning_rate, regularization):
        self.optimizer = torch.optim.Adam(self.parameters(), lr = learning_rate, weight_decay = regularization, amsgrad=True) 
       
        # Training loop
        for epoch in range(epochs):
            loss_IC, loss_domain, loss_BC_symmetric = 0.0, 0.0, 0.0

            self.optimizer.zero_grad()
            
            # Mini-batch loss for Initial Conditions (IC)
            for batch_x0, batch_t0, batch_y0 in batch_generator(self.x0, self.t0, self.y0, self.n_batch):
                loss_IC_batch = self.loss_IC(batch_x0, batch_t0, batch_y0)
                loss_IC += loss_IC_batch
            
            # Mini-batch loss for Domain Loss (interior) 
            #loss_domain_list = []
            for batch_x_domain, batch_t_domain, batch_y_domain in batch_generator(self.x_domain, self.t_domain, self.y_domain, self.n_batch): 
                loss_domain_batch = self.loss_interior(batch_x_domain, batch_t_domain)
                loss_domain += loss_domain_batch
                #loss_domain_list.append(loss_domain_batch)
            
            #loss_domain = self.adaptive_weights.forward_temp_weights(loss_domain_list)
            
            # training for Simmetric boundary (BC_symmetric)
            loss_BC_symmetric += self.loss_BC_symmetric()
            
            # Give a weigth to every singular loss term
            weighted_losses_tensor = self.loss_weights*torch.stack([loss_IC, loss_domain, loss_BC_symmetric])
            
            # Total loss for this epoch after having given the weigths
            total_loss = weighted_losses_tensor.sum()

            # Calculate gradients and retain graph in order to derive also all of the singular losses terms for the correspondant weigth update
            total_loss.backward(retain_graph = True)
            
            if epoch % self.f == 0:
                self.forward_loss_weights([loss_IC, loss_domain, loss_BC_symmetric]) # global weigths update routine using the non-weigthed losses
      
            self.train_loss_history.append([total_loss.cpu().detach(), weighted_losses_tensor[0].cpu().detach(), weighted_losses_tensor[1].cpu().detach(), weighted_losses_tensor[2].cpu().detach()]) #includes backward
                
            # Optimize the network parameters
            self.optimizer.step() 

            # Print out the loss every 100 epochs
            if epoch % 100 == 0:
                print(f'Epoch ({optim}): {epoch}, Total Loss: {total_loss.detach().cpu().numpy()}')

In [1267]:
def exactSolution(t, x):
    c = 10
    
    return np.sin(x - c*t)
    
x_len = 60
t_len = 60

x = np.linspace(0, 2*np.pi, x_len).reshape(-1,1) # Space domain (must be same size as time (for shuffle purposes))
t = np.linspace(0, 1, t_len).reshape(-1,1) # Time domain

X, T = np.meshgrid(x[:, 0], t[:, 0]) 

y_true = exactSolution(T, X) 

In [1268]:
layers = [2, 256, 256, 256, 256, 256, 1]
losstype = 'logcosh'
n_batch = 16
epochs = 2000
L_rate = 0.0005
lambda_reg = 0

x0, t0, y0, x_domain, t_domain, y_domain, x, t, y = data_handler(x, t, y_true, x_len, x_len*t_len)

model = PINN_Advection(layers, losstype, n_batch, t0, x0, y0, t_domain, x_domain, y_domain, t, x, y).to(device)

In [None]:
model.train_network(epochs, 'Adam', L_rate, lambda_reg)

Epoch (Adam): 0, Total Loss: 2.393630266189575
Epoch (Adam): 100, Total Loss: 19.63943862915039
Epoch (Adam): 200, Total Loss: 16.468795776367188
Epoch (Adam): 300, Total Loss: 13.89466667175293
Epoch (Adam): 400, Total Loss: 11.81021499633789
Epoch (Adam): 500, Total Loss: 10.137894630432129
Epoch (Adam): 600, Total Loss: 8.766680717468262
Epoch (Adam): 700, Total Loss: 8.107088088989258
Epoch (Adam): 800, Total Loss: 7.096914291381836
Epoch (Adam): 900, Total Loss: 6.264080047607422
Epoch (Adam): 1000, Total Loss: 5.594166278839111
Epoch (Adam): 1100, Total Loss: 5.051643371582031
Epoch (Adam): 1200, Total Loss: 4.6120452880859375


In [None]:
total_loss, loss_IC, loss_domain, loss_BC_symmetric = model.get_training_history() 

plt.figure(figsize=(10, 6))
plt.plot(total_loss, label='Total Loss')
plt.plot(loss_IC, label='Initial Condition Loss')
plt.plot(loss_domain, label='Domain Loss')
plt.plot(loss_BC_symmetric, label='Symmetric-boundary Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.yscale('log')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# Get model predictions after training
n_valid = int(20)

x_valid = np.linspace(0, 2*np.pi, n_valid).reshape(-1,1) 
t_valid = np.linspace(0, 1, n_valid).reshape(-1,1) 

X_valid, T_valid = np.meshgrid(x_valid[:, 0], t_valid[:, 0]) 

y_true_valid = exactSolution(T_valid, X_valid) 

'''
x_valid = standardize(x_valid)
t_valid = standardize(t_valid)
y_true_valid = standardize(y_true_valid)

X_valid, T_valid = np.meshgrid(x_valid[:, 0], t_valid[:, 0])                                                                                                  
'''

idx = torch.randperm(n_valid)

x_val = x_valid[idx]
t_val = t_valid[idx]

X_val, T_val = np.meshgrid(x_val[:, 0], t_val[:, 0]) 

#y_true_val = exactSolution(T_val, X_val) 

t_pred_shuffle = torch.tensor(T_val.flatten(), requires_grad=True).view(-1, 1).float().to(device)
x_pred_shuffle = torch.tensor(X_val.flatten(), requires_grad=True).view(-1, 1).float().to(device)

t_pred = torch.tensor(T_valid.flatten(), requires_grad=True).view(-1, 1).float().to(device)
x_pred = torch.tensor(X_valid.flatten(), requires_grad=True).view(-1, 1).float().to(device)

# Predict with the trained model
y_pred_shuffle = model.network_prediction(t_pred_shuffle, x_pred_shuffle).float().detach().to(device).numpy()
y_pred = model.network_prediction(t_pred, x_pred).float().detach().to(device).numpy()

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(15, 8))

# Subplot 1: Plot true values (exact solution)
c1 = axes[0].contourf(X_valid, T_valid, y_true_valid, levels=50, cmap='magma', alpha=1)
axes[0].set_xlabel('X')
axes[0].set_ylabel('T')
fig.colorbar(c1, ax=axes[0], label='Exact solution')

# Subplot 2: Plot only predictions (estimated solution)
c2 = axes[1].scatter(X_val, T_val, c=y_pred_shuffle, cmap='magma', s=50, alpha=1)
axes[1].set_xlabel('X')
axes[1].set_ylabel('T')
fig.colorbar(c2, ax=axes[1], label='Estimated solution')

plt.tight_layout()
plt.show()

In [None]:
y_pred = y_pred.reshape(n_valid, n_valid)

plt.figure(figsize=(10, 8))
plt.contourf(X_valid, T_valid, np.abs(y_true_valid - y_pred), levels=100, cmap='viridis')
plt.xlabel(r'$\delta X$')
plt.ylabel(r'$\delta T$')
plt.colorbar(label='Prediction error')
plt.tight_layout()
plt.show()

In [None]:
# Plot Initial conditions and Boundary conditions
plt.figure(figsize=(10, 6))
plt.scatter(x0.detach().numpy(), t0.detach().numpy(), c=y0.flatten(), cmap='jet')
plt.colorbar(label='Conditions value')
plt.xlabel('Position (x)')
plt.ylabel('Time (t)')
plt.grid(True)
plt.tight_layout()
plt.show()