Imports

In [819]:
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:False"

import f90nml
import numpy as np
from pint import UnitRegistry; AssignQuantity = UnitRegistry().Quantity
from QLCstuff2 import getNQLL
import reference_solution as refsol
from scipy.fft import rfft  #, irfft
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.func import grad, jacrev

torch.set_default_dtype(torch.float64)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


Data preparation/pre-processing

In [820]:
# Read in GI parameters
inputfile = "GI parameters - Reference limit cycle (for testing).nml"
GI=f90nml.read(inputfile)['GI']
nx_crystal = GI['nx_crystal']
L = GI['L']
D = GI['D']
D_units = GI['D_units']
D = AssignQuantity(D,D_units)# Compute Nqll_eq
NBAR = GI['Nbar']
NSTAR = GI['Nstar']

# Define t range
RUNTIME = 5
NUM_T_STEPS = RUNTIME + 1

# Define initial conditions
Ntot_init_1D = np.ones(nx_crystal)
Nqll_init_1D = getNQLL(Ntot_init_1D,NSTAR,NBAR)

# Define x, t pairs for training
X_QLC = np.linspace(-L,L,nx_crystal)
t_points = np.linspace(0, RUNTIME, NUM_T_STEPS)
x, t = np.meshgrid(X_QLC, t_points)
training_set = torch.tensor(np.column_stack((x.flatten(), t.flatten()))).to(device)

In [821]:
def compute_loss_gradients(xs, ys):
    """
    Computes gradients needed to compute collocation point loss. 
    Expects batched input with requires_grad=TRUE.
    Args:
        xs: xs[0] = x, xs[1] = t
        ys: ys[0] = Ntot, ys[1] = Nqll

    Returns:
        (dNtot_dt, dNqll_dt, dNqll_dxx)
    """
    # batch_size = len(xs)
    xs.requires_grad_(True)  # Enable gradients for input (shape: (batch_size, 2))
    # TODO - I'm suspicious of these gradients

    # ---- Compute First-Order Derivatives ----
    # We'll compute the following:
    #   Ntot_t = ∂Ntot/∂t, and
    #   Nqll_t = ∂Nqll/∂t
    #
    # Here, the input is [x, t] so t is at index 1.

    # Create grad_outputs tensor matching shape of model output
    grad_outputs = torch.zeros_like(ys).to(device)
    # print(f"xs: {xs}, requires grad: {xs.requires_grad}")
    # print(f"ys: {ys}")
    # print(f"grad outputs (pre calculation1): {grad_outputs}")

    # 1. Compute gradient of Ntot:
    grad_outputs[:, 0] = 1.0  # We want gradients for the first output (Ntot)
    #print(f"grad outputs (pre calculation2): {grad_outputs}")
    grad_Ntot = torch.autograd.grad(
        outputs=ys,
        inputs=xs,
        grad_outputs=grad_outputs,
        create_graph=True,
        retain_graph=True,
        materialize_grads=True      # default is false
    )[0]
    #
    # print(f"grad output: {grad_Ntot}")
    Ntot_t = grad_Ntot[:, 1]   # Extract partial w.r.t. t

    # 2. Compute gradient of Nqll:
    grad_outputs.zero_()      # Reset grad_outputs to zeros
    grad_outputs[:, 1] = 1.0  # Now we want gradients for the second output (Nqll)
    grad_Nqll = torch.autograd.grad(
        outputs=ys,
        inputs=xs,
        grad_outputs=grad_outputs,
        create_graph=True,      # Needed to compute the second-order derivative
        retain_graph=True,      # Retain graph for subsequent derivative computations
        materialize_grads=True 
    )[0]
    Nqll_t = grad_Nqll[:, 1]   # Extract partial w.r.t. t

    # ---- Compute Second-Order Derivative ----
    # Now, compute the second derivative of Nqll with respect to x.
    # Note that from grad_Nqll we already have ∂Nqll/∂x:
    Nqll_x = grad_Nqll[:, 0]

    # Compute the second derivative: d²Nqll/dx²
    # We take the gradient of Nqll_x with respect to the initial inputs.
    grad_Nqll_x = torch.autograd.grad(
        outputs=Nqll_x,
        inputs=xs,
        grad_outputs=torch.ones_like(Nqll_x),
        create_graph=True,
        retain_graph=True,
        materialize_grads=True 
    )[0]
    Nqll_xx = grad_Nqll_x[:, 0]  # Extract ∂²Nqll/∂x² (derivative with respect to x)

    # print("Gradients returned:")
    # print(f"Ntot_t: {Ntot_t}")
    # print(f"Nqll_t: {Nqll_t}")
    # print(f"Nqll_xx: {Nqll_xx}")
    # Return relevant gradients
    return Ntot_t.detach(), Nqll_t.detach(), Nqll_xx.detach()
    # TODO - verify why detach() is important here

In [822]:
def get_misc_params():
    # TODO - make sigma I calculation work for user-specified collocation points, ie. for random resampling
    # Supersaturation reduction at center
    c_r = GI['c_r']

    # Thickness of monolayers
    h_pr = GI['h_pr']
    h_pr_units = GI['h_pr_units']
    h_pr = AssignQuantity(h_pr,h_pr_units)
    h_pr.ito('micrometer')

    # Deposition velocity
    nu_kin = GI['nu_kin']
    nu_kin_units = GI['nu_kin_units']
    nu_kin = AssignQuantity(nu_kin,nu_kin_units)

    # Difference in equilibrium supersaturation between microsurfaces I and II
    sigma0 = torch.tensor(GI['sigma0']).to(device)

    # Supersaturation at facet corner
    sigmaI_corner = GI['sigmaI_corner']

    # Time constant for freezing/thawing
    tau_eq = GI['tau_eq']
    tau_eq_units = GI['tau_eq_units']
    tau_eq = AssignQuantity(tau_eq,tau_eq_units)

    # Compute omega_kin
    nu_kin_mlyperus = nu_kin/h_pr
    nu_kin_mlyperus.ito('1/microsecond')
    omega_kin = torch.tensor(nu_kin_mlyperus.magnitude * tau_eq.magnitude).to(device)

    # Compute sigmaI
    sigmaI = torch.tensor(sigmaI_corner*(c_r*(X_QLC/L)**2+1-c_r))
    # Concatenate a copy of sigmaI for each timestep to prevent shape inconsistencies later
    sigmaI = torch.cat([sigmaI]*NUM_T_STEPS).to(device)
    
    # sigma0, sigmaI, omega_kin = params
    return sigma0, sigmaI, omega_kin

In [823]:
def f1d_solve_ivp_dimensionless(Ntot, Nqll, dNqll_dxx, scalar_params):
    """
    Adapted from QLCstuff2, this function computes the right-hand side of
    the two objective functions that make up the QLC system.
    
    Returns:
        [dNtot_dt, dNqll_dt]
    """
    sigma0, sigmaI, omega_kin = scalar_params
    
    # Diffusion term based on FT
    # Dcoefficient1 = np.pi**2 / (L)**2  
    # bj_list = rfft(Nqll)
    # cj_list = bj_list*J2_LIST
    # dy = -Dcoefficient1 * irfft(cj_list)    # This is actually a second derivative...

    # Ntot deposition
    m = (Nqll - (NBAR - NSTAR))/(2*NSTAR)
    sigma_m = (sigmaI - m * sigma0)
    dNtot_dt = omega_kin * sigma_m
    dNtot_dt += dNqll_dxx # Second derivative acquired elsewhere instead of from FT diffusion term
    # NQLL    
    dNqll_dt = dNtot_dt - (Nqll - (NBAR - NSTAR*torch.sin(2*np.pi*Ntot)))
    
    # Package for output
    return dNtot_dt, dNqll_dt

In [824]:
def QLC_model(xs, params, epochs, epoch, print_every, print_gradients):
    """Defines QLC model. Acts as collocation point loss function.

    Args:
        xs: xs[0] = x, xs[1] = t
        params: output of get_misc_params()

    Returns:
        (Ntot-loss, Nqll-loss)
    """
    # model predicts output of training_set as batch
    ys = model.forward(xs)
    
    # Calculate and extract gradients
    dNtot_dt, dNqll_dt, dNqll_dxx = compute_loss_gradients(xs, ys)
    if (((epoch+1) % print_every) == 0) and print_gradients:
        print(f"Gradients: {dNtot_dt}, {dNqll_dt}, {dNqll_dxx}.")
    

    # Compute expected output
    Ntot, Nqll = ys[:, 0], ys[:, 1]
    #with torch.no_grad():
    dNtot_dt_rhs, dNqll_dt_rhs = f1d_solve_ivp_dimensionless(Ntot, Nqll, dNqll_dxx, params)
    
    # dNtot_dt = Nqll*surface_diff_coefficient + w_kin*sigma_m
    # dNqll_dt = dNtot/dt - (Nqll - Nqll_eq)
    cat_test = torch.stack((dNtot_dt - dNtot_dt_rhs, dNqll_dt - dNqll_dt_rhs), axis=0)
    
    # Return loss as tensor of shape (2, len(xs))
    # [dNtot_dt - dNtot_dt_rhs, dNqll_dt - dNqll_dt_rhs]
    return torch.square(cat_test)

Define IcePINN class

In [825]:
# Difference between nn._ and nn.functional._ is that functional is stateless: 
# https://stackoverflow.com/questions/63826328/torch-nn-functional-vs-torch-nn-pytorch
class IcePINN(nn.Module):
    def __init__(self, num_hidden_layers, hidden_layer_size):
        super().__init__()
        self.fc_in = nn.Linear(2, hidden_layer_size)

        self.fc_hidden = nn.ModuleList()
        for _ in range(num_hidden_layers-1):
            self.fc_hidden.append(nn.Linear(hidden_layer_size, hidden_layer_size))
            
        self.fc_out = nn.Linear(hidden_layer_size, 2)

    def forward(self, x):
        x = F.tanh(self.fc_in(x))
        for layer in self.fc_hidden:
            x = F.tanh(layer(x))
        x = self.fc_out(x)
        return x

Training loop

In [826]:
def train_PINN(model: IcePINN, optimizer, training_set, epochs, print_every, print_gradients=False):

    print(f"Commencing PINN training for {epochs} epochs.")
    # Retrieve miscellaneous params for loss calculation
    params = get_misc_params()

    for epoch in range(epochs):
        # evaluate collocation point loss
        loss = QLC_model(training_set, params, epochs, epoch, print_every, print_gradients)

        if ((epoch+1) % print_every) == 0:
            print(f"Loss at epoch [{epoch+1}/{epochs}]: Ntot = {torch.sum(loss[0]).item()}, Nqll = {torch.sum(loss[1]).item()}.")
            
        # backward and optimize
        loss.backward(torch.ones_like(loss)) # Computes loss gradients
        optimizer.step() # Adjusts weights accordingly
        optimizer.zero_grad() # Zeroes gradients so they don't affect future computations

Instantiate model and optimizers

In [827]:
model = IcePINN(num_hidden_layers=8, hidden_layer_size=80).to(device)

# Initialize model weights with HE initialization
def init_HE(m):
		if type(m) == nn.Linear:
			nn.init.kaiming_normal_(m.weight)
model.apply(init_HE)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

Train model

In [831]:
train_PINN(model, optimizer, training_set, epochs=50_000, print_every=1_000, print_gradients=False)

Commencing PINN training for 50000 epochs.
Loss at epoch [1000/50000]: Ntot = 416.4197447532467, Nqll = 94.53013603193837.
Loss at epoch [2000/50000]: Ntot = 62.62763990596765, Nqll = 25.1242108725045.
Loss at epoch [3000/50000]: Ntot = 27.606846569329196, Nqll = 3.237237347338591.
Loss at epoch [4000/50000]: Ntot = 44.389341465075915, Nqll = 11.761409727795654.
Loss at epoch [5000/50000]: Ntot = 83.88388630861203, Nqll = 35.93509465450294.
Loss at epoch [6000/50000]: Ntot = 275.59624409060837, Nqll = 68.08752844189573.
Loss at epoch [7000/50000]: Ntot = 497.6367500454741, Nqll = 113.96570470022823.
Loss at epoch [8000/50000]: Ntot = 481.4249911163278, Nqll = 79.66138078082624.
Loss at epoch [9000/50000]: Ntot = 7.504982340077781, Nqll = 8.239047315449213.
Loss at epoch [10000/50000]: Ntot = 59.63038006933137, Nqll = 50.358539464634376.
Loss at epoch [11000/50000]: Ntot = 2.202062821184125, Nqll = 0.8111125419116947.
Loss at epoch [12000/50000]: Ntot = 58.05300472004168, Nqll = 21.1098

Save model

In [829]:
print('Finished Training')
PATH = './models/TestPINN.pth'
torch.save(model.state_dict(), PATH)

Finished Training


Evaluate Model (visually?)

In [830]:
# loaded_model = IcePINN()
# loaded_model.load_state_dict(torch.load(PATH)) # it takes the loaded dictionary, not the path file itself
# loaded_model.to(device)
# loaded_model.eval()