In [1]:
import moses
import matplotlib as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas as pd
from rdkit import Chem
from rdkit.Chem import AllChem
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

import csv

  _mcf.append(_pains, sort=True)['smarts'].values]


In [2]:
'''
Functions are from the RNN model we have but not entirely sure where they would fit in the VAE
Currently working on implementation, the process_smiles function may help in creating valid molecules
'''

# Function to add start and end tokens
def process_smiles(smiles_list):
    return ["^" + s + "$" for s in smiles_list]

# Create character dictionaries including special tokens
def create_vocab(smiles_list):
    all_chars = sorted(list(set(''.join(smiles_list))))
    char2idx = {ch: i + 1 for i, ch in enumerate(all_chars)}
    char2idx[''] = 0  # Padding token
    idx2char = {i: ch for ch, i in char2idx.items()}
    return char2idx, idx2char, len(char2idx)

# Enhanced tokenization
def tokenize(smiles, char2idx):
    return [char2idx.get(c, 0) for c in smiles]  # Default to 0 if unknown

def detokenize(tokens, idx2char):
    return ''.join([idx2char.get(t, '') for t in tokens if t != 0])

In [3]:
def load_smiles_from_csv(path, split_type='train'):
    '''
    Loads SMILES strings from a CSV file.

    Args:
        path (str): Path to the CSV file
        split_type (str): Split type ('train' or 'test')

    Returns:
        list: List of SMILES strings
    '''
    smiles = []
    with open(path, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            if row['SPLIT'].strip().lower() == split_type:
                smiles.append(row['SMILES'].strip())
    return smiles

def extract_unique_chars(smiles_list):
    '''
    Extracts unique characters from a list of SMILES strings.

    Args:
        smiles_list (list): List of SMILES strings

    Returns:
        list: List of unique characters
    '''
    unique_chars = set()
    for smiles in smiles_list:
        unique_chars.update(smiles.strip())
    return sorted(unique_chars)

def clean_smiles(smiles):
    '''
    Cleans a SMILES string by removing unwanted characters.

    Args:
        smiles (str): SMILES string

    Returns:
        str: Cleaned SMILES string
    '''
    # Remove unwanted metadata like ",train" or ",SPLIT"
    return smiles.split(',')[0].strip()

def verify_smiles(smiles):
  '''
  Verifies the validity of a SMILES string using RDKit.

  Args:
      smiles (str): SMILES string to verify

  Returns:
      bool: True if valid, False otherwise
  '''
  mol = Chem.MolFromSmiles(smiles)
  return mol is not None

def decode_smiles(one_hot_tensor, idx_to_char):
    '''
    Decodes a one-hot encoded tensor back to SMILES.

    Args:
        one_hot_tensor (torch.Tensor): One-hot encoded tensor
    '''
    smiles = ''
    one_hot_tensor = one_hot_tensor.view(-1, len(idx_to_char))  # unflatten
    for row in one_hot_tensor:
        idx = row.argmax().item()
        smiles += idx_to_char[idx]
    return smiles.strip()

In [None]:
# Generate a new molecule from VAE by sampling from the latent space

# added default values for what the vocab and length generally are
def generate_smiles(model, latent_dim=64, idx_to_char=None, temperature=1.0):
    '''
    Generates a new SMILES string by sampling from the VAE's latent space.

    Args:
        model (nn.Module): VAE model
        latent_dim (int): Dimension of the latent space
    '''
    z = torch.randn(1, latent_dim).to(model.fc1.weight.device)  # Ensure z is on the same device as the model
    with torch.no_grad():
        generated = model.decode(z)  # Use decode instead of decoder
    # Add postprocessing to convert to SMILES
    # generated_tokens_indices = torch.argmax(generated, dim=-1).cpu().numpy().flatten()
    #probs = F.softmax(generated / temperature, dim=-1)
    probs = F.softmax(generated.view(max_length, vocab_size) / temperature, dim=-1)

    # Sample the next character from the probability distribution
    generated_tokens_indices = torch.multinomial(probs, 1).cpu().numpy().flatten()

    # # === ✅ DEBUG BLOCK START ===
    # print("Sample vocab mapping:", list(idx_to_char.items())[:10])
    # unknowns = [i for i in generated_tokens_indices if i not in idx_to_char]
    # if unknowns:
    #     print("⚠️ Unknown token indices:", unknowns)
    # else:
    #     print("✅ All indices map to known tokens.")
    # # === ✅ DEBUG BLOCK END ===

    # print("Vocab size:", len(idx_to_char))
    # print("Max token index generated:", max(generated_tokens_indices))

    # # Print generated tokens and indices for debugging
    # print("Generated tokens indices:", generated_tokens_indices)
    # print("Generated tokens:", [idx_to_char.get(i, "<UNK>") for i in generated_tokens_indices])

    # Iterate through indices to build the SMILES string
    generated_smiles = "".join([idx_to_char.get(i, "") for i in generated_tokens_indices])

    # Verification using rdkit
    is_valid = verify_smiles(generated_smiles)
    if is_valid:
      return generated_smiles
    else:
      return "INVALID"

    # return generated_smiles

In [5]:
# Dataset class for SMILES strings
class SMILESDataset(Dataset):
    def __init__(self, smiles_list, max_length=150, char_to_idx=None):
        '''
        Initializes the SMILESDataset with a list of SMILES strings.

        Args:
            smiles_list (list): List of SMILES strings
            max_length (int): Maximum length of the SMILES strings
            char_to_idx (dict): Character-to-index mapping

        The dataset will one-hot encode each character in a SMILES string to a fixed-size tensor of shape (max_length * vocab_size).
        If a SMILES string is shorter than max_length, it will be padded with zeros. If longer, it will be truncated.
        '''
        self.smiles_list = smiles_list
        self.max_length = max_length

        if char_to_idx is None:
            raise ValueError("Please provide a fixed character-to-index mapping")
            # self.char_to_idx, self.idx_to_char = build_vocabulary(smiles_list)
        else:
            self.char_to_idx = char_to_idx
            self.idx_to_char = {v: k for k, v in char_to_idx.items()}

        self.vocab_size = len(self.char_to_idx)

        original_count = len(smiles_list)
        filtered = []
        invalid_count = 0

        for s in smiles_list:
            s = s.strip()
            if all(c in self.char_to_idx for c in s):
                filtered.append(s)
            else:
                invalid_count += 1
        print(f"Total: {original_count}, Valid: {len(filtered)}, Invalid: {invalid_count}")
        self.smiles_list = filtered

    def __len__(self):
        '''
        Returns:
            int: Number of valid SMILES strings in the dataset
        '''

        return len(self.smiles_list)

    def __getitem__(self, idx):
        '''
        Fetches the encoded version of a SMILES string at a given index.

        Args:
            idx (int): Index of the SMILES string to retrieve

        Returns:
            torch.Tensor: One-hot encoded tensor of the SMILES string of shape (max_length * vocab_size)
        '''

        smiles = self.smiles_list[idx]
        # One-hot encode the SMILES string
        encoded = torch.zeros(self.max_length, self.vocab_size)
        for i, char in enumerate(smiles[:self.max_length]):
            encoded[i, self.char_to_idx[char]] = 1.0

        return encoded.view(-1) #Flatten into 1D tensor

In [6]:
# Rework flow, completely all over the place with its math and needs to be changed with how Jacobian is used


class Flow(nn.Module):
    def __init__(self, input_dim, num_flows):
        super(Flow, self).__init__()
        self.layers = nn.ModuleList([AffineCouplingLayer(input_dim) for _ in range(num_flows)])

    def forward(self, x, reverse=False):
        log_det_jacobian = 0
        if not reverse:
            for layer in self.layers:
                x, ldj = layer(x, reverse=False)
                log_det_jacobian += ldj
        else:
            for layer in reversed(self.layers):
                x, ldj = layer(x, reverse=True)
                log_det_jacobian += ldj
        return x, log_det_jacobian


class AffineCouplingLayer(nn.Module):
    def __init__(self, input_dim):
        super(AffineCouplingLayer, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim // 2, input_dim),
            nn.ReLU(),
            nn.Linear(input_dim, input_dim // 2)
        )

    def forward(self, x, reverse=False):
        x1, x2 = x.chunk(2, dim=1)  # Split input into two halves
        if not reverse:
            log_s = self.net(x1)
            t = self.net(x1)
            x2 = x2 * torch.exp(log_s) + t
            log_det_jacobian = log_s.sum(dim=1)
        else:
            log_s = self.net(x1)
            t = self.net(x1)
            x2 = (x2 - t) * torch.exp(-log_s)
            log_det_jacobian = -log_s.sum(dim=1)
        x = torch.cat([x1, x2], dim=1)
        return x, log_det_jacobian

In [7]:
# Z is latent representation of input data
class VAE(nn.Module):
    def __init__(self, input_dim, latent_dim, vocab_size):
        super(VAE, self).__init__()
        self.input_dim = input_dim
        self.latent_dim = latent_dim

        # Encoder
        self.fc1 = nn.Linear(input_dim, 256)
        self.fc_mu = nn.Linear(256, latent_dim)
        self.fc_logvar = nn.Linear(256, latent_dim)

        # Decoder
        self.fc3 = nn.Linear(latent_dim, 256)
        self.fc4 = nn.Linear(256, input_dim)

    def encode(self, x):
        h1 = F.relu(self.fc1(x))
        return self.fc_mu(h1), self.fc_logvar(h1)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h3 = F.relu(self.fc3(z))
        return torch.sigmoid(self.fc4(h3))

    def forward(self, x):
        mu, logvar = self.encode(x.view(-1, self.input_dim))
        z = self.reparameterize(mu, logvar)

        # Pass z through flow model
        #z, log_det_jacobian = flow_model(z)

        recon_x = self.decode(z)
        return recon_x, mu, logvar, #log_det_jacobian # jacobian is added

In [8]:
def vae_loss(recon_x, x, mu, logvar):
  # log_det_jacobian is a parameter added for flow
    batch_size = x.size(0)
    vocab_size = 29
    seq_len = x.size(1) // vocab_size  # should be 150

    # Reshape both to [batch * seq_len, vocab_size]
    x = x.view(batch_size, seq_len, vocab_size).view(-1, vocab_size)
    recon_x = recon_x.view(batch_size, seq_len, vocab_size).view(-1, vocab_size)

    BCE = F.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    #flow_loss = torch.sum(log_det_jacobian)

    #return BCE + flow_loss
    return BCE + KLD

    # remove the kl divergence

    # look into log det jacobian, fix problem


In [9]:
# normalize the data?
# dont think we are, needed if using sigmoid for the deocder and BCE loss

# Load SMILES strings
with open('dataset/train.txt', 'r') as f:
    smiles_train = [line.strip() for line in f]

with open('dataset/test.txt', 'r') as f:
    smiles_test = [line.strip() for line in f]

# Apply cleaning to your SMILES
smiles_train = [clean_smiles(smiles) for smiles in smiles_train]
smiles_test = [clean_smiles(smiles) for smiles in smiles_test]
smiles_train = smiles_train[:10000]
smiles_test = smiles_test[:10000]

# smiles_train = load_smiles_from_csv('dataset/train.txt', split_type='train')
# smiles_test = load_smiles_from_csv('dataset/test.txt', split_type='test')  # if test rows are in same file

# print(f"Raw SMILES loaded: train={len(smiles_train)}, test={len(smiles_test)}") # output for testing purposes
all_smiles = smiles_train + smiles_test
unique_chars = extract_unique_chars(all_smiles)

print(f"Total unique characters: {len(unique_chars)}")
print("Unique characters in dataset:")
print(unique_chars)

# Use extracted unique characters to rebuild vocabulary
VALID_CHARS = unique_chars
char_to_idx = {c: i for i, c in enumerate(VALID_CHARS)}
idx_to_char = {i: c for c, i in char_to_idx.items()}

# Create datasets
train_dataset = SMILESDataset(smiles_train, max_length=150, char_to_idx=char_to_idx)
test_dataset = SMILESDataset(smiles_test, max_length=150, char_to_idx=char_to_idx)
print("Training Vocabulary Size:", train_dataset.vocab_size)
print("Test Vocabulary Size:", test_dataset.vocab_size) # Should be the same


print(f"# Train SMILES after filtering: {len(train_dataset)}")
print(f"# Test SMILES after filtering: {len(test_dataset)}")
# train_dataset = SMILESDataset(smiles_train)
# test_dataset = SMILESDataset(smiles_test, char_to_idx=train_dataset.char_to_idx)  # Share vocabulary

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)  # No need to shuffle test data

print(f"Number of batches in train_loader: {len(train_loader)}")
print(f"Number of batches in test_loader: {len(test_loader)}")

Total unique characters: 29
Unique characters in dataset:
['#', '(', ')', '-', '1', '2', '3', '4', '5', '=', 'B', 'C', 'E', 'F', 'H', 'I', 'L', 'M', 'N', 'O', 'S', '[', ']', 'c', 'l', 'n', 'o', 'r', 's']
Total: 10000, Valid: 10000, Invalid: 0
Total: 10000, Valid: 10000, Invalid: 0
Training Vocabulary Size: 29
Test Vocabulary Size: 29
# Train SMILES after filtering: 10000
# Test SMILES after filtering: 10000
Number of batches in train_loader: 1250
Number of batches in test_loader: 1250


In [10]:
# Check a batch of data
for i, data in enumerate(train_loader):
    if i == 0:  # Just visualize the first batch
        print(data)
        break

# Visualize 3 samples
print("\nSample SMILES visualizations:")
for i in range(3):
    encoded = train_dataset[i]
    original = train_dataset.smiles_list[i]
    decoded = decode_smiles(encoded, train_dataset.idx_to_char)

    print(f"\nSample {i+1}")
    print(f"Original : {original}")
    print(f"Decoded  : {decoded}")
    print(f"Shape    : {encoded.shape}")

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

Sample SMILES visualizations:

Sample 1
Original : SMILES
Decoded  : SMILES################################################################################################################################################
Shape    : torch.Size([4350])

Sample 2
Original : CCCS(=O)c1ccc2[nH]c(=NC(=O)OC)[nH]c2c1
Decoded  : CCCS(=O)c1ccc2[nH]c(=NC(=O)OC)[nH]c2c1################################################################################################################
Shape    : torch.Size([4350])

Sample 3
Original : CC(C)(C)C(=O)C(Oc1ccc(Cl)cc1)n1ccnc1
Decoded  : CC(C)(C)C(=O)C(Oc1ccc(Cl)cc1)n1ccnc1##################################################################################################################
Shape    : torch.Size([

In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
print(f"Using device: {device}")

# Instantiate the VAE model
input_dim = train_dataset.vocab_size * train_dataset.max_length  # Flatten the input (max_length x vocab_size)
latent_dim = 64
# try lowering the latent dim as it was too high at 128
# 128

vocab_size = train_dataset.vocab_size
max_length = train_dataset.max_length

print("Vocab size:", train_dataset.vocab_size)
print("max_length:", train_dataset.max_length)
print("Input dim:", input_dim)

vae = VAE(input_dim, latent_dim, len(idx_to_char))
vae.to(device)

# Must define flow and affline coupling layer
# input_dim = vae.latent_dim
# num_flows = 4 # Number of flow layers
# flow_model = Flow(input_dim, latent_dim, num_flows)
# flow_model.to(device)

# Optimizer
# 1e-5 = 0.00001
optimizer = torch.optim.Adam(vae.parameters(), lr=0.0001)
#optimizer = torch.optim.Adam(list(vae.parameters()) + list(flow_model.parameters()), lr=0.0001)


# Training and Evaluation loop
epochs = 100
for epoch in range(epochs):
    vae.train()  # Set model to training mode
    train_loss = 0
    for data in train_loader:  # Iterate over training data
        optimizer.zero_grad()
        # Flatten the input here before passing to the model
        #data = data.view(-1, input_dim).to(device)
        data = data.to(device)
        recon_batch, mu, logvar = vae(data)
        #recon_batch, mu, logvar, log_det_jacobian = vae(data)

        #print(f"Reconstructed output: {recon_batch[:5]}") # testing
        #break

        loss = vae_loss(recon_batch, data, mu, logvar)
        #loss = vae_loss(recon_batch, data, mu, logvar, log_det_jacobian)

        #print(f"batch loss: {loss.item()}") # for testing

        loss.backward() # back prop
        train_loss += loss.item()

        optimizer.step() # optimization

    print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss/len(train_loader)}')

    # Evaluation on test set
    vae.eval()  # Set model to evaluation mode
    test_loss = 0
    with torch.no_grad():  # No need to calculate gradients during evaluation
        for data in test_loader:  # Iterate over test data
            # Flatten the input here as well
            #data = data.view(-1, input_dim).to(device)
            data = data.to(device)
            recon_batch, mu, logvar = vae(data)
            #recon_batch, mu, logvar, log_det_jacobian = vae(data)
            #loss = vae_loss(recon_batch, data, mu, logvar, log_det_jacobian)
            loss = vae_loss(recon_batch, data, mu, logvar)
            test_loss += loss.item()

    print(f'Epoch [{epoch+1}/{epochs}], Test Loss: {test_loss/len(test_loader)}')

Using device: cuda
Vocab size: 29
max_length: 150
Input dim: 4350
Epoch [1/100], Train Loss: 2969.57891015625
Epoch [1/100], Test Loss: 1133.6417221191407
Epoch [2/100], Train Loss: 1065.87243828125
Epoch [2/100], Test Loss: 1051.2090211914062
Epoch [3/100], Train Loss: 1022.4547129882812
Epoch [3/100], Test Loss: 1027.7546665527343
Epoch [4/100], Train Loss: 1001.4437251464843
Epoch [4/100], Test Loss: 1011.0516913574219
Epoch [5/100], Train Loss: 985.6617461914062
Epoch [5/100], Test Loss: 988.7768387695312
Epoch [6/100], Train Loss: 958.1156721679688
Epoch [6/100], Test Loss: 962.3720509765625
Epoch [7/100], Train Loss: 931.314619921875
Epoch [7/100], Test Loss: 935.6422159667969
Epoch [8/100], Train Loss: 901.3379482910157
Epoch [8/100], Test Loss: 908.65354296875
Epoch [9/100], Train Loss: 873.2577166503906
Epoch [9/100], Test Loss: 883.4883260742188
Epoch [10/100], Train Loss: 847.8459929199219
Epoch [10/100], Test Loss: 862.5189785644532
Epoch [11/100], Train Loss: 826.883067773

In [16]:
# Generate a new molecule from VAE by sampling from the latent space
generated_smiles = generate_smiles(vae, latent_dim, train_dataset.idx_to_char)  # pass idx_to_char

print(f"Generated SMILES: {generated_smiles}")

# # Mock setup for quick testing
# vae.eval()  # Set model to eval mode (disables dropout, etc.)

# # Generate SMILES from random latent vector
# try:
#     result = generate_smiles(vae, latent_dim, idx_to_char=train_dataset.idx_to_char, temperature=1.0)
#     print("Test SMILES output:", result) # will print invalid if not valid
# except Exception as e:
#     print("Error while generating SMILES:", e)

Generated SMILES: INVALID


[10:58:31] SMILES Parse Error: syntax error while parsing: 3n[S=EMNLH5OH2LI][2on[I4#(2rc]cl#(H[S=]53OLH))MMC34L3M[s5Sr#H)5I2F#I=lF-52(S1EBoooosI(5-3oo(]34c(s4lCo=SC2LnE-LBH1M([2l=-3)CnI=(=-)=r-Il33lL=(OH)FcLEO
[10:58:31] SMILES Parse Error: check for mistakes around position 1:
[10:58:31] 3n[S=EMNLH5OH2LI][2on[I4#(2rc]cl#(H[S=]53
[10:58:31] ^
[10:58:31] SMILES Parse Error: Failed parsing SMILES '3n[S=EMNLH5OH2LI][2on[I4#(2rc]cl#(H[S=]53OLH))MMC34L3M[s5Sr#H)5I2F#I=lF-52(S1EBoooosI(5-3oo(]34c(s4lCo=SC2LnE-LBH1M([2l=-3)CnI=(=-)=r-Il33lL=(OH)FcLEO' for input: '3n[S=EMNLH5OH2LI][2on[I4#(2rc]cl#(H[S=]53OLH))MMC34L3M[s5Sr#H)5I2F#I=lF-52(S1EBoooosI(5-3oo(]34c(s4lCo=SC2LnE-LBH1M([2l=-3)CnI=(=-)=r-Il33lL=(OH)FcLEO'
