<a href="https://colab.research.google.com/github/curcioriccardo/VQ-DRAW-MOL-DATA/blob/main/notebooks/ACS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Imports and Mount

Here you can choose where to store the model by changing path variable

In [None]:
from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
import gzip
import pandas
import h5py
import numpy as np

import argparse
import os
import h5py
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn import model_selection
import shutil

from google.colab import drive
drive.mount('/content/gdrive',force_remount=True)

PATH = F"/content/gdrive/MyDrive/Smiles_models/"

Mounted at /content/gdrive


## Utility functions

These are utility functions to load dataset and decode tensors and tokenize SMILES

In [None]:
def one_hot_array(i, n):
    return map(int, [ix == i for ix in xrange(n)])

def one_hot_index(vec, charset):
    return map(charset.index, vec)

def from_one_hot_array(vec):
    oh = np.where(vec == 1)
    if oh[0].shape == (0, ):
        return None
    return int(oh[0][0])

def decode_smiles_from_indexes(vec, charset):
    smiles=""
    charset = list(map(lambda x: x.decode('utf-8'), charset))
    vec = vec.tolist()
    for row in vec:
      smiles+= str(charset[np.argmax(row)])

    return smiles.strip()

def load_dataset(filename, split = True):
    h5f = h5py.File(filename, 'r')
    if split:
        data_train = h5f['data_train'][:]
    else:
        data_train = None
    data_test = h5f['data_test'][:]
    charset =  h5f['charset'][:]
    h5f.close()
    if split:
        return (data_train, data_test, charset)
    else:
        return (data_test, charset)


## Model

This is a recreation of the VAE model architecture used in https://pubs.acs.org/doi/10.1021/acscentsci.7b00572

The encoder uses
three 1D convolutional layers followed by one fully
connected. The decoder feeds into three
layers of gated recurrent unit (GRU)
The last layer of the RNN decoder defines a probability
distribution over all possible characters at each position in the
SMILES string. This means that the writeout operation is
stochastic, and the same point in latent space may decode into
different SMILES strings, depending on the random seed used
to sample characters. #The output GRU layer had one additional
input, corresponding to the character sampled from the softmax
output of the previous time step and was trained using teacher
forcing

In [None]:
class MolecularVAE(nn.Module):
    def __init__(self):
        super(MolecularVAE, self).__init__()

        self.conv_1 = nn.Conv1d(120, 9, kernel_size=9)
        self.conv_2 = nn.Conv1d(9, 9, kernel_size=9)
        self.conv_3 = nn.Conv1d(9, 10, kernel_size=11)
        self.linear_0 = nn.Linear(70, 435)
        self.linear_1 = nn.Linear(435, 292)
        self.linear_2 = nn.Linear(435, 292)

        self.linear_3 = nn.Linear(292, 292)
        self.gru = nn.GRU(292, 501, 3, batch_first=True)
        self.linear_4 = nn.Linear(501, 33)
        
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()

    def encode(self, x):
        x = self.relu(self.conv_1(x))
        x = self.relu(self.conv_2(x))
        x = self.relu(self.conv_3(x))
        x = x.view(x.size(0), -1)
        x = F.selu(self.linear_0(x))
        return self.linear_1(x), self.linear_2(x)

    def sampling(self, z_mean, z_logvar):
        epsilon = 1e-2 * torch.randn_like(z_logvar)
        return torch.exp(0.5 * z_logvar) * epsilon + z_mean

    def decode(self, z):
        z = F.selu(self.linear_3(z))
        z = z.view(z.size(0), 1, z.size(-1)).repeat(1, 120, 1)
        output, hn = self.gru(z)
        out_reshape = output.contiguous().view(-1, output.size(-1))
        y0 = F.softmax(self.linear_4(out_reshape), dim=1)
        y = y0.contiguous().view(output.size(0), -1, y0.size(-1))
        return y

    def forward(self, x):
        z_mean, z_logvar = self.encode(x)
        z = self.sampling(z_mean, z_logvar)
        return self.decode(z), z_mean, z_logvar




## Dataset

Here we will download the same dataset used for VQ-DRAW and MRL.
For ease of computation, the strings are encoded up to a
maximum length of 120 characters.Only canonicalized SMILES are used for training to
avoid dealing with equivalent SMILES representations. 


In [None]:
!rm -R 'molecular-vae'
!git clone https://github.com/aksub99/molecular-vae.git
import zipfile
zip_ref = zipfile.ZipFile('/content/molecular-vae/data/processed.zip', 'r')
zip_ref.extractall('molecular-vae/data/')
zip_ref.close()


rm: cannot remove 'molecular-vae': No such file or directory
Cloning into 'molecular-vae'...
remote: Enumerating objects: 188, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects: 100% (3/3), done.[K
remote: Total 188 (delta 0), reused 0 (delta 0), pack-reused 185[K
Receiving objects: 100% (188/188), 2.99 MiB | 14.56 MiB/s, done.
Resolving deltas: 100% (95/95), done.


## Loss

Minimizes Binary Cross Entropy and a KL term with a 0.5 weigth.

In [None]:
def vae_loss(x_decoded_mean, x, z_mean, z_logvar):
    xent_loss = F.binary_cross_entropy(x_decoded_mean, x, size_average=False)
    kl_loss = -0.5 * torch.sum(1 + z_logvar - z_mean.pow(2) - z_logvar.exp())
    return xent_loss + kl_loss



## Training

This is the training code. You can change the number of epochs.

In [None]:

data_train, data_test, charset = load_dataset('/content/molecular-vae/data/processed.h5')
data_train = torch.utils.data.TensorDataset(torch.from_numpy(data_train))
data_test = torch.utils.data.TensorDataset(torch.from_numpy(data_test))
train_loader = torch.utils.data.DataLoader(data_train, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(data_test, batch_size=64, shuffle=True)

torch.manual_seed(42)

epochs = 100
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = MolecularVAE().to(device)
optimizer = optim.Adam(model.parameters())


def train(epoch):
    model.train()
    train_loss = 0
    # Training phase
    for batch_idx, data in enumerate(train_loader):

        data = data[0].to(device)
        optimizer.zero_grad()
        output, mean, logvar = model(data)
        
        if batch_idx==0:
              inp = data.cpu().numpy()
              outp = output.cpu().detach().numpy()
              lab = data.cpu().numpy()
              print("Input:")
              print(decode_smiles_from_indexes(inp[0], charset))
              print("Label:")
              print(decode_smiles_from_indexes(lab[0], charset))
              sampled= outp[0]
              print("Output:")
              print(decode_smiles_from_indexes(sampled, charset))
        
        loss = vae_loss(output, data, mean, logvar)
        loss.backward()
        train_loss += loss
        optimizer.step()
        data.detach()
    # Validation phase
    test_loss = 0
    with torch.no_grad():
     for batch_idx, data in enumerate(test_loader):
        data = data[0].to(device)
        output, mean, logvar = model(data)
        loss = vae_loss(output, data, mean, logvar)
        test_loss += loss
        data.detach()

    

    print('train=', (train_loss / len(train_loader.dataset)), ' test=',(test_loss / len(test_loader.dataset)))
     
    return

for epoch in range(1, epochs + 1):
    train(epoch)

torch.save(model,PATH+"ACS_epoch_"+str(epochs)+".pth") 

torch.Size([64, 120, 33])
torch.float32
train= 0.0  test= tensor(535.0306, device='cuda:0')
torch.Size([64, 120, 33])
torch.float32


KeyboardInterrupt: ignored

In [None]:
# 1,33 minutes per epoch