In [86]:
import os
import warnings
import pandas as pd
from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action='ignore', category=SettingWithCopyWarning)
import numpy as np
import pickle
import xgboost as XGB
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import MACCSkeys

from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
import joblib

from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, r2_score, roc_curve, auc
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sklearn.metrics import mean_absolute_error, mean_squared_error
from dgllife.utils import RandomSplitter

In [127]:
def FingerprintTransform(mollist:list,
                         fp_type:str):
    if fp_type == 'MACCS':
        fplist = [[i for i in MACCSkeys.GenMACCSKeys(mol)] for mol in mollist]
#         maccsname = [f'MACCS_{i}' for i in range(167)]
    elif fp_type == 'ECFP':
        fplist = [Chem.AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=2048) for mol in mollist]
    return np.array(fplist)

def mol_dataset(df, fp_type, label=None):
    data = FingerprintTransform([Chem.MolFromSmiles(smiles) 
                                 for smiles in df['SMILES']], fp_type)
    if label:
        label = df['label'].values
    return data, label

def plot_result(label, predict, score, task_type, output_dir):
    if task_type == 'classification':
        fpr, tpr, threshold = roc_curve(label, predict)
        roc_auc = auc(fpr, tpr)
        plt.figure()
        lw = 2
        plt.figure(figsize=(10, 10))
        plt.plot(fpr, tpr, color='darkorange',
                 lw=lw, label='ROC curve (area = %0.4f)' % score)
        plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver operating characteristic example')
        plt.legend(loc='lower right')
    else:
        plt.plot([min(label), max(label)], [min(label), max(label)])
        plt.scatter(predict, label, label='{} {:.4f}'.format('R2', score))
        plt.legend(loc='lower right')
    plt.savefig(output_dir)
    plt.clf()
    return

def cal_metric(model, test_data, test_label, task_type, output_dir=None):
    if task_type == 'classification':
        pred = model.predict_proba(test_data)[:,-1]
        pred_label = np.array([1 if i > 0.5 else 0 for i in pred])
        result = [roc_auc_score(test_label, pred), 
                  accuracy_score(test_label, pred_label),
                  f1_score(test_label, pred_label), 
                  precision_score(test_label, pred_label), 
                  recall_score(test_label, pred_label)]
        score = result[0]
#         print('AUC: %.3f, Acc: %.3f, F1: %.3f, Precision: %.3f, Recall: %.3f' %
#               (result[0], result[1], result[2], result[3], result[4]))
        
    elif task_type == 'regression':
        pred = model.predict(test_data)
        result = [r2_score(test_label, pred),
                  mean_squared_error(test_label, pred)**0.5,
                  mean_absolute_error(test_label, pred), 
                  ef(test_label, pred, 200), 
                  hit(test_label, pred, 200)]
        score = result[-1]
#         print('MAE: %.3f, RMSE: %.3f, R2: %.3f' %
#               (result[0], result[1], result[2]))
#     print('-'*20+'+'*20+'-'*20)
    if output_dir:
        plot_result(test_label, pred, score, task_type, output_dir)
    return pred, result

def CalMetric(test_label, pred, task_type, 
              output_dir=None, thre=0.5):
    if task_type == 'classification':
        pred_label = np.array([1 if i > thre else 0 for i in pred])
        result = [roc_auc_score(test_label, pred), 
                  accuracy_score(test_label, pred_label),
                  f1_score(test_label, pred_label), 
                  precision_score(test_label, pred_label), 
                  recall_score(test_label, pred_label)]
    elif task_type == 'regression':
        result = [r2_score(test_label, pred),
                  mean_squared_error(test_label, pred),
                  mean_absolute_error(test_label, pred), 
                  ef(test_label, pred, 200), 
                  hit(test_label, pred, 200)]
    if output_dir:
        score = result[0]
        plot_result(test_label, pred, score, task_type, output_dir)
    return result

def ef(label, predict, n):
    n_label = label[predict.argsort()[-n:]]
    n_pos = len(np.where(n_label >= 6)[0])
    all_pos = len(np.where(n_label >= 6)[0])
    return (n_pos / n) / (all_pos / len(label) + 1e-8)

def hit(label, predict, n):
    n_label = label[predict.argsort()[-n:]]
    n_pos = len(np.where(n_label >= 6)[0])
    return n_pos / n

In [89]:
seed = 0
path = os.getcwd()
task = 'MIC.csv'
dataset = 'Mtb_MIC'
df = pd.read_csv(task)  
cols = ['MAE', 'RMSE', 'R2']
df_clean = df[['Label_reg', 'SMILES']]
df_clean.columns = ['label', 'SMILES']
X_train_, _, X_test_ = RandomSplitter.train_val_test_split(
                            df_clean, 
                            frac_train=0.8, 
                            frac_val=0.1,
                            frac_test=0.1,
                            random_state=0)
X_train_ = df_clean.loc[X_train_.indices]
X_test_ = df_clean.loc[X_test_.indices]
perf_result, perf_name = [], []

for fp in ['ECFP', 'MACCS']:
    rf_path = os.path.join(path, f'{dataset}/RF_{fp}')
    lr_path = os.path.join(path, f'{dataset}/LR_{fp}')
    gbdt_path = os.path.join(path, f'{dataset}/GBDT_{fp}')
    xgb_path = os.path.join(path, f'{dataset}/XGB_{fp}')

    X_train, Y_train = mol_dataset(X_train_, fp, label=True)
    X_test, Y_test = mol_dataset(X_test_, fp, label=True)

    lr = LinearRegression(n_jobs=-1)
    lr.fit(X_train, Y_train)
    pred, result = cal_metric(lr, X_test, Y_test, 'regression', 
                    os.path.join(lr_path, 'result.png'))
    X_test_.loc[:, f'LR_{fp}'] = pred
    perf_result.append(result)
    perf_name.append(f'LR_{fp}')


rf = RandomForestRegressor(random_state=seed, n_jobs=-1)
rf.fit(X_train, Y_train)  
pred, result = cal_metric(rf, X_test, Y_test, 'regression', 
                os.path.join(rf_path, 'result.png'))
X_test_.loc[:, f'RF_{fp}'] = pred
perf_result.append(result)
perf_name.append(f'RF_{fp}')

gbdt = GradientBoostingRegressor(random_state=seed)
gbdt.fit(X_train, Y_train)
pred, result = cal_metric(gbdt, X_test, Y_test, 'regression', 
                os.path.join(gbdt_path, 'result.png'))
X_test_.loc[:, f'GBDT_{fp}'] = pred
perf_result.append(result)
perf_name.append(f'GBDT_{fp}')

xgb=XGB.XGBRegressor(max_depth=3, 
                        n_estimators=100, 
                        learning_rate=0.1,
                        n_jobs=-1)
xgb.fit(X_train, Y_train)
pred, result = cal_metric(xgb, X_test, Y_test, 'regression', 
                os.path.join(xgb_path, 'result.png'))
X_test_.loc[:, f'XGB_{fp}'] = pred
perf_result.append(result)
perf_name.append(f'XGB_{fp}')

joblib.dump(rf, os.path.join(rf_path, 'model.pkl'))
joblib.dump(lr, os.path.join(lr_path, 'model.pkl'))
joblib.dump(gbdt, os.path.join(gbdt_path, 'model.pkl'))
joblib.dump(xgb, os.path.join(xgb_path, 'model.pkl'))

if 'overall.csv' in os.listdir(path=f'{path}/{dataset}'):

    df_pred = pd.read_csv(f'{path}/{dataset}/prediction.csv',
                            index_col=0)
    df_pred['XGB_ECFP'] = X_test_['XGB_ECFP']
    df_pred['XGB_MACCS'] = X_test_['XGB_MACCS']
    df_pred.to_csv(f'{path}/{dataset}/prediction.csv')

    df_overall = pd.read_csv(f'{path}/{dataset}/overall.csv', 
                                index_col=0)
    df_overall.loc['XGB_ECFP'] = perf_result[0]
    df_overall.loc['XGB_MACCS'] = perf_result[-1]
    df_overall.to_csv(f'{path}/{dataset}/overall.csv')
else:
    X_test_.to_csv(f'{path}/{dataset}/prediction.csv')
    pd.DataFrame(np.array(perf_result), 
                    index=perf_name,
                    columns=cols
                ).to_csv(f'{path}/{dataset}/overall.csv')

<Figure size 432x288 with 0 Axes>

# Prediction

In [52]:
label = [0,0]
test = pd.DataFrame(np.array([label, smiles]).T,
                    columns = ['label', 'SMILES'])
X_test, _ = mol_dataset(test, 'ECFP', label=False)
pred = rf_ecfp.predict(X_test)
test['Predict_RF_ECFP_reg'] = pred

X_test, _ = mol_dataset(test, 'MACCS', label=False)
pred = rf_maccs.predict(X_test)
test['Predict_RF_MACCS_reg'] = pred

In [54]:
test.to_csv('test.csv', index=None)

In [1]:
seed = 0
path = os.getcwd()
task = 'MIC.csv'
dataset = 'Mtb_MIC'
df = pd.read_csv(task)  
df_clean = df[['Label_reg', 'SMILES']]
df_clean.columns = ['label', 'SMILES']
X_train_ = df_clean


X_train, Y_train = mol_dataset(X_train_, 'ECFP', label=True)
rf_ecfp = RandomForestRegressor(random_state=seed, n_jobs=-1)
rf_ecfp.fit(X_train, Y_train)
joblib.dump(rf_ecfp, os.path.join('Mtb_MIC/rf_reg.pkl'))

X_train, Y_train = mol_dataset(X_train_, 'MACCS', label=True)
rf_maccs = RandomForestRegressor(random_state=seed, n_jobs=-1)
rf_maccs.fit(X_train, Y_train)
joblib.dump(rf_maccs, os.path.join('Mtb_MIC/rf_maccs_reg.pkl'))

In [40]:
test_data = pd.read_csv('db_smiles.csv')

X_test, _ = mol_dataset(test_data, 'ECFP', label=False)
pred = rf_ecfp.predict(X_test)
test_data['Predict_RF_ECFP_reg'] = pred

X_test, _ = mol_dataset(test_data, 'MACCS', label=False)
pred = rf_maccs.predict(X_test)
test_data['Predict_RF_MACCS_reg'] = pred



# CurrMG prediction

In [40]:
import pandas as pd
import numpy as np
import torch
import torch.nn.functional as F
import dgl
from dgllife.utils import smiles_to_bigraph, CanonicalAtomFeaturizer, CanonicalBondFeaturizer


def read_smiles(file_dir: str):
    df = pd.read_csv(file_dir)
    return df['SMILES'].values


def Predict_sample(smiles: str,
                   model: torch.nn):
    node_featurizer = CanonicalAtomFeaturizer(atom_data_field='h')
    edge_featurizer = CanonicalBondFeaturizer(bond_data_field='h',
                                              self_loop=True)
    graph = smiles_to_bigraph(smiles=smiles, 
                              add_self_loop=True,
                              node_featurizer=node_featurizer,
                              edge_featurizer=edge_featurizer)
    if graph is None:
        return 'invalid'
    else:
        n_feat = graph.ndata['h']
        e_feat = graph.edata['h']
        try:
            return model(graph, n_feat).detach().numpy()[0][0]
        except:
            try:
                return model(graph, n_feat, e_feat).detach().numpy()[0][0]
            except:
                print(smiles)
                return 0

def Predict(file_dir: str,
            save_dir: str, 
            model_dir: str):
    try:
        smiles_list = read_smiles(file_dir)
    except:
        return 'Load Failed!'
    pre_all = []
#     model = torch.load('.\\model\\' + func + '_gat_model.pkl')
    model = torch.load(model_dir, map_location='cpu')
    model.eval()
    pre_ = []
    for smiles in smiles_list:
        pre = Predict_sample(smiles, model)
        pre_.append(pre)
    df_out = pd.DataFrame(np.array(pre_).T, index=smiles_list, columns=['PREDICT'])
#     df_out.to_csv(save_dir, float_format='%.3f')

    return df_out

In [55]:
df_curr = Predict('test.csv', '', 
                  '/home/gyw/CurrMG/Mtb_2/0_GCN_ts/models.pth')
test['Predict_GCN_reg'] = df_curr['PREDICT'].values
df_curr = Predict('test.csv', '', 
                  '/home/gyw/CurrMG/Mtb_2/0_GAT_mce/models.pth')
test['Predict_GAT_reg'] = df_curr['PREDICT'].values
df_curr = Predict('test.csv', '', 
                  '/home/gyw/CurrMG/Mtb_2/0_MPNN_ab/models.pth')
test['Predict_MPNN_reg'] = df_curr['PREDICT'].values
df_curr = Predict('test.csv', '', 
                  '/home/gyw/CurrMG/Mtb_2/0_AttentiveFP_mce/models.pth')
test['Predict_AttentiveFP_reg'] = df_curr['PREDICT'].values