In [1]:
import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem import rdFingerprintGenerator
from rdkit.ML.Descriptors import MoleculeDescriptors
from rdkit.Chem import Descriptors
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import RidgeClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score,root_mean_squared_error
from xgboost import XGBClassifier
import optuna
import sklearn
from sklearn.metrics import make_scorer, matthews_corrcoef, balanced_accuracy_score

In [None]:
MFPGEN = rdFingerprintGenerator.GetMorganGenerator(radius=2,fpSize=1024)
AFPGEN = rdFingerprintGenerator.GetAtomPairGenerator(fpSize=2048)
RFPGEN = rdFingerprintGenerator.GetRDKitFPGenerator(fpSize=2048)
TTPGEN = rdFingerprintGenerator.GetTopologicalTorsionGenerator(fpSize=2048)

def calculate_descriptors(mol: Chem.Mol, missingVal: float | None = 0.0) -> dict:
    """Calculate the full list of descriptors for a molecule.
    adapted from
    https://github.com/jonswain/tabpfn-tdc/blob/main/submission.py#L12
    """
    
    res = []
    for nm, fn in Descriptors._descList:
        try:
            if nm!="Ipc": # this one creates crazy values so we exclude it
                val = fn(mol)
        except:
            val = missingVal
        res.append(val)
    return res

def mol_feat(mol,morgan=True,tt=True,ap=True,descs=True,rdfp=True):
    """
    Extracts features by combining:
    - Morgan Fingerprints
    - RDKit Descriptors
    - Topological Torsion Fingerprints
    Returns a concatenated NumPy array of all features.
    """
    assert mol is not None, "Invalid molecule."
    features = []

    #combine the features:
    if morgan:
        morgan_fp = MFPGEN.GetFingerprintAsNumPy(mol)
        features.append(morgan_fp)
    if tt:
        torsion_fp = TTPGEN.GetFingerprintAsNumPy(mol)
        features.append(torsion_fp)
    if ap:
        ap_fp = AFPGEN.GetFingerprintAsNumPy(mol)
        features.append(ap_fp)
    if descs:
        rdkit_desc = calculate_descriptors(mol)  
        features.append(rdkit_desc)
    if rdfp:
        rdkit_fp = RFPGEN.GetFingerprintAsNumPy(mol)
        features.append(rdkit_fp)

    # Concatenate all features into a single vector
    combined_features = np.hstack(features)
    
    return combined_features


def build_classification_model(X,y,mode="RandomForest",model_options={"n_estimators":10,"max_depth":3,"random_state":123}):
    if mode == "RandomForest":
        model = RandomForestClassifier(**model_options)
    elif mode == "XGBoost":
        model = XGBClassifier(**model_options)
    elif mode == "Ridge":
        model = RidgeClassifier(**model_options)
    elif mode == "SVM":
        model = SVC(**model_options)
    else:
        print("mode not supported")
    model.fit(X,y)
    return model

def data_splitter(X,y,mode="Random",test_ratio=0.2,seed=123):
    if mode == "Random":
        X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=test_ratio,random_state=seed)
    else:
        print("mode not supported")    
    return X_train,X_test,y_train,y_test

m = Chem.MolFromSmiles("c1ccccc1")

In [6]:
df_chembl = pd.read_csv("data/PDL1-CHEMBL.csv", sep=";")
mols = []
y = []

threshold = 6.0  

for i, row in df_chembl.iterrows():
    smiles = row["Smiles"]
    try:
        pchembl = float(row["pChEMBL Value"])
        mol = Chem.MolFromSmiles(smiles)
        if mol:
            y.append(1 if pchembl >= threshold else 0)
            mols.append(mol)
    except:
        continue  
        
X_chembl_ecfp = [mol_feat(m, descs=False, morgan=True, rdfp=False, ap=False, tt=False) for m in mols]
X_chembl_rdk = [mol_feat(m, descs=False, morgan=False, rdfp=True, ap=False, tt=False) for m in mols]
X_chembl_descs = [mol_feat(m, descs=True, morgan=False, rdfp=False, ap=False, tt=False) for m in mols]
y_chembl = np.array(y)

In [7]:
df_paper = pd.read_excel("data/pharmaceuticals-1710563-supplementary.xlsx")
smiles_paper = list(set(df_paper[df_paper["Unnamed: 2"] == "ACTIVE"]["Unnamed: 3"]))
smiles_paper_decoys = list(set(df_paper[df_paper["Unnamed: 2"] == "DECOY"]["Unnamed: 3"]))

mols = [Chem.MolFromSmiles(smi) for smi in smiles_paper+smiles_paper_decoys]
X_paper_ecfp = [mol_feat(m, descs=False, morgan=True, rdfp=False, ap=False, tt=False) for m in mols]
X_paper_rdk = [mol_feat(m, descs=False, morgan=False, rdfp=True, ap=False, tt=False) for m in mols]
X_paper_descs = [mol_feat(m, descs=True, morgan=False, rdfp=False, ap=False, tt=False) for m in mols]
y_paper = [1]*len(smiles_paper)+[0]*len(smiles_paper_decoys)

# Function for optimization of hyperparamaeters

In [32]:
optuna.logging.set_verbosity(optuna.logging.WARNING)
def xgb_objective(trial):
    xgb_max_depth = trial.suggest_int("xgb_max_depth", 2, 32, log=True)
    xgb_n_estimators = trial.suggest_int("xgb_n_estimators", 10, 1000, log=True)
    xgb_eta = trial.suggest_float("xgb_eta", 0, 1)
    xgb_gamma = trial.suggest_float("xgb_gamma", 0, 1)
    model = build_classification_model(X_train, y_train, mode="XGBoost",
                               model_options={"n_estimators":xgb_n_estimators, "max_depth":xgb_max_depth, "random_state":102, "eta":xgb_eta, "gamma":xgb_gamma})

    score = sklearn.model_selection.cross_val_score(model, X_train, y_train, n_jobs=-1, cv=5,scoring=make_scorer(matthews_corrcoef))
    score_avg = score.mean()
    return score_avg

def rf_objective(trial):
    rf_max_depth = trial.suggest_int("rf_max_depth", 2, 32, log=True)
    rf_n_estimators = trial.suggest_int("rf_n_estimators", 10, 1000, log=True)
    model = build_classification_model(X_train, y_train, mode="RandomForest",
                               model_options={"n_estimators":rf_n_estimators, "max_depth":rf_max_depth, "random_state":102})
    score = sklearn.model_selection.cross_val_score(model, X_train, y_train, n_jobs=-1, cv=5,scoring=make_scorer(matthews_corrcoef))
    score_avg = score.mean()
    return score_avg

def ridge_objective(trial):
    return 0

def svm_objective(trial):
    svm_gamma = trial.suggest_float("svm_gamma", 0.001, 1000, log=True)
    svm_C = trial.suggest_int("svm_C", 1, 1000, log=True)
    model = build_classification_model(X_train, y_train, mode="SVM",
                               model_options={"kernel":"rbf", "gamma":svm_gamma,"C":svm_C}) #svm doesnt have random state

    score = sklearn.model_selection.cross_val_score(model, X_train, y_train, n_jobs=-1, cv=5,scoring=make_scorer(matthews_corrcoef))
    score_avg = score.mean()
    return score_avg

In [29]:
def print_metrics(best_trial,X_train,y_train,X_test,y_test,modeltype,feature):
    params = study.best_trial.params
    if modeltype=="SVM":
        model = build_classification_model(X_train, y_train, mode="SVM",
                                       model_options={"kernel":"rbf", "gamma":params["svm_gamma"],"C":params["svm_C"]}) #svm doesnt have random state
    elif modeltype=="RandomForest":
        model = build_classification_model(X_train, y_train, mode="RandomForest",
                                       model_options={"n_estimators":params["rf_n_estimators"], "max_depth":params["rf_max_depth"], "random_state":102})
    
    elif modeltype=="XGB":
        model = build_classification_model(X_train, y_train, mode="XGBoost",
                                   model_options={"n_estimators":params["xgb_n_estimators"], "max_depth":params["xgb_max_depth"], "random_state":102, "eta":params["xgb_eta"], "gamma":params["xgb_gamma"]})
    elif modeltype=="Ridge":
        model = build_classification_model(X_train, y_train, mode="Ridge",model_options={})
    print(f"{modeltype} {feature} metrics:")
    score = sklearn.model_selection.cross_val_score(model, X_train, y_train, n_jobs=-1, cv=5,scoring=make_scorer(matthews_corrcoef))
    print("CV MCC ",round(score.mean(),3))
    
    score = sklearn.model_selection.cross_val_score(model, X_train, y_train, n_jobs=-1, cv=5,scoring=make_scorer(balanced_accuracy_score))
    print("CV,bAcc",round(score.mean(),3))
    
    y_pred = model.predict(X_test)
    print("test MCC ",round(matthews_corrcoef(y_test,y_pred),3))
    print("test bAcc",round(balanced_accuracy_score(y_test,y_pred),3))
    return

# Task 1:
Classification of chembl data
AND evaluation on not only test set but also patent data.
We reuse the best hyperparameters for task 2 and task 3

In [34]:
X_features = {"ecfp":X_chembl_ecfp,"rdk":X_chembl_rdk,"descs":X_chembl_descs}
model2optimizer = {"RandomForest":rf_objective,"XGB":xgb_objective,"SVM":svm_objective,"Ridge":ridge_objective}
y = y_chembl

In [None]:
for feature in ["ecfp","rdk","descs"]:
    X = X_features[feature]
    X_train, X_test, y_train, y_test = data_splitter(X, y,seed=123)
    for modeltype in ["Ridge","RandomForest","XGB","SVM"]:
        study = optuna.create_study(direction="maximize")
        study.optimize(model2optimizer[modeltype], n_trials=200, n_jobs=20)
        print(study.best_trial)
        print_metrics(study.best_trial,X_train,y_train,X_test,y_test,modeltype,feature)

# Task 2
Classification of patent data
AND evaluation on not only test set but also chembl data

In [36]:
X_features = {"ecfp":X_paper_ecfp,"rdk":X_paper_rdk,"descs":X_paper_descs}
y = y_paper

In [None]:
for feature in ["ecfp","rdk","descs"]:
    X = X_features[feature]
    X_train, X_test, y_train, y_test = data_splitter(X, y,seed=123)
    for modeltype in ["Ridge","RandomForest","XGB","SVM"]:
        study = optuna.create_study(direction="maximize")
        study.optimize(model2optimizer[modeltype], n_trials=200, n_jobs=20)
        print(study.best_trial)
        print_metrics(study.best_trial,X_train,y_train,X_test,y_test,modeltype,feature)

# Task 3
Classification on both of them

In [None]:
X_features = {"ecfp":X_paper_ecfp+X_chembl_ecfp,"rdk":X_paper_rdk+X_chembl_rdk,"descs":X_paper_descs+X_chembl_descs}
y = y_chembl+y_paper

In [None]:
for feature in ["ecfp","rdk","descs"]:
    X = X_features[feature]
    X_train, X_test, y_train, y_test = data_splitter(X, y,seed=123)
    for modeltype in ["Ridge","RandomForest","XGB","SVM"]:
        study = optuna.create_study(direction="maximize")
        study.optimize(model2optimizer[modeltype], n_trials=200, n_jobs=20)
        print(study.best_trial)
        print_metrics(study.best_trial,X_train,y_train,X_test,y_test,modeltype,feature)