In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import cross_validate
from sklearn.ensemble import RandomForestRegressor

from sklearn.metrics import mean_absolute_error

import random

## 1. Dataset prep

In [2]:
import pickle
with open('outputs/non_constant_columns.pkl', 'rb') as f:
    non_constant_columns = pickle.load(f)

In [None]:
source_energy_levels_df = pd.read_csv('outputs/processed_source_dataset.csv')
source_energy_levels_df.head()

In [None]:
source_df = pd.read_pickle('outputs/source_descriptors_processed.pkl')
source_df.head()

In [None]:
source_df = pd.concat((source_df, source_energy_levels_df[['HOMO_DFT']]), axis=1)
del source_energy_levels_df
source_df.head()

In [None]:
X = source_df[non_constant_columns]
y = source_df['HOMO_DFT']
X.shape, y.shape

In [7]:
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.33, random_state=0)

In [None]:
target_df = pd.read_pickle('outputs/target_descriptors_calculated_n_processed.pkl')
target_df = target_df[target_df['HOMO_UPS'].notna()]
target_df = target_df[target_df['Type'] != 'External Validation']
len(target_df)

In [9]:
n_dev_set = 10
dev_set_size = len(target_df)

n_test_set = 5
test_set_size = 5000

In [None]:
X_devs = []
y_devs = []
for i in range(n_dev_set):
    X_dev = X_train[i*dev_set_size:i*dev_set_size+dev_set_size]
    y_dev = y_train[i*dev_set_size:i*dev_set_size+dev_set_size]
    X_devs.append(X_dev)
    y_devs.append(y_dev)
    print(f'Dev set {i}, size {X_dev.shape}, label size {y_dev.shape}')

In [None]:
X_tests = []
y_tests = []
for i in range(n_test_set):
    X_test = X_val[i*test_set_size:i*test_set_size+test_set_size]
    y_test = y_val[i*test_set_size:i*test_set_size+test_set_size]
    X_tests.append(X_test)
    y_tests.append(y_test)
    print(f'Test set {i}, size {X_test.shape}, label size {y_test.shape}')

In [12]:
def calculate_mae(model, X_tests, y_tests, feature_indices=None):
    maes = []
    for i in range(5):
        if feature_indices is None:
            preds = model.predict(X_tests[i])
            mae = mean_absolute_error(y_tests[i], preds)
            maes.append(mae)
        else:
            preds = model.predict(X_tests[i][X_tests[i].columns[feature_indices]])
            mae = mean_absolute_error(y_tests[i], preds)
            maes.append(mae)

    return np.round(np.min(maes), 4), np.round(np.mean(maes), 4), np.round(np.max(maes), 4)

In [None]:
dev_results = []
test_results = []
diff = []
for i in range(n_dev_set):
    rf = RandomForestRegressor(random_state=0)
    
    cv_results = cross_validate(rf, X_devs[i], y_devs[i], scoring='neg_mean_absolute_error', return_estimator=True, n_jobs=-1)
    dev_results.append(np.mean(cv_results['test_score'])*-1)
    
    rf = RandomForestRegressor(random_state=0)
    rf.fit(X_devs[i], y_devs[i])

    min_mae, mean_mae, max_mae = calculate_mae(rf, X_tests, y_tests)
    
    test_results.append((min_mae, mean_mae, max_mae))

    diff.append(abs(mean_mae - np.mean(cv_results['test_score'])*-1))
    
df = pd.DataFrame({'Dev': dev_results, 'Test': test_results, 'Diff': diff})
df

In [None]:
print(f'Average difference between 5-fold CV error and test error is {np.mean(diff):.3f} eV')

In [None]:
from sklearn.model_selection import RepeatedKFold
results = []
for k in [3, 5, 10]:
    for n_rep in [3, 5, 10]:
        cv = RepeatedKFold(n_splits=k, n_repeats=n_rep, random_state=0)
        dev_results = []
        test_results = []
        diff = []
        for i in range(n_dev_set):
            rf = RandomForestRegressor(random_state=0)
            cv_results = cross_validate(rf, X_devs[i], y_devs[i], scoring='neg_mean_absolute_error', return_estimator=True, cv=cv, n_jobs=-1)
            dev_results.append(np.mean(cv_results['test_score'])*-1)
            
            rf = RandomForestRegressor(random_state=0)
            rf.fit(X_devs[i], y_devs[i])

            min_mae, mean_mae, max_mae = calculate_mae(rf, X_tests, y_tests)
            
            test_results.append((min_mae, mean_mae, max_mae))

            diff.append(abs(mean_mae - np.mean(cv_results['test_score'])*-1))
            
        results.append((np.mean(diff), k, n_rep))
        print(np.mean(diff))

In [None]:
results

## 2. Feature selection

In [None]:
from sklearn.feature_selection import RFE
from sklearn.model_selection import RepeatedKFold

from collections import defaultdict
dev_results = defaultdict(list)
test_results = defaultdict(list)

cv = RepeatedKFold(n_splits=10, n_repeats=5, random_state=0)

for i in range(n_dev_set):
    
    rf = RandomForestRegressor(random_state=0)
    selector = RFE(rf, n_features_to_select=1, step=1)
    selector = selector.fit(X_devs[i], y_devs[i])
    sorted_feature_indices = np.argsort(selector.ranking_)
    print(f'Feature selection {i} completed.')

    for n_features_to_select in range(1, dev_set_size+1):
        selected_feature_indices = sorted_feature_indices[:n_features_to_select]
        
        rf = RandomForestRegressor(random_state=0)
        cv_results = cross_validate(
            rf,
            X_devs[i][X_devs[i].columns[selected_feature_indices]], y_devs[i],
            scoring='neg_mean_absolute_error',
            return_estimator=True,
            cv=cv,
            n_jobs=-1)
        dev_mae = np.mean(cv_results['test_score'])*-1

        rf = RandomForestRegressor(random_state=0)
        rf.fit(X_devs[i][X_devs[i].columns[selected_feature_indices]], y_devs[i])
        _, test_mae, _ = calculate_mae(rf, X_tests, y_tests, selected_feature_indices)

        print(i, n_features_to_select, dev_mae, test_mae)
        
        dev_results[i].append(dev_mae)
        test_results[i].append(test_mae)

In [None]:
# Each row is a dev set
# Each columns are number of features selected, first column being a single feature
# Values are test set errors
test_results_df = pd.DataFrame(test_results).T
test_results_df

In [None]:
plt.figure(dpi=300)
plt.plot(np.arange(1, dev_set_size+1), np.mean(test_results_df, axis=0))
plt.xlabel('Number of features')
plt.ylabel('Average Test MAE (eV)')
plt.minorticks_on()
plt.grid(True, which='both', alpha=0.1)
plt.xlim(0.5, dev_set_size)

In [None]:
pd.DataFrame(np.mean(test_results_df, axis=0)).sort_values(by=0).head()

In [None]:
optimum_number_of_features = np.argmin(np.mean(test_results_df, axis=0)) + 1
optimum_number_of_features

In [None]:
# minimum error
np.mean(test_results_df[optimum_number_of_features-1])

## 3. Hyper-parameter tuning

In [None]:
import optuna
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import RepeatedKFold

cv = RepeatedKFold(n_splits=10, n_repeats=5, random_state=0)

def objective(trial):
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 10, 700),
        'max_depth': trial.suggest_int('max_depth', 2, 25),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
        'max_features': trial.suggest_float('max_features', 0.3, 1.0)
        }

    rf = RandomForestRegressor(random_state=0, **params)
    cv_results = cross_validate(
        rf,
        X_selected, y,
        scoring='neg_mean_absolute_error',
        return_estimator=True,
        cv=cv,
        n_jobs=-1)
    return np.mean(cv_results['test_score'])*-1

In [None]:
all_test_errors = []
untuned_test_error = []
all_cv_errors = []
for i in range(10):
    cv_errors = []
    print(f'Tuning dev set {i}')
    rf = RandomForestRegressor(random_state=0)
    selector = RFE(rf, n_features_to_select=optimum_number_of_features, step=1)
    selector = selector.fit(X_devs[i], y_devs[i])
    X_selected = selector.transform(X_devs[i])
    y = y_devs[i]
    print(X_selected.shape)
    print(y.shape)

    # Untuned CV error
    rf = RandomForestRegressor(random_state=0)
    cv_results = cross_validate(
        rf,
        X_selected, y,
        scoring='neg_mean_absolute_error',
        return_estimator=True,
        cv=cv,
        n_jobs=-1)
    cv_errors.append(np.mean(cv_results['test_score'])*-1)

    # Untuned test error
    rf = RandomForestRegressor(random_state=0)
    rf.fit(X_selected, y)
    # Calculate the corresponding test error
    maes = []
    for test_set_id in range(len(X_tests)):
        X_test_selected = selector.transform(X_tests[test_set_id])
        preds = rf.predict(X_test_selected)
        mae = mean_absolute_error(y_tests[test_set_id], preds)
        maes.append(mae)
    untuned_test_error.append(np.mean(maes))

    # Tuning
    study = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=0))
    study.optimize(objective, n_trials=50)

    # Only cv error is used/calculated during tuning
    # Now we will calculate the corresponding test error for each trial
    lowest_value_so_far = None
    best_params_so_far = None
    best_params = []
    test_errors = []
    for trial in study.get_trials():
        cv_errors.append(trial.value)
        if lowest_value_so_far is None or trial.value < lowest_value_so_far:
            best_params_so_far = trial.params
            lowest_value_so_far = trial.value
            # Train rf with best params so far
            rf = RandomForestRegressor(random_state=0, **best_params_so_far)
            rf.fit(X_selected, y)
            # Calculate the corresponding test error
            maes = []
            for test_set_id in range(len(X_tests)):
                X_test_selected = selector.transform(X_tests[test_set_id])
                preds = rf.predict(X_test_selected)
                mae = mean_absolute_error(y_tests[test_set_id], preds)
                maes.append(mae)
            test_error = np.mean(maes)
        best_params.append(best_params_so_far)
        test_errors.append(test_error)
    all_test_errors.append(test_errors)
    all_cv_errors.append(cv_errors)

In [None]:
pd.DataFrame(all_cv_errors)

In [None]:
tuning_results = pd.DataFrame(all_test_errors)
tuning_results.insert(0, "Untuned", untuned_test_error)
tuning_results

In [None]:
x_ticks = list(np.arange(0, 50+1, 5))

plt.figure(dpi=300)
plt.plot(list(np.mean(tuning_results, axis=0)), label='Test set error')
plt.plot(list(np.mean(all_cv_errors, axis=0)), label='CV error')
plt.xlabel('Number of trials')
plt.ylabel('Average Test MAE (eV)')
plt.minorticks_on()
plt.grid(True, which='both', alpha=0.1)

plt.xticks([0] + x_ticks[1:], ['Untuned'] + x_ticks[1:])
plt.xlim(-0.5, 50.5)

plt.legend()

In [None]:
# No need to add 1 here
# Because index 0 is untuned
optimum_number_of_trials = np.argmin(np.mean(tuning_results, axis=0))
optimum_number_of_trials

In [None]:
x_ticks = list(np.arange(0, 50+1, 5))

plt.figure(dpi=300)
plt.plot(list(np.mean(tuning_results, axis=0)), label='Test set error')
plt.xlabel('Number of trials')
plt.ylabel('Average Test MAE (eV)')
plt.minorticks_on()
plt.grid(True, which='both', alpha=0.1)

plt.xticks([0] + x_ticks[1:], ['Untuned'] + x_ticks[1:])
plt.xlim(-0.5, 50.5)

In [None]:
optimum_number_of_features, optimum_number_of_trials

In [31]:
import csv

# Open a CSV file for writing
with open('outputs/homo_optimum_features_and_trials.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow([optimum_number_of_features, optimum_number_of_trials])