In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from pprint import pprint
import h5py
import time
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torch.nn.init as init

<h3>Geometry</h3>

In [None]:
# DEVICE
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# PINN DEFAULT SETTING :
PDE_batch_size  = 4096
IC_batch_size   = 4096
BC_batch_size   = 4096

# GEOMETRY

# ANALOGO TEMPO TAU DEL METODO PSEUDOSPETTRALE (DOVE HO USATO LE STESSE LUNGHEZZE DEL CONDENSATORE)
epsilon = 5.7 * 8.85 * 1e-9   # STESSO epsilon usato nel metodo pseudospettrale
lato, L, d, V0 = 1e3, 1e2, 20, 1
tau = 2.144                   # STESSO tau usato nel metodo pseudospettrale
tau_pde = tau
_sigma = epsilon / tau

# la pde sotto è SOLO in z e t
x_min = -lato/2
x_max = +lato/2
y_min = -lato/2
y_max = +lato/2
z_min = -L/2
z_max = +L/2
factor_x, factor_y, factor_z = x_max-x_min, y_max-y_min, z_max-z_min

t_min = 0
t_max = 5 * tau              # almost complete charge of capacitor
factor_t = (t_max-t_min)
c_time_err = 1e-2
logt_min = np.log( t_min + c_time_err )
logt_max = np.log( t_max + c_time_err )
factor_logt = logt_max-logt_min


print(f"logt_min, logt_max : {logt_min}, {logt_max}")
print(f"tau : {tau} - epsilon : {epsilon} - sigma : {_sigma}")


V0 = 1
E0 = V0 / ( z_max - z_min )


flag_norm_log = True

if flag_norm_log == True:    # with log-transform
    # NORMALIZATION of TIME
    def Norm_time(time_coords):
        if isinstance(time_coords, torch.Tensor):
            return ( torch.log( time_coords + c_time_err ) ) / factor_logt
        else:
            return ( np.log( time_coords + c_time_err ) ) / factor_logt
    # INVERSE OF NORMALIZATION of TIME
    def Inv_Norm_time(norm_time_coords):
        if isinstance(norm_time_coords, torch.Tensor):
            return ( torch.exp( norm_time_coords * factor_logt ) - c_time_err )
        else:
            return ( np.exp( norm_time_coords * factor_logt  ) - c_time_err )

else:     # with linear-transform
    def Norm_time(time_coords):
        if isinstance(time_coords, torch.Tensor):
            return (time_coords) / (factor_t)
        else:
            return (time_coords) / (factor_t)
    
    def Inv_Norm_time(norm_time_coords):
        if isinstance(norm_time_coords, torch.Tensor):
            return norm_time_coords * (factor_t)
        else:
            return norm_time_coords * (factor_t)

# NORMALIZATION OF SPACE
def Norm_z(z_coord):
    return ( z_coord - z_min ) / factor_z
        
def Inv_Norm_z(norm_z_coord):
    return norm_z_coord * factor_z + z_min


def Norm_x(x_coord):
    return ( x_coord ) / factor_x
def Inv_Norm_x(norm_x_coord):
    return norm_x_coord * factor_x
def Norm_y(y_coord):
    return ( y_coord ) / factor_y
def Inv_Norm_y(norm_y_coord):
    return norm_y_coord * factor_y


norm_t_min, norm_x_min, norm_y_min, norm_z_min = Norm_time(t_min), Norm_x(x_min), Norm_y(y_min), Norm_z(z_min)
norm_t_max, norm_x_max, norm_y_max, norm_z_max = Norm_time(t_max), Norm_x(x_max), Norm_y(y_max), Norm_z(z_max)
print("normalized coordinates:")
print(f"REAL =>  time : [{t_min},{t_max}], space : [{x_min},{x_max}]x[{y_min},{y_max}]x[{z_min},{z_max}]")
print(f"NORM =>  time : [{norm_t_min},{norm_t_max}], space : [{norm_x_min},{norm_x_max}]x[{norm_y_min},{norm_y_max}]x[{norm_z_min},{norm_z_max}]")

In [None]:
# verifica che le conversioni diano l'identità sia rispetto al tempo che spazio
print("z")
VVV = torch.rand((1,6), device=device)
print(VVV - Norm_z(Inv_Norm_z(VVV)))
print(VVV - Inv_Norm_z(Norm_z(VVV)))
print(VVV)
print(Norm_z(Inv_Norm_z(VVV)))
print(Inv_Norm_z(Norm_z(VVV)))
print("t")
VVV = torch.rand((1,6), device=device)
print(VVV - Norm_time(Inv_Norm_time(VVV)))
print(VVV - Inv_Norm_time(Norm_time(VVV)))
print(VVV)
print(Norm_time(Inv_Norm_time(VVV)))
print(Inv_Norm_time(Norm_time(VVV)))

<h1>Sigma ML</h1>

In [None]:
flag_train_sigma = True    # se == True, allora allena la neural network della sigma; 
                           # se == False, la prende dal file "sigma_model_plane1D.pth"

In [None]:
def get_sigmoid(value):
    # SMOOTH :
    #mu = factor_z/2 * 2.5
    #sigmoid = 1 / ( 1 + torch.exp( 14 - mu * value ))
    # NON SMOOTH :
    mu = factor_z/2 * 3.4
    sigmoid = 1 / ( 1 + torch.exp( 17 - mu * value ))
    return sigmoid

def generate_sigma_coords(n_points=PDE_batch_size):
    t_coords = norm_t_min + (norm_t_max - norm_t_min) * torch.rand(n_points, 1, device=device)
    z_coords = norm_z_min + (norm_z_max - norm_z_min) * torch.rand(n_points, 1, device=device)
    points = torch.cat( [t_coords, z_coords], dim=-1 )
    mask = (points[:,1] > Norm_z( z_max-L/2 )) | (points[:,1] < Norm_z( z_min+L/2 ))
    sigma_coords = torch.zeros_like(points[:,0].unsqueeze(-1), device=device)
    sigma_coords[mask] = torch.ones_like(sigma_coords[mask], device=device)
    sigma_coords = sigma_coords * get_sigmoid( torch.abs(points[:,1].unsqueeze(-1) - Norm_z( z_max-L/2 )) ) \
                                * get_sigmoid( torch.abs(points[:,1].unsqueeze(-1) - Norm_z( z_min+L/2 )) )
    return points, sigma_coords


if flag_train_sigma == True:
    # PLOTs
    points, sigma_coords = generate_sigma_coords(10000)
    # plane XZ
    z_coords = Inv_Norm_z( points[:,1] )
    z_coords = z_coords.detach().cpu().numpy()
    sigma_coords = sigma_coords.detach().cpu().numpy()    
    plt.scatter( z_coords, sigma_coords, c="green", s=1 )
    plt.axvline(x=-10, color="red", linestyle="--", linewidth=1.5)
    plt.axvline(x=10,  color="red", linestyle="--", linewidth=1.5)
    plt.show()

In [None]:
###### SIGMA DNN model
# è una neural network normale con ML classico (prende i dati da generate_sigma_coords(...) sopra)

class DNN_model(nn.Module):
    
    def __init__(self, input_dim, n_nodes, n_layers, n_batches, dropout):
        super().__init__()  
        # DNN part
        self.n_batches = n_batches
        self.dropout = dropout
        layers = [nn.Linear(input_dim, n_nodes), nn.Tanh(), nn.Dropout(self.dropout)]
        for _ in range(n_layers - 1):
            layers.append(nn.Linear(n_nodes, n_nodes))
            layers.append(nn.Tanh())
            layers.append(nn.Dropout(self.dropout))
        layers.append(nn.Linear(n_nodes, 1))
        layers.append(nn.Tanh())
        self.network = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.network(x)
  
    def train_model(self, optimizer, loss_function, epochs=500, batch_size=4096, validation_split=0.1):
        print("Training in progress...")
        start_time = time.time()
        n_train_batches = int( (1 - validation_split) * self.n_batches)
        n_val_batches   = self.n_batches - n_train_batches
        print(f"  number of epochs             : {epochs}")
        print(f"  number of train batches      : {n_train_batches}")
        print(f"  number of validation batches : {n_val_batches}")
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=20)
        history = []
        
        for epoch in range(epochs):
            start = time.time()
            
            # === TRAINING STEP ===
            self.train()
            start_data_tr = time.time()
            data_total_loss = 0            
            for indx_batch in range(n_train_batches):
                optimizer.zero_grad()
                data_coords, target = generate_sigma_coords(batch_size)
                data_coords.requires_grad=True
                predictions = self.forward(data_coords)
                loss = loss_function( predictions, target )
                loss.backward()
                optimizer.step()
                data_total_loss += loss.item()
            data_total_loss /= n_train_batches
            end_data_tr = time.time()

            # === VALIDATION STEP ===
            self.eval()
            start_val = time.time()
            data_val_loss = 0
            with torch.no_grad():
                for indx_batch in range(n_val_batches):
                    optimizer.zero_grad()
                    data_coords, target = generate_sigma_coords(batch_size)
                    data_coords.requires_grad=True
                    predictions = self.forward(data_coords)
                    loss = loss_function( predictions, target )
                    data_val_loss += loss.item()
                data_val_loss /= n_val_batches
            end_val = time.time()
            scheduler.step(data_val_loss)

            end = time.time()
            print(f"Epoch {epoch+1}/{epochs} - data loss: {data_total_loss} - val Loss: {data_val_loss}")
            print(f"    time : {end - start} s  -  data: {end_data_tr - start_data_tr} s - val: {end_val - start_val} s")
            print(f" ")
            history.append([ data_total_loss, data_val_loss ])
            if data_total_loss < 1e-7:
                # save model
                torch.save(self.state_dict(), "sigma_model_plane1D.pth")
                print(f"Done!  (time : {time.time() - start_time})")
                return history

        # save model
        torch.save(self.state_dict(), "sigma_model_plane1D.pth")
        print(f"Done!  (time : {time.time() - start_time})")
        return history


In [None]:
# construction and training of DNN SIGMA
sigma_model = 0
input_dim = 2     # (t, z)
n_nodes = 64
n_layers = 5
n_batches = 96
dropout = 0
sigma_model = DNN_model( input_dim, n_nodes, n_layers, n_batches, dropout ).to(device)

if flag_train_sigma == True:
    optimizer = torch.optim.Adam(sigma_model.parameters(), lr=0.001, weight_decay=1e-8)
    loss_function = nn.MSELoss()
    history_sigma = sigma_model.train_model(optimizer, loss_function, epochs=400)
    print(sigma_model)
    
    # history of training
    plt.plot([pair[0] for pair in history_sigma], label="Training")
    plt.plot([pair[1] for pair in history_sigma], label="Validation")
    plt.legend(title="Error")
    plt.xlabel("Epoch")
    plt.ylabel("MSE")
    plt.grid(alpha=0.2)
    plt.yscale("log")
    plt.show()

In [None]:
if flag_train_sigma == False:  # se non ho allenato la sigma, allora prende i pesi che avevo salvato dal file
    sigma_model.load_state_dict(torch.load("sigma_model_plane1D.pth"))

# PLOTS
sampled_times = np.linspace(Norm_time(t_min), Norm_time(t_max), 6)
n_samples = 100000
plt.figure(figsize=(16,10))
sigma_model.eval()

for i_plot, n_t in enumerate(sampled_times, 1):
    real_t = Inv_Norm_time(n_t)
    X_pred, sigma_true = generate_sigma_coords(n_samples)
    X_pred[:,0] = n_t * torch.ones_like( X_pred[:,0], device=device )
    X = X_pred.detach().cpu().numpy()
    # predicted result
    with torch.no_grad():
        sigma_pred = sigma_model.forward(X_pred)
    sigma_pred = sigma_pred.detach().cpu().numpy()
    # true result
    sigma_true = sigma_true.detach().cpu().numpy()
    
    plt.subplot(3, len(sampled_times), i_plot)
    plt.title(f"Pred {real_t:3f} ns")
    im0=plt.scatter(Inv_Norm_z(X[:, 1]), sigma_pred, s=2)
    plt.colorbar(im0)
    
    plt.subplot(3, len(sampled_times), len(sampled_times) + i_plot)
    plt.title(f"True {real_t:3f} ns")
    im1=plt.scatter(Inv_Norm_z(X[:, 1]), sigma_true, s=2)
    plt.colorbar(im1)

    plt.subplot(3, len(sampled_times), 2*len(sampled_times) + i_plot)
    plt.title(f"Error {real_t:3f} ns")
    im2=plt.scatter(Inv_Norm_z(X[:, 1]), np.abs(sigma_true-sigma_pred), s=2)

plt.tight_layout()
plt.show()


sigma_model.eval()
n_points = 10000
t_coords = norm_t_min * torch.ones(n_points, 1, device=device)
z_coords = norm_z_min + (norm_z_max - norm_z_min) * torch.rand(n_points, 1, device=device)
coords_sample = torch.cat( [t_coords, z_coords], dim=-1 )
sigma_sample  = sigma_model( coords_sample )
z_coords = z_coords.detach().cpu().numpy()
sigma_sample = sigma_sample.detach().cpu().numpy()
plt.scatter( Inv_Norm_z(z_coords), sigma_sample, c="green", s=1 )


<h1>PINN</h1>

<h3>Initial conditions</h3>

In [None]:
sigma_model.eval()

def gen_IC_points(model=None, batch_size=IC_batch_size, oversample_factor=4):
    # Oversample
    M = batch_size * oversample_factor
    t_coords = norm_t_min * torch.ones(M, 1, device=device)  # initial time
    z_coords = norm_z_min + (norm_z_max - norm_z_min) * torch.rand(M, 1, device=device)
    coords = torch.cat([t_coords, z_coords], dim=-1)
    z_factor = (z_coords - norm_z_min) / (norm_z_max - norm_z_min)
    V_target = V0 * z_factor
    # Importance sampling if model is not None
    if model is not None:
        coords.requires_grad=True
        V_pred = model(coords)
        grads = torch.autograd.grad(
            V_pred, coords,
            grad_outputs=torch.ones_like(V_pred),
            create_graph=False, retain_graph=False
        )[0]
        E_pred = ( (grads[:,1].unsqueeze(-1)/factor_x)**2 ) ** (1/2)
        weights = torch.abs(V_pred - V_target) + 10*torch.abs(E_pred - E0*torch.ones_like(E_pred)) + 1
        weights = torch.clamp(weights.squeeze(-1), min=1e-12)
        probs = weights / torch.sum(weights)
        # Resampling
        idx = torch.multinomial(probs, batch_size, replacement=False)
        coords_batch = coords[idx]
        values_batch = V_target[idx]
    else:
        # uniform sampling if model is None
        coords_batch = coords[:batch_size]
        values_batch = V_target[:batch_size]
    #
    return coords_batch.detach(), values_batch.detach()
    

def compute_IC_loss(model, n_points=IC_batch_size, loss_function=nn.MSELoss()):
    coords, values = gen_IC_points(model=model, batch_size=n_points)
    coords.requires_grad = True
    values_pred = model.forward(coords)
    loss = loss_function( values_pred, values )
    return loss


# PLOT IC
points, IC_values = gen_IC_points(batch_size=10000)
print(f"initial time : {Inv_Norm_time(np.unique(points[:len(points)//2,0].detach().cpu().numpy()))}")
# plane XZ
z_coords = Inv_Norm_z( points[:,1] )
z_coords = z_coords.detach().cpu().numpy()
IC_values = IC_values.detach().cpu().numpy()
fig = plt.scatter(z_coords, IC_values, s=1, c="blue")
plt.title("Initial condition")
plt.xlabel("z")
plt.ylabel("V")
plt.show()

<h3>Boundary condition</h3>

In [None]:
def gen_BC_points(n_points=BC_batch_size):
    # minimum z-coord
    t_coords = norm_t_min + (norm_t_max - norm_t_min) * torch.rand(n_points, 1, device=device)
    z_coords = norm_z_min * torch.ones(n_points, 1, device=device)       
    points2 = torch.cat( [t_coords, z_coords], dim=-1 )
    values2 = 0 * torch.ones_like( t_coords, device=device )
    # maximum z-coord
    t_coords = norm_t_min + (norm_t_max - norm_t_min) * torch.rand(n_points, 1, device=device)
    z_coords = norm_z_max * torch.ones(n_points, 1, device=device)       
    points3 = torch.cat( [t_coords, z_coords], dim=-1 )
    values3 = V0 * torch.ones_like( t_coords, device=device )
    return torch.cat( [points2, points3], dim=0 ), torch.cat( [values2, values3], dim=0 )

def compute_BC_loss(model, n_points=BC_batch_size, loss_function=nn.MSELoss()):
    coords, values = gen_BC_points(n_points)
    coords.requires_grad = True
    values_pred = model.forward(coords)
    loss = loss_function( values_pred, values )
    return loss

# PLOT BC
points, BC_values = gen_BC_points(10000)
z_coords = Inv_Norm_z( points[:,1] )
z_coords = z_coords.detach().cpu().numpy()
BC_values = BC_values.detach().cpu().numpy()
fig = plt.scatter(z_coords, BC_values, s=3, c="blue")
plt.title("Boundary condition")
plt.xlabel("z")
plt.ylabel("V")
plt.colorbar(fig)
plt.show()

<h3>PDE</h3>

In [None]:
eps1 = epsilon * 1e8
sigma1 = _sigma * 1e8

class QSM_PDE:
    
    def gen_PDE_points(self, model=None, batch_size=PDE_batch_size, oversample_factor=5):
        if model is not None:
            # Oversample
            M = batch_size * oversample_factor
            t_coords = norm_t_min + (norm_t_max - norm_t_min) * torch.rand(M, 1, device=device)
            z_coords = norm_z_min + (norm_z_max - norm_z_min) * torch.rand(M, 1, device=device)
            coords = torch.cat([t_coords, z_coords], dim=-1)
            coords.requires_grad=True
            V_pred = model(coords)
            grads = torch.autograd.grad(
                V_pred, coords,
                grad_outputs=torch.ones_like(V_pred),
                create_graph=False, retain_graph=False
            )[0]
            dV_dt = grads[:, 0:1]
            dV_dx = grads[:, 1:]
            grad_norm = torch.norm(dV_dx, dim=1, keepdim=True)
            # Importance sampling
            weights = ( grad_norm**2 + 5 * dV_dt**2 ) ** 0.5 + torch.abs( V_pred ) + 10
            weights = weights.squeeze(-1)
            weights = torch.clamp(weights, min=1e-12)  # evitare zero
            probs = weights / torch.sum(weights)
            # Resampling
            idx = torch.multinomial(probs, batch_size, replacement=False)
            coords_batch = coords[idx]
            return coords_batch.detach()
        else:
            t_coords = norm_t_min + (norm_t_max - norm_t_min) * torch.rand(n_points, 1, device=device)
            z_coords = norm_z_min + (norm_z_max - norm_z_min) * torch.rand(n_points, 1, device=device)
            coords = torch.cat([t_coords, z_coords], dim=-1)
            return coords
        
    
    def compute_PDE(self, coords, pred_func, flag=False, flag_norm_log=flag_norm_log):
        if flag_norm_log == True:
            factorrr = torch.exp(coords[:,0].unsqueeze(-1) * factor_logt) * factor_logt
        else:
            factorrr = factor_t
        sigma_model.eval()
        sigma_comp = sigma_model.forward(coords)
        u = pred_func[:,0]
        u_z  = self.get_derivative(u, coords, 1)[:,1].unsqueeze(-1) / factor_z
        u_zz = self.get_derivative(u_z, coords, 1)[:,1].unsqueeze(-1) / factor_z
        Delta_u = u_zz
        Delta_u_t = self.get_derivative(Delta_u, coords, 1)[:,0].unsqueeze(-1)
        div_sigma_grad_u = self.get_derivative(sigma_comp, coords, 1)[:,1].unsqueeze(-1) * u_z / factor_z \
                            + sigma_comp * self.get_derivative(u_z, coords, 1)[:,1].unsqueeze(-1) / factor_z
        P1 = Delta_u_t / factorrr * eps1
        P2 = div_sigma_grad_u * sigma1
        eq = P1 + P2
        if flag == True:
            # se è True stampo ogni parte della pde
            u_t = self.get_derivative(u, coords, 1)[:,0] / factorrr
            print(f"u_t in [{(u_t).min()}, {(u_t).max()}] , mean = {torch.mean((u_t))}")
            print(f"Eps in [{(P1).min()}, {(P1).max()}] , mean = {torch.mean((P1))}")
            print(f"Div in [{(P2).min()}, {(P2).max()}] , mean = {torch.mean((P2))}")
            print(f"Eq  in [{(eq).min()}, {(eq).max()}] , mean = {torch.mean((eq))}")
            print(" ")
        return eq.squeeze(-1)*1e3, P1.squeeze(-1)*1e3, P2.squeeze(-1)*1e3
        
    
    def get_derivative(self, y, x, n: int = 1):
        if n == 0:
            return y
        else:
            dy_dx = torch.autograd.grad(y, x, torch.ones_like(y).to(y.device), create_graph=True, retain_graph=True, allow_unused=True)[0]              
        return self.get_derivative(dy_dx, x, n - 1)


    def compute_PDE_loss(self, model, n_points=PDE_batch_size, loss_function=nn.MSELoss(), flag=False, flag_norm_log=True):
        coords = self.gen_PDE_points(model=model, batch_size=n_points)
        coords.requires_grad = True
        values_pred = model.forward(coords)
        pde_pred, _, _ = self.compute_PDE(coords, values_pred, flag=flag, flag_norm_log=flag_norm_log)
        loss = loss_function( pde_pred, torch.zeros_like(pde_pred, dtype=torch.float32, device=device) )
        return loss

<h3>PINN model</h3>

In [None]:
"""
############ VERSIONE SENZA SKIP CONNECTION E STAN ACTIVATION FUNCTIONS
class PINN_model(nn.Module):
    def __init__(self, input_dim, n_nodes, n_layers, n_batches, dropout):
        super().__init__()
        # PDE part
        self._PDE = QSM_PDE()
        self.history = []
        # DNN part
        self.n_batches = n_batches
        self.dropout   = dropout
        layers = [nn.Linear(input_dim, n_nodes), nn.Tanh(), nn.Dropout(self.dropout)]
        for _ in range(n_layers - 1):
            layers.append(nn.Linear(n_nodes, n_nodes))
            layers.append(nn.Tanh())
            layers.append(nn.Dropout(self.dropout))
        layers.append(nn.Linear(n_nodes, 1))
        layers.append(nn.Tanh())
        # network
        self.network = nn.Sequential(*layers)
        # Xavier initialization
        for layer in self.network:
            if isinstance(layer, nn.Linear):
                init.xavier_normal_(layer.weight, gain=1.0)
                init.zeros_(layer.bias)

    def forward(self, coords):
        return self.network(coords)
"""

class SelfScaledTanh(nn.Module):
    def __init__(self, size, init_beta=0.0):
        super().__init__()
        self.beta = nn.Parameter(torch.full((size,), init_beta, dtype=torch.float32))

    def forward(self, x):
        return torch.tanh(x) + self.beta * x * torch.tanh(x)
    

class PINN_model(nn.Module):
    def __init__(self, input_dim, n_nodes, n_layers, n_batches, dropout):
        super().__init__()
        self.n_batches = n_batches
        self.dropout = dropout
        validation_split = 0.1
        self.n_train_batches = int((1 - validation_split) * self.n_batches)
        self.n_val_batches = self.n_batches - self.n_train_batches
        self.n_layers = n_layers
        self.n_nodes = n_nodes
        self.history = []
        self.optimizer = 0
        self._PDE = QSM_PDE()
        
        # ==== LAYERS ====
        self.input_layer = nn.Linear(input_dim, n_nodes)
        self.initial_activation = SelfScaledTanh(n_nodes)
        self.hidden_layers = nn.ModuleList()
        for _ in range(n_layers - 1):
            self.hidden_layers.append(
                nn.Sequential(
                    nn.Linear(n_nodes, n_nodes),
                    SelfScaledTanh(n_nodes),
                    nn.Dropout(self.dropout)
                )
            )

        self.output_layer = nn.Linear(n_nodes, 1)
        self.final_activation = SelfScaledTanh(1)

    def forward(self, coords):
        x = self.initial_activation(self.input_layer(coords))
        residual = x
        for i, layer in enumerate(self.hidden_layers):
            out = layer(x)
            if i % 2 == 1:
                x = out + residual
                residual = x
            else:
                x = out
        x = self.output_layer(x)
        return self.final_activation(x)
        
    
    def train_model(self, optimizer, patience = 10, loss_function = nn.MSELoss(), epochs = 200, validation_split = 0.1):
        print("Start :")
        start_time = time.time()   
        self.history = []
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.95, patience=patience)
        # number of batches for training and validation
        n_train_batches = self.n_train_batches
        n_val_batches   = self.n_val_batches
        print(f"  number of epochs             : {epochs}")
        print(f"  number of train batches      : {n_train_batches}")
        print(f"  number of validation batches : {n_val_batches}")
        print(" ")        
        print("Training in progress...")
        print(" ")

        weight_IC, weight_BC, weight_PDE = 1, 1, 1

        for epoch in range(epochs):
            start = time.time()
            
            # === TRAINING STEP ===
            self.train()
            start_tr = time.time()
            IC_total_loss, BC_total_loss, PDE_total_loss = 0, 0, 0            
            for indx_batch in range(n_train_batches):
                optimizer.zero_grad()
                IC_loss  = compute_IC_loss(model=self)
                BC_loss  = compute_BC_loss(model=self)
                if epoch % 50 == 0 and indx_batch == n_train_batches - 1:
                    # ogni 50 epoche stampo gli errori di ogni parte della pde
                    PDE_loss = self._PDE.compute_PDE_loss(model=self, flag=True )
                else:
                    PDE_loss = self._PDE.compute_PDE_loss(model=self, flag=False)
                Loss = weight_IC * IC_loss + weight_BC * BC_loss + weight_PDE * PDE_loss
                Loss.backward()
                optimizer.step()
                IC_total_loss   += IC_loss.item()
                BC_total_loss   += BC_loss.item()
                PDE_total_loss  += PDE_loss.item()
            IC_total_loss  /= n_train_batches
            BC_total_loss  /= n_train_batches
            PDE_total_loss /= n_train_batches
            end_tr = time.time()

            # === VALIDATION STEP ===
            self.eval()
            start_val = time.time()
            IC_val_loss, BC_val_loss, PDE_val_loss = 0, 0, 0  
            for indx_batch in range(n_val_batches):
                optimizer.zero_grad()
                IC_val_loss  += (compute_IC_loss(model=self) ).item()
                BC_val_loss  += ( compute_BC_loss(model=self) ).item()
                PDE_val_loss += ( self._PDE.compute_PDE_loss(model=self) ).item()
            IC_val_loss   /= n_val_batches
            BC_val_loss   /= n_val_batches
            PDE_val_loss  /= n_val_batches
            scheduler.step( weight_IC * IC_val_loss + weight_BC * BC_val_loss + weight_PDE * PDE_val_loss )
            end_val = time.time()
            
            end = time.time()

            
            lambda_eff = weight_PDE/weight_IC * PDE_total_loss/IC_total_loss
            current_lr = optimizer.param_groups[0]['lr']
            # === PRINT LOSSES ===
            print(f"Epoch {epoch+1}/{epochs}")
            print(f"  Training losses   ==>  IC: {IC_total_loss} - BC: {BC_total_loss} - pde: {PDE_total_loss}")
            print(f"  Validation losses ==>  IC: {IC_val_loss} - BC: {BC_val_loss} - pde: {PDE_val_loss}")
            print(f"  time : {end - start} s  - train: {end_tr - start_tr} s  - val: {end_val - start_val} s")
            print(f"     ( w_IC : {weight_IC} - w_BC : {weight_BC} - w_pde : {weight_PDE} - lambda_eff : {lambda_eff} )")
            print(f"     ( learning rate : {current_lr})")
            self.history.append([ IC_total_loss, PDE_total_loss, IC_val_loss, PDE_val_loss,
                            weight_IC, weight_PDE, lambda_eff, current_lr, BC_total_loss, BC_val_loss, weight_BC ])
            print(f" ")
            print(f" ")



            # PLOTs
            if epoch % 100 == 0: # and epoch>0:
                sampled_times = Norm_time( t_min + np.array([ 0, 0.1, 0.2, 0.3, 0.7, 1 ]) * (t_max - t_min) )
                n_samples = 20000
                plt.figure(figsize=(10,4))
                self.eval()
                for i_plot, n_t in enumerate(sampled_times, 1):
                    real_t = Inv_Norm_time(n_t)
                    X_pred = self._PDE.gen_PDE_points(batch_size=n_samples)
                    X_pred[:,0] = n_t * torch.ones_like(X_pred[:,0], device=device)
                    X_Sample = X_pred.detach().cpu().numpy()
                    X_pred.requires_grad=True
                    V_pred = model.forward(X_pred)
                    
                    E_pred = ( (self._PDE.get_derivative(V_pred, X_pred, 1)[:,1].unsqueeze(-1)/factor_z)**2 ) ** (1/2)
                    V_pred = V_pred.detach().cpu().numpy()
                    E_pred = E_pred.detach().cpu().numpy()

                    v_min = 0
                    v_max = V0
                    plt.subplot(2, len(sampled_times), i_plot)
                    vett = Inv_Norm_x(norm_x_min + (norm_x_max - norm_x_min) * torch.rand(len(X_Sample[:,0]), 1, device=device))
                    vett = vett.detach().cpu().numpy()
                    im0=plt.scatter(vett, Inv_Norm_z(X_Sample[:, 1]), c=V_pred, s=2, cmap='plasma', vmin=v_min, vmax=v_max)
                    # linea rossa condensatore piastre
                    x_vals = x_min + (x_max - x_min) * np.random.rand(1000)
                    z_val  = z_max - L/2 + d/2
                    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
                    z_val  = z_min+L/2 - d/2
                    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
                    plt.title(f"Q {real_t:.2f} ns")
                    plt.colorbar(im0)
                    plt.tight_layout()

                    v_min = 0
                    v_max = 0.05
                    plt.subplot(2, len(sampled_times), len(sampled_times) + i_plot)
                    vett = Inv_Norm_x(norm_x_min + (norm_x_max - norm_x_min) * torch.rand(len(X_Sample[:,0]), 1, device=device))
                    vett = vett.detach().cpu().numpy()
                    im1=plt.scatter(vett, Inv_Norm_z(X_Sample[:, 1]), c=E_pred, s=2, cmap='plasma', vmin=v_min, vmax=v_max)
                    # linea rossa condensatore piastre
                    x_vals = x_min + (x_max - x_min) * np.random.rand(1000)
                    z_val  = z_max - L/2 + d/2
                    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
                    z_val  = z_min+L/2 - d/2
                    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
                    plt.title(f"E {real_t:.2f} ns")
                    plt.colorbar(im1)
                plt.tight_layout()
                plt.show()
                
                # initial condition
                points = self._PDE.gen_PDE_points(batch_size=10000)
                points[:,0] = norm_t_min * torch.ones_like(points[:,0], device=device)
                points.requires_grad=True
                IC_values = get_IC_values(points)
                IC_pred = self.forward(points)
                E_pred = ( (self._PDE.get_derivative(IC_pred, points, 1)[:,1].unsqueeze(-1)/factor_z)**2 ) ** (1/2)
                z_coords = points[:,1].detach().cpu().numpy()
                IC_pred = IC_pred.detach().cpu().numpy()
                IC_values = IC_values.detach().cpu().numpy()
                E_pred = E_pred.detach().cpu().numpy()
                fig, axs = plt.subplots(1, 2, figsize=(12, 4))
                axs[0].scatter(Inv_Norm_z(z_coords), IC_values, c="green", s=1, label="IC_values")
                axs[0].scatter(Inv_Norm_z(z_coords), IC_pred, c="red", s=1, label="IC_pred")
                axs[0].set_title("V Initial condition")
                axs[0].set_xlabel("z")
                axs[0].legend()
                axs[1].scatter(Inv_Norm_z(z_coords), E_pred, c="red", s=1, label="E_pred")
                axs[1].set_title("E Initial condition")
                axs[1].set_ylim(0,0.015)
                axs[1].set_xlabel("z")
                axs[1].legend()
                plt.tight_layout()
                plt.show()

                # final condition
                points = self._PDE.gen_PDE_points(batch_size=10000)
                points[:,0] = norm_t_max * torch.ones_like(points[:,0], device=device)
                points.requires_grad=True
                FC_pred = self.forward(points)
                E_pred = ( (self._PDE.get_derivative(FC_pred, points, 1)[:,1].unsqueeze(-1) / factor_z)**2 ) ** (1/2)
                z_coords = points[:,1].detach().cpu().numpy()
                FC_pred = FC_pred.detach().cpu().numpy()
                E_pred = E_pred.detach().cpu().numpy()
                fig, axs = plt.subplots(1, 2, figsize=(12, 4))
                axs[0].scatter(Inv_Norm_z(z_coords), FC_pred, c="red", s=1, label="FC_pred")
                axs[0].set_title("V Final Condition")
                axs[0].set_ylim(0,1)
                axs[0].set_xlabel("z")
                axs[0].legend()
                axs[1].scatter(Inv_Norm_z(z_coords), E_pred, c="red", s=1, label="E_pred")
                axs[1].set_title("E Final Condition")
                axs[1].set_xlabel("z")
                axs[1].set_ylim(0,0.05)
                axs[1].legend()
                plt.tight_layout()
                plt.show()

                # pde error
                points = self._PDE.gen_PDE_points(batch_size=10000)
                points.requires_grad=True
                V_pred = self.forward(points)
                pde_pred, _, _ = self._PDE.compute_PDE( points, V_pred )
                z_coords = points[:,1].detach().cpu().numpy()
                pde_pred = pde_pred.detach().cpu().numpy()
                plt.scatter(Inv_Norm_z(z_coords), pde_pred, c="red", s=1, label="pde error")
                plt.xlabel("z")
                plt.legend()
                plt.tight_layout()
                plt.show()

        # END EPOCHS
        self.history = torch.stack([torch.tensor(h) for h in self.history]).detach().cpu().numpy()
        print(f"Done!  (time : {time.time() - start_time})")
        return self.history

<h3>Training</h3>

In [None]:
###### construction of PINN
model = 0
input_dim = 2     # (t, z)
n_nodes = 64
n_layers = 6
n_batches = 32
dropout = 0
model = PINN_model( input_dim, n_nodes, n_layers, n_batches, dropout).to(device)

# training of PINN
optimizer = torch.optim.Adam(model.parameters(), lr=0.003, weight_decay=1e-9)
loss_function = nn.MSELoss()
history = model.train_model(optimizer, patience=20, loss_function=loss_function, epochs=10000)
print(model)

In [None]:
torch.save(model.state_dict(), "model_c1D.pth")
model.load_state_dict(torch.load("model_c1D.pth"))

In [None]:
history = model.history

# PLOTS OF TRAINING AND VALIDATION STEPS
# plot IC
plt.plot([pair[0] for pair in history], label="Training")
plt.plot([pair[2] for pair in history], label="Validation")
plt.legend(title="IC error")
plt.xlabel("Epoch")
plt.ylabel("Loss values (MSE)")
plt.grid(alpha=0.2)
plt.yscale("log")
plt.savefig("plot_c1D_IC.jpg", format="jpg", dpi=300)
plt.show()
# plot BC
plt.plot([pair[8] for pair in history], label="Training")
plt.plot([pair[9] for pair in history], label="Validation")
plt.legend(title="BC error")
plt.xlabel("Epoch")
plt.ylabel("Loss values (MSE)")
plt.grid(alpha=0.2)
plt.yscale("log")
plt.savefig("plot_c1D_BC.jpg", format="jpg", dpi=300)
plt.show()
# plot PDE CONDITIONS
plt.plot([np.maximum(pair[1]/1e3, 1e-10) for pair in history], label="Training")
plt.plot([np.maximum(pair[3]/1e3, 1e-10) for pair in history], label="Validation")
plt.legend(title="PDE error")
plt.xlabel("Epoch")
plt.ylabel("Loss values (MSE)")
plt.grid(alpha=0.2)
plt.yscale("log")
plt.savefig("plot_c1D_PDE.jpg", format="jpg", dpi=300)
plt.show()
# plot LAMBDA EFFECTIVE
plt.plot([pair[6] for pair in history], label="lambda eff")
plt.legend(title="Lambda effective")
plt.xlabel("Epoch")
plt.ylabel("values")
plt.grid(alpha=0.2)
plt.yscale("log")
plt.show()
# plot LEARNING RATE
plt.plot([pair[7] for pair in history], label="lr")
plt.legend(title="Learning rate")
plt.xlabel("Epoch")
plt.ylabel("values")
plt.grid(alpha=0.2)
plt.yscale("log")
plt.tight_layout()
plt.show()

In [None]:
# PREDICTIONs
sampled_times = Norm_time( t_min + np.array([ 0, 0.1, 0.3, 0.7, 1, 1.5 ]) * (t_max - t_min) )
n_samples = 20000
plt.figure(figsize=(16,14))
model.eval()
sigma_model.eval()
pde_class = QSM_PDE()

for i_plot, n_t in enumerate(sampled_times, 1):
    real_t = Inv_Norm_time(n_t)
    X_pred = pde_class.gen_PDE_points(batch_size=n_samples)
    X_pred[:,0] = n_t * torch.ones_like(X_pred[:,0], device=device)
    X_Sample = X_pred.detach().cpu().numpy()
    X_pred.requires_grad=True
    V_pred = model.forward(X_pred)
    
    E_pred = ( (pde_class.get_derivative(V_pred, X_pred, 1)[:,1].unsqueeze(-1) / factor_z)**2 ) ** (1/2)
    V_pred = V_pred.detach().cpu().numpy()
    E_pred = E_pred.detach().cpu().numpy()

    v_min = 0
    v_max = V0
    plt.subplot(4, len(sampled_times), i_plot)
    vett = Inv_Norm_x(norm_x_min + (norm_x_max - norm_x_min) * torch.rand(len(X_Sample[:,0]), 1, device=device))
    vett = vett.detach().cpu().numpy()
    im0=plt.scatter(vett, Inv_Norm_z(X_Sample[:, 1]), c=V_pred, s=2, cmap='plasma', vmin=0, vmax=V0)
    # linea rossa condensatore piastre
    x_vals = x_min + (x_max - x_min) * np.random.rand(1000)
    z_val  = z_max - L/2 + d/2
    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
    z_val  = z_min + L/2 - d/2
    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
    plt.title(f"V(t={real_t:.2f} ns)")
    plt.xlabel("x coordinate [$\\mu$m]")
    if i_plot == 1:
        plt.ylabel("z coordinate [$\\mu$m]")
    plt.xlim(x_min, x_max)
    plt.ylim(z_min, z_max)
    plt.colorbar(im0)
    plt.tight_layout()

    plt.subplot(4, len(sampled_times), len(sampled_times) + i_plot)
    im2=plt.scatter(Inv_Norm_z(X_Sample[:, 1]), V_pred, s=1)
    plt.title(f"t = {real_t:.2f} ns")
    if i_plot == 1:
        plt.ylabel("Potential [V]")
    plt.xlabel("z coordinate [$\\mu$m]")
    plt.tight_layout()

    v_min = 0
    v_max = (V0 - 0) / ( norm_z_max - norm_z_min ) / factor_z
    plt.subplot(4, len(sampled_times), 2*len(sampled_times) + i_plot)
    vett = Inv_Norm_x(norm_x_min + (norm_x_max - norm_x_min) * torch.rand(len(X_Sample[:,0]), 1, device=device))
    vett = vett.detach().cpu().numpy()
    im1=plt.scatter(vett, Inv_Norm_z(X_Sample[:, 1]), c=E_pred, s=2, cmap='plasma', vmin=0, vmax=0.06)
    # linea rossa condensatore piastre
    x_vals = x_min + (x_max - x_min) * np.random.rand(1000)
    z_val  = z_max - L/2 + d/2
    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
    z_val  = z_min + L/2 - d/2
    plt.scatter(x_vals, np.full_like(x_vals, z_val), c="red", s=1)
    plt.title(f"E(t={real_t:.2f} ns)")
    plt.xlabel("x coordinate [$\\mu$m]")
    if i_plot == 1:
        plt.ylabel("z coordinate [$\\mu$m]")
    plt.xlim(x_min, x_max)
    plt.ylim(z_min, z_max)
    plt.colorbar(im1)
    plt.tight_layout()

    plt.subplot(4, len(sampled_times), 3*len(sampled_times) + i_plot)
    im3=plt.scatter(Inv_Norm_z(X_Sample[:, 1]), E_pred, s=1)
    plt.ylim(0,0.06)
    plt.title(f"t = {real_t:.2f} ns")
    if i_plot == 1:
        plt.ylabel("Electric field [V/$\\mu$m]")
    plt.xlabel("z coordinate [$\\mu$m]")
    plt.tight_layout()

plt.savefig("plot_c1D.jpg", format="jpg", dpi=300)
plt.show()