In [1]:
%load_ext autoreload
%autoreload 2

import typing
from typing import List, Iterable
import pickle
import numpy as np
import pandas as pd

import rdkit
from rdkit.Chem import AllChem as AllChem
from rdkit.SimDivFilters.rdSimDivPickers import MaxMinPicker

from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import SGDClassifier

from utils import *

import os, glob

SEED = 0
P_INIT = 0.10
P_ITER = 0.05
N_ITER = 5
P_EXPLORE = 0.2

Loaded 97 descriptor functions


In [2]:
def iHTS(learner, y, init_idxs, n_iter=N_ITER, p_iter=P_ITER):
    #
    results = []
    
    # initialize idxs
    idxs = Indice(len(y))
    
    # observe initial set
    idxs.add(init_idxs)
    learner.observe(init_idxs, y[init_idxs])
    results.append(idxs.sampled)
    
    print(f"0 | {P_INIT:.2f}: {recovery_rate(y, idxs.sampled):.4f}")
    
    # iteratively sample the library
    for it in range(n_iter):
        
        sample_idxs = learner.select(idxs.unsampled, int(p_iter * len(y)))
        learner.observe(sample_idxs, y[sample_idxs])
        idxs.add(sample_idxs)

        results.append(idxs.sampled)
        
        print(f"{it+1} | {P_INIT+P_ITER*(it+1):.2f}: {recovery_rate(y, idxs.sampled):.4f}")

    results.append(idxs.unsampled)
    print("\n")
    return results

In [22]:
# with open("dataset/628/data.pkl","rb") as file:
#     data = pickle.load(file)

# smiles_ls = data["smiles_ls"][:10000]
# mol_ls = data["mol_ls"][:10000]
# fp_ls = data["fp_ls"][:10000]
# y = np.array(data["activity_ls"])[:10000]

# # initial picks using RDkit
# init_idxs = random_pick(fp_ls, int(P_INIT*len(fp_ls)))

In [4]:
# """Random Forest"""
# clf = RandomForestClassifier(n_estimators=1200, class_weight="balanced",
#     max_features='log2',bootstrap=True,min_samples_split = 8,
#     min_samples_leaf = 3, n_jobs = 16,random_state=SEED)
# learner = Supervised_learner(clf, P_EXPLORE, data)

# results = iHTS(learner, y, init_idxs)

0:0.23823529411764705
1:0.3352941176470588
2:0.42058823529411765
3:0.49117647058823527


In [34]:
# pains_idx = get_pains_idx(mol_ls)
# analyze_results(results, y, learner, 'test.pkl', pains_idx=pains_idx,)

{'lib_hit_rate': 0.034,
 'sample_hit_rate': 0.04801610120759057,
 'sample_recovery_rate': 0.49117647058823527,
 'percentage_iter': [0.15, 0.2, 0.25, 0.30000000000000004],
 'hit_iter': [0.040621865596790374,
  0.04580152671755725,
  0.047906197654941376,
  0.04801610120759057],
 'recovery_iter': [0.23823529411764705,
  0.3352941176470588,
  0.42058823529411765,
  0.49117647058823527],
 'lib_pains_rate': 0.03,
 'hits_pains_rate': 0.05588235294117647,
 'sample_pains_rate': 0.04111558366877516,
 'pains_recovery_rate': 0.4766666666666667,
 'pains_recovery_iter': [0.25666666666666665,
  0.32666666666666666,
  0.3933333333333333,
  0.4766666666666667],
 'pains_recovered_hits_rate': 0.04790419161676647,
 'learner': <utils.learner.Supervised_learner at 0x2abeedb01550>,
 'sampled_ls': [[0,
   8194,
   3,
   8200,
   8202,
   16,
   17,
   23,
   8216,
   25,
   8218,
   27,
   34,
   40,
   8235,
   8236,
   43,
   8244,
   8246,
   8252,
   8258,
   8259,
   68,
   66,
   8266,
   8267,
   8269

In [3]:
from rdkit.Chem.FilterCatalog import FilterCatalog, FilterCatalogParams

def get_pains_idx(mol_ls: list) -> List[int]:
    """Similar to above but return list of index"""
    pains_idxs = []
    
    # initialize filter
    params = FilterCatalogParams()
    params.AddCatalog(FilterCatalogParams.FilterCatalogs.PAINS)
    catalog = FilterCatalog(params)
    
    
    for i, mol in zip(range(len(mol_ls)), mol_ls):
        entry = catalog.GetFirstMatch(mol)  # Get the first matching PAINS
        if entry is not None:
            pains_idxs.append(i)
    
    return pains_idxs

def get_pains_ls(mol_ls: list) -> List:
    """Returns a boolean list indicating whether a compound contains PAINS structure"""
    
    # initialize filter
    params = FilterCatalogParams()
    params.AddCatalog(FilterCatalogParams.FilterCatalogs.PAINS)
    catalog = FilterCatalog(params)
    
    return [(catalog.GetFirstMatch(mol) is not None) for mol in mol_ls]

In [4]:
def hit_rate(y: np.array):
    return sum(y) / len(y)
def recovery_rate(y: np.array, idxs: Iterable):
    return sum(y[idxs]) / sum(y)
        
def analyze_results(
    results: list, 
    y: np.array, 
    learner, 
    save_path: str, 
    pains_idx=None, 
    p_init=P_INIT, 
    p_iter=P_ITER):

    result_dict = {}
    
    # stats
    result_dict['lib_hit_rate'] = hit_rate(y)
    result_dict['sample_hit_rate'] = hit_rate(y[results[-2]])
    result_dict['sample_recovery_rate'] = recovery_rate(y, results[-2])
    
    # iteration
    result_dict['percentage_iter'] = [p_init+(i*p_iter) for i in range(len(results[:-1]))]  # cumulative percentage sampled
    result_dict['hit_iter'] = [hit_rate(y[results[i]]) for i in range(len(results[:-1]))]   # hit rate per iteration
    result_dict['recovery_iter'] = [recovery_rate(y, results[i]) for i in range(len(results[:-1]))]  # recovery rate per iteration
    
    # pains
    if pains_idx is not None:
        # create a boolean list of all compounds, 1 if a compound is pains, 0 otherwise
        is_pains = np.zeros(len(y))
        is_pains[pains_idx] = 1

        # pains statistics
        result_dict['lib_pains_rate'] = hit_rate(is_pains)  # library pains rate
        result_dict['hits_pains_rate'] = recovery_rate(y, pains_idx)  # pains rate of all hits
        result_dict['sample_pains_rate'] = hit_rate(is_pains[results[-2]])  # sample pains rate
        result_dict['pains_recovery_rate'] = recovery_rate(is_pains, results[-2]) 
        result_dict['pains_recovery_iter'] = [recovery_rate(is_pains, results[i]) for i in range(len(results[:-1]))]  # sample pains rate per iteration
 
        result_dict['pains_recovered_hits_rate'] = sum(np.logical_and(y, is_pains)[results[-2]]) / sum(y[results[-2]])
        
    # learner
    result_dict['learner'] = learner

    # sampled indice
    result_dict['sampled_ls'] = results[:-1]
    result_dict['unsampled'] = results[-1]
    
    # save
    with open(save_path, "wb") as file:
        pickle.dump(result_dict, file)
    
    return result_dict

In [6]:
from sklearn.utils.random import sample_without_replacement

In [10]:
for folder in glob.glob('dataset/*'):

    rf_path = os.path.join(folder, 'rf_results.pkl')
    ts_path = os.path.join(folder, 'ts_results.pkl')
    # if os.path.exists(rf_path) and os.path.exists(ts_path):
    #     continue
    
    print(folder)
    with open(os.path.join(folder, 'data.pkl'),"rb") as file:
        data = pickle.load(file)

    smiles_ls = data["smiles_ls"]
    mol_ls = data["mol_ls"]
    fp_ls = data["fp_ls"]
    y = np.array(data["activity_ls"])
    #pains_idx = get_pains_idx(mol_ls)
    # pains_idx = data["pains_ls"].nonzero()[0]

    # get initial set
    init_idxs = sample_without_replacement(len(fp_ls), int(P_INIT*len(fp_ls))) #
    # init_idxs = random_pick(fp_ls, int(P_INIT*len(fp_ls)))
    
    # random forest
    # clf = RandomForestClassifier(n_estimators=1200, class_weight="balanced",
    #     max_features='log2',bootstrap=True,min_samples_split = 8,
    #     min_samples_leaf = 3, n_jobs = 16,random_state=SEED)
    # rf_learner = Supervised_learner(clf, P_EXPLORE, data)
    # rf_results = iHTS(rf_learner, y, init_idxs)

    # thompson sampling
    ts_learner = Thompson_learner(data)
    ts_results = iHTS(ts_learner, y, init_idxs)
    
    # analyze & save results
    #analyze_results(rf_results, y, rf_learner, rf_path, pains_idx=pains_idx)
    analyze_results(ts_results, y, ts_learner, ts_path)

# runtime ~ 2hr

dataset/596
Initialized 16585 scaffolds for 69668 compounds
0 | 0.10: 0.0971
1 | 0.15: 0.1531

2 | 0.20: 0.2042



KeyboardInterrupt: 