In [1]:
import os
import sys
import numpy as np
import torch as pt
from torch.nn.functional import normalize
import scipy
from tqdm import tqdm
import random
from sklearn.model_selection import train_test_split
import logging
import sys
from torch.utils.data import TensorDataset, DataLoader
from torch.func import hessian, vmap
from torch.optim.lr_scheduler import ReduceLROnPlateau

device = pt.device("cuda" if pt.cuda.is_available() else "cpu")

sys.path.append(os.path.abspath('../'))

from src.useful_functions import *
from src.PWDs_module import generate_PWDistances_torch

# For reproducibility
np.random.seed(0)


# Read directory paths
read_dirs_paths('dir_paths_.txt', globals())



 
Created variables:
inp_dir = /scratch/htc/fsafarov/structures/8ef5_july_2025/8ef5/
dcd_dir = /scratch/htc/fsafarov/mOR_dcd_files/npat/
out_dir = /scratch/htc/fsafarov/ISOKANN_PINN/output/


In [2]:
class MLP(pt.nn.Module):

    def __init__(self, Nodes, enforce_positive=0, act_fun='sigmoid', LeakyReLU_par=0.01):

        super(MLP, self).__init__()

        self.input_size    = Nodes[0]
        self.output_size   = Nodes[-1]
        self.Nhiddenlayers = len(Nodes)-2
        self.Nodes         = Nodes

        dims_in = Nodes[:-1]
        dims_out = Nodes[1:]

        if act_fun == 'sigmoid':
            self.activation  = pt.nn.Sigmoid()  # #
        elif act_fun == 'relu':
            self.activation  = pt.nn.ReLU()
        elif act_fun == 'leakyrelu': 
            self.activation  = pt.nn.LeakyReLU(LeakyReLU_par)
        elif act_fun == 'gelu': 
            self.activation  = pt.nn.GELU()

            
        layers = []

        for i, (dim_in, dim_out) in enumerate(zip(dims_in, dims_out)):
            layers.append(torch.nn.Linear(dim_in, dim_out))

            if i < self.Nhiddenlayers:
                layers.append(self.activation)

        self._layers = torch.nn.Sequential(*layers)
    

    def forward(self, x):
        """
            MLP forward pass
        """
        return self._layers(x)




In [None]:
import torch



def nabla_chi(model, x):
    """Gradient ∇χ(x), assuming scalar χ: x(B,D) → (B,D)"""
    x.requires_grad_(True)
    chi = model(x)
    grad_chi = pt.autograd.grad(chi.sum(), x, create_graph=True, retain_graph=True)[0]
    return grad_chi  # (B, D)

def laplacian_operator(model, x):
    """Δχ = trace(Hessian χ), scalar χ: x(B,D) → scalar"""
    H = hessian(model)(x)  # (B, D, D)
    return pt.trace(H, dim1=-2, dim2=-1)  # (B,)

def generator_action(model, x, forces_fn, D):  # forces_fn(x) → b(x) (B,D)
   
    grad_chi = nabla_chi(model, x)
    lap_chi = vmap(laplacian_operator, in_dims=(None, 0))(model, x)  # vmap over batch
    b = forces_fn(x)
    return (b * grad_chi).sum(-1) + D * lap_chi  # (B,)

def trainNN(model, lr=1e-3, wd=1e-5, Nepochs=10000, batch_size=1024, patience=50,
            X=None, Y=None, forces_fn=None, kB=1.0, T=300, gamma=1.0, c1=0.01, c2=0.01,
            split=0.2):
    model.to(device)
    D = kB * T / gamma  # Diffusion const

    if X is not None:
        X = pt.tensor(X, dtype=pt.float32, device=device)
        X_train, X_val, _, _ = train_test_split(X, Y, test_size=split)

    optimizer = pt.optim.Adam(model.parameters(), lr=lr, weight_decay=wd)
    scheduler = ReduceLROnPlateau(optimizer, patience=10, factor=0.5)
    mse_loss = pt.nn.MSELoss()

    best_loss, patience_counter = float('inf'), 0
    train_losses, val_losses = [], []

    for epoch in tqdm(range(Nepochs)):
        model.train()
        train_loss = 0.0
        perm = pt.randperm(X_train.size(0), device=device)

        for i in range(0, X_train.size(0), batch_size):
            optimizer.zero_grad()
            idx = perm[i:i+batch_size]
            batch_x = X_train[idx]

            L_chi = generator_action(model, batch_x, forces_fn, D)
            chi = model(batch_x)
            residual = L_chi + c1 * chi.squeeze() - c2 * (1 - chi.squeeze())  # (B,)
            pde_loss = mse_loss(residual, pt.zeros_like(residual))
            reg_loss = mse_loss(chi.squeeze(), chi.squeeze() * 0) + mse_loss(1-chi.squeeze(), (1-chi.squeeze()) * 0)  # Soft [0,1]
            loss = pde_loss + 0.01 * reg_loss

            loss.backward()
            pt.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()

        # Val
        model.eval()
        with pt.no_grad():
            L_chi_val = generator_action(model, X_val, forces_fn, D)
            chi_val = model(X_val)
            res_val = L_chi_val + c1 * chi_val.squeeze() - c2 * (1 - chi_val.squeeze())
            val_loss = mse_loss(res_val, pt.zeros_like(res_val)).item()

        train_losses.append(train_loss / (X_train.size(0)//batch_size))
        val_losses.append(val_loss)
        scheduler.step(val_loss)

        if val_loss < best_loss:
            best_loss = val_loss
            patience_counter = 0
            pt.save(model.state_dict(), 'best_chi.pth')
        else:
            patience_counter += 1
            if patience_counter > patience:
                break

    logging.info(f"Best val loss: {best_loss}")
    return train_losses, val_losses





D0 = pt.load(out_dir + 'PWDistances_0.pt', map_location=device)
dim = D0.shape[-1]
# Usage ex.
net = MLP([dim, 256, 256, 256, 1], act_fun='gelu').to(device)
forces_fn = lambda x: -potential_grad_fn(pt.tensor(x)) / gamma  # User impl
train_losses, val_losses = trainNN(net, X=D0, Y=D0, forces_fn=forces_fn)


  X = pt.tensor(X, dtype=pt.float32, device=device)
  0%|          | 0/10000 [00:02<?, ?it/s]


RuntimeError: NVML_SUCCESS == r INTERNAL ASSERT FAILED at "/pytorch/c10/cuda/CUDACachingAllocator.cpp":1016, please report a bug to PyTorch. 