<a href="https://www.kaggle.com/code/basth94/ml-bio-molecule-practical-work-2?scriptVersionId=144102502" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# RNN Based molucule generation

Laurent Cetinsoy

In this hands-on we want to generate molecule formulas for denovo-drug discovery.

For that we need to use Generative models. Generative models are models which goes beyond classification or simple regression : they are able to generate data that look like previously seens dataset.

There exists a lot of models :

- Bayesian models like graphical models
- Recurrent models (for sequence generation like texte)
- Variational auto encoders
- Generative adversarial models
- Flow and diffusion models


In the hands-on we will start by  trainning a character based RNN to generate smile molecules


We want to feed smile representations of molecules to an RNN.
The basic idea is we will train it to predict the next smile token of a molecule given the previous one.

For instance for the following molecule "CC(=O)NC1=CC=C(O)C=C1" will may give to the model

X = "CC(=O)N"
y = C

and ask the RNN to learn to predict y given X

Like a standard language model !


## RNN Language model


A language model is a model which predict the next token of a sequence given the previous ones :

$ P(X_t | X_{t-1}, X_{t-2}, ..., X_{t-p})  $


This model can be learned with a Recurrent neural network

$ y = P(X_t | X_{t-1}, X_{t-2}, ..., X_{t-p}) = RNN_{\theta} (X_{t-1}, X_{t-2}, ..., X_{t-p})  $


In order to train such model you need a corpus of data.



There are two main ways to do that : Word level model or character level model

For character level models, an interesting resource is : http://karpathy.github.io/2015/05/21/rnn-effectiveness/



Explain briefly what is the difference between word based language model and character based language model

Les modèles basés sur des caractères fonctionnent sur la prédiction de la lettre la plus probable de suivre une séquence. Les modèles basés sur des mots fonctionnent en revanche sur la prédiction du mot le plus probable.

## Import and dependancies

In [None]:
!pip install rdkit-pypi

In [None]:
import yaml
import pandas as pd
import numpy as np
import random
from rdkit.Chem import QED,MolFromSmiles
import os
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from rdkit.Chem import MolToSmiles

## Saving config
In order to keep best config for our RNN, we will keep its config in `config.yml` file

In [None]:
def save_config(dictionnaire, chemin_fichier="config.yml"):
    with open(chemin_fichier, "w") as fichier:
        yaml.dump(dictionnaire, fichier)
    print("Fichier YAML sauvegardé avec succès.")


def read_yaml(chemin_fichier):
    if os.path.exists(chemin_fichier):
        with open(chemin_fichier, "r") as fichier:
            contenu = yaml.safe_load(fichier)
    else:
        contenu = {}
    return contenu

In [None]:
config = {}

## Loading the data

Dowload the following dataset : https://github.com/joeymach/Leveraging-VAE-to-generate-molecules

In [None]:
! [ -e 250k_smiles.csv ] || wget https://raw.githubusercontent.com/joeymach/Leveraging-VAE-to-generate-molecules/master/250k_smiles.csv

Import pandas and load the first 1000 lines

In [None]:
df = pd.read_csv(filepath_or_buffer="250k_smiles.csv", nrows=1000)

Display the first rows of the dataframe

In [None]:
df.head()

## Processing the data

We need to do the following things :

- convert smile tokens to numbers
- build  smile token sequences and corresponding labels pairs

Compute the biggest smile molecule size

In [None]:
len(max(df["smiles"], key=lambda s: len(s)))

Code a function unic_characters(string) which return the unic characters in a string

In [None]:
def unic_characters(string):
    return np.unique(list(string))

In [None]:
unic_characters("AAAABAAACCCDDDEE")

Concatenate all smile string of the pandas dataframe and use **unic_characters** to get the unic_characters

In [None]:
unic_chars = unic_characters(df["smiles"].sum())

Code a function **map_char_to_int(unic_chars)** which returns a dictionnary where each char is assigned an int value.
Add a character to specify the end of the molecule (like "\n")


In [None]:
def map_char_to_int(unic_chars):
    dictionnary = {}
    for i, char in enumerate(unic_chars):
        dictionnary[char] = i
    return dictionnary

In [None]:
map_char_to_int(unic_chars)

Code a function map_int_to_char(unic_chars) which returns the reverse mapping.

If you want you can merge both functions in a class

In [None]:
def map_int_to_char(unic_chars):
    dictionnary = {}
    for i, char in enumerate(unic_chars):
        dictionnary[i] = char
    return dictionnary

In [None]:
class MolEncoder:
    def __init__(self, unic_characters: np.array):
        self.unic_chars = unic_characters
        self.char_to_int = map_char_to_int(unic_chars)
        self.int_to_char = map_int_to_char(unic_chars)
        self.voc_len = len(unic_chars)

    def get_char(self, int_val):
        return self.int_to_char[int_val]

    def get_int(self, char):
        return self.char_to_int[char]

    def encode_mol(self, smiles):
        return np.array([self.char_to_int[char] for char in smiles])

    def get_one_hot(self, int_value: int = None, char: chr = None):
        if int_value is None:
            int_value = self.get_int(char)
        elif char is None:
            char = self.get_char(int_value)
        one_hot = np.zeros(self.voc_len)
        one_hot[int_value] = 1
        return one_hot

    def decode_mol(self, encoded_mol):
        return "".join([self.int_to_char[int] for int in encoded_mol])

In [None]:
mol_encoder = MolEncoder(unic_chars)
print(mol_encoder.get_char(17))
print(mol_encoder.get_int("C"))
print(mol_encoder.get_one_hot(17))
assert np.all(mol_encoder.get_one_hot(17) == mol_encoder.get_one_hot(char="C"))

In [None]:
mol_smile = df["smiles"][random.randint(0, len(df))]
ecnoded = mol_encoder.encode_mol(mol_smile)
assert mol_encoder.decode_mol(ecnoded) == mol_smile

For each smile molecule add the ending token to it

L'algorithme implémenté encode déjà l'ending token.

## Building the dataset

Now we will create the dataset so that it has the good share for our Keras LSTM model

Remember Keras recurrent models expect a 3D array with shapes (n_examples, seq_len, n_features)



What will be `n_features` in our case ?

`n_features` est la dimension de chaque vecteur de caractères. Dans notre cas c'est 1, puisque chaque caractère est représenté par un entier. Pour les cas des vecteurs en one-hot encoding `n_features` sera égal à la taille du vocabulaire.

Code a function **build_X_and_y(string, i_char, seq_lenght)** which takes a string, a **seq_length** number and a position.


It should create X by by getting all character between i and i + seq_length
and create y by getting the character following the X sequence
it returns X and y

In [None]:
def build_X_and_y(
    string: str,
    i_char: int,
    seq_length: int,
    mol_encoder: MolEncoder = None,
    one_hot: bool = False,
):
    encode_method = (
        mol_encoder.get_int
        if not one_hot
        else lambda x: mol_encoder.get_one_hot(char=x)
    )
    X = [encode_method(char) for char in string[i_char : i_char + seq_length]]
    y = encode_method(string[i_char + seq_length])
    return X, y

Test your function on the following string "OCC(C)(C)c1ccc" with seq_length = 4 and i = [1, 2, 3]

In [None]:
tested_string = "CC(C)(C)c1ccc"
seq_len = 3
for i_char in range(1, 4):
    X, y = build_X_and_y(
        tested_string, i_char=i_char, seq_length=seq_len, mol_encoder=mol_encoder
    )
    print(
        f"l'encodage de {tested_string[i_char:i_char+seq_len]} est {X} et le caractère suivant est {y}"
    )

By using build_X_and_y and map_char_to_int build a list nameed X_train and a list named y_train

In [None]:
def generate_ds(seq_len = 10, mol_encoder = None):
    X_train, y_train = [], []
    for mol in df["smiles"]:
        for i in range(len(mol) - seq_len):
            X, y = build_X_and_y(mol, i_char=i, seq_length=seq_len, mol_encoder=mol_encoder)
            X_train.append(X)
            y_train.append(y)
    return np.array(X_train),np.array(y_train)

Create numpy arrays from the lists

In [None]:
X_train ,y_train = generate_ds(10,mol_encoder)

In [None]:
X_train.shape, y_train.shape

Reshape the X numpy array (n_examples, seq_lenght, 1)

In [None]:
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)

In [None]:
X_train.shape, y_train.shape

Normalize X by dividing each values by the total number of unic characters

In [None]:
X_train = X_train / mol_encoder.voc_len

In [None]:
assert np.all(X_train < 1)

Import Keras and build (at least) a two layered LSTM network with 128 neurone in each.

You can also add Dropoutlayers

Do you think you should use the return_sequences = True ? If yes, when ?


Add a Dense layer on top with with the appropriate activation function and number of neurones


In [None]:
model = Sequential()
model.add(LSTM(128, input_shape=(X_train.shape[1:])))
model.add(Dropout(0.2))
model.add(Dense(units=1, activation="sigmoid"))

Compile the model with the appropriate loss function and the adam optimizer

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss="categorical_crossentropy",
    metrics=["accuracy"],
)
model.summary()

Train the model on 20 epochs and 10 examples (yeah you read correctly) and check that the model overfits !

In [None]:
# Convert X_train to tensor
n_samples = 10
X_train_tensor = tf.convert_to_tensor(X_train[:n_samples])
y_train_tensor = tf.convert_to_tensor(y_train[:n_samples])

In [None]:
n_epochs = 20
config['epochs'] = n_epochs 
history = model.fit(X_train_tensor, y_train_tensor, epochs=n_epochs)

If it does not overfit try to fix data prep and model architecture so it does

Pour la préparation de la donnée nous allons appliquer du one hot encoding sur les caractères. Ainsi, nous aurons une matrice de dimension (`n_examples`, `seq_length`, `n_features`) avec `n_features` = nombre de caractères uniques. Nous allons ensuite normaliser cette matrice en divisant chaque valeur par le nombre de caractères uniques. Enfin, nous allons utiliser la fonction return_sequences = True pour que le modèle renvoie une séquence de sortie pour chaque entrée. Cela nous permettra de prédire le caractère suivant pour chaque caractère de la séquence d'entrée.

In [None]:

seq_len = 20
config["seq_len"] = seq_len
def generate_one_hot_ds(seq_len,mol_encoder):
    X_train, y_train = [],[]
    for mol in df["smiles"]:
        for i in range(len(mol) - seq_len):
            X, y = build_X_and_y(
                mol, i_char=i, seq_length=seq_len, mol_encoder=mol_encoder, one_hot=True
            )
            X_train.append(X)
            y_train.append(y)
    X_train = np.array(X_train)
    y_train = np.array(y_train)
    assert X_train.shape[1:] == (seq_len, mol_encoder.voc_len)
    assert y_train.shape[1] == (mol_encoder.voc_len)
    return X_train,y_train


In [None]:
from tensorflow.keras.layers import Input, Dense, LSTM, Bidirectional


def build_one_hot_model():
    model = Sequential(
        [
            Input(shape=(seq_len, mol_encoder.voc_len)),
            LSTM(128),
            Dropout(0.2),
            Dense(mol_encoder.voc_len, activation="softmax"),
        ]
    )
    optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.01)
    model.compile(
        loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]
    )
    config["model_config"] = model.get_config()
    return model

Générons maintenant les datasets pour entrainer notre nouveau modèle.

In [None]:
X_train, y_train = generate_one_hot_ds(seq_len,mol_encoder)
X_train_tensor = tf.convert_to_tensor(X_train)
y_train_tensor = tf.convert_to_tensor(y_train)

Construisons maintenant ce modèle.

In [None]:
model = build_one_hot_model()
model.summary()

Afin de pouvoir faire des plots d'évolution de l'entrainement nous allons sauvegarder l'historique du fitting du modèle.

In [None]:
def save_history(model, history,hist_file_name):
    history_df = pd.DataFrame(history.history)
    hist_file_name = f"{model.name}_history.csv"
    history_df.to_csv(os.path.join(hist_file_name))

Create a function **make_prediction(seed_start)** which takes a starting string sequence and uses it to generate a molecule


In [None]:
def make_prediction(seed_start: str,model,mol_encoder,seq_len):
    seed = seed_start
    for i in range(50):
        X = np.zeros((1, seq_len, mol_encoder.voc_len))
        for j, char in enumerate(seed[-seq_len:]):
            X[0, j, mol_encoder.get_int(char)] = 1
        y = model.predict(X,verbose=0)
        next_char = mol_encoder.get_char(np.argmax(y))
        seed += next_char
        if next_char == "\n":
            break
    return seed

def make_prediction_alternate(seed_start: str,model=None,mol_encoder=None,seq_len=0):
    seed = ""
    seed_start
    for i in range(50):
        X = np.zeros((1, seq_len, mol_encoder.voc_len))
        for j, char in enumerate(seed_start):
            X[0, j, mol_encoder.get_int(char)] = 1
        y = model.predict(X,verbose=0)
        next_char = mol_encoder.get_char(np.argmax(y))
        seed_start = seed_start[1:] + next_char
        seed += next_char
        if next_char == "\n":
            break
    return seed

Lancons maintenant le modèle pour que celui-ci overfit.

In [None]:
model.fit(X_train_tensor, y_train_tensor, epochs=20,validation_split=0.2)

generate a molecule of your overfitted model

In [None]:
seed_start = random.choice(df['smiles'])[:seq_len]
make_prediction(seed_start,model,mol_encoder,seq_len)

Make a model checkpoint so that the model is saved after each epoch
if you train on a plateform and it stops you do not lose your training

Nous allons créer un callback adapté à notre situation en initiant une classe qui hérite de `tf.keras.callbacks.Callback`

In [None]:
class MyCustomCallback(tf.keras.callbacks.Callback):
    def __init__(self, patience:int=0, model_file:str="model.h5"):
        self.patience = patience
        self.counter = 0
        self.min_val_loss = float("inf")
        self.model_file_path = model_file

    def on_epoch_end(self, epoch, logs=None):
        val_loss = logs.get("val_loss")
        if val_loss < self.min_val_loss:
            self.min_val_loss = val_loss
            print(f"\nNew Mininimum for validation loss: {self.min_val_loss}")
            print(f"Saving {self.model_file_path}, {epoch = }")
            self.model.save(os.path.join(self.model_file_path))

Now go to your favorite plateform (colab or something else) and train the dataset on the whole data for 10 epochs and batch size 256

it should take a long time so either follow the class or go take a nap

In [None]:
config["callback_param"] = dict(patience=2,model_file="one_hot_model.h5")

callbacks = [MyCustomCallback(**config["callback_param"])]
config["batch_size"] =256
model.fit(X_train_tensor, y_train_tensor, epochs=10,validation_split=0.2,callbacks=callbacks,batch_size=config["batch_size"])

Generate between 100 and 1000 molecules.

In [None]:
from tqdm import tqdm 
n_molecules = 500
molecules_list = []
for i in tqdm(range(n_molecules)):
    seed_start = random.choice(df['smiles'])[:seq_len]
    molecules_list.append(make_prediction(seed_start,model,mol_encoder,seq_len))

generated_df = pd.DataFrame({"gen_mol":molecules_list})
config["generated_molecules"] = 'generated_molecules.csv'
generated_df.to_csv(config["generated_molecules"])

In [None]:
generated_df.head()

With rdkit compute the Quantified Estimated Drug likelyness (QED) of each molecule in this subset

Implémentons maintenant une fonction qui génère des molécules selon les smiles générées par notre modèle.

In [None]:
from rdkit import rdBase
def get_valid_mols_list(gen_smiles):
    rdBase.DisableLog('rdApp.error')
    gen_mol_list = []
    for gen_smiles in gen_smiles:
        current_mol = MolFromSmiles(gen_smiles)
        if current_mol is not None:
            gen_mol_list.append(current_mol)
    rdBase.EnableLog('rdApp.error')
    return gen_mol_list

In [None]:
gen_mol_list = get_valid_mols_list(generated_df["gen_mol"])
len(gen_mol_list)

create a list where molecules have between 10 and 50 atoms

In [None]:
between_10_and_50 = lambda mol: ((mol.GetNumAtoms() >= 10) and (mol.GetNumAtoms() <= 50))

In [None]:
gen_mol_list_10_50 = list(filter(between_10_and_50,gen_mol_list ))
len(gen_mol_list_10_50)

In [None]:
qed = lambda x : QED.qed(x)
qed_list = list(map(qed,gen_mol_list_10_50))
qed_df = pd.DataFrame({
    "gen_mol_list_10_50":list(map(MolToSmiles,gen_mol_list_10_50)),
    "qed": qed_list
})
qed_df.head()

Bonus 1 : Using rdkit, compute the quantitative estimation of drug-likeness (QED) of your generated molecules.

In [None]:
qed_list = list(map(qed,gen_mol_list))
qed_df = pd.DataFrame({
    "gen_mol":list(map(MolToSmiles,gen_mol_list)),
    "qed": qed_list
})
qed_df.head()

Bonus 2 : try to adapt a transformer model training from hugging face to see if it is better

## Sauvegarde des configurations

In [None]:
save_config(config, "config.yml")