<a href="https://colab.research.google.com/github/bopeng-sue/Optimal-Biofluid-Matrices-for-Human-Exposome-Biomonitoring/blob/main/hyperparameter_tf_nn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import numpy as np
import pandas as pd
from rdkit import Chem, RDLogger
from rdkit.Chem import AllChem, MACCSkeys, Descriptors
from rdkit.ML.Descriptors import MoleculeDescriptors
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import (
    precision_score,
    recall_score,
    f1_score,
    balanced_accuracy_score,
    roc_auc_score,
    classification_report,
    roc_curve,
    precision_recall_curve,
    auc
)
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tqdm import tqdm
from pathlib import Path
from imblearn.over_sampling import SMOTE
import warnings
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials, space_eval
import matplotlib.pyplot as plt
import seaborn as sns
import joblib

warnings.filterwarnings('ignore')
RDLogger.DisableLog('rdApp.*')

In [5]:
print("GPUs available:", tf.config.list_physical_devices('GPU'))

GPUs available: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [6]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU is configured for use.")
    except RuntimeError as e:
        print(f"GPU configuration error: {e}")


GPU is configured for use.


In [7]:
df = pd.read_excel('dataset_1211.xlsx')
df.head(2)

Unnamed: 0.1,Unnamed: 0,Biomonitor Compound PubChem CID,Blood study numbers,Urine study numbers,Tendency for the selection of blood samples,Biospecimen,CAS Number,Name,SMILES,hhlb(hours),...,JGI10,JGT,VE1_D,VE3_D,VR1_D,VR2_D,SRW5,AMW,WTPT-3,XLogP
0,0,70,0,2,0.0,Urine,816-66-0,4-Methyl-2-oxovaleric acid,CC(C)CC(=O)C(=O)O,0.61,...,0.0,0.601944,0.046505,-2.761367,35.180772,3.908975,0.0,6.845421,7.15891,0.904
1,1,89,0,1,0.0,Urine,484-78-6,Hydroxykynurenine,C1=CC(=C(C(=C1)O)N)C(=O)CC(C(=O)O)N,0.43,...,0.0,0.556317,0.130944,-3.25278,192.246721,12.01542,0.0,8.002847,14.772289,-2.464


# feature function

In [8]:
def padel_descriptor(df: pd.DataFrame) -> np.ndarray:
    """
    Extracts Padel descriptors from a given DataFrame starting from column index 16
    to the end. It replaces NaN values with 0 and infinite values with 1e10.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame containing feature columns.

    Returns
    -------
    np.ndarray
        A NumPy array of transformed feature values.
    """
    # Slice columns from index 16 to the end
    X = df.iloc[:, 16:].values

    # Replace NaNs with 0
    X = np.nan_to_num(X, nan=0.0)

    # Replace infinities with a large finite value
    X[np.isinf(X)] = 1e10

    return X

In [9]:
def maccs_descriptor(smiles_list):
    """
    Generates MACCS fingerprints for a list of SMILES strings and returns them as a DataFrame.

    Parameters
    ----------
    smiles_list : list of str
        A list of SMILES strings for which MACCS keys will be generated.

    Returns
    -------
    pd.DataFrame
        A DataFrame of MACCS bit values (0 or 1) with column names 'MACCS_1' through 'MACCS_167'.
    """
    # Generate MACCS fingerprints and convert them to a list of bits
    maccs_fingerprints = [
        list(MACCSkeys.GenMACCSKeys(Chem.MolFromSmiles(smiles)).ToBitString())
        for smiles in smiles_list
    ]

    # Convert the list of bits to a DataFrame
    # MACCSkeys are 167 bits long, hence 167 columns
    X = pd.DataFrame(maccs_fingerprints, columns=[f'MACCS_{i}' for i in range(1, 168)])

    # Convert strings to integers (0 and 1)
    X = X.astype(int)

    return X

In [10]:
def ecfp_descriptor(smiles_list, radius=2, n_bits=1024):
    """
    Generates ECFP (Extended-Connectivity Fingerprints) for a list of SMILES strings
    and returns them as a NumPy array.

    Parameters
    ----------
    smiles_list : list of str
        A list of SMILES strings for which ECFP fingerprints will be generated.
    radius : int, optional (default=2)
        The fingerprint radius to use. For ECFP4, radius=2 is standard.
    n_bits : int, optional (default=1024)
        Length of the bit vector.

    Returns
    -------
    np.ndarray
        A 2D NumPy array where each row corresponds to the ECFP bit vector of the input SMILES.
    """
    def calculate_ecfp(smiles, radius=radius, n_bits=n_bits):
        mol = Chem.MolFromSmiles(smiles)
        if mol is not None:
            fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius=radius, nBits=n_bits, useFeatures=False)
            return np.array(fp)
        else:
            return np.zeros(n_bits, dtype=int)

    # Apply ECFP calculation to the input list of SMILES
    ecfp_fingerprints = np.array([calculate_ecfp(s) for s in smiles_list])
    return ecfp_fingerprints

In [11]:
def fcfp_descriptor(smiles_list, radius=2, n_bits=1024):
    """
    Generates FCFP (Feature-based Circular Fingerprints) for a list of SMILES strings
    and returns them as a NumPy array.

    Parameters
    ----------
    smiles_list : list of str
        A list of SMILES strings for which FCFP fingerprints will be generated.
    radius : int, optional (default=2)
        The fingerprint radius to use.
    n_bits : int, optional (default=1024)
        Length of the bit vector.

    Returns
    -------
    np.ndarray
        A 2D NumPy array where each row corresponds to the FCFP bit vector of the input SMILES.
    """
    def calculate_fcfp(smiles, radius=radius, n_bits=n_bits):
        mol = Chem.MolFromSmiles(smiles)
        if mol is not None:
            fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius=radius, nBits=n_bits, useFeatures=True)
            return np.array(fp)
        else:
            return np.zeros(n_bits, dtype=int)

    # Apply FCFP calculation to the input list of SMILES
    fcfp_fingerprints = np.array([calculate_fcfp(s) for s in smiles_list])
    return fcfp_fingerprints

In [12]:
def rdkit_descriptors_to_X(df: pd.DataFrame, smiles_col='SMILES', correlation_threshold=0.95) -> np.ndarray:
    """
    Compute RDKit molecular descriptors for each SMILES in the given DataFrame,
    remove descriptors with zero variance and highly correlated descriptors,
    and return the final feature matrix X.

    Parameters
    ----------
    df : pd.DataFrame
        A DataFrame containing at least a 'SMILES' column.
    smiles_col : str, optional
        The name of the column containing SMILES strings.
    correlation_threshold : float, optional
        Threshold above which descriptors are considered highly correlated and removed.

    Returns
    -------
    np.ndarray
        A 2D NumPy array containing the filtered RDKit descriptors.
    """
    # Convert SMILES to RDKit Mol objects
    mol_list = [Chem.MolFromSmiles(s) for s in df[smiles_col]]

    # Define the descriptor names
    descriptor_names = [desc_name[0] for desc_name in Descriptors._descList]

    # Create a descriptor calculator
    calculator = MoleculeDescriptors.MolecularDescriptorCalculator(descriptor_names)

    # Calculate descriptors for each molecule, if mol is None use NaNs
    descriptors_list = [
        calculator.CalcDescriptors(mol) if mol is not None else [np.nan]*len(descriptor_names)
        for mol in mol_list
    ]

    # Convert the list of descriptors to a DataFrame
    descriptors_df = pd.DataFrame(descriptors_list, columns=descriptor_names)

    # Replace NaNs with 0
    descriptors_df = descriptors_df.fillna(0)

    # Step 1: Remove descriptors with zero variance
    variance = descriptors_df.var()
    zero_variance_columns = variance[variance == 0].index
    descriptors_df = descriptors_df.drop(columns=zero_variance_columns)

    # Step 2: Remove descriptors with correlation exceeding the threshold
    corr = descriptors_df.corr().abs()
    to_drop = []

    for i, col1 in enumerate(corr.columns):
        for col2 in corr.columns[i+1:]:
            if corr.loc[col1, col2] > correlation_threshold:
                to_drop.append(col2)

    # Remove duplicates if any
    to_drop = list(set(to_drop))

    # Drop the highly correlated descriptors
    descriptors_df = descriptors_df.drop(columns=to_drop, errors='ignore')

    # Convert the final descriptors DataFrame to a NumPy array
    X = descriptors_df.values

    return X

# Hyperparameter_NN

Descriptor Processor

In [13]:
def compute_descriptors(df: pd.DataFrame, descriptor_type: str, smiles_col: str='SMILES') -> np.ndarray:
    """
    Computes the specified molecular descriptor for the given DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame containing at least the 'SMILES' column.
    descriptor_type : str
        Type of descriptor to compute. One of ['padel', 'maccs', 'ecfp', 'fcfp', 'rdkit'].
    smiles_col : str, optional
        Name of the column containing SMILES strings.

    Returns
    -------
    np.ndarray
        Feature matrix as a NumPy array.
    """
    if descriptor_type.lower() == 'padel':
        X = padel_descriptor(df)
    elif descriptor_type.lower() == 'maccs':
        X = maccs_descriptor(df[smiles_col].tolist()).values
    elif descriptor_type.lower() == 'ecfp':
        X = ecfp_descriptor(df[smiles_col].tolist())
    elif descriptor_type.lower() == 'fcfp':
        X = fcfp_descriptor(df[smiles_col].tolist())
    elif descriptor_type.lower() == 'rdkit':
        X = rdkit_descriptors_to_X(df, smiles_col=smiles_col, correlation_threshold=0.95)
    else:
        raise ValueError("Invalid descriptor_method. Choose from 'padel', 'maccs', 'fcfp', 'ecfp', 'rdkit'.")
    return X

Machine Learning Pipeline

In [14]:
def run_ml_pipeline(
    X: np.ndarray,
    y: np.ndarray,
    descriptor_type: str,
    model_output_path: str='models/',
    max_evals: int=2000,
    random_state: int=42
) -> dict:
    """
    Runs the machine learning pipeline for a given descriptor type using TensorFlow,
    trains the best neural network model, evaluates it, and returns performance metrics.

    Parameters
    ----------
    X : np.ndarray
        Feature matrix.
    y : np.ndarray
        Target vector.
    descriptor_type : str
        Descriptor type being processed.
    model_output_path : str, optional
        Directory path to save the models. Defaults to 'models/'.
    max_evals : int, optional
        Maximum number of evaluations for hyperparameter optimization. Defaults to 50.
    random_state : int, optional
        Random state for reproducibility. Defaults to 42.

    Returns
    -------
    dict
        A dictionary containing performance metrics.
    """
    # Create output directory if it doesn't exist
    Path(model_output_path).mkdir(parents=True, exist_ok=True)

    # ---------------------------
    # 1. Data Splitting
    # ---------------------------

    # Initialize the first split: train + temp (validation + test)
    sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.4, random_state=random_state)
    for train_index, temp_index in sss1.split(X, y):
        X_train, X_temp = X[train_index], X[temp_index]
        y_train, y_temp = y[train_index], y[temp_index]

    # Initialize the second split: validation + test from temp
    sss2 = StratifiedShuffleSplit(n_splits=1, test_size=2/3, random_state=random_state)  # 1/3 validation, 2/3 test
    for val_index, test_index in sss2.split(X_temp, y_temp):
        X_val, X_test = X_temp[val_index], X_temp[test_index]
        y_val, y_test = y_temp[val_index], y_temp[test_index]

    print(f"Dataset sizes for {descriptor_type}:")
    print(f"Training set: X_train: {X_train.shape}, y_train: {y_train.shape}")
    print(f"Validation set: X_val: {X_val.shape}, y_val: {y_val.shape}")
    print(f"Test set: X_test: {X_test.shape}, y_test: {y_test.shape}")

    # ---------------------------
    # 2. Apply SMOTE
    # ---------------------------

    smote = SMOTE(random_state=random_state)
    X_train_res, y_train_res = smote.fit_resample(X_train, y_train)

    print(f"After SMOTE, training set sizes for {descriptor_type}:")
    print(f"X_train_res: {X_train_res.shape}, y_train_res: {y_train_res.shape}")

    # ---------------------------
    # 3. Hyperparameter Optimization
    # ---------------------------

    def objective(space):
        model = Sequential([
            Dense(int(space['units1']), activation='relu', input_shape=(X_train_res.shape[1],)),
            Dropout(space['dropout1']),
            Dense(int(space['units2']), activation='relu'),
            Dropout(space['dropout2']),
            Dense(1, activation='sigmoid')
        ])

        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=space['learning_rate']),
                      loss='binary_crossentropy', metrics=['accuracy'])

        early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

        history = model.fit(
            X_train_res, y_train_res,
            validation_data=(X_val, y_val),
            epochs=50,
            batch_size=int(space['batch_size']),
            callbacks=[early_stopping],
            verbose=0
        )

        val_loss = min(history.history['val_loss'])
        return {'loss': val_loss, 'status': STATUS_OK, 'model': model}

    # Define the search space
    space = {
        'units1': hp.quniform('units1', 64, 256, 16),
        'dropout1': hp.uniform('dropout1', 0.1, 0.5),
        'units2': hp.quniform('units2', 32, 128, 16),
        'dropout2': hp.uniform('dropout2', 0.1, 0.5),
        'batch_size': hp.quniform('batch_size', 16, 64, 8),
        'learning_rate': hp.loguniform('learning_rate', np.log(0.0001), np.log(0.01))
    }

    trials = Trials()
    best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=max_evals, trials=trials, rstate=np.random.default_rng(random_state))

    # Retrieve the best model
    best_model = None
    for trial in trials.trials:
        if trial['result']['status'] == STATUS_OK:
            best_model = trial['result']['model']
            break

    # ---------------------------
    # 4. Evaluation on Test Set
    # ---------------------------

    y_pred_probs = best_model.predict(X_test).flatten()
    y_pred = (y_pred_probs > 0.5).astype(int)

    # Calculate metrics
    precision = precision_score(y_test, y_pred, average='binary')
    recall = recall_score(y_test, y_pred, average='binary')
    f1 = f1_score(y_test, y_pred, average='binary')
    balanced_acc = balanced_accuracy_score(y_test, y_pred)
    auc_score = roc_auc_score(y_test, y_pred_probs)

    print(f"\nNeural Network Model Performance on Test Set ({descriptor_type}):")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1-Score:  {f1:.4f}")
    print(f"Balanced Accuracy: {balanced_acc:.4f}")
    print(f"AUC-ROC:   {auc_score:.4f}")

    # Detailed classification report
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))

    # ---------------------------
    # 5. Save the TensorFlow Model
    # ---------------------------

    model_path = f'{model_output_path}/best_nn_model_{descriptor_type}.h5'
    best_model.save(model_path)
    print(f"Neural Network model saved to {model_path}\n")

    # ---------------------------
    # 6. Compile Performance Metrics
    # ---------------------------

    results_nn = {
        'Model Type': 'Neural Network',
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'Balanced Accuracy': balanced_acc,
        'AUC-ROC': auc_score,
        'Model Path': model_path
    }

    return results_nn



In [15]:
def run_descriptor_model_screening(
    df: pd.DataFrame,
    target_col: str,
    smiles_col: str='SMILES',
    descriptor_method: str='padel',
    model_output_path: str='models/',
    max_evals: int=1000,
    random_state: int=42
) -> dict:
    """
    Runs the descriptor model screening for a given descriptor method.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame containing the data.
    target_col : str
        The name of the target column.
    smiles_col : str, optional
        The name of the SMILES column. Defaults to 'SMILES'.
    descriptor_method : str, optional
        The descriptor method to use. One of ['padel', 'maccs', 'fcfp', 'ecfp', 'rdkit'].
    model_output_path : str, optional
        Directory path to save the models and plots. Defaults to 'models/'.
    max_evals : int, optional
        Maximum number of evaluations for hyperparameter optimization. Defaults to 50.
    random_state : int, optional
        Random state for reproducibility. Defaults to 42.

    Returns
    -------
    dict
        A dictionary containing performance metrics and paths.
    """
    print(f"Starting model screening for descriptor method: {descriptor_method}")

    # ---------------------------
    # 1. Extract and Binarize Target
    # ---------------------------
    y = (df[target_col] > 0.5).astype(int).values

    # ---------------------------
    # 2. Compute Features X
    # ---------------------------
    try:
        X = compute_descriptors(df, descriptor_type=descriptor_method, smiles_col=smiles_col)
    except ValueError as e:
        print(f"Error computing descriptors: {e}")
        return {}

    # ---------------------------
    # 3. Run Machine Learning Pipeline
    # ---------------------------
    results = run_ml_pipeline(
        X=X,
        y=y,
        descriptor_type=descriptor_method,
        model_output_path=model_output_path,
        max_evals=max_evals,
        random_state=random_state
    )

    return results

def main():
    """
    Main function to execute the machine learning pipeline for each descriptor type.
    """

    # Define descriptor methods you want to test
    descriptor_methods = [
        # 'padel', 'maccs','fcfp','ecfp',
                            'rdkit']

    # Dictionary to store the results for each descriptor method
    all_results = {}


    for method in descriptor_methods:
        results = run_descriptor_model_screening(
            df=df,
            target_col='Tendency for the selection of blood samples',
            smiles_col='SMILES',
            descriptor_method=method,
            model_output_path='models/',
            max_evals=1000,
            random_state=42
        )
        all_results[method] = results

    # ---------------------------
    # 4. Save All Results to a CSV
    # ---------------------------

    # Convert results dictionary to a DataFrame for saving
    results_df = pd.DataFrame(all_results).T  # Transpose for better readability
    results_csv_path = 'models/all_descriptor_results.csv'
    results_df.to_csv(results_csv_path)
    print(f"All results saved to {results_csv_path}")

    # ---------------------------
    # 5. Display Summary
    # ---------------------------

    print("\nSummary of All Descriptor Methods:")
    print(results_df)

if __name__ == "__main__":
    main()



Starting model screening for descriptor method: rdkit
Dataset sizes for rdkit:
Training set: X_train: (321, 149), y_train: (321,)
Validation set: X_val: (71, 149), y_val: (71,)
Test set: X_test: (144, 149), y_test: (144,)
After SMOTE, training set sizes for rdkit:
X_train_res: (396, 149), y_train_res: (396,)
100%|██████████| 1000/1000 [1:41:39<00:00,  6.10s/trial, best loss: 0.5288572311401367]
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 323ms/step





Neural Network Model Performance on Test Set (rdkit):
Precision: 0.7642
Recall:    0.9205
F1-Score:  0.8351
Balanced Accuracy: 0.7370
AUC-ROC:   0.7885

Classification Report:
              precision    recall  f1-score   support

           0       0.82      0.55      0.66        56
           1       0.76      0.92      0.84        88

    accuracy                           0.78       144
   macro avg       0.79      0.74      0.75       144
weighted avg       0.78      0.78      0.77       144

Neural Network model saved to models//best_nn_model_rdkit.h5

All results saved to models/all_descriptor_results.csv

Summary of All Descriptor Methods:
           Model Type Precision    Recall  F1-Score Balanced Accuracy  \
rdkit  Neural Network  0.764151  0.920455  0.835052          0.737013   

        AUC-ROC                      Model Path  
rdkit  0.788454  models//best_nn_model_rdkit.h5  
