In [None]:
import os
import shutil

import numpy as np
import pandas as pd

from pathlib import Path
from rdkit import Chem
from tqdm import tqdm

In [None]:
import warnings
def warn(*args, **kwargs):
    pass
warnings.warn = warn

## Mahine learning methods

Machine learning methods are chosen from different families for diversity.

In [None]:
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.neighbors import KNeighborsRegressor

from sklearn.preprocessing import MinMaxScaler

In [None]:
ml_list = [(Ridge(), "RidgeRegression"), 
           (KNeighborsRegressor(), "KNeighborsRegressor"),
           (SVR(), "SVR"),
           (RandomForestRegressor(), "RandomForestRegressor"),
           (MLPRegressor(max_iter=500), "MLPRegressor"),
           ]

## Chemical descriptors

Descriptors are chosen from different families (topological, physico-chemical, tranformer-based, etc.) for diversity.

More descriptors: https://molfeat.datamol.io/featurizers

In [None]:
from molfeat.trans import MoleculeTransformer
from molfeat.trans.fp import FPVecTransformer
from molfeat.calc.pharmacophore import Pharmacophore2D

In [None]:
descr_list = [
                (MoleculeTransformer(featurizer='cats2d', dtype=float), "cats2d"), # fails sometimes
                (MoleculeTransformer(featurizer='scaffoldkeys', dtype=float), "scaffoldkeys"),
                (MoleculeTransformer(featurizer='secfp', dtype=float), "secfp"),
                (MoleculeTransformer(featurizer='atompair-count', dtype=float), "atompair-count"),
                (MoleculeTransformer(featurizer='avalon', dtype=float), "avalon"),
                (MoleculeTransformer(featurizer='ecfp-count', dtype=float), "ecfp-count"),
                (MoleculeTransformer(featurizer='ecfp', dtype=float), "ecfp"),
                (MoleculeTransformer(featurizer='erg', dtype=float), "erg"),
                (MoleculeTransformer(featurizer='estate', dtype=float), "estate"),
                (MoleculeTransformer(featurizer='fcfp-count', dtype=float), "fcfp-count"),
                (MoleculeTransformer(featurizer='fcfp', dtype=float), "fcfp"),
                (MoleculeTransformer(featurizer='maccs', dtype=float), "maccs"),
                (MoleculeTransformer(featurizer='pattern', dtype=float), "pattern"),
                (MoleculeTransformer(featurizer='rdkit', dtype=float), "rdkit"),
                (MoleculeTransformer(featurizer='topological-count', dtype=float), "topological-count"),
                (MoleculeTransformer(featurizer='topological', dtype=float), "topological"),
                
                # long
                # (MoleculeTransformer(featurizer='desc2D', dtype=float), "desc2D"),
                # (MoleculeTransformer(featurizer=Pharmacophore2D(factory='cats'), dtype=float), "pharm2D-cats"),
                # (MoleculeTransformer(featurizer=Pharmacophore2D(factory='gobbi'), dtype=float), "pharm2D-gobbi"),
                # (MoleculeTransformer(featurizer=Pharmacophore2D(factory='pmapper'), dtype=float), "pharm2D-pmapper"),
            ]


## Run benchmark

In [None]:
def parse_data(data_path):
    data = pd.read_csv(data_path, header=None)
    
    smi_prop_list = []
    for smi, prop in zip(data[0], data[1]):
        smi_prop_list.append((smi, prop))

    return smi_prop_list

### Input/output data path

In [None]:
# input data
benchmark_collection =  Path("benchmark_collection_prepared").resolve()

# output data
prediction_collection = Path("benchmark_model_prediction").resolve()
if os.path.exists(prediction_collection):
    shutil.rmtree(prediction_collection)
    
os.makedirs(prediction_collection, exist_ok=True)

### Model building

In [None]:
total_num = sum([len(os.listdir(os.path.join(benchmark_collection, i))) for i in os.listdir(benchmark_collection)])

In [None]:
with tqdm(total=total_num) as progress_bar:
    
    for coll_folder in os.listdir(benchmark_collection):
        for bench_name in os.listdir(os.path.join(benchmark_collection, coll_folder)):
            
            # benchmark dataset
            bench_folder = os.path.join(benchmark_collection, coll_folder, bench_name)
            res_folder = os.path.join(prediction_collection, coll_folder, bench_name)
            os.makedirs(res_folder, exist_ok=True)
    
            # run benchmark 
            res_val = pd.DataFrame()
            res_test = pd.DataFrame()
            
            # parse dataset
            data_train = parse_data(os.path.join(bench_folder, 'train.csv'))
            data_val = parse_data(os.path.join(bench_folder, 'val.csv'))
            data_test = parse_data(os.path.join(bench_folder, 'test.csv'))
            
            # save true prop
            res_val['Y_TRUE'] = [i[1] for i in data_val]
            res_test['Y_TRUE'] = [i[1] for i in data_test]
        
            # calc 2D descriptors
            for descr_func, descr_name in descr_list:
        
                # calculate training data descriptors
                try:
                    x_train = descr_func([i[0] for i in data_train])
                    x_val = descr_func([i[0] for i in data_val])
                    x_test = descr_func([i[0] for i in data_test])
                except:
                    continue
                
                y_train = [i[1] for i in data_train]

                # scale descriptors
                scaler = MinMaxScaler()
                x_train_scaled = scaler.fit_transform(x_train)
                x_val_scaled = scaler.transform(x_val)
                x_test_scaled = scaler.transform(x_test)
                
                # train machine learning model
                for model, method_name in ml_list:
                    model.fit(x_train_scaled, y_train)

                    # validation set prediction
                    res_val[f'{descr_name}|{method_name}'] = model.predict(x_val_scaled)
                    res_val.to_csv(os.path.join(res_folder, f'{bench_name}_val.csv'), index=False)

                    # test set prediction
                    res_test[f'{descr_name}|{method_name}'] = model.predict(x_test_scaled)
                    res_test.to_csv(os.path.join(res_folder, f'{bench_name}_test.csv'), index=False)

            # update progress bar
            progress_bar.update(1)