Pytorch implementation of SMILES2VEC code

Read in the data of SMILES strings with their properties

Data Loading & Preprocessing 

In [None]:
import pandas as pd
import numpy as np
import torch

np.random.seed(42)

'''
This script processes molecular data in the form of SMILES strings and prepares it for machine learning models.
The data is loaded from a CSV file and split into training and testing sets based on a "split" column.
The SMILES strings are one-hot encoded using a custom function that includes special start ('!') and end ('E')
characters, ensuring all sequences have a uniform length. The encoded SMILES strings are converted into PyTorch 
tensors, making them suitable for training and testing a deep learning model.

The script also handles the assay activity values, which are reshaped and converted into PyTorch tensors as well. 
Additionally, after vectorization, the SMILES strings are decoded back into their original form for verification.

Key steps:
1. Load the SMILES and activity data from a CSV file and split into training and testing sets.
2. Create a character set from all unique characters found in SMILES and map characters to integers and vice versa.
3. Vectorize (one-hot encode) the SMILES strings with start, end, and padding characters.
4. Convert the one-hot encoded data and activity values into PyTorch tensors for use in a deep learning model.
5. Decode the one-hot encoded SMILES back to their original form for verification.
'''


# Load the data from a CSV file containing SMILES strings and assay activity
DATA = pd.read_csv('data/IGC50.csv')


# Split the SMILES data into training and testing sets based on the "split" column in the data
X_train_smiles = np.array(list(DATA["smiles"][DATA["split"] == 1]))  # Training SMILES strings
X_test_smiles = np.array(list(DATA["smiles"][DATA["split"] == 0]))   # Testing SMILES strings

# Extract the assay values (e.g., activity) for training and testing sets based on the "split" column
assay = "Activity"  # Column name for the assay values
Y_train = DATA[assay][DATA["split"] == 1].values.reshape(-1, 1)  # Training labels (assay values)
Y_test = DATA[assay][DATA["split"] == 0].values.reshape(-1, 1)   # Testing labels (assay values)

# Print the shapes of the training and testing data
print(X_train_smiles.shape, Y_train.shape)
print(X_test_smiles.shape, Y_test.shape)

# Create a character set based on all unique characters in the SMILES strings plus special start and end chars
charset = set("".join(list(DATA.smiles)) + "!E")

# Create mappings between characters and integers (for encoding and decoding)
char_to_int = dict((c, i) for i, c in enumerate(charset))  # Map characters to integers
int_to_char = dict((i, c) for i, c in enumerate(charset))  # Map integers to characters

# Calculate the maximum length of a SMILES string and add padding to ensure uniform length
embed = max([len(smile) for smile in DATA.smiles]) + 50  # Add padding space of 50

# Print the charset and embedding size (vocabulary size and maximum sequence length)
print(str(charset))
print(len(charset), embed)

# Show the character-to-integer mapping
char_to_int

# Define a function to vectorize the SMILES strings into one-hot encoded arrays
def vectorize(smiles):
    # Initialize a one-hot encoded array of shape (number of smiles, max length, number of unique characters)
    one_hot = np.zeros((smiles.shape[0], embed, len(charset)), dtype=np.int8)
    
    # Loop over each SMILES string and encode the characters
    for i, smile in enumerate(smiles):
        # Encode the start character '!'
        one_hot[i, 0, char_to_int["!"]] = 1
        
        # Encode each character in the SMILES string
        for j, c in enumerate(smile):
            one_hot[i, j + 1, char_to_int[c]] = 1  # Offset by 1 to account for start character
        
        # Encode the end character 'E'
        one_hot[i, len(smile) + 1:, char_to_int["E"]] = 1
    
    # Return the one-hot encoded SMILES for input and output (shifted by one position)
    return one_hot[:, 0:-1, :], one_hot[:, 1:, :]

# Vectorize the SMILES strings for training and testing sets
X_train, _ = vectorize(X_train_smiles)
X_test, _ = vectorize(X_test_smiles)

# Print the shape of the vectorized training set (for confirmation)
print(X_train.shape)

# Convert the NumPy arrays to PyTorch tensors for training and evaluation
X_train_tensor = torch.from_numpy(X_train).long()  # SMILES strings (one-hot encoded)
Y_train_tensor = torch.from_numpy(Y_train).float()  # Assay activity values (float-based)

# Print the shape of the assay activity values
print(Y_train.shape)

# Convert the test data to PyTorch tensors
X_test_tensor = torch.from_numpy(X_test).long()  # SMILES strings (one-hot encoded)
Y_test_tensor = torch.from_numpy(Y_test).float()  # Assay activity values (float-based)

# Initialize lists to store decoded SMILES strings from the one-hot encoded vectors for training and testing sets
mol_str_train = []
mol_str_test = []

# Decode the one-hot encoded training SMILES strings back to their original form
for x in range(1434):  # Number of training samples
    mol_str_train.append("".join([int_to_char[idx] for idx in np.argmax(X_train[x, :, :], axis=1)]))

# Decode the one-hot encoded testing SMILES strings back to their original form
for x in range(358):  # Number of testing samples
    mol_str_test.append("".join([int_to_char[idx] for idx in np.argmax(X_test[x, :, :], axis=1)]))

# Get the size of the vocabulary (number of unique characters)
vocab_size = len(charset)

# Print the size of the vocabulary
print(vocab_size)


Defining Pytorch CNN model

In [6]:
import torch.nn as nn

'''
This code defines a PyTorch neural network architecture called `CNNonSMILES`, designed for processing SMILES (Simplified Molecular 
Input Line Entry System) strings and predicting molecular properties. The architecture consists of an embedding layer for 
vectorizing SMILES characters, a GRU layer for capturing sequential relationships, convolutional layers for feature extraction, 
and fully connected layers for final prediction.

The model architecture includes the following key components:
1. **Embedding layer**: Converts SMILES characters into dense vectors.
2. **GRU (Gated Recurrent Unit)**: A recurrent layer to handle the sequential nature of SMILES data.
3. **Convolutional layers**: Three 1D convolutional layers (with BatchNorm applied after the first one) to extract spatial 
   features from the sequences.
4. **Fully connected layers**: Dense layers to process the flattened features extracted by the convolutional layers and 
   output the final prediction (e.g., molecular property).
5. **Dropout**: A dropout layer to prevent overfitting by randomly zeroing some of the layer’s outputs during training.

The forward method returns the model’s output prediction as well as intermediate outputs from various layers (embedding, 
first convolutional layer, final convolutional layer).

This architecture is suitable for tasks such as molecular property prediction from SMILES strings.

'''

# Define the CNNonSMILES neural network architecture
class CNNonSMILES(nn.Module):
    def __init__(self, vocab_size, embed):
        super(CNNonSMILES, self).__init__()
        
        # Embedding layer: Converts input SMILES characters into dense vectors
        self.embedding = nn.Embedding(vocab_size, 50, padding_idx=0)
        
        # GRU layer: Captures sequential dependencies in the SMILES strings
        self.gru = nn.GRU(input_size=50, hidden_size=50, num_layers=1, batch_first=True)
        
        # Convolutional layers (1D):
        # First convolutional layer with BatchNorm
        self.conv1 = nn.Conv1d(50, 192, kernel_size=10)
        self.bn1 = nn.BatchNorm1d(192)  # Apply batch normalization after the first convolution
        # Second and third convolutional layers for further feature extraction
        self.conv2 = nn.Conv1d(192, 192, kernel_size=5)
        self.conv3 = nn.Conv1d(192, 192, kernel_size=3)
        
        # Flatten layer: Converts the multidimensional output of the convolutional layers into a 1D vector
        self.flatten = nn.Flatten()
        
        # Fully connected (dense) layers:
        # First fully connected layer
        self.fc1 = nn.Linear(16512, 100)
        self.relu = nn.ReLU()  # ReLU activation function
        self.dropout = nn.Dropout(0.4)  # Dropout to prevent overfitting
        
        # Second fully connected layer (final output)
        self.fc2 = nn.Linear(100, 1)
        
    def forward(self, x):
        # Pass the input through the embedding layer
        x = self.embedding(x)
        x_emb = x  # Store the embedding output
        
        # Permute the tensor to match the input format expected by Conv1D (batch, channels, sequence_length)
        x = x.permute(0, 2, 1)
        
        # Pass through the first convolutional layer and apply batch normalization and ReLU activation
        x = self.conv1(x)
        x_conv1 = x  # Store the output after the first convolution
        x = self.bn1(x)
        x = self.relu(x)
        
        # Pass through the second and third convolutional layers with ReLU activations
        x = self.conv2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.relu(x)
        x_final = x  # Store the output after the final convolutional layer
        
        # Flatten the output from the convolutional layers
        x = self.flatten(x)
        
        # Pass through the first fully connected layer and apply ReLU activation
        x = self.fc1(x)
        x = self.relu(x)
        
        # Apply dropout to prevent overfitting
        x = self.dropout(x)
        
        # Final output prediction through the second fully connected layer
        x = self.fc2(x)
        
        # Return the final prediction, and intermediate outputs for visualization or analysis
        return x, x_emb, x_conv1, x_final


Training the CNN model 

In [7]:
'''
This script defines a training and validation pipeline for a PyTorch model (assumed to be `CNNonSMILES` or a custom model). 
It includes:
1. **Model Definition**: The model is defined with parameters like `vocab_size` and `embed`.
2. **Optimizer**: The Adam optimizer is used to update the model weights.
3. **Learning Rate Scheduler**: A scheduler (`ReduceLROnPlateau`) is used to reduce the learning rate when validation loss 
   plateaus.
4. **Loss Function**: Mean Squared Error (MSE) loss is used to compute the difference between the predicted and actual values.
5. **DataLoader**: DataLoaders for training and validation data are created, with batch sizes for the respective datasets.
6. **Training Loop**: A loop runs for `num_epochs` iterations, where the model trains on the data and evaluates performance 
   on a validation set. Training and validation losses are tracked.
7. **Checkpointing**: The model's best weights (based on validation loss) are saved.
8. **Learning Rate Adjustment**: The scheduler adjusts the learning rate based on validation loss improvements.

This pipeline handles SMILES data and is structured to train the model, evaluate it, and dynamically adjust learning 
parameters during training.
'''

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Define your PyTorch model
model = CNNonSMILES(vocab_size, embed)  # Assuming 'vocab_size' and 'embed' are defined for the SMILES model

# Define the optimizer (Adam optimizer)
optimizer = Adam(model.parameters(), lr=0.01)

# Define a learning rate scheduler to reduce the LR when the validation loss plateaus
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-15, verbose=True)

# Define the loss function (Mean Squared Error for regression tasks)
loss_function = nn.MSELoss()

# Create DataLoader for training and validation data
# The SMILES tensors are passed through torch.argmax to reduce dimensionality (from one-hot encoding) for training
train_dataset = TensorDataset(torch.argmax(X_train_tensor, dim=2), Y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=1434, shuffle=False)  # Load training data in batches

val_dataset = TensorDataset(torch.argmax(X_test_tensor, dim=2), Y_test_tensor)
val_loader = DataLoader(val_dataset, batch_size=358)  # Load validation data

# Initialize lists to log the loss during training and validation
trainloss_profile = []
valloss_profile = []
num_epochs = 150  # Set the number of training epochs

# Training loop: Loop over the dataset multiple times
for epoch in range(num_epochs):

    model.train()  # Set the model to training mode
    train_loss = 0.0  # Reset training loss for the epoch
    
    # Loop over the training data batches
    for inputs, targets in train_loader:
        optimizer.zero_grad()  # Zero the parameter gradients
        outputs, x_emb_train, x_conv1_train, x_final_train = model(inputs)  # Forward pass through the model
        loss = loss_function(outputs, targets)  # Compute loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update the weights
        train_loss += loss.item() * inputs.size(0)  # Accumulate training loss
    
    train_loss /= len(train_loader.dataset)  # Compute the average loss for the training set
    
    # Validation phase (without gradient updates)
    model.eval()  # Set the model to evaluation mode
    val_loss = 0.0  # Reset validation loss for the epoch
    
    with torch.no_grad():  # Disable gradient calculations during validation
        for inputs, targets in val_loader:
            outputs, x_emb_val, x_conv1_val, x_final_val = model(inputs)  # Forward pass on validation data
            loss = loss_function(outputs, targets)  # Compute loss
            val_loss += loss.item() * inputs.size(0)  # Accumulate validation loss
    
    val_loss /= len(val_loader.dataset)  # Compute the average loss for the validation set
    
    # Print the current epoch number and the corresponding losses
    print('epoch', epoch)
    print('train_loss', train_loss)
    print('val_loss', val_loss)

    # Step the learning rate scheduler based on validation loss
    scheduler.step(val_loss)
    
    # Log the losses for this epoch
    trainloss_profile.append([epoch, train_loss])
    valloss_profile.append([epoch, val_loss])
    
    # Checkpointing: Save the model if validation loss improves
    if val_loss < best_val_loss:  # If current validation loss is the best so far, save the model
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'weights_best.pth')  # Save the model's state


Extract latent vector space representation  (embeddings) of the data for further analysis

In [10]:
'''
This part of the code extracts specific latent vectors or embeddings (convolutional outputs) of the model. The goal is to:
1. **Combine Training and Validation Embeddings**: Stack the convolutional outputs from both the training and validation 
   data into one array (`x_conv1_all`) for analysis.
2. **Process Inputs**: The input SMILES data, reduced from one-hot encoding to integer indices, is also combined for 
   training and validation sets (`X_inp_all`).
3. **Target Vocabulary Extraction**: The code loops through each molecule and character in the SMILES strings, identifies 
   instances of a specific character (vocabulary element), and retrieves the corresponding convolutional layer outputs for 
   that character (latent vectors).
4. **Save Embeddings**: For each instance where the target vocabulary element is found, its latent vector is concatenated 
   with the molecule index and saved to a CSV file (`conv1embs.csv`).

The embeddings or latent vectors represent how specific characters in the SMILES strings are transformed during the 
convolutional operations.
'''

import numpy as np
import torch

# Stack (combine) the convolutional layer outputs from both the training and validation sets
x_conv1_all = np.vstack((x_conv1_train.detach().numpy(), x_conv1_val.detach().numpy()))

# Stack (combine) the input SMILES data (after torch.argmax reduces one-hot encoding to integer indices)
X_inp_all = np.vstack((torch.argmax(X_train_tensor, dim=2), torch.argmax(X_test_tensor, dim=2)))

# Print the shapes of the input SMILES data and the convolutional layer outputs
print(X_inp_all.shape)  # Shape should be (number of molecules, max_sequence_length)
print(x_conv1_all.shape)  # Shape should be (number of molecules, channels, sequence_length)

# Initialize a list to store the embeddings to be saved later
save_emb = []

# Define the target vocabulary element to search for (example: 19 corresponds to "O")
target_vocab = 19

# Loop through each molecule and each character in the SMILES strings
for each_molecule in range(1792):  # Total number of molecules (combined train + validation)
    for each_character in range(X_inp_all.shape[1]):  # Loop over each character in the SMILES string

        # Check if the current character matches the target vocabulary element (e.g., "O")
        if X_inp_all[each_molecule][each_character] == target_vocab:
            
            # Get the output of the first convolutional layer for the specific molecule and character
            final = x_conv1_all[each_molecule, :, each_character]
            print(final.shape)  # Print the shape of the extracted convolutional embedding
            
            # Append the molecule index to the embedding for identification purposes
            finalindexed = np.concatenate((final, [each_molecule]))

            # Save the indexed embedding for later use
            save_emb.append(finalindexed)

# Convert the list of embeddings to a NumPy array and save it to a CSV file
np.savetxt('conv1embs.csv', save_emb, delimiter=',')
