## Calling packages and assigning variables
Here i call the necessary packages as well as assigning variables. Of note are the paths to collected data which will need to be changed for replication in another system

In [2]:
if not os.path.isdir("src/misvm"):
    !pip install -e git+https://github.com/garydoranjr/misvm.git#egg=misvm

In [3]:
import pandas as pd
import misvm
import rdkit
import numpy as np
import pickle
from ast import literal_eval
from tpot import TPOTClassifier
from rdkit import Chem
from rdkit.Chem import MACCSkeys
from mordred import Calculator, descriptors
from sklearn.impute import SimpleImputer
from sklearn.model_selection import StratifiedKFold

data = pd.read_csv("/home/samuel/honours_redo/1-data/selected_molecules.csv")
metabolite_data = pd.read_csv("/home/samuel/honours_redo/publication_work/biotransformer_output_phaseII.csv").append(pd.read_csv("/home/samuel/honours_redo/publication_work/biotransformer_output_cyp1.csv"))

# validation_data = pd.read_csv()
# validation_metabolite_data = pd.read_csv()

# Cleaning Biotransformer data
This section is to ensure that metabolites label the correct parent molecule as the root parent and not the direct one. Simply it replaces the "parent molecule" if that molecule is a metabolite and records the parent of said molecule.

Additionally i encode all the molecules with the expected methods here to reduce calculation times in iterations

In [4]:
def normalize_smiles(smi):
    try:
        smi_norm = Chem.MolToSmiles(Chem.MolFromSmiles(smi))
        return smi_norm
    except:
        return np.nan

def parent_finder(smi):
    for parent in data['smiles']:
        try:
            if Chem.MolToSmiles(Chem.MolFromSmiles(smi)) == Chem.MolToSmiles(Chem.MolFromSmiles(parent)):
                return parent
        except:
            continue
    return "No parent found"

def number_check(x):
    try:
        return float(x)
    except:
        return "broken"

def get_ml_fingerprint(df, function=MACCSkeys.GenMACCSKeys):                                                                    
    df1 = df.copy()                                                                     
    df1['fp_list'] = df1['smiles'].apply(lambda x: list(function(Chem.MolFromSmiles(x))))     
    df1 = df1.dropna(axis = 1, how = 'any')                                             

    df2 = pd.DataFrame(df1['fp_list'].to_list())                                        
    df2 = df2.applymap(number_check).dropna(axis =1, how = "any")                       
    df2 = df2.drop(columns=df2.columns[(df2 == 'broken').any()])                        
    X = [[int(i) for i in lst] for lst in df2.values.tolist()]                                                                                                                          
    return X

def bag_parent(smiles,met_df,function):
    mol_family          =   met_df[met_df["parent smiles"]==smiles].append({'smiles':smiles},ignore_index=True).drop_duplicates(subset=["smiles"])
    mol_family_encoded  =   get_ml_fingerprint(df = mol_family, function = function)
    return mol_family_encoded

In [5]:
## I precalculate the splits and the encoded molecules to save time restarting calculations. This also acts as a checkpoint to allow for checking the data into the model

if os.path.isfile("train_test.pk1"):
    print("Crossvalidation indices already determined in file: train_test.csv")
    print('To redo them please delete these files and for a different set please choose another random stat or "none" within the code block above')
else:
    ##          Step 1: defining the splits
    skf = StratifiedKFold(n_splits=10, random_state=871923, shuffle=True)
    First = True
    for itr in range(1,11):
        for fold, [train_index, test_index] in enumerate(skf.split(data,data['Ames'])):
            fold +=1
            if First:
                test_df = pd.DataFrame({"index":test_index})
                train_df = pd.DataFrame({"index":train_index})
                test_df["group"] = "test"; train_df["group"] = "train"
                test_df["fold"] = fold; train_df["fold"] = fold
                test_df["iteration"] = itr; train_df["iteration"] = itr
                train_test = test_df.append(train_df)
                First = False
            else:
                test_df = pd.DataFrame({"index":test_index})
                train_df = pd.DataFrame({"index":train_index})
                test_df["group"] = "test"; train_df["group"] = "train"
                test_df["fold"] = fold; train_df["fold"] = fold
                test_df["iteration"] = itr; train_df["iteration"] = itr
                train_test = train_test.append(test_df).append(train_df)

    ##          Step 2: Normailizing metabolite smiles and matching to parent (approx 220 secs) 
    metabolite_data['smiles'] = metabolite_data['SMILES'].apply(lambda x: normalize_smiles(x))
    metabolite_data = metabolite_data.dropna(axis=0,subset=['smiles'])
    metabolite_data['parent smiles'] = metabolite_data['Precursor SMILES'].apply(lambda x:parent_finder(x))

    ##          Step 3: Pre calculating encoding for molecules, requires evaluation of lists on loading csv (approx 110 secs)
    data["MACCS"] = get_ml_fingerprint(df = data, function = MACCSkeys.GenMACCSKeys)
    data["RDKF"] = get_ml_fingerprint(df = data, function =  Chem.RDKFingerprint)
    data["MACCS_MIL"] = data.apply(lambda row: bag_parent(smiles = row['smiles'], met_df = metabolite_data, function = MACCSkeys.GenMACCSKeys),axis=1)
    data["RDKF_MIL"] = data.apply(lambda row: bag_parent(smiles = row['smiles'], met_df = metabolite_data, function = Chem.RDKFingerprint),axis=1)
    train_test = pd.merge(train_test, data[["smiles","Ames","MACCS","RDKF","MACCS_MIL","RDKF_MIL","idx"]], left_on="index",right_on="idx").drop("idx",axis=1)
    
    ##          Step 4: Saved to a pickle, rather than a csv this stores the lists and is much faster to load (~10x)
    train_test.to_pickle("train_test.pk1")

Crossvalidation indices already determined in file: train_test.csv
To redo them please delete these files and for a different set please choose another random stat or "none" within the code block above


## Defining functions
These functions are used to execute the described actions on the data

In [12]:
train_test = pd.read_pickle("train_test.pk1")

testing = train_test[(train_test['group']=="train") & (train_test["fold"]==1) & (train_test["iteration"]==1)]

In [None]:
def build_mil_model(training_data,MIL,fingerprint,name):
    if not os.path.isfile("saved_models/"+name):
            bags = training_data[fingerprint+"_MIL"].to_list()
            labels = training_data["Ames"].apply(lambda x: x if x==1 else -1).to_list()
            model = MIL                                                                 
            model.fit(bags,labels)                                                      
            pickle.dump(model, open("saved_models/"+name, 'wb'))                        
    else:
        print(name,"is already built")

def build_ml_model(training_data,fingerprint,ML,name):
    if not os.path.isfile("saved_models/"+name):
        instances = training_data[fingerprint].to_list()
        labels = training_data["Ames"].to_list()       
        tpot_optimisation = ML                                                          
        tpot_optimisation.fit(instances,labels)                                         
        model = tpot_optimisation.fitted_pipeline_                                      
        pickle.dump(model, open("saved_models/"+name, 'wb'))                             
    else:
        print(name,"is already built")

def test_mil_model(testing_data,grouping_data,fingerprint,name):
    if not os.path.isfile("saved_tests/"+name.split(".")[0]+".csv"):
        loaded_model = pickle.load(open("saved_models/"+name, 'rb'))                    
        bags = training_data[fingerprint+"_MIL"].to_list()
        labels = training_data["Ames"].apply(lambda x: x if x==1 else -1).to_list()
        predictions = loaded_model.predict(bags)                                        
        predicted_labels = list(map(pos_or_neg,predictions))                            
        df = pd.DataFrame({
            'predicted' : predictions,
            'predicted labal' : predicted_labels,
            'true label' : true_labels
        })                                                                              
        df.to_csv("saved_tests/"+name.split(".")[0]+".csv",index=False )                
    else:
        print(name,"is already built")

def test_ml_model(testing_data,fingerprint,name):
    if not os.path.isfile("saved_tests/"+name.split(".")[0]+".csv"):
        loaded_model = pickle.load(open("saved_models/"+name, 'rb'))                    
        instances = training_data[fingerprint].to_list()
        labels = training_data["Ames"].to_list()       
        predictions = loaded_model.predict(instances)                                   
        predicted_labels = list(map(pos_or_neg,predictions))                            
        df = pd.DataFrame({
            'predicted' : predictions,
            'predicted labal' : predicted_labels,
            'true label' : true_labels
        })                                                                              
        df.to_csv("saved_tests/"+name.split(".")[0]+".csv",index=False )                
    else:
        print(name,"is already built")

In [6]:
def build_mil_model(training_data,grouping_data,fingerprint,MIL,name):
    if not os.path.isfile("saved_models/"+name):
            [bags,labels] = generate_bags(training_data,grouping_data,fingerprint)      
            model = MIL                                                                 
            model.fit(bags,labels)                                                      
            pickle.dump(model, open("saved_models/"+name, 'wb'))                        
    else:
        print(name,"is already built")

def build_ml_model(training_data,fingerprint,ML,name):
    if not os.path.isfile("saved_models/"+name):
        [instances,labels] = get_ml_fingerprint(training_data, fingerprint)             
        tpot_optimisation = ML                                                          
        tpot_optimisation.fit(instances,labels)                                         
        model = tpot_optimisation.fitted_pipeline_                                      
        pickle.dump(model, open("saved_models/"+name, 'wb'))                             
    else:
        print(name,"is already built")

def test_mil_model(testing_data,grouping_data,fingerprint,name):
    if not os.path.isfile("saved_tests/"+name.split(".")[0]+".csv"):
        loaded_model = pickle.load(open("saved_models/"+name, 'rb'))                    
        [bags,true_labels] = generate_bags(testing_data,grouping_data,fingerprint)      
        predictions = loaded_model.predict(bags)                                        
        predicted_labels = list(map(pos_or_neg,predictions))                            
        df = pd.DataFrame({
            'predicted' : predictions,
            'predicted labal' : predicted_labels,
            'true label' : true_labels
        })                                                                              
        df.to_csv("saved_tests/"+name.split(".")[0]+".csv",index=False )                
    else:
        print(name,"is already built")

def test_ml_model(testing_data,fingerprint,name):
    if not os.path.isfile("saved_tests/"+name.split(".")[0]+".csv"):
        loaded_model = pickle.load(open("saved_models/"+name, 'rb'))                    
        [instances,true_labels] = get_ml_fingerprint(testing_data, fingerprint)         
        predictions = loaded_model.predict(instances)                                   
        predicted_labels = list(map(pos_or_neg,predictions))                            
        df = pd.DataFrame({
            'predicted' : predictions,
            'predicted labal' : predicted_labels,
            'true label' : true_labels
        })                                                                              
        df.to_csv("saved_tests/"+name.split(".")[0]+".csv",index=False )                
    else:
        print(name,"is already built")

# def generate_bags(df,bt_df, funct):
#         bags = []; labels = []                                                          
#         for smile in df['smiles'].to_list():
#             wk_data =   bt_df[bt_df["parent smiles"]==smile]                            
#             if not wk_data.empty:
#                 wk_data = get_mil_fingerprint(wk_data, funct)
#                 bags        += [np.array(wk_data['fp_list'].to_list())]                 
#                 labels      += [df.loc[df['smiles'] == smile, 'Ames'].item()]           
#         for i,x in enumerate(labels):
#             if x == 0:
#                 labels[i] = -1                                                          
#         bags = np.array(bags)                                                           
#         labels = np.array(labels)                                                       
#         return [bags, labels]                                                           

# def get_mil_fingerprint(df, function):
#     fn = function                                                                       
#     df1 = df.copy()                                                                     
#     df1['fp_list'] = df1['smiles'].apply(lambda x: list(fn(Chem.MolFromSmiles(x))))
#     return df1

# def pos_or_neg(x):
#     if x>0:
#         return 1
#     else:
#         return -1
    
# def number_check(x):
#     try:
#         return float(x)
#     except:
#         return "broken"

# def get_ml_fingerprint(df, function):
#     if function != "Morgan":
#         fn = function                                                                       
#         df1 = df.copy()                                                                     
#         df1['fp_list'] = df1['smiles'].apply(lambda x: list(fn(Chem.MolFromSmiles(x))))     
#         df1 = df1.dropna(axis = 1, how = 'any')                                             

#         df2 = pd.DataFrame(df1['fp_list'].to_list())                                        
#         df2 = df2.applymap(number_check).dropna(axis =1, how = "any")                       
#         df2 = df2.drop(columns=df2.columns[(df2 == 'broken').any()])                        
#         df1['fp_list'] = df2.values.tolist()                                                
#         X = np.array(df1['fp_list'].to_list())                                              
#         Y = np.array(df1['Ames'].to_list())                                                 
#         return [X,Y]
#     else:
#         radius = 3                                                                          
#         df1 = df.copy()                                                                     
#         df1['fp_list'] = df1['smiles'].apply(lambda x: list(AllChem.GetMorganFingerprint(Chem.MolFromSmiles(x),radius).ToBinary()))     
#         df2 = pd.DataFrame(df1['fp_list'].to_list())                                        
#         df2 = df2.applymap(number_check).dropna(axis =1, how = "any")                       
#         df2 = df2.drop(columns=df2.columns[(df2 == 'broken').any()])
#         df1['fp_list'] = df2.values.tolist()
#         X = np.array(df1['fp_list'].to_list())
#         Y = np.array(df1['Ames'].to_list())
#         return [X,Y]


# def clearConsole():
#     command = 'clear'
#     if os.name in ('nt', 'dos'):  # If Machine is running on Windows, use cls
#         command = 'cls'
#     os.system(command)

## Building models
Here the above functions are used to build models. This section can be altered to build additional models if desired

In [28]:
def develop_models(training_data,training_data_metabolites,testing_data,testing_data_metabolites,suffix="",encoding="MACCS"):
    tested_mils =  [["MICA", misvm.MICA(max_iters=50,verbose=False)],     
                ["MISVM", misvm.MISVM(kernel='linear', C=1.0, max_iters=50,verbose=False)],
                ['SIL', misvm.SIL(verbose=False)],
                ['NSK', misvm.NSK(verbose=False)],
                ['sMIL', misvm.sMIL(verbose=False)]]

    fps = {"MACCS":MACCSkeys.GenMACCSKeys,"RDFP":Chem.RDKFingerprint}
    if encoding in fps:
        fp = fps[encoding]
    else:
        print('Please use expected fingerprint: ["MACCS", "RDFP"]')
        return
    
    # Iterate over the used MILs
    for mil in tested_mils:
        clearConsole();print("     Building MIL model:",mil[0],"    fold:",suffix.split("_iteration")[0].split("fold")[-1],"    Iteration:",suffix.split("iteration")[-1])
        build_mil_model(training_data=training_data,grouping_data=training_data_metabolites,name=encoding+"_"+mil[0]+suffix+".sav",MIL=mil[1],fingerprint=fp)
        
        clearConsole();print("     Testing MIL model:",mil[0],"    fold:",suffix.split("_iteration")[0].split("fold")[-1],"    Iteration:",suffix.split("iteration")[-1])
        test_mil_model(testing_data=testing_data,grouping_data=testing_data_metabolites,name=encoding+"_"+mil[0]+suffix+".sav",fingerprint=fp)
    
    # Build and test TPOT model
    clearConsole();print("     Building TPOT model","    fold:",suffix.split("_iteration")[0].split("fold")[-1],"    Iteration:",suffix.split("iteration")[-1])
    build_ml_model(training_data=training_data, ML = TPOTClassifier(generations=10, population_size=100, cv=5, random_state=42, verbosity=1),fingerprint=fp,name=encoding+"_tpot"+suffix+'.sav')
    
    clearConsole();print("     Testing TPOT model","    fold:",suffix.split("_iteration")[0].split("fold")[-1],"    Iteration:",suffix.split("iteration")[-1])
    test_ml_model(testing_data=testing_data,name=encoding+"_tpot"+suffix+'.sav',fingerprint=fp)

In [29]:
## For precaution i save the indexes of the models into each crossvalidation. This is also usefull incase the code times out part way through, in addition to a set random state

if os.path.isfile("train_test.csv"):
    print("Crossvalidation indices already determined in file: train_test.csv")
    print('To redo them please delete these files and for a different set please choose another random stat or "none" within the code block above')
else:
    skf = StratifiedKFold(n_splits=10, random_state=871923, shuffle=True)
    First = True
    for itr in range(1,11):
        for fold, [train_index, test_index] in enumerate(skf.split(data,data['Ames'])):
            fold +=1
            if First:
                test_df = pd.DataFrame({"index":test_index})
                train_df = pd.DataFrame({"index":train_index})
                test_df["group"] = "test"; train_df["group"] = "train"
                test_df["fold"] = fold; train_df["fold"] = fold
                test_df["iteration"] = itr; train_df["iteration"] = itr
                train_test = test_df.append(train_df)
                First = False
            else:
                test_df = pd.DataFrame({"index":test_index})
                train_df = pd.DataFrame({"index":train_index})
                test_df["group"] = "test"; train_df["group"] = "train"
                test_df["fold"] = fold; train_df["fold"] = fold
                test_df["iteration"] = itr; train_df["iteration"] = itr
                train_test = train_test.append(test_df).append(train_df)
    train_test.to_csv("train_test.csv",index=False)

Crossvalidation indices already determined in file: train_test.csv
To redo them please delete these files and for a different set please choose another random stat or "none" within the code block above


In [7]:
train_test = pd.read_csv("train_test.csv")
for iteration in train_test["iteration"].unique():  
    for fold in train_test["fold"].unique():
        train_index = train_test[(train_test["group"] == "train") & (train_test["fold"] == fold) & (train_test["iteration"] == iteration)]["index"].to_list()
        test_index = train_test[(train_test["group"] == "test") & (train_test["fold"] == fold) & (train_test["iteration"] == iteration)]["index"].to_list()
        train = data.iloc[train_index]
        test =  data.iloc[test_index]
        develop_models(train,metabolite_data,test,metabolite_data,suffix='_kfold'+str(fold)+"_iteration"+str(iteration),encoding = "MACCS")
        print("Done Fold", fold)
    print("Done Iteration", iteration)

     Building MIL model: MICA     fold: 1     Iteration: 1
MACCS_MICA_kfold1_iteration1.sav is already built
     Testing MIL model: MICA     fold: 1     Iteration: 1
MACCS_MICA_kfold1_iteration1.sav is already built
     Building MIL model: MISVM     fold: 1     Iteration: 1
MACCS_MISVM_kfold1_iteration1.sav is already built
     Testing MIL model: MISVM     fold: 1     Iteration: 1
MACCS_MISVM_kfold1_iteration1.sav is already built
     Building MIL model: SIL     fold: 1     Iteration: 1
MACCS_SIL_kfold1_iteration1.sav is already built
     Testing MIL model: SIL     fold: 1     Iteration: 1
MACCS_SIL_kfold1_iteration1.sav is already built
     Building MIL model: NSK     fold: 1     Iteration: 1
MACCS_NSK_kfold1_iteration1.sav is already built
     Testing MIL model: NSK     fold: 1     Iteration: 1
MACCS_NSK_kfold1_iteration1.sav is already built
     Building MIL model: sMIL     fold: 1     Iteration: 1
MACCS_sMIL_kfold1_iteration1.sav is already built
     Testing MIL model: sMI

  bags = np.array(bags)


## Model Validation
Here the model results are assessed

In [None]:
# develop_models(data,bt_data,validation_data,validation_metabolite_data,suffix="_validation")

## Model Analysis
Here the results of each fold are calculated as well as deviation within crossvalidation

In [None]:
def confusion_matrix(df):
    TP = len(df[(df["predicted label"] == 1) & (df["true label"] == 1)])
    TN = len(df[(df["predicted label"] == 0) & (df["true label"] == 0)])
    FP = len(df[(df["predicted label"] == 1) & (df["true label"] == 0)])
    FN = len(df[(df["predicted label"] == 0) & (df["true label"] == 1)])
    return [TP,TN,FP,FN]

In [None]:
rslt_list = []

for filename in os.listdir("/home/samuel/honours_redo/publication_work/saved_tests"):
    [TP,TN,FP,FN] = confusion_matrix(pd.read_csv(filename))
    fingerprint = filename.split("_")[0]
    model = filename.split("_")[0]
    fold = filename.split("_")[-1].split(".")[0]
    rslt_list += {"fingerprint":fingerprint, "model":model, "fold":fold, "TP":TP, "TN":TN, "FP":FP, "FN":FN}

rslt_df = pd.Dataframe(rslt_list)
rslt_df