<a target="_blank" href="https://colab.research.google.com/github/RodrigoAVargasHdz/CHEM-4PB3/blob/w2024/Course_Notes/Week%206/TanimotoKernel.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [None]:
!pip install gpytorch
!pip install rdkit

In [None]:
import numpy as np
import pandas as pd
import torch
import gpytorch
from gpytorch.kernels import Kernel
from rdkit.Chem import AllChem, Descriptors, MolFromSmiles, MolToSmiles
from rdkit.Chem.Draw import IPythonConsole
from rdkit.Chem import Draw

from gpytorch.constraints import Positive
from torch import Tensor
import torch
from gpytorch.kernels import Kernel
from gpytorch.constraints import Positive

To open on Google Colab [link](https://colab.research.google.com/github/RodrigoAVargasHdz/CHEM-4PB3/blob/main/Course_Notes/Week6/TanimotoKernel.ipynb)

# Kernel for strings #

The kernel between two strings $\mathbf{x}$ and $\mathbf{x}'$ can be defined as,
$$
k(x,x') = \sum_{a\in{\cal A}^*}\omega_s \phi_s(\mathbf{x})\phi_s(\mathbf{x}'),
$$
where 
*  $\phi_s(\mathbf{x})$ denote the number of times that substring $s$ appears in string $\mathbf{x}$
* ${\cal A}$ is the alphabet of characters.
* $\omega_s$ is a non-negative weight for substring $s$
</br>
</br>

(**Example from Ref. [2](https://papers.nips.cc/paper/2000/file/68c694de94e6c110f42e587e8e48d852-Paper.pdf)**)\
![Arrays](https://raw.github.com/RodrigoAVargasHdz/CHEM-4PB3/master/Course_Notes/Figures/StringKernel.png)


</br>
</br>

**References**
1. [GP Book, Chapter 4, Section 4.4](https://gaussianprocess.org/gpml/chapters/) 
2. [(paper) Text classification with string kernels](https://papers.nips.cc/paper/2000/file/68c694de94e6c110f42e587e8e48d852-Paper.pdf)
3. [(paper) GAUCHE: A Library for Gaussian Processes in Chemistry](https://ml4physicalsciences.github.io/2022/files/NeurIPS_ML4PS_2022_75.pdf)
4. [(Wiki) String kernels](https://en.wikipedia.org/wiki/String_kernel)

# Kernels for molecules #
**Reference**: [paper](https://papers.nips.cc/paper/2000/file/68c694de94e6c110f42e587e8e48d852-Paper.pdf)

## Scalar product kernel ##
$k_{\text{Scalar Product}}(\mathbf{x},\mathbf{x}') = \ell \cdot \langle \mathbf{x},\mathbf{x}' \rangle$, \
where
* $\langle \mathbf{x},\mathbf{x}' \rangle$ is the  Euclidean inner product; $\langle \mathbf{x},\mathbf{x}' \rangle = \sum_i x_i x'_i$.
* $\ell$ is a scalar signal variance hyperparameter.
</br>
</br>


## Tanimoto kernel ##
General similarity metric for **binary attributes**, used in [Ref.](https://doi.org/10.1016/j.neunet.2005.07.009) for cheminformatics.

$k_{\text{Tanimoto}}(\mathbf{x},\mathbf{x}') = \ell \cdot \frac{\langle \mathbf{x},\mathbf{x}' \rangle}{\|\mathbf{x} \|^2 + \|\mathbf{x}'\|^2 - \langle \mathbf{x},\mathbf{x}' \rangle}$, \
where
* $\| \;\cdot\; \|$ is the Euclidian norm
* $\mathbf{x}$ is a binary vector, $x_i = \{0,1\}$
</br>
</br>


## Graph kernel ##
$k_{\text{Gprah}}(g,g') = \ell \cdot \langle \phi(g),\phi(g') \rangle_{{\cal H}}$,
where,
* $\langle \phi(g),\phi(g') \rangle_{{\cal H}}$  measures the similarity betweene two molecular graphs. Related to graph isomorphism. 
* $\ell$ is a scalar signal variance hyperparameter.

The graph kernel will be used this week [link](https://colab.research.google.com/github/RodrigoAVargasHdz/CHEM-4PB3/blob/main/Course_Notes/Week5/gpytorch_molecules.ipynb)



# Code

## Tanimoto kernel in Torch
Code is based on this [online tutorial](https://towardsdatascience.com/gaussian-process-regression-on-molecules-in-gpflow-ee6fedab2130), where $k_{\text{Tanimoto}}(\cdot,⋅)$ was coded in TensorFlow. \
For defining your own kernel in Gpytorch, one can follow the following [tutorial](https://docs.gpytorch.ai/en/stable/examples/00_Basic_Usage/Implementing_a_custom_Kernel.html).



In [None]:

# from ChatGPT 2023
# '''Write me a code compatible with GPyTorch for the Tanimoto kernel function'''
# '''remove the lengthscale parameter'''
class TanimotoKernel(gpytorch.kernels.Kernel):
    def __init__(self, active_dims=None):
        super(TanimotoKernel, self).__init__(active_dims=active_dims)
        self.register_parameter(
            name="raw_variance", parameter=torch.nn.Parameter(torch.ones(1))
        )
        self.register_constraint("raw_variance", gpytorch.constraints.Positive())

    @property
    def variance(self):
        return self.raw_variance_constraint.transform(self.raw_variance)

    @variance.setter
    def variance(self, value):
        self.raw_variance = self.raw_variance_constraint.inverse_transform(value)

    def forward(self, x1, x2, **params):
        x1_ = x1.unsqueeze(-2)
        x2_ = x2.unsqueeze(-3)
        numerator = torch.sum(x1_ * x2_, dim=-1)
        denominator = torch.sum(x1_ + x2_ - x1_ * x2_, dim=-1)
        return self.variance * numerator / denominator
    '''
    # def forward(self, x1, x2, **params):
    #     x1_ = x1.unsqueeze(-2)
    #     x2_ = x2.unsqueeze(-3)
    #     numerator = torch.sum(x1_ * x2_, dim=-1)
    #     denominator = torch.sum(x1_ ** 2, dim=-1) + \
    #         torch.sum(x2_ ** 2, dim=-1) - numerator
    #     return self.raw_variance * numerator / denominator  
    '''

    

In [None]:
n,d = 2, 5
x1 = torch.randint(0, 2, (n,d))
print('Random vectors')
print(x1)

In [None]:
print(x1.unsqueeze(-2).shape)
print(x1.unsqueeze(-2))
print(x1.unsqueeze(-3).shape)
print(x1.unsqueeze(-3))
print('*')

print(torch.sum(x1.unsqueeze(-2) * x1.unsqueeze(-3), dim=-1))
print(torch.tensordot(x1,x1, dims=([-1],[-1])))
print('*')

kernel = TanimotoKernel()
K = kernel.forward(x1,x1) ## is this correct?
print('Kernel matrix')
print(K)
print('Kernel parameter')
print(kernel.raw_variance)

### GPT4 2024 ###

In [None]:
class TanimotoKernel(Kernel):
    has_lengthscale = False

    def __init__(self, variance=None, **kwargs):
        super(TanimotoKernel, self).__init__(**kwargs)

        # Initialize the variance (outputscale) parameter
        if variance is None:
            self.raw_outputscale = torch.nn.Parameter(
                torch.log(torch.exp(torch.tensor(1.))-1))
        else:
            self.raw_outputscale = torch.nn.Parameter(torch.tensor(variance))

        self.register_constraint("raw_outputscale", Positive())

    @property
    def outputscale(self):
        return self.raw_outputscale_constraint.transform(self.raw_outputscale)

    @outputscale.setter
    def outputscale(self, value):
        self._set_outputscale(value)

    def forward(self, x1, x2, diag=False, **params):
        """
        Compute the scaled Tanimoto kernel between inputs x1 and x2.

        Args:
            x1 (Tensor): The first input tensor with shape (..., n, d) where n is the number of points
                         and d is the dimensionality of each point.
            x2 (Tensor): The second input tensor with shape (..., m, d) where m is the number of points
                         and d is the dimensionality of each point.
            diag (bool, optional): Whether to return the diagonal of the kernel matrix rather than the full
                                   kernel matrix. Defaults to False.

        Returns:
            Tensor: The computed kernel matrix scaled by the variance parameter.
        """
        prod = x1.matmul(x2.transpose(-2, -1))
        x1_sq = x1.pow(2).sum(dim=-1, keepdim=True)
        x2_sq = x2.pow(2).sum(dim=-1, keepdim=True).transpose(-2, -1)

        base = x1_sq + x2_sq - prod
        kernel_matrix = prod / base

        # Scale the kernel matrix by the variance parameter
        kernel_matrix *= self.outputscale

        if diag:
            return kernel_matrix.diag()
        else:
            return kernel_matrix

# Example Caffeine

In [None]:
def get_fingerprints(m_smiles,radius=2):
  m = MolFromSmiles(m_smiles)
  m_fingerprints = AllChem.GetMorganFingerprintAsBitVect(m, radius=radius, nBits=2048)
  return np.asarray(m_fingerprints)

In [None]:
caff_sm = 'CN1C=NC2=C1C(=O)N(C(=O)N2C)C'
m = MolFromSmiles(caff_sm)
x_caff_fp = get_fingerprints(caff_sm)
print(x_caff_fp)
m

Molecules similar to caffeine [link](https://www.acs.org/education/resources/highschool/chemmatters/past-issues/archive-2013-2014/caffeine.html)\
[Paraxanthine](https://en.wikipedia.org/wiki/Paraxanthine)

In [None]:
# Paraxanthine
parax_sm = 'O=C2Nc1ncn(c1C(=O)N2C)C'
m1 = MolFromSmiles(parax_sm)
x_parax_fp = get_fingerprints(parax_sm)
m1

Evaluate the Tanimoto kernel.

In [None]:
x_caff = torch.from_numpy(x_caff_fp).unsqueeze(0)
x_parax = torch.from_numpy(x_parax_fp).unsqueeze(0)
print('Tanimoto kernel for, Caffeine-Caffeine: ',kernel.forward(x_caff,x_caff))
print('Tanimoto kernel for, Caffeine-Paraxanthine: ',kernel.forward(x_caff,x_parax))
print('Tanimoto kernel for, Paraxanthine-Paraxanthine: ',kernel.forward(x_parax,x_parax))

# More data!!

In [None]:
data_url = "https://github.com/RodrigoAVargasHdz/CHEM-4PB3/raw/main/Course_Notes/data/solubility.csv"
data = pd.read_csv(data_url)
print(data.columns)

In [None]:
# print 10 molecules
print(data['SMILES'][:10])

In [None]:
# compute the FingerPrints for all molecules
X_fp = []
molecules = data['SMILES']#[:1000]
for m in molecules:
  x = get_fingerprints(m)
  X_fp.append(x)

In [None]:
import matplotlib
import matplotlib.pyplot as plt


# # compute K(caffeine, molecule)
X_fp = torch.from_numpy(np.asarray(X_fp))
k_t = kernel.forward(x_caff,X_fp)
# print(k_t[:1000])

# from torch to numpy (don't forget!!)
k_values = k_t.detach().numpy()
k_values = k_values.flatten()
  
plt.figure(figsize=(10,8))
plt.hist(k_values, bins=100)
plt.xticks(fontsize=15)
plt.xlabel('Tanimoto Kernel for Caffeine',fontsize=15)
plt.ylabel('Counts')

In [None]:
i_max = np.argmax(k_values)
smiles_max = data['SMILES'][i_max]
print(smiles_max)
i_min = np.argmin(k_values)
smiles_min = data['SMILES'][i_min]
print(smiles_min)

molecule1 = MolFromSmiles(smiles_max)
molecule2 = MolFromSmiles(smiles_min)

mols = [molecule1, molecule2]
img = Draw.MolsToGridImage(mols, molsPerRow=2, subImgSize=(
    200, 200), legends=['Molecule MAX', 'Molecule MIN'])

img

In [None]:
def are_smiles_equal(smiles1, smiles2):
    # Convert the SMILES strings to RDKit molecule objects
    mol1 = MolFromSmiles(smiles1)
    mol2 = MolFromSmiles(smiles2)

    # Generate canonical SMILES for each molecule
    canonical_smiles1 = MolToSmiles(mol1)
    canonical_smiles2 = MolToSmiles(mol2)

    # Compare the canonical SMILES strings
    return canonical_smiles1 == canonical_smiles2

are_equal = are_smiles_equal(caff_sm, smiles_max)
print(f"Are the two SMILES equal? {are_equal}")

are_equal = are_smiles_equal(caff_sm, smiles_min)
print(f"Are the two SMILES equal? {are_equal}")

## Train a GP (GPyTorch)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# data
# Xtot_np = k_values[:,np.newaxis]
Xtot_np = np.asarray(X_fp)
ytot_np = data['Solubility'].to_numpy()

scaler = StandardScaler()
scaler.fit(ytot_np[:,np.newaxis])
ytot_np = scaler.transform(ytot_np[:,np.newaxis])
ytot_np = ytot_np.ravel()

print(Xtot_np.shape,ytot_np.shape)

N = 800 #2500
Nval = 5000
X_train, X_test, y_train, y_test = train_test_split(Xtot_np, ytot_np, 
                                                    test_size=ytot_np.shape[0] - N, random_state=1)
if Nval > 0:
    X_test, y_test = X_test[:Nval], y_test[:Nval]
    Xtr, Xtst, ytr, ytst = X_train, X_test, y_train, y_test

if torch.cuda.is_available():
    Xtr = torch.from_numpy(Xtr).cuda()
    ytr = torch.from_numpy(ytr).float().cuda()
else:
    Xtr = torch.from_numpy(Xtr)
    ytr = torch.from_numpy(ytr).float()

Xtst = torch.from_numpy(Xtst).double()
ytst = torch.from_numpy(ytst).float()

Xtot = torch.from_numpy(Xtot_np)
ytot = torch.from_numpy(ytot_np)

In [None]:
class ExactGPModel(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(ExactGPModel, self).__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = TanimotoKernel()
        # self.covar_module = gpytorch.kernels.ScaleKernel(
        #     gpytorch.kernels.RBFKernel(ard_num_dims=train_x.shape[1]))

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

In [None]:
# initialize likelihood and model

# likelihood = gpytorch.likelihoods.GaussianLikelihood().double()
# likelihood.noise = 1e-5  # Some small value, but don't make it too small or numerical performance will suffer. I recommend 1e-4.
# likelihood.noise_covar.raw_noise.requires_grad_(False)

likelihood = gpytorch.likelihoods.GaussianLikelihood(noise_constraint=gpytorch.constraints.GreaterThan(1e-6)).double()
likelihood.noise = 1e-5 

model = ExactGPModel(Xtr, ytr, likelihood).double()

if torch.cuda.is_available():
    likelihood.cuda()
    model.cuda()

In [None]:
kernel = TanimotoKernel()
K = kernel.forward(Xtr,Xtr)
for i, k in enumerate(K):
  if not torch.any(torch.isnan(k)):
    print(i)
    print(Xtr[i])

In [None]:
# Find optimal model hyperparameters using ADAM

# Use the adam optimizer
# Includes GaussianLikelihood parameters
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

# "Loss" for GPs - the marginal log likelihood
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)

training_iter = 250
mll_trajectory = []
mll_trajectory_tst = []
for i in range(training_iter+1):
    model.train()
    likelihood.train()
    # Zero gradients from previous iteration
    optimizer.zero_grad()
    # Output from model
    output = model(Xtr)
    # Calc loss and backprop gradients
    loss = -mll(output, ytr)
    loss.backward()
    mll_trajectory.append(loss.item())

    if (i % 25) == 0.:
      print('Iter %d/%d - Loss: %.5f  noise: %.6f' % (
          i, training_iter, loss.item(),
          model.likelihood.noise.item()
      ))
      print('sigma: ', model.covar_module.variance.item())
    optimizer.step()

## Prediction

In [None]:
# Prediction with GPyTorch
from torch.utils.data import TensorDataset, DataLoader

# 	In all other cases, he suggests using a power of 2 as the mini-batch size.
# 	So the minibatch should be 64, 128, 256, 512, or 1024 elements large.


dummy_test_y = torch.full_like(Xtst, dtype=torch.long, fill_value=0)
test_dataset = TensorDataset(Xtst, dummy_test_y)
test_loader = DataLoader(test_dataset, batch_size=512, shuffle=False)

means = torch.tensor([0.])
stds = torch.tensor([[0.,0]])

model.eval()
likelihood.eval()
with torch.no_grad():
    for x_batch, _ in test_loader:
      if torch.cuda.is_available():
          xb = x_batch.cuda()
      else:
          xb = x_batch
      preds = likelihood(model(xb))
      mean = preds.mean.cpu()
      means = torch.cat([means, mean])

In [None]:
from sklearn.metrics import r2_score
import matplotlib
import matplotlib.pylab as plt

ytot_gp = means[1:].cpu().numpy() # torch to numpy 
ytst.cpu()
r2 = r2_score(ytot_gp,ytst.cpu())

plt.figure(figsize=(10,8))
plt.scatter(ytot_gp,ytst.cpu(),s=5)

low = np.min(np.stack((ytot_gp,ytst.cpu())).flatten())
high = np.max(np.stack((ytot_gp,ytst.cpu())).flatten())
plt.plot([low, high], [low, high], ls="--", c="red",lw=3.)

plt.title('Solubility (N=%s), $R^{2}$ = %.3f'%(N,r2),fontsize=18)
# plt.text(0.02,0.08,r'$R^{2}$ = %.3f'%r2,fontsize=18)
plt.xlabel('GP prediction',fontsize=18)
plt.ylabel('Data',fontsize=18)
# plt.show()
