In [None]:
%load_ext autoreload
%autoreload 2

import math
import sys
from functools import partial

import matplotlib.pyplot as plt
import mpltex
import numpy as np
from ase.io import read
from rascal.representations import SphericalInvariants as SOAP
from rascal.utils.scorer import get_rmse
from sklearn.kernel_ridge import KernelRidge
from sklearn.linear_model import Ridge
from sklearn.metrics.pairwise import linear_kernel, polynomial_kernel
from sklearn.model_selection import train_test_split
from sklearn.utils import Bunch
from tqdm import tqdm

In [None]:
from rascal.representations import SphericalExpansion

In [None]:
from pylode import Density_Projection_Calculator as LODE
from pylode.utilities.precompute_lode import lode_get_features

In [None]:
tab10 = plt.rcParams["axes.prop_cycle"].by_key()["color"]
fsize = np.array([mpltex.acs._width, mpltex.acs._height])
kernel_func = partial(linear_kernel)

# General functions

In [None]:
def _get_feat_vec_dict(species, frame_feature, frame):
    """Extract features from pylode."""

    species_array = frame.get_chemical_symbols()

    n_species = len(species)

    # Build feature dictionary
    feat_vec_dict = {}
    for spe_cen in species:
        for spe_neigh in species:
            feat_vec_dict[(spe_cen, spe_neigh)] = 0.0

    # Fill feature dictionary with each component
    for i_at in range(n_atoms):
        spe_cen = species_array[i_at]
        for ispe_neigh in range(n_species):
            spe_neigh = species[ispe_neigh]
            feat_vec_dict[(spe_cen, spe_neigh)
                          ] += frame_feature[i_at, ispe_neigh]

    return feat_vec_dict

In [None]:
def construct_feature_vector(X_raw, frames, species):
    """Construct feature vector for point chargees."""
    X = np.zeros([n_frames_sel, 4])

    for i in range(len(frames)):
        feat_vec_dict_pylode = _get_feat_vec_dict(species=species,
                                                  frame_feature=X_raw[i],
                                                  frame=frames[i])
        X[i] = list(feat_vec_dict_pylode.values())

    return X

In [None]:
def build_kernel_matrix(kernel_func, X, Y, desc="Build kernel"):
    """Build kernel matrix for kernel ridge regression.

    Parameters
    ----------
    kernel_func : callable
        kernel function. i.e. `sklearn.metrics.pairwise.linear_kernel`
    X : ndarray of shape (n_sets_X, n_atoms, n_features)
        The first feature array.
    Y : ndarray of shape (n_sets_Y, n_atoms, n_features)
        The second feature array.
    desc : str
        Description for progress bar.

    Returns
    -------
    K : ndarray of shape (n_samples_X, n_samples_Y)

    Raises
    ------
    ValueError
        If predicted number of structures times `n_atoms` is not equal to
        the length of one of the feature arrays.
    """
    if X is Y:
        # For X==Y the kernel matrix is symmetric.
        # Only calculate upper trangle matrix and copy at the end.
        row, col = np.triu_indices(X.shape[0])
    else:
        row, col = np.indices((X.shape[0], Y.shape[0]))

    K = np.zeros([X.shape[0], Y.shape[0]])

    indices = [(r, c) for r, c in zip(row.flatten(), col.flatten())]
    for n, m in tqdm(indices, desc=desc):
        K[n, m] = np.sum(kernel_func(X[n], Y[m]))

    if X is Y:
        K += K.T
        K -= np.diag(np.diag(K)) / 2

    return K

In [None]:
def train_predict_krr(X, regularisation=1):
    """
    Train a KRR model based on given feature vector X.

    Parameters
    ----------
    X : ndarray of shape (n_sets_X, n_atoms, n_features)
        Feature array
    regularisation : float
        regularisation for regression

    Returns
    -------
    RMSE train : np.array
        root mean square deviation (%) of the training set 
        with respect to the standard deviation of the test set.
    RMSE test : np.array
        root mean square deviation (%) of the test set
        with respect to the standard deviation of the test set.
    """
    X_train = X[i_train]
    X_test = X[i_test]

    rmse_train = np.zeros(len(r_train_structures))
    rmse_test = np.zeros(len(r_train_structures))

    for i, n_train_structure in enumerate(r_train_structures):
        X_train_cur = X_train[:n_train_structure]
        Y_train_cur = Y_train[:n_train_structure]

        K_train = build_kernel_matrix(
            kernel_func,
            X=X_train_cur,
            Y=X_train_cur,
            desc=f"Build train kernel for" f" {n_train_structure} sets")

        K_test = build_kernel_matrix(
            kernel_func,
            X=X_test,
            Y=X_train_cur,
            desc=f"Build test kernel for" f" {n_train_structure} sets")

        krr = KernelRidge(alpha=regularisation, kernel="precomputed", gamma=1)
        krr.fit(K_train, Y_train_cur)

        Y_train_pred = krr.predict(K_train)
        Y_test_pred = krr.predict(K_test)

        rmse_train[i] = 100*get_rmse(Y_train_pred, Y_train_cur)
        rmse_test[i] = 100*get_rmse(Y_test_pred, Y_test)

        # Calculate % RMSE
        rmse_train[i] /= Y_train_cur.var()
        rmse_test[i] /= Y_train_cur.var()

    return rmse_train, rmse_test

In [None]:
def train_predict_lr(X, regularisation=1e-10):
    """
    Train a linear model based on feature vector X.

    Parameters
    ----------
    X : ndarray of shape (n_sets_X, n_atoms, n_features)
        Feature array
    regularisation : float
        regularisation for regression

    Returns
    -------
    RMSE train : np.array
        root mean square deviation (%) of the training set 
        with respect to the standard deviation of the test set.
    RMSE test : np.array
        root mean square deviation (%) of the test set
        with respect to the standard deviation of the test set.
    """

    X_train = X[i_train]
    X_test = X[i_test]

    rmse_train = np.zeros(len(r_train_structures))
    rmse_test = np.zeros(len(r_train_structures))

    for i, n_train_structure in enumerate(r_train_structures):
        X_train_cur = X_train[:n_train_structure]
        Y_train_cur = Y_train[:n_train_structure]

        clf = Ridge(alpha=regularisation)
        clf.fit(X_train_cur, Y_train_cur)

        Y_train_pred = clf.predict(X_train_cur)
        Y_test_pred = clf.predict(X_test)

        rmse_train[i] = 100*get_rmse(Y_train_pred, Y_train_cur)
        rmse_test[i] = 100*get_rmse(Y_test_pred, Y_test)

        # Calculate % RMSE
        rmse_train[i] /= Y_train_cur.var()
        rmse_test[i] /= Y_train_cur.var()

    return rmse_train, rmse_test

In [None]:
@mpltex.acs_decorator
def plot_rmsd(bunch_obj,
              method_keys,
              prop_keys,
              show_test=True,
              show_train=False,
              colors_test=None,
              colors_train=None,
              ylim=(1e-2, 1e2),
              fname=None):

    fmts = ["-", "--", ".-"]

    if type(method_keys) not in (list, tuple):
        method_keys = [method_keys]

    if type(prop_keys) not in (list, tuple):
        prop_keys = [prop_keys]

    if colors_test is None:
        colors_test = ["k", tab10[3]]
    elif colors_test not in (list, tuple):
        colors_test = [colors_test]

    if colors_train is None:
        colors_train = [tab10[0], tab10[1]]
    elif colors_train not in (list, tuple):
        colors_train = [colors_train]

    assert len(method_keys) == len(prop_keys)

    fig, ax = plt.subplots(constrained_layout=True)

    if show_test:
        handles_test = []
    if show_train:
        handles_train = []

    for i, (method, prop_key) in enumerate(zip(method_keys, prop_keys)):
        results_obj = bunch_obj[method]
        realiziations = results_obj.keys()

        if show_test:
            handles_test_cur = len(realiziations) * [None]
        if show_train:
            handles_train_cur = len(realiziations) * [None]

        for j, k in enumerate(realiziations):
            label_cutoff = rf"$r_\mathrm{{{prop_key}}}="
            label_cutoff += rf"{results_obj[k][prop_key]}\,\mathrm{{\AA}}$"

            if show_test:
                handles_test_cur[j] = ax.plot(
                    r_train_structures,
                    results_obj[k].rmse_test,
                    fmts[j],
                    c=colors_test[i],
                    label=rf"Test, {method}, {label_cutoff}")[0]

            if show_train:
                handles_train_cur[j] = ax.plot(
                    r_train_structures,
                    results_obj[k].rmse_train,
                    fmts[j],
                    c=colors_train[i],
                    label=rf"Train, {method}, {label_cutoff}")[0]

        if show_test:
            handles_test += handles_test_cur
        if show_train:
            handles_train += handles_train_cur

    ax.set_yscale("log")
    ax.set_xscale("log")

    ax.set_xlim(1e1, 2e3)
    ax.set_ylim(*ylim)

    ax.set_xlabel("training structures")
    ax.set_ylabel("\% RMSE")

    handles = []
    if show_test:
        handles += handles_test
    if show_train:
        handles += handles_train

    labels = [handle.get_label() for handle in handles]
    ax.legend(
        handles,
        labels,
        ncol=2,
        handlelength=1,
        frameon=True,
        edgecolor="None",
        fontsize=7,
    )

    if fname is not None:
        fig.savefig(fname, transparent=True)

    fig.show()

# Import and preprocess data 

In [None]:
input_file = "../datasets/point_charges_Training_set.xyz"
n_frames_sel = 200  # Number of sets used for analysis

frames = read(input_file, index=":")
n_frames = len(frames)
n_atoms = len(frames[0])

# Move atoms in unitcell
for frame in frames:
    frame.wrap()

# extract energy
Y = np.array([frame.info["energy"] for frame in frames])[:n_frames_sel]

# Create an object/dictionary for storing results
results = Bunch()

# Get atomic species in dataset
global_species = set()
for frame in frames:
    global_species.update(frame.get_chemical_symbols())
species_dict = {k: i for i, k in enumerate(global_species)}

species = list(species_dict.keys())

# Split Train and Test data

Here only for Y since the X depends on our applied model (SOAP, LODE)

In [None]:
f_train = 0.75  # factor of the train set picked from the total set
n_subsets = 5  # number of subsets picked on a logspace for scaling test
n_min = 15  # minimal number of training structures

i_train, i_test = train_test_split(
    np.arange(n_frames_sel), train_size=f_train, random_state=5
)
n_train = len(i_train)
n_test = len(i_test)

# Split energies
Y_train = Y[i_train]
Y_test = Y[i_test]

# Generate subset numbers for training curve
exp_min = math.log(n_min, 10)
exp_max = math.log(n_train, 10)
r_train_structures = np.logspace(exp_min, exp_max, num=n_subsets, endpoint=True)
r_train_structures = np.round(r_train_structures).astype(int)

# Remove doubled values
r_train_structures = np.unique(r_train_structures)

## Recalculate?

Recalculating the features (especially for LODE) may take a long time!

In [None]:
recalc_features_soap = True
recalc_regression_soap = False

recalc_features_lode = False
recalc_regression_lode = False

# SOAP

In [None]:
results.soap = Bunch()
r_cut = [3, 6, 9]
r_cut = [9]
regularisation_soap = 10e-10  # From paper

hypers_soap = dict(
    cutoff_smooth_width=0.5,
    soap_type="PowerSpectrum",  # nu = 2
    max_radial=8,
    max_angular=0,
    gaussian_sigma_type="Constant")

In [None]:
for cut in r_cut:
    results.soap[f"r_{cut}"] = Bunch()
    results.soap[f"r_{cut}"].cut = cut
    fname_precomputed = f"../datasets/precomputed/precomputed_soap_{cut}.npy"

    if recalc_features_soap:
        hypers_soap["interaction_cutoff"] = cut

        calculator = SOAP(**hypers_soap)
        soap_rep = calculator.transform(frames)
        X_raw = soap_rep.get_features(calculator)

        np.save(fname_precomputed, X_raw)

    else:
        X_raw = np.load(fname_precomputed)

    n_features = X_raw.shape[1]
    X = X_raw.reshape(n_frames, n_atoms, n_features)

    # Select only frames for analysis
    X = X[:n_frames_sel]

    # Only save
    results.soap[f"r_{cut}"].X = X

print(f"n_features_soap = {n_features}")

## Train krr model

In [None]:
for cut in r_cut:
    print(f"cut = {cut} Å")
    fname_precomputed = f"precomputed/precomputed_soap_{cut}"

    if recalc_regression_soap:
        rmse_train, rmse_test = train_predict_krr(
            X=results.soap[f"r_{cut}"].X,
            regularisation=regularisation_soap)

        np.save(f"{fname_precomputed}_train.npy", rmse_train)
        np.save(f"{fname_precomputed}_test.npy", rmse_test)

    else:
        rmse_train = np.load(f"{fname_precomputed}_train.npy")
        rmse_test = np.load(f"{fname_precomputed}_test.npy", )

    results.soap[f"r_{cut}"].rmse_train = rmse_train.copy()
    results.soap[f"r_{cut}"].rmse_test = rmse_test.copy()

# LODE model

In [None]:
results.lode = Bunch()
r_smearing = [3, 2, 1]
r_smearing = [1]
regularisation_lode = 10e-10  # From paper

# Note that the radial cutoff of the representation
# 2.0 is chosen to be smaller than the minimum distance
# between any pair of ions in the dataset (which is 2.5 Angstrom)

hypers_lode = dict(
    max_radial=1,  # currently, only 1 is supported
    max_angular=2,
    cutoff_radius=2,
    potential_exponent=1,  # currently, 1 is supported
    compute_gradients=False)

In [None]:
for smear in r_smearing:
    results.lode[f"r_{smear}"] = Bunch()
    results.lode[f"r_{smear}"].smear = smear
    fname_precomputed = f"../datasets/precomputed/precomputed_lode_{smear}.npy"

    if recalc_features_lode:
        hypers_lode["smearing"] = smear
        X = lode_get_features(frames, **hypers_lode)
        np.save(fname_precomputed, X)
    else:
        X = np.load(fname_precomputed)

    n_features = X.shape[2]
    results.lode[f"r_{smear}"].X = X

print(f"n_features_lode = {n_features}")

In [None]:
for smear in r_smearing:
    print(f"smearing = {smear} Å")
    fname_precomputed = f"precomputed/precomputed_lode_{smear}"

    if recalc_regression_lode:
        rmse_train, rmse_test = train_predict_krr(
            X=results.lode[f"r_{smear}"].X,
            regularisation=regularisation_lode)

        np.save(f"{fname_precomputed}_train.npy", rmse_train)
        np.save(f"{fname_precomputed}_test.npy", rmse_test)
    else:
        rmse_train = np.load(f"{fname_precomputed}_train.npy")
        rmse_test = np.load(f"{fname_precomputed}_test.npy", )

    results.lode[f"r_{smear}"].rmse_train = rmse_train.copy()
    results.lode[f"r_{smear}"].rmse_test = rmse_test.copy()

In [None]:
# plot_rmsd(results, ["soap", "lode"], ["cut", "smear"],
#           fname="RMSE_point_charges.pdf", ylim=(1e-3, 5e2))

# Simple Linear Regression

In [None]:
# Concententae 0.5 files
a = np.load("../datasets/precomputed/precomputed_lode_0.5_1.npy")
b = np.load("../datasets/precomputed/precomputed_lode_0.5_2.npy")

np.save("../datasets/precomputed/precomputed_lode_0.5.npy",
        np.concatenate([a,b], axis=0))

In [None]:
# precompute_lode.py -f point_charges_Training_set.xyz -i : -s 1 -n 8 -l 0 -rc 2.0 -e 'GTO' -o ../datasets/precomputed_lode_1_test
frames_feature_pylode = np.load("../datasets/precomputed/precomputed_lode_1.npy")

# precompute_lode.py -f ../datasets/point_charges_Training_set.xyz -i :200 -s 0.5 -e 'GTO' -n 1 -l 0 -rc 2.0 -o
frames_feature_pylode_0_5 = np.load("../datasets/precomputed/precomputed_lode_0.5.npy")

X_pylode = construct_feature_vector(frames_feature_pylode,
                                    frames[:n_frames_sel],
                                    species)

X_pylode_0_5 = construct_feature_vector(frames_feature_pylode_0_5,
                                        frames[:n_frames_sel],
                                        species)

# # sagpr_get_PS -f point_charges_Training_set.xyz -p -ele -sg 1 -rc 2.0 -l 0 -n 1 -sew 1 -nn -o LODE_test
X_tensoap = np.load("../../TENSOAP/bin/LODE_test.npy").sum(axis=1)

# # sagpr_get_PS -f point_charges_Training_set.xyz -p -ele -sg 1 -rc 2.0 -l 0 -n 8 -sew 1 -nn -o LODE_test
X_tensoap_8 = np.load("../../TENSOAP/bin/LODE_test_8.npy").sum(axis=1)

X_soap = results.soap[f"r_9"].X.sum(axis=1)

X_soap_lode = np.concatenate([X_soap, X_pylode], axis=1)
X_soap_lode_0_5 = np.concatenate([X_soap, X_pylode_0_5], axis=1)

In [None]:
results.rmse = Bunch()

results.rmse.pylode = train_predict_lr(X_pylode)[1]
results.rmse.pylode_0_5 = train_predict_lr(X_pylode_0_5)[1]
results.rmse.tensoap = train_predict_lr(X_tensoap)[1]
results.rmse.tensoap_8 = train_predict_lr(X_tensoap_8)[1]
results.rmse.soap = train_predict_lr(X_soap)[1]

results.rmse.soap_lode = train_predict_lr(X_soap_lode)[1]
results.rmse.soap_lode_0_5 = train_predict_lr(X_soap_lode_0_5)[1]

In [None]:
@mpltex.acs_decorator
def plot_simple_comp(fname=None, show_legend=True):

    fig, ax = plt.subplots(constrained_layout=False)

    plots = []

    p = ax.plot(r_train_structures,
                       results.rmse.soap,
                       ".-",
                       c='k',
                       label="SOAP $r=9\,\mathrm{{\AA}}, n=8, l=0$")[0]
    plots.append(p)

    p = ax.plot(r_train_structures,
                       results.rmse.pylode,
                       ".--",
                       c=tab10[0],
                       label="pyLODE $\sigma=1\,\mathrm{{\AA}}, n=1, l=0$")[0]
    plots.append(p)

    p = ax.plot(r_train_structures,
                       results.rmse.pylode_0_5,
                       ".-",
                       c=tab10[0],
                       label="pyLODE $\sigma=0.5\,\mathrm{{\AA}}, n=1, l=0$")[0]
    plots.append(p)

    p = ax.plot(r_train_structures,
                       results.rmse.tensoap,
                       ".--",
                       c=tab10[1],

                       label="LODE $\sigma=1\,\mathrm{{\AA}}, n=1, l=0$")[0]
    plots.append(p)

    p = ax.plot(r_train_structures,
                       results.rmse.tensoap_8,
                       ".-",
                       c=tab10[1],
                       label="LODE $\sigma=1\,\mathrm{{\AA}}, n=8, l=0$")[0]
    plots.append(p)
    
    p = ax.plot(r_train_structures,
                       results.rmse.soap_lode,
                       ".--",
                       c=tab10[2],
                       label="SOAP $\oplus$ pyLODE $\sigma=1\,\mathrm{{\AA}}$")[0]
    plots.append(p)

    p = ax.plot(r_train_structures,
                       results.rmse.soap_lode_0_5,
                       ".-",
                       c=tab10[2],
                       label="SOAP $\oplus$ pyLODE $\sigma=0.5\,\mathrm{{\AA}}$")[0]
    plots.append(p)

    ax.set_yscale("log")
    ax.set_xscale("log")

    ax.set_ylim(1e-2, 1e2)
    ax.set_xlim(1e1, 2e2)

    if show_legend:
        fig.legend(loc="upper right",
                  ncol=1,
                  frameon=True,
                  edgecolor="None",
                  handlelength=2,
                  bbox_to_anchor=(1.5, 1))
    else:
        for p in plots:
            ax.annotate(p.get_label(),
                        xy=(p.get_xdata()[2], p.get_ydata()[2]),
                        xycoords='data',
                        xytext=(0, 3),
                        fontsize=7,
                        textcoords='offset points',
                        horizontalalignment='left', verticalalignment='bottom')

    ax.set_xlabel("training structures")
    ax.set_ylabel("\% RMSE")

    if fname is not None:
        fig.savefig(fname, bbox_inches="tight", transparent=True,)

    fig.show()

plot_simple_comp(fname="lode_comparison.pdf")