In [1]:
import sys
import os

!pip install rdkit-pypi

Collecting rdkit-pypi
  Downloading rdkit_pypi-2022.9.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.9 kB)
Downloading rdkit_pypi-2022.9.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (29.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m29.4/29.4 MB[0m [31m66.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rdkit-pypi
Successfully installed rdkit-pypi-2022.9.5


In [None]:
!git clone https://github.com/Romain-MIPI/Reinforcement-Learning-for-De-Novo-Drug-Design.git

In [None]:
%cd Reinforcement-Learning-for-De-Novo-Drug-Design

In [None]:
from rdkit import Chem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import IPythonConsole

In [None]:
from sklearn.metrics import balanced_accuracy_score, f1_score, recall_score, precision_score
import pandas as pd
import copy
import pickle
import numpy as np
import matplotlib.pyplot as plt

In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import ExponentialLR
import torch.nn.functional as F

In [None]:
import numpy as np
from tqdm import tqdm, trange
import pickle
from rdkit import Chem, DataStructs
from models.rnn_generative import RNNGenerative
from models.generator_data import GeneratorData
from models.utils import canonical_smiles
import matplotlib.pyplot as plt
%matplotlib inline

import seaborn as sns

In [None]:
from rdkit.Chem import QED

gen_data_path = './data/clean_020724_all_with_update.csv'
data = pd.read_csv(gen_data_path)
smiles = data['SMILES'].values

In [None]:
from models.utils import get_tokens
my_tokens, _, _ = get_tokens(smiles)
l_tokens = list(my_tokens) + ['<', '>']

In [None]:
gen_data_path = './data/clean_020724_all_with_update.csv'
gen_data = GeneratorData(training_data_path=gen_data_path, delimiter=',',
                         cols_to_read=[12], keep_header=False, tokens=l_tokens)

In [None]:
def plot_bar(prediction, n_to_generate):
    value, count = np.unique(prediction, return_counts=True)
    print("Percentage predict class 0 (inactive) :", count[np.where(value==0)]/len(prediction)
    print("Percentage predict class 1 (weakly actif) :", count[np.where(value==1)]/len(prediction))
    print("Percentage predict class 2 (strongly actif) :", count[np.where(value==2)]/len(prediction))
    print("Proportion of valid SMILES:", len(prediction)/n_to_generate)
    ax = plt.bar(value, count)
    ax.set(xlabel='Predicted class',
           y_label='Number of molecules',
           title='Distribution of predicted class for generated molecules')
    plt.show()

In [None]:
def estimate_and_update(generator, predictor, n_to_generate):
    generated = []
    pbar = tqdm(range(n_to_generate))
    for i in pbar:
        pbar.set_description("Generating molecules...")
        generated.append(generator.evaluate(gen_data, predict_len=120)[1:-1])

    sanitized = canonical_smiles(generated, sanitize=False, throw_warning=False)[:-1]
    unique_smiles = list(np.unique(sanitized))[1:]
    smiles, prediction, nan_smiles = predictor.predict(unique_smiles, use_tqdm=True)

    plot_bar(prediction, n_to_generate)

    return smiles, prediction

In [None]:
hidden_size = 500
stack_width = 500
stack_depth = 200
layer_type = 'GRU'
lr = 0.001
optimizer_instance = torch.optim.Adadelta

my_generator = RNNGenerative(input_size=gen_data.n_characters, hidden_size=hidden_size,
                             output_size=gen_data.n_characters, layer_type=layer_type,
                             n_layers=1, is_bidirectional=False, has_stack=True,
                             stack_width=stack_width, stack_depth=stack_depth,
                             use_cuda=use_cuda,
                             optimizer_instance=optimizer_instance, lr=lr)

In [None]:
model_path = './data/checkpoints/generator/checkpoint_stack_rnn'

In [None]:
# training generative model

losses = my_generator.fit(gen_data, 50000)
plt.plot(losses)
my_generator.evaluate(gen_data)
my_generator.save_model(model_path)

In [None]:
my_generator.load_model(model_path)

In [None]:
from models.rnn_predictor import RNNPredictor

path_to_params = './ReLeaSE/checkpoints/classification/model_parameters.pkl'
path_to_checkpoint = './ReLeaSE/checkpoints/classification/fold_'

my_predictor = RNNPredictor(path_to_params, path_to_checkpoint, tokens)

In [None]:
smiles_unbiased, prediction_unbiased = estimate_and_update(my_generator,
                                                           my_predictor,
                                                           n_to_generate=10000)

In [None]:
my_generator_2 = RNNGenerator(input_size=gen_data.n_characters,
                                hidden_size=hidden_size,
                                output_size=gen_data.n_characters,
                                layer_type=layer_type,
                                n_layers=1, is_bidirectional=False, has_stack=True,
                                stack_width=stack_width, stack_depth=stack_depth,
                                use_cuda=use_cuda,
                                optimizer_instance=optimizer_instance, lr=lr)

my_generator_2.load_model(model_path)

In [None]:
def simple_moving_average(previous_values, new_value, ma_window_size=10):
    value_ma = np.sum(previous_values[-(ma_window_size-1):]) + new_value
    value_ma = value_ma/(len(previous_values[-(ma_window_size-1):]) + 1)
    return value_ma

In [None]:
def get_reward_activity(smile, predictor, invalid_reward=0.0):
    _, prediction, invalid_smiles = predictor.predict([smile])
    if len(nan_smiles) == 1:
        return invalid_reward
    if (prediction[0] == 2):
        return 11.0
    elif (prediction[0] == 1):
        return 6.0
    else:
        return 1.0

In [None]:
# Setting up some parameters for the experiment
n_to_generate = 200
n_policy_replay = 10
n_policy = 15
n_iterations = 60

RL_activity = Reinforcement(my_generator_2, my_predictor, get_reward_activity)

rewards = []
rl_losses = []

for i in range(n_iterations):
    for j in trange(n_policy, desc='Policy gradient...'):
        cur_reward, cur_loss = RL_activity.policy_gradient(gen_data)
        rewards.append(simple_moving_average(rewards, cur_reward))
        rl_losses.append(simple_moving_average(rl_losses, cur_loss))

    plt.plot(rewards)
    plt.xlabel('Training iteration')
    plt.ylabel('Average reward')
    plt.show()
    plt.plot(rl_losses)
    plt.xlabel('Training iteration')
    plt.ylabel('Loss')
    plt.show()

    smiles_cur, prediction_cur = estimate_and_update(RL_activity.generator,
                                                     my_predictor,
                                                     n_to_generate)

In [None]:
smiles_biased, prediction_biased = estimate_and_update(RL_activity.generator,
                                                       my_predictor,
                                                       n_to_generate=10000)

In [None]:
class_val = [0, 1, 2]

value1, count1 = np.unique(prediction_biased, return_counts=True)
value2, count2 = np.unique(prediction_biased, return_counts=True)

if len(value1) != len(class_val):
    tmp = []
    for c in class_val:
        if c not in value1:
            tmp.append(count1[np.where(value1==c)])
        else:
            tmp.append(0)
    count1 = tmp
if len(value2) != len(class_val):
    tmp = []
    for c in class_val:
        if c not in value2:
            tmp.append(count2[np.where(value2==c)])
        else:
            tmp.append(0)
    count2 = tmp

w, x = 0.4, np.arange(len(labels))

fig, ax = plt.subplots()
ax.bar(x - w/2, count1, width=w, label='unbiased')
ax.bar(x + w/2, count2, width=w, label='biased')

ax.set_xticks(x)
ax.set_xticklabels(['class 0', 'class 1', 'class 2'])
ax.set_ylabel('Number of molecules')
ax.set_title('distribution of generated molecules')
ax.legend()

plt.show()