In [11]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [12]:
import pandas as pd
import numpy as np
import torch
from torch import nn, optim
from rdkit import Chem
from rdkit.Chem import AllChem
from tdc import Oracle

In [13]:
# Load data and Oracle
oracle = Oracle(name='DRD2')
data = pd.read_csv("small_drd2_data.csv")
smiles_list = data['smiles'].tolist()
proba_molecules = [oracle(smiles) for smiles in smiles_list]
print(proba_molecules)

Found local copy...


[0.0005901676463998309, 0.001288978921112421, 0.00023099921452992108, 0.0014480634352253118, 0.00023229149413919086, 0.007226910417238079, 0.03177009486102104, 0.00018465196104625156, 0.0006181948403864203, 0.017767959705749174, 0.0001353658415040822, 7.584214925439865e-05, 0.00011932658307753439, 0.00019596215439119096, 5.0310012366721855e-05, 0.00153997550154151, 0.004863899555067754, 0.0013627517075909072, 0.00022636359589811632, 0.9999907104594737, 0.002364173240172854, 0.9999987724917501, 0.0004128064910287372, 4.515114620151249e-05, 0.9999899941967955, 3.439700377056164e-05, 0.005426522090583173, 0.00015732934690030053, 0.0003269993975438858, 0.0003363411034820678, 0.0055611952891646, 0.9890921212669289, 0.9999988552735308, 0.9905578204031097, 0.00045109684719108454, 0.0012732716327158269, 0.0005288131087850552, 0.0021161386976471714, 0.014029352175312171, 0.013244877011903917, 0.9999973055784004, 0.9999998974481366, 0.0032787785378776348, 0.0002981816185162872, 0.000420367794340

### Computing morgan fingerprints

In [14]:
def compute_fingerprints(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is not None:
        return AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=2048)
    else:
        return np.zeros((2048,), dtype=int)

features = torch.tensor([compute_fingerprints(smiles) for smiles in smiles_list])
print(features.shape)

torch.Size([240, 2048])


### Generate pairs and labels

In [15]:
def generate_pairs_label(features, proba_molecules, smiles_list, num_sets=1000):
    n = len(features)

    smiles_1 = []
    smiles_2 = []
    features_1 = []
    features_2 = []
    label_1_proba = []
    label_1_binary = []
    label_2_proba = []
    label_2_binary = []
    compare_proba = []
    compare_binary = []

    # set seed
    np.random.seed(42) 
    
    # sample unrepeated num_sets number of pairs
    for i in range(n):
        for j in range(n):
            if i != j:
                features_1.append(features[i])
                features_2.append(features[j])
                smiles_1.append(smiles_list[i])
                smiles_2.append(smiles_list[j])
                label_1_proba.append(proba_molecules[i])
                label_1_binary.append(1 if proba_molecules[i] > proba_molecules[j] else 0)
                label_2_proba.append(proba_molecules[j])
                label_2_binary.append(1 if proba_molecules[j] > proba_molecules[i] else 0)
                # convert to float
                compare_proba_value = torch.tensor(proba_molecules[i] - proba_molecules[j], dtype=torch.float32)
                compare_proba_value = torch.sigmoid(compare_proba_value)
                compare_proba.append(compare_proba_value)
                compare_binary.append(1 if compare_proba_value > 0.5 else 0)

    # Choose randomly num_sets number of pairs
    idx = np.random.choice(len(features_1), num_sets, replace=False)

    # Convert to numpy
    label_1_proba = np.array(label_1_proba)[idx]
    label_1_binary = np.array(label_1_binary)[idx]
    label_2_proba = np.array(label_2_proba)[idx]
    label_2_binary = np.array(label_2_binary)[idx]
    compare_proba = np.array(compare_proba)[idx]
    compare_binary = np.array(compare_binary)[idx]
    features_1 = torch.stack(features_1)[idx]
    features_2 = torch.stack(features_2)[idx]
    smiles_1 = np.array(smiles_1)[idx]
    smiles_2 = np.array(smiles_2)[idx]

    return features_1, features_2, label_1_proba, label_1_binary, label_2_proba, label_2_binary,\
            compare_proba, compare_binary, smiles_1, smiles_2

features_1, features_2, label_1_proba, label_1_binary, label_2_proba, label_2_binary,\
compare_proba, compare_binary, smiles_1, smiles_2 = generate_pairs_label(features, proba_molecules, smiles_list, num_sets=len(smiles_list))

In [16]:
print(features_1.shape)
print(features_2.shape)

torch.Size([240, 2048])
torch.Size([240, 2048])


In [17]:
from sklearn.model_selection import train_test_split

# Why is test_size very high here?
# This is because we are using active learning later
# Where the Human in the loop process would generate more better training data for the model
# So the amount of training data to train the initial model is not important, as the model should not be better than random guess


features_1_train, features_1_test,\
    features_2_train, features_2_test,\
    label_1_proba_train, label_1_proba_test,\
    label_1_binary_train, label_1_binary_test,\
    label_2_proba_train, label_2_proba_test,\
    label_2_binary_train, label_2_binary_test,\
    compare_proba_train, compare_proba_test,\
    compare_binary_train, compare_binary_test,\
    smiles_1_train, smiles_1_test,\
    smiles_2_train, smiles_2_test = train_test_split(features_1, features_2,\
                                                     label_1_proba, label_1_binary, 
                                                     label_2_proba, label_2_binary,\
                                                     compare_proba, compare_binary,\
                                                     smiles_1, smiles_2, test_size=0.9, random_state=42)

# Now we need to save them 

small_drd2_training_data = pd.DataFrame()
small_drd2_training_data['smiles_1'] = smiles_1_train
small_drd2_training_data['smiles_2'] = smiles_2_train
small_drd2_training_data['label_1_proba'] = label_1_proba_train
small_drd2_training_data['label_2_proba'] = label_2_proba_train
small_drd2_training_data['label_1_binary'] = label_1_binary_train
small_drd2_training_data['label_2_binary'] = label_2_binary_train
small_drd2_training_data['compare_proba'] = compare_proba_train
small_drd2_training_data['compare_binary'] = compare_binary_train

small_drd2_training_data.to_csv("small_drd2_training_data.csv", index=False)

small_drd2_testing_data = pd.DataFrame()
small_drd2_testing_data['smiles_1'] = smiles_1_test
small_drd2_testing_data['smiles_2'] = smiles_2_test
small_drd2_testing_data['label_1_proba'] = label_1_proba_test
small_drd2_testing_data['label_2_proba'] = label_2_proba_test
small_drd2_testing_data['label_1_binary'] = label_1_binary_test
small_drd2_testing_data['label_2_binary'] = label_2_binary_test
small_drd2_testing_data['compare_proba'] = compare_proba_test
small_drd2_testing_data['compare_binary'] = compare_binary_test

small_drd2_testing_data.to_csv("small_drd2_testing_data.csv", index=False)

In [18]:
from bradley_terry import BradleyTerryModel
    
# Training the model
model = BradleyTerryModel(feature_dim=2048)

# When using Binary Cross-Entropy Loss (BCELoss) in neural networks, the input expected by the 
# loss function is a list of probabilities, not binary values (0 or 1)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.00001)

# Create label data loader

from torch.utils.data import TensorDataset, DataLoader

# Assuming X_train and y_train are numpy arrays, convert them to Pylabel tensors
features_1_train_tensor = torch.tensor(features_1_train).float()  
features_2_train_tensor = torch.tensor(features_2_train).float()
compare_binary_train_tensor = torch.tensor(compare_binary_train).float()

features_1_test_tensor = torch.tensor(features_1_test).float()
features_2_test_tensor = torch.tensor(features_2_test).float()
compare_binary_test_tensor = torch.tensor(compare_binary_test).float()

# Create a TensorDataset
train_dataset = TensorDataset(features_1_train_tensor, features_2_train_tensor, 
                              compare_binary_train_tensor)
test_dataset = TensorDataset(features_1_test_tensor, features_2_test_tensor, 
                              compare_binary_test_tensor)

# Create a DataLoader
batch_size = 64  # You can adjust the batch size as needed
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

def train(model, train_loader, epochs=1):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for features_1, features_2, compare_binary in train_loader:
            optimizer.zero_grad()
            output = model(features_1, features_2)
            loss = criterion(output, compare_binary.unsqueeze(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoch {epoch+1}, Loss: {total_loss / len(features)}')

train(model, train_loader)

# save state dict
torch.save(model.state_dict(), "bradley_terry_model.pth")

print("Model trained and saved")

Epoch 1, Loss: 0.002886946250995
Model trained and saved


### Perform prediction on the testing dataset

In [19]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, matthews_corrcoef

def compute_metrics(label, predictions):
    """
    Compute classification metrics: accuracy, precision, recall, F1 score, and MCC.
    
    Args:
    label (list[int]): True binary label.
    predictions (list[int]): Predicted binary label.

    Returns:
    dict: A dictionary containing the computed metrics.
    """
    accuracy = accuracy_score(label, predictions)
    precision = precision_score(label, predictions)
    recall = recall_score(label, predictions)
    f1 = f1_score(label, predictions)
    mcc = matthews_corrcoef(label, predictions)

    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'F1': f1,
        'MCC': mcc
    }

In [20]:
# Initialize the model
model = BradleyTerryModel(feature_dim=2048)

# Load the state dict
model.load_state_dict(torch.load("bradley_terry_model.pth"))

# Finally we perform prediction
model.eval()

prediction_binary_test = []

with torch.no_grad():
    for features_1, features_2, compare_binary in test_loader:
        outputs = model(features_1, features_2)
        prediction_binary_test.extend(outputs.squeeze().tolist())

prediction_binary_test = (torch.tensor(prediction_binary_test) > 0.5).int().tolist()

metrics = compute_metrics(compare_binary_test, prediction_binary_test)

print("Accuracy:", metrics['accuracy'])
print("Precision:", metrics['precision'])
print("Recall:", metrics['recall'])
print("F1:", metrics['F1'])
print("MCC:", metrics['MCC'])

Accuracy: 0.5879629629629629
Precision: 0.6213592233009708
Recall: 0.5614035087719298
F1: 0.5898617511520737
MCC: 0.17896593739442457


#### The model performs very weakly, which means that we can now use this model as a human component for HITL. Ideally, the initial model should be no better than random guess