# Example MOBO

This notebook gives a minimal example of a multi-objective Bayesian optimization (MOBO) algorithm on molecules.
See `README.md` for installation instructions.

In [None]:
import os
print("PYTHONPATH:", os.environ.get('PYTHONPATH', 'Not Set'))

In [None]:
from pprint import pprint

from tdc_oracles_modified import Oracle
import numpy as np
from scipy import stats
from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem
from gp_mobo import kern_gp

## Objective

MOBO needs a set of objective functions to optimize.
We will try to [simultaneously] maximize the objectives:

\begin{align*}
f_1(m) &= -\textrm{DockingScore}(\textrm{PPARD}, m) \\
f_2(m) &= \textrm{QED}(m) \\
f_3(m) &= \textrm{sim}(m, \textrm{celecoxib})
\end{align*}

- $f_1$ is a negative docking score: higher values of $f_1$ indicate stronger binding to the PPARD target.
- $f_2$ is the quantatitive estimate of druglikeness. Higher values indicate superfical similarity to previously discovered drugs (on the basis of molecular weight, avoiding a small number of toxic substructures, etc).
- $f_3$ is similarity to the known drug molecule celecoxib. This is one of the objectives in the PMO benchmark.

Together, these objectives specify selecting molecules which bind to PPARD, are drug-like, and are structurally similar to celecoxib. Note this is just a demo objective; it probably does not correspond to a realistic drug discovery task.

In [None]:
# Load dockstring dataset
from dockstring.dataset import load_dataset
DOCKSTRING_DATASET = load_dataset()

In [None]:
# Create "oracles" for f2 and f3
QED_ORACLE = Oracle("qed")
CELECOXIB_ORACLE = Oracle("celecoxib-rediscovery")

In [None]:
def evaluate_objectives(smiles_list: list[str]) -> np.ndarray:
    """
    Given a list of N smiles, return an NxK array A such that
    A_{ij} is the jth objective function on the ith SMILES.

    NOTE: you might replace this implementation with an alternative
    implementation for your objective of interest.

    Our specific implementation uses the objective above.
    Because it uses the dockstring dataset to look up PPARD values,
    it is only defined on SMILES in the dockstring dataset.

    Also, be careful of NaN values! Some docking scores might be NaN.
    These will need to be dealt with somehow. 
    """
    f1 = [- DOCKSTRING_DATASET["PPARD"][s] for s in smiles_list]
    f2 = QED_ORACLE(smiles_list)
    f3 = CELECOXIB_ORACLE(smiles_list)
    out = np.stack([f1, f2, f3])  # 3xN
    return out.T  # tranpose, Nx3

# Test the function to ensure it outputs the right thing
test_inputs = [
    'C1=C(C2=C(C=C1O)OC(C(C2=O)=O)C3=CC=C(C(=C3)O)O)O',
    'O=S(=O)(N1CCNCCC1)C2=CC=CC=3C2=CC=NC3',
    'C=1(N=C(C=2C=NC=CC2)C=CN1)NC=3C=C(NC(C4=CC=C(CN5CCN(CC5)C)C=C4)=O)C=CC3C',
    'C1=CC=2C(=CNC2C=C1)C=3C=CN=CC3',
]
test_outputs = np.asarray(
    [
        [8.2, .463, 0.139],
        [7.1, 0.903, 0.173,], 
        [10.8, 0.389, 0.201,],
        [7.7, 0.633, 0.196,],
    ]
)
actual_outputs = evaluate_objectives(test_inputs)
print(actual_outputs)
assert np.allclose(
    actual_outputs,
    test_outputs,
    atol=1e-3
)
del actual_outputs   

## Data

At each step of optimization, assume we have a list of molecules for which all objective values have been observed (possibly with noise).
We will represent molecules using SMILES strings, and keep a list of such SMILES strings in the variable `known_smiles`.
We will store the objective evaluations in an array `known_Y`,
where `known_Y[i,j]` is a noisy observation of $f_j$ on the molecule `known_smiles[i]`.

In [None]:
ALL_SMILES = list(DOCKSTRING_DATASET["PPARD"].keys())[:10_000]  # 10k SMILES from dataset

In [None]:
known_smiles = ALL_SMILES[:10]  # start with just 10 SMILES known
print("Known SMILES:")
pprint(known_smiles)

known_Y = evaluate_objectives(known_smiles)
print(f"Known Y shape: {known_Y.shape}")
known_Y

## Model

We will write a model to predict $[f_1,f_2,f_3]$ for arbitrary other molecules $m$,
given previous objective function evaluations.
We will model each objective independently, and assume Gaussian noise.
This means our model's predictive distribution will be a multi-variate Gaussian
distribution with no correlations (aka a diagonal covariance matrix):

$$
\begin{bmatrix}f_1(m) \\ f_2(m) \\ f_3(m) \end{bmatrix}
\sim
\mathcal N \left(
\begin{bmatrix}\mu_1(m) \\ \mu_2(m) \\ \mu_3(m) \end{bmatrix},
\begin{bmatrix}\sigma^2_1(m) & 0 & 0 \\ 0 & \sigma^2_2(m)& 0  \\0 & 0 & \sigma^2_3(m) \end{bmatrix}
\right)
$$



Our independent model for $f_i$ will be a Tanimoto-kernel GP with a constant mean $\mu_i$,
a kernel amplitude $a_i$, and a noise variance $s_i$.
This is equivalent to modelling the _residual_ $(y-\mu_i)$ with a _zero-mean_ GP
with kernel $$k(x,x')=a_i T(x,x')$$ (where $T$ denotes Tanimoto similarity between fingerprints).
The parameters $\vec{a}, \vec{\mu}, \vec{\s}$ are the model hyperparameters.
These can be tuned later.

Because the covariance matrix is diagonal, we will make our model return predictions as a tuple of vectors
\begin{align*}
    \vec{\mu}(m) &= \begin{bmatrix} \mu_1(m) & \mu_2(m) & \mu_3(m) \end{bmatrix} \\
    \vec{\sigma^2}(m) &= \begin{bmatrix} \sigma^2_1(m) & \sigma^2_2(m) & \sigma^2_3(m) \end{bmatrix}\ .
\end{align*}
This is implemented in the function below.

In [None]:
def get_fingerprint(smiles: str):
    mol = Chem.MolFromSmiles(smiles)
    assert mol is not None
    return AllChem.GetMorganFingerprint(mol, radius=3, useCounts=True)

def independent_tanimoto_gp_predict(
    *,  # require inputting arguments by name
    query_smiles: list[str],  # len M
    known_smiles: list[str],  # len N
    known_Y: np.ndarray,  # NxK
    gp_means: np.ndarray,  # shape K
    gp_amplitudes: np.ndarray,  # shape K
    gp_noises: np.ndarray,  # shape K
) -> tuple[np.ndarray, np.ndarray]:
    """
    Make *independent* predictions on a set of query smiles with the
    independent Tanimoto GP model.
    
    Return two arrays A and B with shape (M,K)
    such that A[i,j] is the predicted mean for query_smiles[i]
    on objective j
    and B[i,j] is the predicted variance for query_smiles[i]
    on objective j.

    gp_means, gp_amplitudes, and gp_noises
    are the model hyperparameters.

    NOTE: this method can likely be made much more efficient if covariance matrices are cached
    (i.e. calculated once and then passed in). If you change this in the future, this method
    could be a helpful reference.
    """

    # Check that dimension of hyperparameters is correct
    for hparam_arr in (gp_means, gp_amplitudes, gp_noises):
        assert hparam_arr.shape == (known_Y.shape[1], )

    # Create kernel matrices of Tanimoto similarities.
    # These are shared between all 3 models
    # NOTE: if you are calling this function many times on the same query/known smiles
    # you could potentially cache this computation, but for now we won't worry about this.
    known_fp = [get_fingerprint(s) for s in known_smiles]
    query_fp = [get_fingerprint(s) for s in query_smiles]
    K_known_known = np.asarray([DataStructs.BulkTanimotoSimilarity(fp, known_fp) for fp in known_fp])  # shape (N,N)
    K_query_known = np.asarray([DataStructs.BulkTanimotoSimilarity(fp, known_fp) for fp in query_fp])  # shape (M,N)

    # Compute DIAGNONAL of query-query covariance matrix. Don't need the full matrix since we are not
    # making correlated predictions.
    K_query_query_diagonal = np.asarray([DataStructs.TanimotoSimilarity(fp, fp) for fp in query_fp])

    # Make separate predictions for each model.
    # NOTE: this will invert the covariance matrix K times, and will therefore be slower than it could be.
    # This is a design limitation of kern_GP and could be improved in the future.
    means_out = []
    vars_out = []
    for j in range(known_Y.shape[1]):  # iterate over all objectives
        residual_j = known_Y[:, j] - gp_means[j]
        mu_j, var_j = kern_gp.noiseless_predict(
            a=gp_amplitudes[j],
            s=gp_noises[j],
            k_train_train=K_known_known,
            k_test_train=K_query_known,
            k_test_test=K_query_query_diagonal,
            y_train=residual_j,
            full_covar=False
        )
        means_out.append(mu_j + gp_means[j])
        vars_out.append(var_j)

    # Return joint predictions
    return (
        np.asarray(means_out).T,
        np.asarray(vars_out).T,
    )

Test Case

In [None]:
# Test this model on two molecules with Tanimoto similarity = 0.5
m1, m2 = "CCCC", "CCC"
print(f"Tanimoto Similarity: {DataStructs.TanimotoSimilarity(get_fingerprint(m1), get_fingerprint(m2)):}")
mu_pred, var_pred = independent_tanimoto_gp_predict(
    query_smiles=[m2],
    known_smiles=[m1],
    known_Y=np.asarray([[1.0, 0.0, -1.0]]),
    gp_means=np.asarray([0.0, 0.0, 1.0]),
    gp_amplitudes=np.asarray([1.0, 0.5, 1.0]),
    gp_noises=np.asarray([1.0, 1e-4, 1e-1]),
)

# Check that it is close to some manually-calculated values
assert np.allclose(mu_pred, np.asarray([[1 / 4, 0, 1 - 1 / 1.1]]))
assert np.allclose(var_pred, np.asarray([[1 - 0.5**3, 0.5 * (1 - 0.5**2), 1 - (0.5**2) / 1.1]]), atol=1e-3)
print(mu_pred)
print(var_pred)
print("Mean and variance are as expected")
del mu_pred, var_pred

## Acquisition function

We have written a function which returns the predicted mean and variance for all objectives.
We will now write a separate function which computes an acquisition function from
a set of predicted means and variances.
Designing the acquisition function will likely be the most difficult part of the project.

As a demonstration, we will use a simple acquisition function which I have made-up/reinvented: the probability of _any_ improvement (PAI).
This is the probability that an observed value will improve on the best known value for at least 1 objective:

$$\mathrm{PAI}(x) = \mathbb{P}_{\vec{y}\sim \mathrm{model}(x)}\left[i : y_i \geq y_{best,i} \right]$$

This function can be calculated in two ways:
1. Analytically: because the predictions are independent, PAI(x) = 1 - (probability that no $y_i$ improves)
2. Monte Carlo: sample a bunch of $y$ values and check how many times improvement is observed

We will code both below.

In [None]:
def PAI_analytic(
    *,
    mu_pred: np.ndarray,  # shape (K,)
    var_pred: np.ndarray,  # shape (K,)
    y_best: np.ndarray,  # shape (K, )
) -> float:

    assert mu_pred.shape == var_pred.shape == y_best.shape

    # The probability of a Gaussian variable being <= a certain value is given by its CDF.
    # This is exactly the probability of not improving
    prob_no_improve = stats.norm.cdf(y_best, loc=mu_pred, scale=np.sqrt(var_pred))

    return float(1 - np.prod(prob_no_improve))

def PAI_mc(
    *,
    mu_pred: np.ndarray,  # shape (K,)
    var_pred: np.ndarray,  # shape (K,)
    y_best: np.ndarray,  # shape (K, )
    num_mc_samples: int,
) -> float:

    samples = stats.norm(loc=mu_pred, scale=np.sqrt(var_pred)).rvs(size=(num_mc_samples, len(y_best)))
    return np.mean(np.any(samples > y_best, axis=1).astype(float))

In [None]:
# Test the acquisition function.
# Values should be reasonably close
mu_pred = np.asarray([1.0, 2.0])
var_pred = np.asarray([0.5, 2.0])**2
y_best = np.asarray([1.5, 4.0])
print(f"Analytic: {PAI_analytic(mu_pred=mu_pred, var_pred=var_pred, y_best=y_best)}")
print(f"MC: {PAI_mc(mu_pred=mu_pred, var_pred=var_pred, y_best=y_best, num_mc_samples=1000)}")

## BO loop

In each iteration, a BO loop will:

1. Fit the model
2. Evaluate the acquisition function over all candidate SMILES
3. Pick the SMILES with the highest acquisition function value
4. Evaluate that SMILES and add it to the dataset

Here we will write a very simple BO loop that does this.

Note that writing this loop requires setting model hyperparameters.
A good setting for the $\mu$ hyperparameter is the dataset mean,
while a good setting for the $a$ is the dataset variance (or potentially a higher value to encourage exploration).
These are _not_ known in real life (and should be tuned),
but for the purposes of this example we will calculate them on the real dataset
and set them accordingly.
Since these objectives are all noiseless, we will set the noise to a small value.

Running the BO loop below should yield a small amount of improvement to the objectives.
Most likely better tuning of hyperparameters will yield better results.

In [None]:
# Hyperparameters
all_Y_values = evaluate_objectives(ALL_SMILES)  # NOTE: is cheating, if we can just evaluate everything we wouldn't do BO

# NOTE: in evaluation, disregard NaN values
print(f"Mean: {np.nanmean(all_Y_values, axis=0)}")
print(f"Var: {np.nanvar(all_Y_values, axis=0)}")

In [None]:
# BO loop
BO_known_smiles = list(known_smiles)
BO_known_Y = known_Y.copy()
for bo_iter in range(10):
    y_best = np.max(BO_known_Y, axis=0)  # best eval so far
    print(f"Start BO iter {bo_iter}. Dataset size={BO_known_Y.shape}. Y_best={y_best}")

    # Make predictions
    mu_pred, var_pred = independent_tanimoto_gp_predict(
        query_smiles=ALL_SMILES,
        known_smiles=BO_known_smiles,
        known_Y=BO_known_Y,
        gp_means=np.asarray([9., 0.6, 0.2]),  # Chosen from above
        gp_amplitudes=np.asarray([2.0, 0.25, 0.25]),  # Chosen higher than actual means/vars above
        gp_noises=np.asarray([1e-1, 1e-2, 1e-2]),  # small values
    )

    # Evaluate acquisition function (use analytic version)
    # NOTE: can replace with other acquisition function later
    acq_fn_values = [
        PAI_analytic(mu_pred=m, var_pred=v, y_best=y_best)
        for m, v in zip(mu_pred, var_pred)
    ]
    print(f"\tMax acquisition value: {max(acq_fn_values):.3g}")

    # Which SMILES maximizes the acquisition function value?
    # be sure to choose a SMILES which was not chosen before!
    for chosen_i in np.argsort(-np.asarray(acq_fn_values)):
        if ALL_SMILES[chosen_i] in BO_known_smiles:
            print(f"\tSMILES {chosen_i} with acq fn = {acq_fn_values[chosen_i]:.3g} is already known, skipping")
        else:
            break
    print(f"\tChose SMILES {chosen_i} with acq fn = {acq_fn_values[chosen_i]:.3g}")

    # Evaluate SMILES
    chosen_smiles = ALL_SMILES[chosen_i]
    new_y = evaluate_objectives([chosen_smiles])
    assert not np.any(np.isnan(new_y)), "NaN value detected in objective. Need to handle this case separately"
    print(f"\tValue of chosen SMILES: {new_y}")

    # Add to dataset
    BO_known_smiles = BO_known_smiles + [chosen_smiles]
    BO_known_Y = np.concatenate([BO_known_Y, new_y], axis=0)

In [None]:
#apepended list of new generated SMILEs 

In [None]:
print("BO_known_smiles_list:")
print(BO_known_smiles)
print("BO_known_Y_list:")
print(BO_known_Y)