## 2. Mark Global Test Subset

In [None]:
import numpy as np
import pandas as pd
from rdkit import Chem, DataStructs
from rdkit.Chem import rdFingerprintGenerator
from rdkit.ML.Cluster import Butina
import os
from sklearn.feature_selection import mutual_info_regression

### Read Data

In [183]:
filename = "0_raw_data_sets/featurized_imputed_data.csv"
df = pd.read_csv(filename, index_col=0)

In [184]:
df = df[df['BASE_Category'] != 'misc']
df["dH (kJ/mol)"] = df["dH (kJ/mol)"] / 4.184
df["dS (J/mol/K)"] = df["dS (J/mol/K)"] / 4.184

df = df.rename(columns={'dH (kJ/mol)': 'dH (kcal/mol)', 'dS (J/mol/K)': 'dS (cal/mol/K)'})
df = df.round(3)

# Making sure that everything is unique
tags = []
for _, row in df.iterrows():
    tag = row['BASE_State'] + row['BASE_Category'] + row['Canonical SMILES'] + str(row['Solvent'])
    tags.append(tag)
assert(len(tags) == len(set(tags)))

In [185]:
# print names of columns in combined_df that have any missing values
missing_cols = df.columns[df.isnull().any()].tolist()
print("Columns with missing values:")
for col in missing_cols:
    print(col)

Columns with missing values:
Solvent
dH (kcal/mol)
dS (cal/mol/K)
Solvent_SMILES
Solvent_SMILES_2
SOLV_PARAM_s_g
SOLV_PARAM_b_g
SOLV_PARAM_e_g
SOLV_PARAM_l_g
SOLV_PARAM_a_g
SOLV_PARAM_c_g
SOLV_PARAM_visc at 298 K (cP)
SOLV_PARAM_dielectric constant


### Determine Global Test Set Based on Butina Clustering/Phase Stratification

In [None]:
def generate_global_test(df, test_fraction=0.10, phase_col='BASE_Monomer_State', smiles_col='Canonical SMILES', h_col='dH (kcal/mol)', s_col='dS (cal/mol/K)', 
                         radius=2, nBits=2048, seed=42, n_h_bins=5, n_s_bins=5):
    rng = np.random.RandomState(seed)
    df = df.copy()
    df['Global_Test'] = False
    
    # Filter molecules with both H and S (negative)
    df['Tc C'] = df['dH (kcal/mol)']*1000/df['dS (cal/mol/K)']-273
    eligible_idx = df.index[(df[h_col].notna()) & (df[s_col].notna()) & (df['Tc C'] < 1000) & (df['Tc C'] > -500)]
    eligible_df = df.loc[eligible_idx]
    
    # Create bins for H and S
    eligible_df = eligible_df.copy()
    eligible_df['H_bin'] = pd.qcut(eligible_df[h_col], q=n_h_bins, labels=False, duplicates='drop')
    eligible_df['S_bin'] = pd.qcut(eligible_df[s_col], q=n_s_bins, labels=False, duplicates='drop')
    eligible_df['HS_bin'] = eligible_df['H_bin'].astype(str) + '_' + eligible_df['S_bin'].astype(str)
    
    # Generate fingerprints
    mols = [Chem.MolFromSmiles(s) for s in eligible_df[smiles_col]]
    gen = rdFingerprintGenerator.GetMorganGenerator(radius=radius, fpSize=nBits)
    fps = [gen.GetFingerprint(m) for m in mols]
    n = len(fps)
    
    dists = []
    for i in range(1, n):
        sims = DataStructs.BulkTanimotoSimilarity(fps[i], fps[:i])
        dists.extend([1 - x for x in sims])
    
    # Cluster using Butina
    clusters = Butina.ClusterData(dists, n, 0.2, isDistData=True)
    cluster_map = {i: [eligible_df.index[j] for j in cluster] for i, cluster in enumerate(clusters)}
    
    # Compute counts
    phase_values = eligible_df[phase_col].unique()
    h_bin_values = sorted(eligible_df['H_bin'].dropna().unique())
    s_bin_values = sorted(eligible_df['S_bin'].dropna().unique())
    
    h_bin_counts = eligible_df['H_bin'].value_counts().to_dict()
    s_bin_counts = eligible_df['S_bin'].value_counts().to_dict()
    
    target_test_count = int(len(eligible_df) * test_fraction)
    
    # Iteratively select clusters
    cluster_ids = list(cluster_map.keys())
    rng.shuffle(cluster_ids)
    
    test_counts_h = {b: 0 for b in h_bin_values}
    test_counts_s = {b: 0 for b in s_bin_values}
    total_test = 0
    selected_idxs = []
    
    for cid in cluster_ids:
        idxs = cluster_map[cid]
        cluster_h_bins = eligible_df.loc[idxs, 'H_bin']
        cluster_s_bins = eligible_df.loc[idxs, 'S_bin']
        add_cluster = True
        
        for h_bin in cluster_h_bins.dropna().unique():
            n_h_in_cluster = (cluster_h_bins == h_bin).sum()
            n_h_total = h_bin_counts.get(h_bin, 1)
            if test_counts_h[h_bin] + n_h_in_cluster > 2.0 * test_fraction * n_h_total:
                add_cluster = False
                break
        
        if add_cluster:
            for s_bin in cluster_s_bins.dropna().unique():
                n_s_in_cluster = (cluster_s_bins == s_bin).sum()
                n_s_total = s_bin_counts.get(s_bin, 1)
                if test_counts_s[s_bin] + n_s_in_cluster > 2.0 * test_fraction * n_s_total:
                    add_cluster = False
                    break
        
        if add_cluster and total_test + len(idxs) <= target_test_count:
            df.loc[idxs, 'Global_Test'] = True
            total_test += len(idxs)
            selected_idxs.extend(idxs)
            
            for h_bin in cluster_h_bins.dropna().unique():
                test_counts_h[h_bin] += (cluster_h_bins == h_bin).sum()
            for s_bin in cluster_s_bins.dropna().unique():
                test_counts_s[s_bin] += (cluster_s_bins == s_bin).sum()
        
        if total_test >= target_test_count:
            break
    
    print(f"\nTotal eligible molecules: {len(eligible_df)}")
    print(f"Global test set size: {df['Global_Test'].sum()}")
    print(f"Global test fraction: {df['Global_Test'].sum() / len(eligible_df):.2%}")
    
    return df.drop(columns=['Tc C'])

In [188]:
df = generate_global_test(df, test_fraction=0.10)


Total eligible molecules: 291
Global test set size: 29
Global test fraction: 9.97%


In [189]:
df.to_csv('0_raw_data_sets/2_global_test_flagged_data.csv')

### Feature selection and data splitting

In [194]:
# Load data
filename = "0_raw_data_sets/2_global_test_flagged_data.csv"
df_orig = pd.read_csv(filename, index_col=0)


output_dir = '2_split_datasets'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

df_split_dict = {
    "enthalpy": df_orig.dropna(subset=['dH (kcal/mol)']).drop(columns=[col for col in df_orig.columns if "dS (" in col]),
    "entropy": df_orig.dropna(subset=['dS (cal/mol/K)']).drop(columns=[col for col in df_orig.columns if "dH (" in col]),
    "has_both": df_orig.dropna(subset=['dH (kcal/mol)', 'dS (cal/mol/K)']),
}

In [195]:
# Calculate MI score and reduce and save datasets
all_top_features = []

for prop, df in df_split_dict.items():
    monomer_states_for_splitting = df['BASE_Monomer_State']
    polymn_cat = df['BASE_Category']
    smiles_for_splitting = df['Canonical SMILES']
    one_hot_encoding = pd.get_dummies(df[['BASE_Category', 'BASE_Monomer_State']])
    global_test_set_flag = df['Global_Test']

    if "has_both" not in prop:
        # Mutual Information-based feature reduction
        target = df.iloc[:, 1]
        features = df.iloc[:, 19:-1]

        mutual_info = mutual_info_regression(features, target, random_state=42)
        mi_df = pd.DataFrame({'Feature': features.columns, 'MI_Score': mutual_info}).sort_values(by='MI_Score', ascending=False)

        top_k_features = 80
        top_features = mi_df.head(top_k_features)['Feature'].tolist()
        all_top_features += top_features

        reduced_features = features[top_features]
        reduced_df = pd.concat([target, monomer_states_for_splitting, polymn_cat, smiles_for_splitting, global_test_set_flag, one_hot_encoding, reduced_features], axis=1)
        reduced_df.to_csv(os.path.join(output_dir, f"{prop}_{top_k_features}_MI_reduced.csv"), index=True)
    else:
        targets = df.iloc[:, 1:3]
        features = df.iloc[:, 20:-1]

        targets['Tc (C)'] = targets['dH (kcal/mol)']*1000/targets['dS (cal/mol/K)'] - 273.15 # C
        targets = targets.round(3)
    
        merged_top_features = list(set(all_top_features))
        reduced_features = features[merged_top_features]
        reduced_df = pd.concat([targets['Tc (C)'], monomer_states_for_splitting, polymn_cat, smiles_for_splitting, global_test_set_flag, one_hot_encoding, reduced_features], axis=1)
        reduced_df.to_csv(os.path.join(output_dir, f"{prop}_{len(merged_top_features)}_MI_reduced.csv"), index=True)