In [47]:
import torch 
import numpy as np
from typing import Dict, List, Optional
from scipy.stats import moment
from ase.io import read
from metatensor.torch import Labels, TensorBlock, TensorMap
from metatomic.torch import (
    AtomisticModel,
    ModelCapabilities,
    ModelEvaluationOptions,
    ModelMetadata,
    ModelOutput,
    System,
    load_atomistic_model,
    systems_to_torch,
)
from featomic.torch import SoapPowerSpectrum

structures = read('/Users/markusfasching/EPFL/Work/project-SOAP/scripts/SOAP-time-code/data/interfaces/250_275_fast/positions.lammpstrj', index='::50')
systems = systems_to_torch(structures[:1], dtype=torch.float64)

In [4]:

HYPER_PARAMETERS = {
    "cutoff": {
        "radius": 4, #4 #5 #6
        "smoothing": {"type": "ShiftedCosine", "width": 0.5},
    },
    "density": {
        "type": "Gaussian",
        "width": 0.25, #changed from 0.3
    },
    "basis": {
        "type": "TensorProduct",
        "max_angular": 2, #8
        "radial": {"type": "Gto", "max_radial": 2}, #6
    },
}
calculator = SoapPowerSpectrum(**HYPER_PARAMETERS)
centers = [8]
neighbors = [1]
selected_keys = Labels(
    names=["center_type", "neighbor_1_type", "neighbor_2_type"],
    values=torch.tensor([[i,j,k] for i in centers for j in neighbors for k in neighbors if j <=
        k], dtype=torch.int32),
)
selected_atoms = [i for i in range(10)]
selected_samples = Labels(
            names=["atom"],
            values=torch.tensor(selected_atoms, dtype=torch.int64).unsqueeze(-1),
        )


soap = calculator(
    systems,
    selected_samples=selected_samples,
    selected_keys=selected_keys,
)

soap = soap.keys_to_samples("center_type")
soap = soap.keys_to_properties(["neighbor_1_type", "neighbor_2_type"])
soap_block = soap.block()


In [22]:
def compute_cumulants_fwd(X: torch.Tensor, n_cumulants: int):
        """
        TorchScript-friendly computation of cumulants.

        X: (N, P) tensor
        n_cumulants: number of cumulants per feature
        returns: (N, P * n_cumulants) tensor
        """
        # ensure float
        X = X.float()
        N, P = X.shape  # Python ints

        # Preallocate output, per structure: N=1
        out = torch.empty((1, P * n_cumulants), dtype=X.dtype, device=X.device)

        # Temporary tensors reused per feature
        moments = torch.empty((n_cumulants,), dtype=X.dtype, device=X.device)
        c = torch.empty((n_cumulants,), dtype=X.dtype, device=X.device)

        jbase = 0
        for j in range(P):
            x = X[:, j]

            # mean
            m = torch.mean(x)
            centered = x - m

            # compute central moments μ_k = mean((x - m)^k) for k=1..n_cumulants
            k = 1
            while k <= n_cumulants:
                moments[k - 1] = torch.mean(centered ** k)
                k += 1

            # fill cumulant vector c
            # 1st cumulant = mean
            c[0] = m

            # 2nd cumulant = variance (μ2)
            if n_cumulants > 1:
                c[1] = moments[1 - 1]  # μ2

            # 3rd cumulant = μ3
            if n_cumulants > 2:
                c[2] = moments[2 - 1]  # μ3

            # 4th cumulant = μ4 − 3 μ2²
            if n_cumulants > 3:
                mu2 = moments[1]
                mu4 = moments[3 - 1]
                c[3] = mu4 - 3.0 * (mu2 * mu2)

            # 5th cumulant = μ5 − 10 μ2 μ3
            if n_cumulants > 4:
                mu2 = moments[1]
                mu3 = moments[2]
                mu5 = moments[5 - 1]
                c[4] = mu5 - 10.0 * mu2 * mu3

            # broadcast c to N rows without extra Python list
            # c_row: (1, n_cumulants) then expanded to (N, n_cumulants)
            c_row = c.unsqueeze(0)  # no new allocation for repeated view
            # write into output slice
            out[:, jbase:jbase + n_cumulants] = c_row

            jbase += n_cumulants

        return out

In [26]:
def compute_cumulants(X, n_cumulants):

    X = np.asarray(X)
    N, P = X.shape
    
    cumulant_matrix = []
    for j in range(P):
        x = X[:, j]
        m = np.mean(x)
        centered = x - m

        # Compute central moments up to n_cumulants
        mu = np.array([moment(centered, moment=i) for i in range(1, n_cumulants + 1)])
        c = np.zeros(n_cumulants)
        
        # First cumulants (mean, variance, skewness, kurtosis, ...)
        c[0] = m
        if n_cumulants > 1:
            c[1] = mu[1]                 # variance
        if n_cumulants > 2:
            c[2] = mu[2]                 # 3rd central moment
        if n_cumulants > 3:
            c[3] = mu[3] - 3 * mu[1]**2  # 4th cumulant (kurtosis-related)
        # higher orders could follow recursion, but are rarely stable
        if n_cumulants > 4:
            c[4] = mu[4] - 10 * mu[1] * mu[2]
        # Broadcast cumulant values to N samples
        cumulant_matrix.append(np.tile(c, (N, 1)))
    
    # Concatenate all cumulant blocks for each feature
    X_cumulant = np.hstack(cumulant_matrix)
    return X_cumulant

In [27]:
new = compute_cumulants(soap_block.values, 3)

In [28]:
new.shape

(4, 81)

In [70]:
root = '/Users/markusfasching/EPFL/Work/project-SOAP/scripts/SOAP-time-code/results/icewaterinterfacemeltfast_lf/v0/SOAP/533/test/CumulantPCA/interval_1/lag_0/sigma_0/ridge_a1e-05/SOAP_533_[8]'
model_soap = load_atomistic_model(f'{root}/model_soap.pt', extensions_directory=f'{root}/extensions')
model_soap.eval()
nl_options = model_soap.requested_neighbor_lists()[0]
options = {"features": ModelOutput(per_atom=True)}
selected_samples = Labels(
            names=["system", "atom"],
            values=torch.tensor([[0,i] for i in selected_atoms], dtype=torch.int64),
        )
eval_options = ModelEvaluationOptions(
    length_unit='angstrom',
    outputs=options,
    selected_atoms=selected_samples,
    )



In [64]:
from vesin.metatomic import NeighborList
calculator = NeighborList(nl_options, length_unit="Angstrom")
for system in systems:
    neighbors = calculator.compute(system)
    system.add_neighbor_list(nl_options, neighbors)


ValueError: the neighbors list for NeighborListOptions(cutoff=5.000000, full_list=False, strict=False) already exists in this system

In [71]:
cv = model_soap(systems, options=eval_options, check_consistency=True) #, selected_atoms=selected_atoms)

[1, 1]
<__torch__.torch.classes.metatensor.Labels object at 0x31168b770>
 0  0
 0  3
 0  6
 0  9
[ CPUIntType{4,2} ]
<__torch__.torch.classes.metatensor.Labels object at 0x31167b680>
 0  0
[ CPUIntType{1,2} ]
[1, 1]
