# ML4DD Summer School Hackathon

The final days of the Machine Learning For Drug Discovery summer school ends with a hackathon. We will use Polaris as a tool to get the associated benchmarks and datasets. First things first, we will install Polaris from PyPi.

We next need to authenticate ourselves to Polaris. If you haven't done so yet, you can create an account at https://polarishub.io. Afterwards, you can simply run the command below.

In [None]:
# Use the organization owner settings
owner = "team13"

print(f'You have set "{owner}" as the owner')

In [None]:
import polaris as po
import datamol as dm
import numpy as np
import pandas as pd

# Kinase Selectivity

The second benchmark we will use is `polaris/pkis1-kit-wt-mut-c-1`. Using this benchmark is very similar to before, except for one difference: This is a multi-task benchmark.

In [None]:
benchmark = po.load_benchmark("polaris/pkis1-kit-wt-mut-c-1")
train, test = benchmark.get_train_test_split()

As we can see, the targets are now returned to us as a dictionary. Let's train a multi-task model on this data! We first preprocess the data to be in a format we can use with scikit-learn.

In [None]:
ys = train.y
ys = np.stack([ys[target] for target in benchmark.target_cols], axis=1)
ys.shape

Now that we're working with a multi-task dataset, it's also possible for these arrays to be sparse. Let's filter out any data points that doesn't have readouts for _all_ targets.

In [None]:
mask = ~np.any(np.isnan(ys), axis=1)
mask.sum()

In [None]:
df_train = pd.DataFrame(train.X[mask])
df_train.columns = ["smiles"]
df_train[benchmark.target_cols] = ys[mask]

In [None]:
df_train.head()

## Add phyisical features

In [None]:
features = [
    "MolecularWeight",
    "LogP",
    "MaxAbsPartialCharge",
    "MinAbsPartialCharge",
]

In [None]:
from src.utils import featurize_smiles

df_train[features] = df_train["smiles"].apply(lambda x: pd.Series(featurize_smiles(x)))

In [None]:
df_train.head()

In [None]:
X = df_train[features].values
y = df_train[benchmark.target_cols].values

print("X shape:", X.shape)
print("y shape:", y.shape)

Do the same featurization on the test set.

In [None]:
df_test = pd.DataFrame(test.X)
df_test.columns = ["smiles"]
df_test[features] = df_test["smiles"].apply(lambda x: pd.Series(featurize_smiles(x)))

X_test = df_test[features].values
print("X_test shape:", X_test.shape)

# Baseline with Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
# Construct a random forest regressor for each target
models = {
    target: RandomForestClassifier(max_depth=5) for target in benchmark.target_cols
}

# Train the models
for target in benchmark.target_cols:
    models[target].fit(X, y[:, benchmark.target_cols.index(target)])

## Predictions

In [None]:
# Predict the test set
y_prob_rf = {
    target: model.predict_proba(X_test)[:, 1] for target, model in models.items()
}
y_pred_rf = {target: model.predict(X_test) for target, model in models.items()}

In [None]:
results = benchmark.evaluate(y_pred=y_pred_rf, y_prob=y_prob_rf)
results

# Multioutput

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.multioutput import MultiOutputClassifier

In [None]:
base_model = RandomForestClassifier()
model = MultiOutputClassifier(base_model)

In [None]:
model.fit(X, y)

## Predictions

In [None]:
# Predict the test set
y_pred_mrf = model.predict(X_test)
y_prob_mrf = model.predict_proba(X_test)

In [None]:
y_pred_mrf = {
    target: y_pred_mrf[:, i] for i, target in enumerate(benchmark.target_cols)
}
y_prob_mrf = {
    target: y_prob_mrf[i][:, 1] for i, target in enumerate(benchmark.target_cols)
}

In [None]:
print(benchmark.target_cols)

In [None]:
# Predict for class 1
print("Predicted as binders for target 1:", y_pred_mrf[benchmark.target_cols[0]].sum())
print("Predicted as binders for target 2:", y_pred_mrf[benchmark.target_cols[1]].sum())
print("Predicted as binders for target 3:", y_pred_mrf[benchmark.target_cols[2]].sum())

In [None]:
results = benchmark.evaluate(y_pred=y_pred_mrf, y_prob=y_prob_mrf)
results

# Resampling

In [None]:
from imblearn.over_sampling import SMOTE

smote = SMOTE()

resampled_datasets = {}

for target in benchmark.target_cols:
    X_resampled, y_resampled = smote.fit_resample(
        X, y[:, benchmark.target_cols.index(target)]
    )
    resampled_datasets[target] = (X_resampled, y_resampled)

In [None]:
models = {target: RandomForestClassifier() for target in benchmark.target_cols}

In [None]:
for target in benchmark.target_cols:
    models[target].fit(*resampled_datasets[target])

## Predictions

In [None]:
y_pred_rf_resampled = {
    target: model.predict(X_test) for target, model in models.items()
}
y_prob_rf_resampled = {
    target: model.predict_proba(X_test)[:, 1] for target, model in models.items()
}

In [None]:
# Predict for class 1
print(
    "Predicted as binders for target 1:",
    y_pred_rf_resampled[benchmark.target_cols[0]].sum(),
)
print(
    "Predicted as binders for target 2:",
    y_pred_rf_resampled[benchmark.target_cols[1]].sum(),
)
print(
    "Predicted as binders for target 3:",
    y_pred_rf_resampled[benchmark.target_cols[2]].sum(),
)

In [None]:
results = benchmark.evaluate(y_pred=y_pred_rf_resampled, y_prob=y_prob_rf_resampled)
results

# MLP

In [None]:
# Define an MLP model, then train it with a loss which is proportional to the class imbalance

import torch
import torch.nn as nn


class MLP(nn.Module):
    def __init__(self, input_dim, hidden_layers, output_dim):
        super(MLP, self).__init__()
        self.input_dim = input_dim
        self.hidden_layers = hidden_layers
        self.output_dim = output_dim

        self.layers = nn.ModuleList()
        self.layers.append(nn.Linear(input_dim, hidden_layers[0]))
        for i in range(1, len(hidden_layers)):
            self.layers.append(nn.Linear(hidden_layers[i - 1], hidden_layers[i]))
        self.layers.append(nn.Linear(hidden_layers[-1], output_dim))

    def forward(self, x):
        for layer in self.layers[:-1]:
            x = torch.relu(layer(x))
        x = torch.sigmoid(self.layers[-1](x))
        return x


# Define the loss function
class WeightedBCELoss(nn.Module):
    def __init__(self, pos_weight):
        super(WeightedBCELoss, self).__init__()
        self.pos_weight = pos_weight
        self.bce = nn.BCELoss(reduction="mean")

    def forward(self, y_pred, y_true):
        if y_pred > 0.5:
            return self.bce(y_pred, y_true) * self.pos_weight
        else:
            return self.bce(y_pred, y_true)

In [None]:
weights = {}

for target in benchmark.target_cols:
    num_positive = y[:, benchmark.target_cols.index(target)].sum()
    num_negative = y.shape[0] - num_positive
    pos_weight = num_negative / num_positive
    weights[target] = pos_weight

In [None]:
n_epochs = 100
hidden_layers = [64]

# Define the model
models = {
    target: MLP(input_dim=X.shape[1], hidden_layers=hidden_layers, output_dim=1)
    for target in benchmark.target_cols
}

for target in benchmark.target_cols:
    model = models[target]
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = WeightedBCELoss(pos_weight=weights[target])

    X_t = torch.tensor(X, dtype=torch.float32)
    y_t = torch.tensor(y[:, benchmark.target_cols.index(target)], dtype=torch.float32)

    for epoch in range(n_epochs):
        optimizer.zero_grad()
        y_pred = model(X_t).squeeze()
        loss = criterion(y_pred, y_t)
        loss.backward()
        optimizer.step()

    model.eval()

## Predict

In [None]:
# Predict the test set
y_prob_mlp = {
    target: model(torch.tensor(X_test, dtype=torch.float32)).detach().numpy().squeeze()
    for target, model in models.items()
}

y_pred_mlp = {
    target: (y_prob > 0.5).astype(int) for target, y_prob in y_prob_mlp.items()
}

In [None]:
print("Predicted as binders for target 1:", y_pred_mlp[benchmark.target_cols[0]].sum())
print("Predicted as binders for target 2:", y_pred_mlp[benchmark.target_cols[1]].sum())
print("Predicted as binders for target 3:", y_pred_mlp[benchmark.target_cols[2]].sum())

In [None]:
results = benchmark.evaluate(y_pred=y_pred_mlp, y_prob=y_prob_mlp)
results

## Upload results

In [None]:
results.name = "my-second-result"
results.description = "ECFP fingerprints with a Random Forest"

In [None]:
# results.upload_to_hub(owner=owner)

The End.