In [35]:
#from fcd import get_fcd, load_ref_model, canonical_smiles, get_predictions, calculate_frechet_distance
import warnings
import os
import pandas as pd
import numpy as np
import numpy
import pickle
import rdkit
from tqdm import tqdm
import math

import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import ReduceLROnPlateau


# Ignore some warnings from RDKIT and keras
from rdkit import RDLogger, Chem
from rdkit.Chem import rdMolDescriptors
RDLogger.DisableLog('rdApp.*')

warnings.filterwarnings("ignore")

# Load methods from the FCD library

np.random.seed(1234)

print("RDKit: ", rdkit.__version__)


RDKit:  2022.09.5


In [2]:
data = pd.read_csv("smiles_train.txt", header=None)[0]


In [3]:
def to_canonical(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is not None:
        return Chem.MolToSmiles(mol, canonical=True)
    else:
        return None


In [4]:
data_canonical = data.apply(to_canonical).dropna().reset_index(drop=True)


KeyboardInterrupt: 

In [22]:
data_canonical.to_csv("canonical_smiles_train.txt", index=False, header=False)


In [6]:
data_canonical = pd.read_csv(
    "canonical_smiles_train.txt", header=None, squeeze=True)


In [None]:
data_canonical

In [40]:
#train_data, val_data = train_test_split(data, test_size=1/6, random_state=42)
train_data, val_data = train_test_split(
    data_canonical, test_size=0.2, random_state=42)
train_data = train_data.reset_index(drop=True)
val_data = val_data.reset_index(drop=True)


In [41]:
charset = sorted(list(set(''.join(data_canonical))))
max_length = max([len(smile) for smile in data_canonical]) + 1


In [42]:
def vectorize_smiles(smiles, charset, max_length):
    indices = [charset.index(char) for char in smiles]
    padded_indices = indices + [0] * (max_length - len(indices))
    data = torch.tensor(padded_indices[:-1], dtype=torch.long)
    target = torch.tensor(padded_indices[1:], dtype=torch.long)
    return data, target


In [43]:
class SMILESDataset(Dataset):
    def __init__(self, smiles_data, charset, max_length):
        self.smiles_data = smiles_data
        self.charset = charset
        self.max_length = max_length

    def __len__(self):
        return len(self.smiles_data)

    def __getitem__(self, index):
        input_smiles = self.smiles_data[index % len(self.smiles_data)]
        data, target = vectorize_smiles(
            input_smiles, self.charset, self.max_length)
        return data, target


In [52]:
train_dataset = SMILESDataset(train_data, charset, max_length)
val_dataset = SMILESDataset(val_data, charset, max_length)
train_loader = DataLoader(train_dataset, batch_size=384, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=384, shuffle=False)


In [59]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

input_size = len(charset)
hidden_size = 1024
output_size = len(charset)
num_layers = 1
num_epochs = 50
learning_rate = 0.01


cuda


In [53]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.2, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(
            0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


In [57]:
class SMILESTransformer(nn.Module):
    def __init__(self, input_size=input_size, d_model=512, nhead=8, num_layers=4, output_size=output_size, dropout=0.1):
        super(SMILESTransformer, self).__init__()

        self.embedding = nn.Embedding(input_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        self.transformer_encoder = TransformerEncoder(
            TransformerEncoderLayer(d_model, nhead, dropout=dropout), num_layers)
        self.fc1 = nn.Linear(d_model, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(d_model, output_size)

    def forward(self, x):
        x = self.embedding(x) * math.sqrt(x.size(-1))
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x


In [60]:
model = SMILESTransformer(input_size, hidden_size,
                          num_layers=num_layers).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
    model.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(
    optimizer, mode='min', factor=0.1, patience=5, verbose=True)


In [61]:
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    train_count = 0
    for one_hot_data, target in tqdm(train_loader, desc=f"Training Epoch {epoch + 1}/{num_epochs}", leave=False):
        one_hot_data, target = one_hot_data.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(one_hot_data)

        loss = criterion(output.transpose(1, 2), target)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        train_count += 1

    avg_train_loss = train_loss / train_count

    # Validation
    model.eval()
    val_loss = 0
    val_count = 0
    with torch.no_grad():
        for one_hot_data, target in tqdm(val_loader, desc=f"Validating Epoch {epoch + 1}/{num_epochs}", leave=False):
            one_hot_data, target = one_hot_data.to(device), target.to(device)
            output = model(one_hot_data)

            loss = criterion(output.transpose(1, 2), target)

            val_loss += loss.item()
            val_count += 1

    avg_val_loss = val_loss / val_count

    print(
        f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

    # Step the scheduler
    scheduler.step(avg_val_loss)


                                                                          

Epoch [1/50], Train Loss: 1.0694, Validation Loss: 1.1715


                                                                         

Epoch [2/50], Train Loss: 1.1186, Validation Loss: 0.8314


                                                                          

Epoch [3/50], Train Loss: 0.8617, Validation Loss: 0.7934


                                                                        

Epoch [4/50], Train Loss: 0.8288, Validation Loss: 1.1407


                                                                          

Epoch [5/50], Train Loss: 0.9637, Validation Loss: 0.8036


                                                                        

Epoch [6/50], Train Loss: 0.8707, Validation Loss: 0.8457


                                                                         

Epoch [7/50], Train Loss: 0.8570, Validation Loss: 0.8136


                                                                        

Epoch [8/50], Train Loss: 0.9598, Validation Loss: 0.8982


                                                                         

Epoch [9/50], Train Loss: 0.9026, Validation Loss: 0.8978
Epoch 00009: reducing learning rate of group 0 to 1.0000e-03.


                                                                       

KeyboardInterrupt: 

In [62]:
torch.save(model.state_dict(), "trained_model_transformer.pth")


In [63]:
def unvectorize_smiles(vector, charset):
    smiles = ""
    for index in vector:
        if index == len(charset) - 1:  # End of sequence token
            break
        smiles += charset[index]
    return smiles


In [79]:
def predict(model, input_smiles, charset, max_length):
    model.eval()
    with torch.no_grad():
        data = vectorize_smiles(input_smiles, charset, max_length)
        # Convert the data to Long data type
        data_long = data[0].to(device).long()
        # Pass the 2D tensor to the model with an extra batch dimension
        output = model(data_long.unsqueeze(0))
        pred_indices = output.argmax(dim=2).squeeze(0).cpu().tolist()
        predicted_smiles = unvectorize_smiles(pred_indices, charset)

        # Remove "#" characters
        cleaned_smiles = predicted_smiles.replace("#", "")

        # Check for "nan" or "inf" in the SMILES string
        if "nan" not in cleaned_smiles.lower() and "inf" not in cleaned_smiles.lower():
            return cleaned_smiles
        else:
            return None


In [73]:
def is_valid_smiles(smiles):
    mol = Chem.MolFromSmiles(smiles)
    return mol is not None


In [80]:
with open("predictions_transformer.txt", "w") as f:
    for input_smiles in val_data:
        predicted_smiles = predict(model, input_smiles, charset, max_length)
        if predicted_smiles is not None and is_valid_smiles(predicted_smiles):
            f.write(predicted_smiles + "\n")
