<div style="display: flex; align-items: center;">
    <span style="font-size: 24px; color: #003366; font-weight: 500;">Predicting Molecule Binding using Random Forest</span>
    <img src="../logo.svg" style="height: 50px; width: auto; margin-left: auto;"/>
</div>

In [None]:
import os
import sys
import torch
import psutil
import warnings
import subprocess
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem, Descriptors
from rdkit.Chem.rdFingerprintGenerator import GetMorganGenerator

from sklearn.manifold import TSNE
from sklearn.utils import resample
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold, cross_val_predict, train_test_split

from joblib import dump, load
from standardiser import break_bonds, neutralise, unsalt, standardise

warnings.filterwarnings("ignore")
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 1: Check system availability </h2>
</div>

In [None]:
def check_availability():
    if "CUDA_VISIBLE_DEVICES" not in os.environ:
        os.environ["CUDA_VISIBLE_DEVICES"] = "0"

    if torch.cuda.is_available():
        device = torch.device("cuda")
        gpu_info = os.popen('nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits').readlines()
        gpu_available = 100 - int(gpu_info[0].strip())
        gpu_result = f"\033[1m\033[34mGPU availability: \033[91m{gpu_available:.2f}%\033[0m"
    else:
        device = torch.device("cpu")
        gpu_result = 'GPU is not available, using CPU instead'

    cpu_percentage = psutil.cpu_percent()
    cpu_available = 100 - cpu_percentage
    cpu_result = f"\033[1m\033[34mCPU availability: \033[91m{cpu_available:.2f}%\033[0m"
    
    print(gpu_result)
    print(cpu_result)
    return device

device = check_availability()

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 1: Load data </h2>
</div>

In [None]:
df = pd.read_csv('../data/leash_bio_brd4.csv')
display(df.head())
print(df.shape)

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 3: Remove salts and standardise smiles </h2>
</div>

In [None]:
def remove_salts(df):
    def remove_salt(smiles):
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            return ''
        
        mol = break_bonds.run(mol)
        mol = neutralise.run(mol)
        non_salt_frags = []
        for frag in Chem.GetMolFrags(mol, asMols=True):        
            if unsalt.is_nonorganic(frag): 
                continue 
            if unsalt.is_salt(frag): 
                continue      
            non_salt_frags.append(frag)
        
        non_salt_smiles = [Chem.MolToSmiles(frag) for frag in non_salt_frags]
        non_salt_smiles = '.'.join(non_salt_smiles) 

        try:
            mol = Chem.MolFromSmiles(non_salt_smiles)
            standard_mol = standardise.run(mol)
            standard_smiles = Chem.MolToSmiles(standard_mol)
            return standard_smiles
        except standardise.StandardiseException as e:
            return None
    
    initial_count = len(df)
    df['SMILES_unsalt'] = df['SMILES'].apply(remove_salt)
    df_unsalt = df.dropna(subset=['SMILES_unsalt'])
    df_unsalt = df_unsalt.drop(columns=['SMILES'])
    df_unsalt = df_unsalt.rename(columns={'SMILES_unsalt': 'SMILES'})
    final_count = len(df_unsalt)
    print(f"\033[1m\033[34mNumber of datapoints removed: \033[91m{initial_count - final_count}\033[0m")
    print(f"\033[1m\033[34mNumber of datapoints remaining: \033[91m{final_count}\033[0m")
    return df_unsalt, initial_count, final_count

df_remove_salts, initial_count, after_salts_count = remove_salts(df)

In [None]:
df = df_remove_salts.copy()
df = df[['id', 'SMILES', 'Target']]

display(df.head())
print(df.shape)

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 4: Balance dataset </h2>
</div>

In [None]:
df['Target'].value_counts()

In [None]:
df_majority = df[df['Target'] == 0]
df_minority = df[df['Target'] == 1]

df_majority_downsampled = resample(df_majority, replace=False, n_samples=df_minority.shape[0], random_state=42)
df = pd.concat([df_majority_downsampled, df_minority])
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
df['Target'].value_counts()

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 5: Train-Test split </h2>
</div>

In [None]:
train_df, test_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df['Target'])

print("Train Data")
display(train_df.head())
print(train_df.shape)

print("-" * 80)
print("Test Data")
display(test_df.head())
print(test_df.shape)

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 6: Visualise train-test data </h2>
</div>

In [None]:
def generate_ecfp(smiles_list, radius=2, n_bits=2048):
    ecfp_list = []
    generator = GetMorganGenerator(radius=radius, fpSize=n_bits)
    for smiles in smiles_list:
        mol = Chem.MolFromSmiles(smiles)
        if mol:
            ecfp = generator.GetFingerprint(mol)
            ecfp_list.append(np.array(ecfp))
        else:
            ecfp_list.append(np.zeros(n_bits))
    return np.array(ecfp_list)

X_train = generate_ecfp(train_df['SMILES'])
X_test = generate_ecfp(test_df['SMILES'])
y_train = train_df['Target']
y_test = test_df['Target']

tsne = TSNE(n_components=2, random_state=42)
tsne_results = tsne.fit_transform(np.vstack((X_train, X_test)))
tsne_train = tsne_results[:len(X_train)]
tsne_test = tsne_results[len(X_train):]

plt.figure(figsize=(6, 6))
plt.scatter(tsne_train[:, 0], tsne_train[:, 1], c='#7b1fa2', label=f'Train Data (n={len(X_train)})', s=10, alpha=0.7)
plt.scatter(tsne_test[:, 0], tsne_test[:, 1], c='#ff6f00', label=f'Test Data (n={len(X_test)})', s=10, alpha=1)
plt.title('t-SNE plot of Train and Test Data')
plt.xlabel('t-SNE Component 1')
plt.ylabel('t-SNE Component 2')
plt.legend()
os.makedirs('model_files/rf_classifier', exist_ok=True)
plt.savefig('model_files/rf_classifier/tsne_train_vs_test_data.png', bbox_inches='tight')
plt.show()

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 7: Get the Descriptors </h2>
</div>

In [None]:
def calculate_normalized_descriptors(df, smiles_column='SMILES', missing_val=None):
    mols = [Chem.MolFromSmiles(x) for x in df[smiles_column].values.tolist()]
    
    def get_mol_descriptors(mol, missing_val=None):
        res = {}
        for nm, fn in Descriptors._descList:
            try:
                val = fn(mol)
            except:
                val = missing_val
            res[nm] = val
        return res

    all_descrs = [get_mol_descriptors(m) for m in mols]
    df_descriptor = pd.DataFrame(all_descrs)

    df_descriptor_normalized = pd.DataFrame(MinMaxScaler().fit_transform(df_descriptor), columns=df_descriptor.columns)
    return df_descriptor_normalized

df_train = calculate_normalized_descriptors(train_df)
print(df_train.shape)

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 8: Remove reduntant and Highly Correlated Columns </h2>
</div>

In [None]:
redundant_columns = df_train.columns[df_train.nunique() == 1]
correlation_matrix = df_train.astype(float).corr().abs()
correlated_columns = set()
for i in range(len(correlation_matrix.columns)):
    for j in range(i):
        if correlation_matrix.iloc[i, j] > 0.6:
            colname = correlation_matrix.columns[i]
            correlated_columns.add(colname)

df_train = df_train.drop(columns=redundant_columns)
df_train = df_train.drop(columns=correlated_columns)
training_columns = df_train.columns.tolist()

pd.Series(training_columns).to_csv('model_files/rf_classifier/training_columns.csv', index=False)

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 10: Model Training </h2>
</div>

In [None]:
X_train = df_train
y_train = train_df['Target']

best_params = {
    'bootstrap': False,
    'criterion': 'gini',
    'max_depth': 20,
    'max_features': 'sqrt',
    'max_leaf_nodes': None,
    'min_impurity_decrease': 0.0,
    'min_samples_leaf': 1,
    'min_samples_split': 2,
    'min_weight_fraction_leaf': 0.0,
    'n_estimators': 300,
    'n_jobs': -1,
    'oob_score': False,
    'random_state': 42,
    'verbose': 0
}

rf_classifier = RandomForestClassifier(**best_params)
rf_classifier.fit(X_train, y_train)
dump(rf_classifier, f'model_files/rf_classifier/rf_model.joblib')

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 11: Make predition on test data </h2>
</div>

In [None]:
rf_model = load('model_files/rf_classifier/rf_model.joblib')

training_columns_path = 'model_files/rf_classifier/training_columns.csv'
training_columns = pd.read_csv(training_columns_path).squeeze().tolist()

df_test = calculate_normalized_descriptors(test_df)
df_test = df_test.reindex(columns=training_columns, fill_value=0)
predictions = rf_model.predict(df_test)
test_df['Target_pred'] = predictions
display(test_df.head())
print(test_df.shape)

<div style="background-color:#4B6587; color:#F0E5CF; padding: 1px; border-radius: 10px;">
    <h2 style="font-size: 16px; margin-left: 10px;"> Step 12: Model Evaluation </h2>
</div>

In [None]:
sys.path.append(os.path.abspath(".."))
from my_cm import *

true_labels = test_df['Target']
predicted_labels = test_df['Target_pred']
cm = confusion_matrix(true_labels, predicted_labels)
PrettyConfusionMatrix(cm, labels=('0', '1'), save_path='model_files/rf_classifier/my_confusion_matrix.png')