In [1]:
import re
import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
from torch import nn
import torch.nn.functional as F

import sys
sys.path.append('src/')

from smiles_lstm.model.smiles_vocabulary import SMILESTokenizer, Vocabulary, create_vocabulary

from tensorflow.keras.preprocessing.sequence import pad_sequences

import warnings
# ignore some deprecation warnings
warnings.filterwarnings('ignore')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Functions

In [2]:
def pad_sequence(tokenizer_array, desired_length):
    padded_sequence = pad_sequences([tokenizer_array], maxlen=desired_length, padding='post')[0]
    return padded_sequence

def preprocess_smiles_data(x):
    x = tk.tokenize(x, with_begin_and_end=False)
    x = vocabulary.encode(x )
    x  = pad_sequence(x, 200 )
    x  = torch.tensor([x ])
    
    return X

## Load main data set

In [3]:
df = pd.read_csv('retrosynthesis-all', header=None)
df['source'] = df[0].apply(lambda x: x.split('>>')[0])
df['target'] = df[0].apply(lambda x: x.split('>>')[1])
df.drop(0, axis=1, inplace=True)

# Remove spaces from all columns
for col in df.columns:
    df[col] = df[col].str.replace(' ', '')

## Define `Tokenizer` and `Vocabulary`

In [4]:
class Dataset(torch.utils.data.Dataset):
    """
    Custom PyTorch Dataset that takes a file containing \n separated SMILES.
    """

    def __init__(self, smiles_list : list, vocabulary : Vocabulary,
                 tokenizer : SMILESTokenizer) -> None:
        self._vocabulary  = vocabulary
        self._tokenizer   = tokenizer
        self._smiles_list = list(smiles_list)

    def __getitem__(self, i : int) -> torch.Tensor:
        smi     = self._smiles_list[i]
        tokens  = self._tokenizer.tokenize(smi, with_begin_and_end=False)
        encoded = self._vocabulary.encode(tokens)
        return torch.tensor(encoded.astype(int), dtype=torch.long)  # pylint: disable=E1102

    def __len__(self) -> int:
        return len(self._smiles_list)

    @staticmethod
    def collate_fn(encoded_seqs : list) -> torch.Tensor:
        """
        Converts a list of encoded sequences into a padded tensor.
        """
        max_length   = max([seq.size(0) for seq in encoded_seqs])
        collated_arr = torch.zeros(len(encoded_seqs),
                                   max_length,
                                   dtype=torch.long)  # padded with zeros
        for i, seq in enumerate(encoded_seqs):
            collated_arr[i, :seq.size(0)] = seq
        return collated_arr

In [5]:
# create a vocabulary using all SMILES in df
dataset = df['source'].unique().tolist() + df['target'].unique().tolist()
dataset = np.unique(dataset).tolist()

tokenizer = SMILESTokenizer()
vocabulary   = create_vocabulary(smiles_list=dataset, tokenizer=tokenizer, canonical=True)
print(f'There are {len(vocabulary)} unique tokens in the vocabulary.\n')

train_dataset = Dataset(smiles_list=df['source'].tolist(), vocabulary=vocabulary, tokenizer=tokenizer)
train_dataset = train_dataset.collate_fn(train_dataset)

There are 85 unique tokens in the vocabulary.



## Train / validation / test split

In [6]:
from sklearn.model_selection import train_test_split

print(df.shape)

# Splitting the data into train and combined val/test sets
train_data, val_test_data = train_test_split(df, test_size=0.2, random_state=42)

# Splitting the combined val/test set into separate val and test sets
val_data, test_data = train_test_split(val_test_data, test_size=0.2, random_state=42)

# Printing the sizes of the resulting splits
print("Train data size:", len(train_data))
print("Validation data size:", len(val_data))
print("Test data size:", len(test_data))

train     = train_data.copy()
test      = test_data.copy()
valid     = val_data.copy()

train_X = Dataset(smiles_list=train['source'].tolist(), vocabulary=vocabulary, tokenizer=tokenizer)
train_X = train_X.collate_fn(train_X)
train_y = Dataset(smiles_list=train['target'].tolist(), vocabulary=vocabulary, tokenizer=tokenizer)
train_y = train_y.collate_fn(train_y)

val_X = Dataset(smiles_list=train['source'].tolist(), vocabulary=vocabulary, tokenizer=tokenizer)
val_X = val_X.collate_fn(val_X)
val_y = Dataset(smiles_list=train['target'].tolist(), vocabulary=vocabulary, tokenizer=tokenizer)
val_y = val_y.collate_fn(val_y)

test_X = Dataset(smiles_list=train['source'].tolist(), vocabulary=vocabulary, tokenizer=tokenizer)
test_X = test_X.collate_fn(test_X)
test_y = Dataset(smiles_list=train['target'].tolist(), vocabulary=vocabulary, tokenizer=tokenizer)
test_y = test_y.collate_fn(test_y)

(45033, 2)
Train data size: 36026
Validation data size: 7205
Test data size: 1802


In [7]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define the LSTM model
class Seq2SeqLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Seq2SeqLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq):
        embedded = self.embedding(input_seq)
        output, _ = self.lstm(embedded)
        output = self.fc(output)
        return output

# Set hyperparameters
input_size = len(vocabulary)  # Replace with the actual vocabulary size of your input data
hidden_size = 5  # Adjust the hidden size according to your needs
output_size = len(vocabulary)  # Replace with the actual vocabulary size of your output data
learning_rate = 0.1
num_epochs = 3
batch_size = 2048

# Create the model
model = Seq2SeqLSTM(input_size, hidden_size, output_size)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    total_loss = 0
    num_batches = 0

    for i in tqdm(range(0, len(train_X), batch_size)):
        # Prepare batch
        batch_X = train_X[i:i+batch_size]
        batch_y = train_y[i:i+batch_size]

        # Forward pass
        optimizer.zero_grad()
        output = model(batch_X)

        # Compute loss
        loss = criterion(output.view(-1, output_size), batch_y.view(-1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

    # Print average loss for the epoch
    average_loss = total_loss / num_batches
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {average_loss:.4f}")

100%|██████████| 18/18 [00:04<00:00,  4.10it/s]


Epoch 1/3, Loss: 1.8415


100%|██████████| 18/18 [00:04<00:00,  3.96it/s]


Epoch 2/3, Loss: 0.8423


100%|██████████| 18/18 [00:04<00:00,  3.92it/s]

Epoch 3/3, Loss: 0.8095





In [10]:
model.eval()
y = model(test_X[11232])
y.argmax(dim=1).flatten()

tensor([81, 25, 81, 81, 81, 81, 81, 81, 25, 81, 81, 81, 81, 81, 81, 81, 81, 81,
        81, 81, 81, 81, 25, 81, 81, 81,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])

In [20]:
test_y[11232]

tensor([20, 25, 81,  8, 81, 81, 81,  3, 46,  3, 17, 25,  4, 54,  4, 81, 81,  8,
         5, 81,  8, 81, 81, 81, 81,  3, 20, 24,  4, 81,  8,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])

In [24]:
x = 'C00082'
y = 'C00811'

cpds = pd.read_csv('../data/compounds_final.csv', index_col=0)
x = cpds[cpds['Entry'] == x]['SMILES'].values[0]
y = cpds[cpds['Entry'] == y]['SMILES'].values[0]

x = tokenizer.tokenize(x, with_begin_and_end=False)
x =vocabulary.encode(x)
x = pad_sequence(x, 158)
x = torch.tensor(x.astype(int), dtype=torch.long)  

y = tokenizer.tokenize(y, with_begin_and_end=False)
y =vocabulary.encode(y)
y = pad_sequence(y, 158)
y = torch.tensor(y.astype(int), dtype=torch.long)  

y_pred = model(x).argmax(dim=1).flatten()
y_pred

tensor([25, 25, 81, 81, 81, 81, 81, 81, 81, 81, 25, 81, 81, 81, 81, 81, 81, 81,
        81, 25, 81, 25,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])

In [25]:
y

tensor([25, 17, 20,  3, 25,  4,  7, 20, 17, 20,  7, 81,  8, 81, 81, 81,  3, 25,
         4, 81, 81,  8,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
         0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define the LSTM model
class Seq2SeqLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Seq2SeqLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.encoder = nn.LSTM(input_size, hidden_size)
        self.decoder = nn.LSTM(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq):
        _, (hidden_state, _) = self.encoder(input_seq)
        output_seq, _ = self.decoder(input_seq, (hidden_state, torch.zeros_like(hidden_state)))
        output_seq = self.fc(output_seq)
        return output_seq

# Define the input and output dimensions
input_size = train_X.shape[-1]  # Size of each input sequence
hidden_size = 128  # Number of LSTM units
output_size = train_y.shape[-1]  # Size of each output sequence

# Instantiate the LSTM model
model = LSTMModel(input_size, hidden_size, output_size)

# Define batch size and create TensorDataset
batch_size = 2048
dataset = TensorDataset(train_X, train_y)

# Create DataLoader
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()

        # Forward pass
        output = model(batch_X)

        # Compute the loss
        loss = criterion(output, batch_y)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Print the loss after each epoch
    print(f"Epoch: {epoch+1}/{num_epochs}, Loss: {loss.item():.4f}")


In [None]:
batch_X.shape