# Imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import glob
import csv
from sklearn.kernel_ridge import KernelRidge
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import cross_val_score
from Element_PI import VariancePersist
from Element_PI import VariancePersistv1
from scipy.stats import randint, uniform, rv_continuous
from scipy.stats._distn_infrastructure import rv_continuous
from sklearn.model_selection import GridSearchCV, ShuffleSplit

# (X) Elemental Persistence Images

In [None]:
# Define PI hyperparameters
pixelsx = 55
pixelsy = 55
spread = 3.0
Max = 3.5

# Get list of XYZ files
files = glob.glob('XYZ_DIRECTORY/*.xyz')
samples = len(files)

# Initialize X
X = np.zeros((samples, pixelsx * pixelsy))

# Iterate over files
for idx, file in enumerate(files):
    X[idx, :] = VariancePersistv1(file, pixelx=pixelsx, pixely=pixelsy, myspread=spread, myspecs={"maxBD": Max, "minBD": -0.10}, showplot=False)

# (y) Enthalpy Values Corresponding to X

In [None]:
# Read the CSV file
csv_file_path = "CSV_FILENAME.csv"  # Replace with the actual path to the CSV file
df = pd.read_csv(csv_file_path)

# Get the list of XYZ files in the xyz directory
molecules_directory = "./XYZ_DIRECTORY"  # Replace with the actual path to the xyz directory
xyz_files = [file for file in os.listdir(molecules_directory) if file.endswith(".xyz")]

# Filter the DataFrame to include only the rows where XYZ file names are present in the list of XYZ files
filtered_df = df[df['Filename'].isin(xyz_files)]

# Reindex the DataFrame to match the order of XYZ files obtained from the directory
ordered_df = filtered_df.set_index('Filename').reindex(xyz_files).reset_index()

# Print the ordered DataFrame
print(ordered_df)

# Extract the "H" column values from the ordered DataFrame
y = ordered_df['H'].values

# Print the energy values
print(y)

# Kernel Ridge Regression with Hyperparameter Scanning

In [None]:
plt.rc('text', usetex=False)

def test_learner(X, y, model):
    kf = KFold(n_splits=3, random_state=0, shuffle=True)
    kf.get_n_splits(X)

    errorlist = []
    mdscr_list = []
    r2_train_list = []  # List to store R-squared values of the training set across folds
    r2_test_list = []   # List to store R-squared values of the testing set across folds

    for fold, (train_index, test_index) in enumerate(kf.split(X), 1):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        # Flatten the matrices to make them 2D arrays
        X_train_flat = np.array([matrix.flatten() for matrix in X_train])
        X_test_flat = np.array([matrix.flatten() for matrix in X_test])

        # Train the model on the entire training set
        model.fit(X_train_flat, y_train)

        # Calculate error on the entire testing set
        error = rmse(model.predict(X_test_flat), y_test)
        errorlist.append(error)

        # Calculate scores and errors for training set
        tr = model.score(X_train_flat, y_train)
        te = model.score(X_test_flat, y_test)

        # Store R-squared values for training and testing sets
        r2_train_list.append(tr)
        r2_test_list.append(te)

        # Print metrics for each fold
        print('Fold error {} for fold {}'.format(error, fold))
        print(tr, 'is the score for the training set')
        print(te, 'is the score for the testing set')
        print("Size of Training Set:", len(y_train))
        print("Size of Testing Set:", len(y_test))
        print()

        # Plotting for each fold
        plt.scatter(y_train.flatten(), model.predict(X_train_flat), color="b", label='Training Set')
        plt.scatter(y_test.flatten(), model.predict(X_test_flat), color="r", label='Testing Set')
        # Plot the diagonal line
        plt.plot([min(y_test.flatten()), max(y_test.flatten())], [min(y_test.flatten()), max(y_test.flatten())], '--', color='rebeccapurple')
        plt.legend(fontsize=12, loc="lower right")
        plt.ylabel('Predicted Enthalpy (kcal/mol)', fontsize=12)
        plt.xlabel('DFT Enthalpy (kcal/mol)', fontsize=12)
        plt.show()

        # Calculate R-squared score for the current fold
        fold_mdscr = (tr + te) / 2
        mdscr_list.append(fold_mdscr)

    # Print averages across all folds
    print('Final error {} +- {} '.format(np.average(errorlist), np.std(errorlist)))
    
    # Overall model score
    mdscr = np.mean(mdscr_list)
    print(mdscr, "is the score for the model")

    # Print mean of R-squared scores from each fold
    mean_mdscr = np.mean(mdscr_list)
    print(mean_mdscr, "is the mean of R-squared scores from each fold")

    # Print average R-squared values of training and testing sets across all folds
    avg_r2_train = np.mean(r2_train_list)
    avg_r2_test = np.mean(r2_test_list)
    print("Average R-squared on Training Set (across all folds):", avg_r2_train)
    print("Average R-squared on Testing Set (across all folds):", avg_r2_test)


def rmse(predictions, targets):
    return np.sqrt(((predictions - targets) ** 2).mean())

# ---------------------------------------------------------------
# Hyperparameter scan (alpha, gamma) with one-standard-error rule
# ---------------------------------------------------------------

def mean_test_r2_for_params(X, y, alpha, gamma, splits=None):
    """Compute mean test R^2 across provided CV splits for given alpha, gamma."""
    if splits is None:
        kf = KFold(n_splits=3, random_state=0, shuffle=True)
        splits = list(kf.split(X))
    r2_test_list = []
    for train_index, test_index in splits:
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        # Flatten to 2D arrays (same as in test_learner)
        X_train_flat = np.array([matrix.flatten() for matrix in X_train])
        X_test_flat = np.array([matrix.flatten() for matrix in X_test])

        model = KernelRidge(alpha=alpha, kernel='laplacian', gamma=gamma)
        model.fit(X_train_flat, y_train)
        te = model.score(X_test_flat, y_test)  # R^2 on test
        r2_test_list.append(te)
    return float(np.mean(r2_test_list)), float(np.std(r2_test_list))


def run_hyperparameter_scan(X, y, alpha_grid, gamma_grid, folds=3):
    """Grid search over alpha and gamma; returns (best_params, results)."""
    kf = KFold(n_splits=folds, random_state=0, shuffle=True)
    splits = list(kf.split(X)) 

    best_score = -np.inf
    best_params = None
    results = []  # collect all results: (alpha, gamma, mean_r2, std_r2)

    for alpha in alpha_grid:
        for gamma in gamma_grid:
            mean_r2, std_r2 = mean_test_r2_for_params(X, y, alpha, gamma, splits=splits)
            results.append((alpha, gamma, mean_r2, std_r2))
            print(f"alpha={alpha:.6g}, gamma={gamma:.6g} -> mean test R^2={mean_r2:.4f} Â± {std_r2:.4f}")
            if mean_r2 > best_score:
                best_score = mean_r2
                best_params = (alpha, gamma)

    print("\nBest-by-mean params (raw scan):")
    print(f"alpha={best_params[0]:.6g}, gamma={best_params[1]:.6g} with mean test R^2={best_score:.4f}")
    return best_params, results


def select_params_one_se(results, folds=3):
    """
    Apply one-standard-error rule.
    results: list of tuples (alpha, gamma, mean_r2, std_r2)
    folds: number of CV folds used to compute mean/std
    Returns (alpha, gamma) chosen by the one-standard-error rule,
    preferring larger alpha and smaller gamma (simpler models).
    """
    # Best mean score and its SE
    best = max(results, key=lambda r: r[2])
    best_mean, best_std = best[2], best[3]
    best_se = best_std / np.sqrt(folds)
    threshold = best_mean - best_se

    # Candidates within one SE of the best
    candidates = [r for r in results if r[2] >= threshold]

    # Choose "simplest": larger alpha, then smaller gamma
    candidates.sort(key=lambda r: (r[0], -r[1]), reverse=True)
    chosen = candidates[0]
    print(f"One-SE choice: alpha={chosen[0]:.6g}, gamma={chosen[1]:.6g} "
          f"(mean R^2={chosen[2]:.4f}, std={chosen[3]:.4f}; threshold={threshold:.4f})")
    return chosen[0], chosen[1]


# Example grids (adjust as needed)
alpha_grid = np.logspace(-4, 0, 50)  # 1e-4 to 1, log-spaced
gamma_grid = np.logspace(-4, 0, 50)  # 1e-4 to 1, log-spaced

# Run scan, then apply one-standard-error selection
_, results = run_hyperparameter_scan(X, np.array(y), alpha_grid, gamma_grid, folds=3)
alpha_1se, gamma_1se = select_params_one_se(results, folds=3)

# Final model using one-SE params
model = KernelRidge(alpha=alpha_1se, kernel='laplacian', gamma=gamma_1se)
test_learner(X, np.array(y), model)

# Make Predictions on Large Database

In [None]:
# Directory containing XYZ files for validation
xyz_directory = "DATABASE_DIRECTORY"  # Replace witH actual directory
xyz_files = [os.path.join(xyz_directory, file) for file in os.listdir(xyz_directory) if file.endswith(".xyz")]

# Define PI hyperparameters
pixelsx = 55
pixelsy = 55
spread = 3.0
Max = 3.5

# Initialize a dictionary to store filename and predicted energy pairs
filename_energy_dict = {}

# Initialize list to store feature vectors (optional)
X_list = []

# Iterate over xyz files
for idx, xyz_file in enumerate(xyz_files):
    xyz_file_name = os.path.basename(xyz_file)

    try:
        # Generate Persistence Image vector using VariancePersistv1
        pi_vector = VariancePersistv1(
            xyz_file,
            pixelx=pixelsx,
            pixely=pixelsy,
            myspread=spread,
            myspecs={"maxBD": Max, "minBD": -0.10},
            showplot=False
        )

        # Append to X_list (optional, for further analysis)
        X_list.append(pi_vector)

        # Prediction
        predicted_energy = model.predict(pi_vector.reshape(1, -1))[0]

        # Store filename and predicted energy
        filename_energy_dict[xyz_file_name] = predicted_energy

    except Exception as e:
        # Print the filename that caused any error and continue
        print(f"Error processing '{xyz_file_name}': {e}")
        continue

# Sort the dictionary by predicted energy values (ascending)
sorted_filename_energy = sorted(
    filename_energy_dict.items(),
    key=lambda x: x[1],
    reverse=False
)

# Save ALL predictions into a CSV file
csv_filename = "NAME_OF_CSV_WITH_PREDICTIONS.csv"
with open(csv_filename, mode="w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(["Filename", "Predicted_Energy"])  # Header for columns
    for filename, energy in sorted_filename_energy:
        writer.writerow([filename, energy])

print(f"\nAll predictions saved to '{csv_filename}'!")