In [None]:
import ast
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import KFold
import keras.backend as K
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

import pandas as pd
import numpy as np
from datetime import datetime
import os

# os.environ["CUDA_VISIBLE_DEVICES"] = "2"  

# gpus = tf.config.experimental.list_physical_devices('GPU')
# if gpus:
#   try:
#     tf.config.experimental.set_virtual_device_configuration(
#         gpus[0],
#         [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=7999)])
#   except RuntimeError as e:
#     print(e)
    
# tf.random.set_seed(1234)


# Conventional methods

In [None]:
# Load Dataset
file_path = "/home1/nhuynh2023/datasets/PDF_HEA_Gibbs/HEA_Dataset_with_embeddings.csv"
dataset = pd.read_csv(file_path)

# Parse the embedding column to convert from string to a list
dataset["embedding"] = dataset["embedding"].apply(ast.literal_eval)

# Remove non-PDF features and irrelevant columns
pdf_columns = [col for col in dataset.columns if col.startswith("g(r)_") or col.startswith("r_")]
numerical_features = dataset.drop(columns=["ID","Gibbs", "prompt", "n_tokens", 'active_site_1', 'active_site_2', 'Fe', 'Co', 'Ni', 'Cu', 'Zn'])

# Filter to numerical types (PDF features)
numerical_features = numerical_features.select_dtypes(include=[np.number])

# Debug: Check remaining columns
print("Remaining columns after dropping irrelevant ones:")
print(numerical_features.columns.tolist())

# Standardize numerical features
scaler = StandardScaler()
pdf_features_scaled = scaler.fit_transform(numerical_features)

# Reduce dimensionality with PCA
m = 50  # Number of PCA components
pca = PCA(n_components=m)
pdf_pca = pca.fit_transform(pdf_features_scaled)

# Target variable
y = dataset["Gibbs"].values

# Debug: Output shapes
print("Shape of pdf_pca:", pdf_pca.shape)
print("Shape of y:", y.shape)

# Split into train and test sets
X_idx = np.arange(len(y))
X_train_idx, X_test_idx = train_test_split(X_idx, test_size=0.2, random_state=42)

In [None]:
# Prepare train and test data for conventional models
X_train = pdf_pca[X_train_idx]
X_test = pdf_pca[X_test_idx]
y_train = y[X_train_idx]
y_test = y[X_test_idx]

# Define conventional models and hyperparameter grids
models = {
    "RandomForest": {
        "model": RandomForestRegressor(random_state=42),
        "param_grid": {
            "n_estimators": [50, 100],
            "max_depth": [5, 10, None],
            "min_samples_leaf": [1, 5]
        }
    },
    "GradientBoosting": {
        "model": GradientBoostingRegressor(random_state=42),
        "param_grid": {
            "n_estimators": [50, 100],
            "learning_rate": [0.05, 0.1],
            "max_depth": [3, 5],
            "min_samples_leaf": [1, 5]
        }
    },
    "SVR": {
        "model": SVR(),
        "param_grid": {
            "C": [0.1, 1.0, 10.0],
            "epsilon": [0.01, 0.1],
            "kernel": ["rbf"]
        }
    },
    "LinearRegression": {
        "model": LinearRegression(),
        "param_grid": {}  # No hyperparameters to tune
    }
}

# Function to evaluate model performance
def evaluate_model(y_true, y_pred, model_name):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    print(f"{model_name} Performance:")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"R²: {r2:.4f}")
    return {"MAE": mae, "RMSE": rmse, "R²": r2}

# Train and evaluate each model
results = {}
for model_name, config in models.items():
    print(f"\nTraining {model_name}...")
    model = config["model"]
    param_grid = config["param_grid"]
    
    if param_grid:  # Perform grid search for models with hyperparameters
        grid_search = GridSearchCV(
            model, param_grid, cv=5, scoring="neg_mean_squared_error", n_jobs=-1
        )
        grid_search.fit(X_train, y_train)
        best_model = grid_search.best_estimator_
        print(f"Best parameters for {model_name}: {grid_search.best_params_}")
    else:  # Fit directly for Linear Regression
        best_model = model
        best_model.fit(X_train, y_train)
    
    # Predict on test set
    y_pred = best_model.predict(X_test)
    
    # Evaluate and store results
    results[model_name] = evaluate_model(y_test, y_pred, model_name)

# Save results to a DataFrame for comparison
results_df = pd.DataFrame(results).T
print("\nPerformance Comparison:")
print(results_df)

# Optionally, save results to a CSV file
results_df.to_csv("/home1/nhuynh2023/Projects/PDF_HEA_Gibbs/conventional_methods/conventional_models_performance.csv")

## PDF and LLM embeddings

In [None]:
# Load Dataset
file_path = "/home1/nhuynh2023/datasets/PDF_HEA_Gibbs/HEA_Dataset_with_embeddings.csv"
dataset = pd.read_csv(file_path)

# Parse the embedding column to convert from string to a list
dataset["embedding"] = dataset["embedding"].apply(ast.literal_eval)

# Remove non-PDF features
pdf_columns = [col for col in dataset.columns if col.startswith("g(r)_") or col.startswith("r_")]
numerical_features = dataset.drop(columns=["ID", "Gibbs", "prompt", "n_tokens", 'active_site_1', 'active_site_2', 'Fe', 'Co', 'Ni', 'Cu', 'Zn', 'embedding'])

# Filter to numerical types (should be PDF features)
numerical_features = numerical_features.select_dtypes(include=[np.number])

# Debug: Check remaining columns
print("Remaining columns after dropping irrelevant ones:")
print(numerical_features.columns.tolist())

# Standardize numerical features
scaler = StandardScaler()
pdf_features_scaled = scaler.fit_transform(numerical_features)

# Reduce dimensionality with PCA
m = 50  # Number of PCA components
pca = PCA(n_components=m)
pdf_pca = pca.fit_transform(pdf_features_scaled)

# Convert embeddings to array
embeddings_array = np.vstack(dataset["embedding"].values)

# Target variable
y = dataset["Gibbs"].values

# Debug: Output shapes
print("Shape of pdf_pca:", pdf_pca.shape)
print("Shape of embeddings:", embeddings_array.shape)
print("Shape of y:", y.shape)

In [None]:
# Split into train and test sets
X_idx = np.arange(len(y))
X_train_idx, X_test_idx = train_test_split(X_idx, test_size=0.2, random_state=42)

# Custom Dataset
class HEAGibbsDataset(Dataset):
    def __init__(self, indices):
        self.pdf_pca = torch.tensor(pdf_pca[indices], dtype=torch.float32)
        self.embeddings = torch.tensor(embeddings_array[indices], dtype=torch.float32)
        self.y = torch.tensor(y[indices], dtype=torch.float32)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.pdf_pca[idx], self.embeddings[idx], self.y[idx]

# Create datasets and loaders
train_dataset = HEAGibbsDataset(X_train_idx)
test_dataset = HEAGibbsDataset(X_test_idx)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)



In [None]:
# Define Transformer Model
class TransformerRegressor(nn.Module):
    def __init__(self, m, d, d_model=64, nhead=8, num_layers=2):
        super().__init__()
        # Embed PCA components (scalars) to d_model
        self.pca_embed = nn.Linear(1, d_model)
        # Project embedding vector to d_model
        self.embed_proj = nn.Linear(d, d_model)
        # Positional encoding
        self.pos_encoder = nn.Parameter(torch.zeros(1, m + 1, d_model))
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        # Regression head
        self.regression_head = nn.Linear(d_model, 1)

    def forward(self, pdf_pca, embeddings):
        batch_size = pdf_pca.size(0)
        # Embed PCA components
        pca_tokens = self.pca_embed(pdf_pca.unsqueeze(-1))  # (batch_size, m, d_model)
        # Project embeddings
        embed_token = self.embed_proj(embeddings).unsqueeze(1)  # (batch_size, 1, d_model)
        # Concatenate: [GPT embedding, PCA tokens]
        tokens = torch.cat([embed_token, pca_tokens], dim=1)  # (batch_size, m+1, d_model)
        # Add positional encodings
        tokens = tokens + self.pos_encoder
        # Pass through transformer
        output = self.transformer_encoder(tokens)  # (batch_size, m+1, d_model)
        # Use first token (GPT embedding) for prediction
        cls_output = output[:, 0, :]  # (batch_size, d_model)
        y_pred = self.regression_head(cls_output).squeeze(-1)  # (batch_size,)
        return y_pred

# Instantiate model
d = embeddings_array.shape[1]  # Embedding dimension
model = TransformerRegressor(m=m, d=d, d_model=64, nhead=8, num_layers=2)

# Optimizer and criterion
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()



In [None]:
# Training Loop with Saving the Best Model
num_epochs = 200
best_test_loss = float('inf')
best_model_state = None

for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    for pdf_pca_batch, embeddings_batch, y_batch in train_loader:
        optimizer.zero_grad()
        y_pred = model(pdf_pca_batch, embeddings_batch)
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)

    # Evaluation
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for pdf_pca_batch, embeddings_batch, y_batch in test_loader:
            y_pred = model(pdf_pca_batch, embeddings_batch)
            loss = criterion(y_pred, y_batch)
            test_loss += loss.item()
        test_loss /= len(test_loader)

    # Save best model
    if test_loss < best_test_loss:
        best_test_loss = test_loss
        best_model_state = model.state_dict()
        print(f"Epoch {epoch+1}: New best test loss: {best_test_loss:.4f}")

    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")

# Save the best model
if best_model_state is not None:
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = f'/home1/nhuynh2023/Projects/PDF_HEA_Gibbs/models/PDF_GPT/model_{timestamp}_loss_{best_test_loss:.4f}.pth'
    torch.save(best_model_state, filename)
    print(f"Best model saved to {filename} with Test Loss: {best_test_loss:.4f}")
else:
    print("No model saved (no improvement found).")

In [None]:

model.load_state_dict(torch.load("/home1/nhuynh2023/Projects/PDF_HEA_Gibbs/models/PDF_GPT/model_20250226_141802_loss_0.0014.pth"))
model.eval()

# Function to collect predictions and actual values from the model
def evaluate_model(model, data_loader):
    model.eval()  # Set the model to evaluation mode
    predictions = []  # List to store predicted values
    actuals = []      # List to store actual values
    with torch.no_grad():  # Disable gradient computation for efficiency
        for pdf_pca_batch, gpt_embeddings_batch, y_batch in data_loader:
            # Forward pass through the model
            y_pred = model(pdf_pca_batch, gpt_embeddings_batch)
            # Move predictions and actuals to CPU and convert to NumPy
            predictions.append(y_pred.cpu().numpy())
            actuals.append(y_batch.cpu().numpy())
    # Combine all batches into single arrays
    predictions = np.concatenate(predictions)
    actuals = np.concatenate(actuals)
    return predictions, actuals

# Get predictions and actual values for training and test sets
train_predictions, train_actuals = evaluate_model(model, train_loader)
test_predictions, test_actuals = evaluate_model(model, test_loader)

# Function to calculate MAE, RMSE, and R²
def calculate_metrics(predictions, actuals):
    mae = mean_absolute_error(actuals, predictions)  # Mean Absolute Error
    mse = mean_squared_error(actuals, predictions)   # Mean Squared Error
    rmse = np.sqrt(mse)                              # Root Mean Squared Error
    r2 = r2_score(actuals, predictions)              # R-squared
    return mae, rmse, r2

# Calculate metrics for training and test sets
train_mae, train_rmse, train_r2 = calculate_metrics(train_predictions, train_actuals)
test_mae, test_rmse, test_r2 = calculate_metrics(test_predictions, test_actuals)

# Print the results
print(f"Training MAE: {train_mae:.4f}, RMSE: {train_rmse:.4f}, R²: {train_r2:.4f}")
print(f"Test MAE: {test_mae:.4f}, RMSE: {test_rmse:.4f}, R²: {test_r2:.4f}")