<a href="https://colab.research.google.com/github/SamuelRuby/DeepLearningProjects/blob/main/VAE_for_Molecular_Representation_AND_Generation_ZINC_Dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --upgrade numpy
!pip install --upgrade scipy
!pip install --upgrade scikit-learn

In [None]:
!pip install rdkit-pypi

In [None]:
# ------------------                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          # --------------------
# ---LIBRARIES------
# ------------------

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
# from sklearn.datasets import fetch_openaml

In [None]:
from rdkit import Chem, DataStructs
from rdkit.Chem import Draw, AllChem

# ** PHASE 1: Setup & Data Exploration **

## Step 1.1: Understand the ZINC dataset structure

In [None]:
# Find and load a subset of the ZINC dataset (SMILES format is standard).

# Understand what SMILES strings are (how they represent molecules).

# Visualize a few molecules from their SMILES strings.

# Optional: convert SMILES to molecular fingerprints (to understand structure).

In [None]:
!wget https://raw.githubusercontent.com/schwallergroup/ai4chem_course/generative_models/notebooks/05%20-%20Generative%20Models/data/zinc.smi -O zinc.smi


In [None]:
# Load SMILES strings
zinc_data = pd.read_csv('zinc.smi', header=None, skiprows=1)
zinc_data.columns = ['SMILES']
print(zinc_data.head())

In [None]:
# VISUALIZE
# Convert SMILES to RDKit molecule objects
molecules = [Chem.MolFromSmiles(smiles) for smiles in zinc_data['SMILES'][:5]]

# Filter out None values from the molecules list
molecules = [mol for mol in molecules if mol is not None]

# Visualize molecules
Draw.MolsToGridImage(molecules, molsPerRow=3)


In [None]:
# #visulaize
# #zinc_data['SMILES']
# smiles_list = zinc_data['SMILES'].tolist()[:5] # Get the first 5 SMILES strings
# mols = [Chem.MolFromSmiles(smiles) for smiles in smiles_list] # Convert SMILES to RDKit Mol objects
# Draw.MolsToGridImage(mols, molsPerRow=5, subImgSize=(200, 200), legends=smiles_list) # Display molecules in a grid

In [None]:
#converting smiles to molecular fingerprints

# Generate fingerprints, but only if molecules is not empty
if molecules:
    fingerprints = [AllChem.GetMorganFingerprintAsBitVect(mol, radius=2, nBits=1024) for mol in molecules]

    # Example: Print the fingerprint of the first molecule
    print(fingerprints[0])
else:
    print("No valid molecules found. Skipping fingerprint generation.")


In [None]:
#print(fingerprints[0].ToBitString())

fingerprint_array = np.array(fingerprints[0])
plt.imshow(fingerprint_array.reshape(32, 32), cmap='viridis')  # Adjust shape as needed
plt.colorbar()
plt.show()

In [None]:
from rdkit import DataStructs

similarity = DataStructs.FingerprintSimilarity(fingerprints[0], fingerprints[1])  # Compare the first two fingerprints
print(similarity)

## Step 1.2: Preprocess the Data for the VAE

Tasks:

Tokenize SMILES strings (each character or token becomes part of your vocabulary).

Convert SMILES to sequences of tokens (integer encoding).

Pad sequences to the same length.

Split into train/validation/test sets.

Goal: Prepare data for feeding into your VAE. Understand that each molecule becomes a sequence of tokens.



In [None]:
# tokenize
# def tokenize_smiles(smiles):
#     tokens = list(smiles)  # Each character becomes a token
#     return tokens

# tokenized_smiles = [tokenize_smiles(smiles) for smiles in zinc_data['SMILES']]
# print(tokenized_smiles)

# Tokens represent atoms, bonds, and structural features, enabling computational analysis
# Tokenized SMILES strings can be fed into NLP models for tasks like molecular property prediction or generative modeling.

In [None]:
# Try this more sophisticated tokenization approach
def tokenize_smiles(smiles):
    """Tokenize SMILES with awareness of chemical structures"""
    import re
    pattern = r'(\[[^\]]+]|Br?|Cl?|N|O|S|P|F|I|b|c|n|o|s|p|\(|\)|\.|=|#|-|\+|\\|\/|:|~|@|\?|>|\*|\$|\%[0-9]{2}|[0-9])'
    regex = re.compile(pattern)
    tokens = [token for token in regex.findall(smiles)]
    return tokens
tokenized_smiles = [tokenize_smiles(smiles) for smiles in zinc_data['SMILES']]
print(tokenized_smiles)

In [None]:
# Vocab : Maps each unique token to an integer, creating a consistent encoding scheme.
def build_vocabulary(tokenizedd_smiles):
    all_tokens = set()
    for smiles in tokenizedd_smiles:
        all_tokens.update(smiles)
    vocab = {token: idx for idx, token in enumerate(sorted(all_tokens))}
    return vocab

vocab = build_vocabulary(tokenized_smiles)
print(vocab)  # Displays the token-to-integer mapping


In [None]:
# integer encoding SMILES Strings: Transforms SMILES strings into sequences of integers, used as input for the machine learning model
def encode_smiles(smiles, vocab):
    tokens = tokenize_smiles(smiles)
    encoded = [vocab[token] for token in tokens]
    return encoded

encoded_smiles = [encode_smiles(smiles, vocab) for smiles in zinc_data['SMILES']]
print(encoded_smiles[:5])  # Displays the integer-encoded sequences for the first 5 SMILES strings


In [None]:
#padding sequences to same length
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Set maximum sequence length
max_len = max(len(seq) for seq in encoded_smiles)

# Pad sequences with zeros
padded_smiles = pad_sequences(encoded_smiles, maxlen=max_len, padding='post', value=0)
print(padded_smiles[:5])  # Displays the first 5 padded sequences


In [None]:
#split
from sklearn.model_selection import train_test_split

# Split into training and test sets (80% train, 20% test)
train_smiles, test_smiles = train_test_split(padded_smiles, test_size=0.2, random_state=42)

# Further split training set into training and validation sets (80% train, 20% validation)
train_smiles, val_smiles = train_test_split(train_smiles, test_size=0.2, random_state=42)


#added here
# Reduce each set by half
train_smiles = train_smiles[:len(train_smiles) // 2]
val_smiles = val_smiles[:len(val_smiles) // 2]
test_smiles = test_smiles[:len(test_smiles) // 2]


print(f"Training set size: {len(train_smiles)}")
print(f"Validation set size: {len(val_smiles)}")
print(f"Test set size: {len(test_smiles)}")


# ** PHASE 2: Build the VAE Architecture **

## Step 2.1: Implement the Encoder


Tasks:

Define an embedding layer (to map tokens to dense vectors).

Use an LSTM or GRU to encode the sequence.

Extract the mean and log variance (for the latent space).

Goal: Understand how the model compresses a molecule into a latent vector.

In [None]:
# LIBRARIES
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import LSTM, GRU

In [None]:
# define the embedding layer: that maps tokens (integer-encoded representations of SMILES strings) into dense vectors. therefore the model learn meaningful representations of individual tokens.

vocab_size = len(vocab)  # Total number of unique tokens #size of the vocabulary
embedding_dim = 128  # Dimension of the embedding space

embedding_layer = Embedding(input_dim=vocab_size, output_dim=embedding_dim, input_length=max_len)

#  output_dim=embedding_dim -->defines the dimensionality of the dense embedding vectors. Higher dimensions capture more information but require more computational power.

#input_length=max_len: The fixed sequence length after padding.

In [None]:
#LSTM OR GRU to Encode the Sequence and extract patterns from the SMILES string

# Both are capable of capturing sequential relationships in the data:
# LSTM (Long Short-Term Memory): Ideal for handling long-term dependencies.
# GRU (Gated Recurrent Unit): Similar to LSTM but computationally more efficient.


# Define the encoder (LSTM or GRU) --> START with GRU
latent_dim = 64  # Dimensionality of the latent vector #was 64 increased to 128

# encoder_lstm = LSTM(latent_dim, return_sequences=False, return_state=True)
# OR
encoder_gru = GRU(latent_dim, return_sequences=False, return_state=True)

# Pass padded integer-encoded sequences through encoder embedding layer
inputs = np.array(padded_smiles) # Padded integer-encoded sequences
embedded_sequences = embedding_layer(inputs) #padded tensor should be what is passed into embedding_layer function
_, state_h = encoder_gru(embedded_sequences)

# state_h is latent space
# return_sequences=False: Only the final output of the sequence is returned (compressed information).
# return_state=True: Captures the hidden state as well, which is critical for the latent representation.

In [None]:
# Extract the mean and log variance (for the latent space). a probabilistic latent space

z_mean = Dense(latent_dim)(state_h) #mu
z_log_var = Dense(latent_dim)(state_h)

## Step 2.2: Reparametization Technique --> Latent sampling

In [None]:
# Sampling: Generates the latent vector using the reparameterization trick.

def sampling(z_mean, z_log_var):
    epsilon = tf.random.normal(shape=tf.shape(z_mean)) #ε is random noise sampled from a standard normal distribution.
    z = z_mean + tf.exp(0.5 * z_log_var) * epsilon
    return z

# Sample latent vector
z = sampling(z_mean, z_log_var)

#this trick in VAE involves sampling from a normal distribution (one that has mean as 0 and std as 1), and then scaling/shifting our distribution using the latent variables(mean and std)
# this allows gradients to flow(backpropagate) through the sampling operation

In [None]:
z


## Step 2.3: Implement the Decoder

In [None]:
# Tasks:

# Take latent vectors and decode them into sequences.

# Use a recurrent decoder (LSTM/GRU) + a dense layer with softmax.

# Train the model to reconstruct the original SMILES from latent space.

# Goal: Learn how the VAE "imagines" or regenerates molecules.


In [None]:
# LIBARIES
# from tensorflow.keras.layers import Input, RepeatVector
# from tensorflow.keras.models import Model
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Model, Input
from tensorflow.keras.layers import Embedding, GRU, Dense, RepeatVector
import numpy as np
from tensorflow.keras.losses import sparse_categorical_crossentropy


In [None]:
# BUILD DECODER
# Decoder Input: Latent Vector decoding into sequences
decoder_input = Input(shape=(latent_dim,))  # The latent vector size is `latent_dim`

# Expand Latent Vector for Sequence Generation
repeat_vector = RepeatVector(max_len)(decoder_input)  # Repeat to match sequence length

# recurrent Decoder GRU/LSTM
decoder_gru = GRU(256, return_sequences=True)(repeat_vector)  # 256 units, recurrent processing
# OR
# decoder_lstm = LSTM(256, return_sequences=True)(repeat_vector)

# Dense Layer with Softmax
decoder_dense = Dense(vocab_size, activation='softmax')(decoder_gru)

# Define the decoder model
decoder = Model(decoder_input, decoder_dense)

#define loss function
decoder.compile(optimizer='adam', loss='categorical_crossentropy')


# ** PHASE 3: TRAINING **


## Step 3.1:  PUTTING EVERYTHING TOGETHER INTO A CLASS

In [None]:
# Encoder layers
encoder_gru = GRU(64, return_sequences=False, return_state=True)
z_mean_dense = Dense(64)
z_log_var_dense = Dense(64)

# Sampling layer
def sampling(z_mean, z_log_var):
    epsilon = tf.random.normal(shape=tf.shape(z_mean))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon


In [None]:
#OLDDD
# Decoder (already defined)
# decoder_input = Input(shape=(latent_dim,))
# repeat_vector = RepeatVector(max_len)(decoder_input)
# decoder_gru = GRU(256, return_sequences=True)(repeat_vector)
# decoder_dense = Dense(vocab_size, activation='softmax')(decoder_gru)
# decoder = Model(decoder_input, decoder_dense)

# 🧠 VAE Class
# class VAE(Model):
#     def __init__(self, embedding_layer, encoder_gru, z_mean_dense, z_log_var_dense, decoder, **kwargs):
#         super(VAE, self).__init__(**kwargs)
#         self.embedding_layer = embedding_layer
#         self.encoder_gru = encoder_gru
#         self.z_mean_dense = z_mean_dense
#         self.z_log_var_dense = z_log_var_dense
#         self.decoder = decoder

#     def train_step(self, data):
#         x = data  # data = padded SMILES input (batch)
#         with tf.GradientTape() as tape:
#             # 1. Embed
#             embedded = self.embedding_layer(x)

#             # 2. Encode
#             _, state_h = self.encoder_gru(embedded)
#             z_mean = self.z_mean_dense(state_h)
#             z_log_var = self.z_log_var_dense(state_h)
#             z = sampling(z_mean, z_log_var)

#             # 3. Decode
#             x_reconstructed = self.decoder(z)

#             # 4. Losses
#             reconstruction_loss = tf.reduce_mean(
#                 tf.keras.losses.sparse_categorical_crossentropy(x, x_reconstructed)
#             )
#             kl_loss = -0.5 * tf.reduce_mean(
#                 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
#             )
#             total_loss = reconstruction_loss + kl_loss

#         # Backprop
#         grads = tape.gradient(total_loss, self.trainable_weights)
#         self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

#         return {
#             "loss": total_loss,
#             "reconstruction_loss": reconstruction_loss,
#             "kl_loss": kl_loss
#         }


In [None]:
#changed code for implementing a call(method)
class VAE(Model):
    def __init__(self, embedding_layer, encoder_gru, z_mean_dense, z_log_var_dense, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.embedding_layer = embedding_layer
        self.encoder_gru = encoder_gru
        self.z_mean_dense = z_mean_dense
        self.z_log_var_dense = z_log_var_dense
        self.decoder = decoder

        # Define metrics
        self.total_loss_tracker = keras.metrics.Mean(name="loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def call(self, inputs):
        # Forward pass for inference
        embedded = self.embedding_layer(inputs)
        _, state_h = self.encoder_gru(embedded)
        z_mean = self.z_mean_dense(state_h)
        z_log_var = self.z_log_var_dense(state_h)
        z = sampling(z_mean, z_log_var)
        return self.decoder(z)

    def train_step(self, data):
        x = data  # data = padded SMILES input (batch)

        with tf.GradientTape() as tape:
            # 1. Embed
            embedded = self.embedding_layer(x)

            # 2. Encode
            _, state_h = self.encoder_gru(embedded)
            z_mean = self.z_mean_dense(state_h)
            z_log_var = self.z_log_var_dense(state_h)
            z = sampling(z_mean, z_log_var)

            # 3. Decode
            x_reconstructed = self.decoder(z)

            # 4. Losses
            reconstruction_loss = tf.reduce_mean(
                tf.keras.losses.sparse_categorical_crossentropy(x, x_reconstructed)
            )
            kl_loss = -0.5 * tf.reduce_mean(
                1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
            )
            total_loss = reconstruction_loss + kl_loss

        # Backprop
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result()
        }

    def test_step(self, data):
        # This is needed for validation to work
        x = data

        # Forward pass
        embedded = self.embedding_layer(x)
        _, state_h = self.encoder_gru(embedded)
        z_mean = self.z_mean_dense(state_h)
        z_log_var = self.z_log_var_dense(state_h)
        z = sampling(z_mean, z_log_var)
        x_reconstructed = self.decoder(z)

        # Compute losses
        reconstruction_loss = tf.reduce_mean(
            tf.keras.losses.sparse_categorical_crossentropy(x, x_reconstructed)
        )
        kl_loss = -0.5 * tf.reduce_mean(
            1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
        )
        total_loss = reconstruction_loss + kl_loss

        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result()
        }
#added get_config and from_config below
    def get_config(self):
      config = super(VAE, self).get_config()
      config.update({
          'embedding_layer': self.embedding_layer,
          'encoder_gru': self.encoder_gru,
          'z_mean_dense': self.z_mean_dense,
          'z_log_var_dense': self.z_log_var_dense,
          'decoder': self.decoder
      })
      return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [None]:
#CODE RUNS FOR ALMOST 2 HOURS ON GOOGLE COLAB FREE SERVER

vae = VAE(
    embedding_layer=embedding_layer,
    encoder_gru=encoder_gru,
    z_mean_dense=z_mean_dense,
    z_log_var_dense=z_log_var_dense,
    decoder=decoder
)
vae.compile(optimizer='adam')  # No need to specify loss here
vae.fit(train_smiles, epochs=50, batch_size=64, validation_data=(val_smiles,))


## Step 3.2: Visualize the Latent Space

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans

import seaborn as sns

In [None]:
from rdkit.Chem import Descriptors

In [None]:
# Tasks:

# Use PCA or t-SNE on the latent vectors.

# Color-code by molecule properties (e.g., molecular weight or logP if available).

# Try clustering.

# Goal: Build intuition for how molecules are distributed in latent space.

In [None]:
# generating latent vectors for the molecules

def generate_latent_vectors(model, data):
    """Extract latent vectors (z_mean) from the trained VAE model."""
    # Get the encoder part to generate latent vectors
    embedded = model.embedding_layer(data)
    _, state_h = model.encoder_gru(embedded)
    z_mean = model.z_mean_dense(state_h)
    return z_mean.numpy()

# Generate latent vectors for all molecules
latent_vectors = generate_latent_vectors(vae, padded_smiles)
latent_vectors

In [None]:
# Calculate molecular properties for color-coding
def calculate_mol_properties(smiles_list):
    """Calculate molecular weight and logP for each SMILES string."""
    properties = []
    for smiles in smiles_list:
        mol = Chem.MolFromSmiles(smiles)
        if mol:
            mw = Descriptors.MolWt(mol)
            logp = Descriptors.MolLogP(mol)
            properties.append({'MW': mw, 'LogP': logp})
        else:
            properties.append({'MW': np.nan, 'LogP': np.nan})
    return pd.DataFrame(properties)


# Extract original SMILES strings
original_smiles = zinc_data['SMILES'].tolist()
mol_properties = calculate_mol_properties(original_smiles)


In [None]:
mol_properties

In [None]:
# Apply PCA
pca = PCA(n_components=2)
pca_result = pca.fit_transform(latent_vectors)
print(f"PCA explained variance ratio: {pca.explained_variance_ratio_}")


In [None]:
# Investigate Latent Space:
pca = PCA(n_components=2)
pca_result = pca.fit_transform(latent_vectors)

# Access the components (loadings)
principal_components = pca.components_

# The first row of principal_components corresponds to PC1
pc1_loadings = principal_components[0]

# Now, you can analyze pc1_loadings:
# - Print the values to see the contribution of each latent variable to PC1
print(pc1_loadings)

# - Visualize the loadings using a bar chart or heatmap
import matplotlib.pyplot as plt

plt.bar(range(len(pc1_loadings)), pc1_loadings)
plt.xlabel("Latent Variable Index")
plt.ylabel("Loading on PC1")
plt.title("Contribution of Latent Variables to PC1")
plt.show()

# Larger (absolute) values ​​in pc1_loadingsindicate that the corresponding latent variables contribute more strongly to PC1.
# Positive values ​​indicate a positive correlation between the latent variable and PC1.
# Negative values ​​indicate a negative correlation.

In [None]:
# Apply t-SNE
tsne = TSNE(n_components=2, random_state=42, perplexity=30, n_iter=1000)
tsne_result = tsne.fit_transform(latent_vectors)
tsne_result

#takes 15 mins to run

In [None]:
plt.scatter(tsne_result[:, 0], tsne_result[:, 1])
plt.xlabel("t-SNE Dimension 1")
plt.ylabel("t-SNE Dimension 2")
plt.title("t-SNE Visualization of Molecules")
plt.show()

In [None]:
# K-Means clustering
kmeans = KMeans(n_clusters=5, random_state=42)
cluster_labels = kmeans.fit_predict(latent_vectors)
cluster_labels
#kmeans

In [None]:
# Visualization with PCA
plt.figure(figsize=(12, 10))

# Plot 1: PCA colored by molecular weight
plt.subplot(2, 2, 1)
scatter = plt.scatter(pca_result[:, 0], pca_result[:, 1], c=mol_properties['MW'],
                      cmap='viridis', alpha=0.6, s=10)
plt.colorbar(scatter, label='Molecular Weight')
plt.title('PCA of Latent Space (colored by MW)')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')

# Plot 2: PCA colored by LogP
plt.subplot(2, 2, 2)
scatter = plt.scatter(pca_result[:, 0], pca_result[:, 1], c=mol_properties['LogP'],
                      cmap='plasma', alpha=0.6, s=10)
plt.colorbar(scatter, label='LogP')
plt.title('PCA of Latent Space (colored by LogP)')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')

# Plot 3: PCA with K-means clusters
plt.subplot(2, 2, 3)
scatter = plt.scatter(pca_result[:, 0], pca_result[:, 1], c=cluster_labels,
                      cmap='tab10', alpha=0.6, s=10)
plt.colorbar(scatter, label='Cluster')
plt.title('PCA of Latent Space with K-means Clusters')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')

In [None]:
# 8. Analyze clusters
cluster_properties = pd.DataFrame({
    'Cluster': cluster_labels,
    'MW': mol_properties['MW'],
    'LogP': mol_properties['LogP']
})

# Calculate mean properties for each cluster
cluster_stats = cluster_properties.groupby('Cluster').agg(['mean', 'std', 'count'])
print("Cluster statistics:")
print(cluster_stats)

# 9. Additional visualization: Distribution of properties within clusters
plt.figure(figsize=(14, 6))

# Molecular Weight distribution by cluster
plt.subplot(1, 2, 1)
sns.boxplot(x='Cluster', y='MW', data=cluster_properties)
plt.title('Molecular Weight Distribution by Cluster')
plt.xlabel('Cluster')
plt.ylabel('Molecular Weight')

# LogP distribution by cluster
plt.subplot(1, 2, 2)
sns.boxplot(x='Cluster', y='LogP', data=cluster_properties)
plt.title('LogP Distribution by Cluster')
plt.xlabel('Cluster')
plt.ylabel('LogP')

plt.tight_layout()
plt.show()

In [None]:
# 10. Bonus: Find representative molecules from each cluster
def find_representative_molecules(cluster_properties, original_smiles, n=3):
    """Find n molecules closest to each cluster centroid."""
    representatives = {}
    for cluster_id in np.unique(cluster_properties['Cluster']):
        # Get indices of molecules in this cluster
        cluster_indices = np.where(cluster_properties['Cluster'] == cluster_id)[0]
        # Get subset of latent vectors for this cluster
        cluster_vectors = latent_vectors[cluster_indices]
        # Get centroid
        centroid = np.mean(cluster_vectors, axis=0)
        # Calculate distances to centroid
        distances = np.linalg.norm(cluster_vectors - centroid, axis=1)
        # Get indices of n closest molecules
        closest_indices = cluster_indices[np.argsort(distances)[:n]]
        # Get SMILES of representative molecules
        rep_smiles = [original_smiles[i] for i in closest_indices]
        representatives[cluster_id] = rep_smiles
    return representatives

representative_mols = find_representative_molecules(cluster_properties, original_smiles)
print("\nRepresentative molecules from each cluster:")
for cluster_id, smiles_list in representative_mols.items():
    print(f"\nCluster {cluster_id}:")
    for smiles in smiles_list:
        print(f"  {smiles}")


## Step 3.3: Generate New Molecules

In [None]:
Tasks:

Sample new latent vectors.

Decode them using your decoder.

Convert token sequences back to SMILES.

Visualize or validate if they are valid molecules.

Goal: Understand generative capability — how plausible are the outputs?

In [None]:
# 1. Function to sample from latent space
def sample_latent_space(n_samples, latent_dim=64):
    """Sample random points from the latent space"""
    # Standard normal distribution sampling
    z_sample = np.random.normal(0, 1, size=(n_samples, latent_dim))
    return z_sample

# Generate samples from latent space -> latent vectors
n_samples = 100
latent_samples = sample_latent_space(n_samples, latent_dim=64)
latent_samples

#latent_dim = 64
#
# latent_samples = sample_latent_space(n_samples, latent_dim)


In [None]:
# Sample from latent space
n_samples = 100
latent_dim = 64  # Adjust if your latent dimension is different
max_length = 100

# Improved decoder function for your specific architecture
def decode_latent_vectors(vae_model, z_vectors, max_length):
    """Decode latent vectors to token sequences"""
    # Direct decoding using the decoder component
    # This assumes your decoder can take the latent vectors directly
    output_tokens = vae_model.decoder(z_vectors)

    # Get the most likely token for each position
    # output_tokens shape should be (batch_size, seq_length, vocab_size)
    token_indices = np.argmax(output_tokens, axis=-1)

    # Truncate to max_length if needed
    if token_indices.shape[1] > max_length:
        token_indices = token_indices[:, :max_length]

    return token_indices

# Run the decoding
decoded_sequences = decode_latent_vectors(vae, latent_samples, max_length)
decoded_sequences

In [None]:
# # 2. Function to decode latent vectors back to token sequences
# def decode_latent_vectors(decoder_model, z_vectors):
#     """Decode latent vectors to token sequences"""
#     # Use the decoder to generate token probabilities
#     token_probs = decoder_model.predict(z_vectors)

#     # Get the most likely token for each position
#     token_indices = np.argmax(token_probs, axis=-1)
#     return token_indices

# # Decode the latent vectors using your decoder
# # Since we have the full VAE model, we need to use just the decoder part
# decoded_sequences = decode_latent_vectors(vae.decoder, latent_samples)
# decoded_sequences


In [None]:
# 3. Function to convert token indices back to SMILES strings
# def indices_to_smiles(token_indices, idx_to_token):
#     """Convert token indices back to SMILES strings"""
#     smiles_list = []

#     for seq in token_indices:
#         # Convert indices to tokens
#         tokens = [idx_to_token.get(idx, '') for idx in seq if idx != 0]  # Skip padding tokens

#         # Join tokens to form SMILES string
#         smiles = ''.join(tokens)

#         # Find where the SMILES string might end (if there's a termination token)
#         # Adjust this depending on your tokenization scheme
#         if '\n' in smiles:
#             smiles = smiles.split('\n')[0]

#         smiles_list.append(smiles)

#     return smiles_list


# 3. Improved function to convert token indices to SMILES
def indices_to_smiles(token_indices, idx_to_token, pad_token=0):
    """Convert token indices back to SMILES strings with better handling"""
    smiles_list = []

    for seq in token_indices:
        # Filter out padding and end tokens
        valid_indices = [idx for idx in seq if idx != pad_token]

        # Convert indices to tokens
        tokens = [idx_to_token.get(idx, '') for idx in valid_indices]

        # Join tokens - handling depends on your tokenization approach
        if isinstance(tokens[0], str) and len(tokens[0]) == 1:
            # Character-level tokenization
            smiles = ''.join(tokens)
        else:
            # Token-level tokenization (with spaces between tokens)
            smiles = ''.join(tokens)

        # Check for termination token and truncate
        if '\n' in smiles:
            smiles = smiles.split('\n')[0]

        smiles_list.append(smiles)

    return smiles_list


# def indices_to_smiles(token_indices, idx_to_token):
#     """Convert token indices back to SMILES strings"""
#     smiles_list = []

#     for seq in token_indices:
#         smiles = ""
#         for idx in seq:
#             if idx != 0:  # Skip padding tokens
#                 token = idx_to_token.get(idx, '')
#                 # Add space if it's in the vocabulary and the previous token was not a space
#                 if token == ' ' and smiles and smiles[-1] != ' ':
#                     smiles += token
#                 elif token != ' ':
#                     smiles += token

#         # Find where the SMILES string might end (if there's a termination token)
#         if '\n' in smiles:
#             smiles = smiles.split('\n')[0]

#         smiles_list.append(smiles)

#     return smiles_list

# First, create the inverse vocabulary (index to token mapping)
idx_to_token = {idx: token for token, idx in vocab.items()}

# Convert token indices back to SMILES strings
generated_smiles = indices_to_smiles(decoded_sequences, idx_to_token)
generated_smiles[:9]


In [None]:
# # To generate new SMILES

# def generate_smiles(model, latent_dim, char_to_index, index_to_char, max_length):
#     # Sample a random point in the latent space
#     z = tf.random.normal(shape=(1, latent_dim))

#     # Initial input token (typically start token or padding)
#     current_token = np.zeros((1, 1))  # Use your start token index here

#     generated_tokens = []

#     # Generate tokens sequentially
#     for i in range(max_length):
#         # Predict the next token probabilities
#         predictions = model.decoder([z, current_token])

#         # Sample from the predictions
#         sampled_token_index = np.argmax(predictions[0, i, :])

#         # Break if end token is generated
#         if sampled_token_index == char_to_index.get('\n', 0):  # Or whatever your end token is
#             break

#         # Add the token to our generated sequence
#         generated_tokens.append(index_to_char[sampled_token_index])

#         # Update current token for next prediction if needed
#         if model.decoder.stateful:
#             current_token = np.array([[sampled_token_index]])

#     # Join tokens into a SMILES string
#     generated_smiles = ''.join(generated_tokens)
#     return generated_smiles

# # Convert token indices back to SMILES strings
# generated_smiles = indices_to_smiles(decoded_sequences, idx_to_token)
# generated_smiles[:9]


In [None]:


# # 3. Improved function to convert token indices to SMILES
# def indices_to_smiles(token_indices, idx_to_token, pad_token=0):
#     """Convert token indices back to SMILES strings with better handling"""
#     smiles_list = []

#     for seq in token_indices:
#         # Filter out padding and end tokens
#         valid_indices = [idx for idx in seq if idx != pad_token]

#         # Convert indices to tokens
#         tokens = [idx_to_token.get(idx, '') for idx in valid_indices]

#         # Join tokens - handling depends on your tokenization approach
#         if isinstance(tokens[0], str) and len(tokens[0]) == 1:
#             # Character-level tokenization
#             smiles = ''.join(tokens)
#         else:
#             # Token-level tokenization (with spaces between tokens)
#             smiles = ''.join(tokens)

#         # Check for termination token and truncate
#         if '\n' in smiles:
#             smiles = smiles.split('\n')[0]

#         smiles_list.append(smiles)

#     return smiles_list

In [None]:
# 4. Function to check validity of generated SMILES
def check_validity(smiles_list):
    """Check if generated SMILES strings are valid molcules"""
    valid_mols = []
    valid_smiles = []

    for smiles in smiles_list:
        mol = Chem.MolFromSmiles(smiles)
        if mol is not None:
            valid_mols.append(mol)
            valid_smiles.append(smiles)

    print(f"Generated {len(smiles_list)} SMILES strings")
    print(f"Valid molecules: {len(valid_mols)} ({len(valid_mols)/len(smiles_list)*100:.2f}%)")

    return valid_mols, valid_smiles

# Check validity and get valid molecules
valid_mols, valid_smiles = check_validity(generated_smiles)

In [None]:
# 5. Visualize some of the valid molecules
def visualize_molecules(mols, n_per_row=5, n_rows=4):
    """Display a grid of molecules"""
    # Take up to n_per_row * n_rows molecules
    mols_to_display = mols[:n_per_row * n_rows]

    if len(mols_to_display) == 0:
        print("No valid molecules to display!")
        return

    # Create a molecule grid
    img = Draw.MolsToGridImage(
        mols_to_display,
        molsPerRow=n_per_row,
        subImgSize=(200, 200),
        legends=[f"Mol {i+1}" for i in range(len(mols_to_display))]
    )

    # Display the image
    plt.figure(figsize=(15, 12))
    plt.imshow(img)
    plt.axis('off')
    plt.title("Generated Valid Molecules")
    plt.show()

# Visualize the valid molecules
visualize_molecules(valid_mols)


In [None]:

# 6. Calculate basic properties of valid molecules
def calculate_properties(mols):
    """Calculate some basic molecular properties"""
    from rdkit.Chem import Descriptors

    properties = {
        'MW': [],       # Molecular Weight
        'LogP': [],     # Partition coefficient
        'HBA': [],      # Hydrogen Bond Acceptors
        'HBD': [],      # Hydrogen Bond Donors
        'RotBonds': [], # Rotatable Bonds
        'Rings': []     # Ring Count
    }

    for mol in mols:
        properties['MW'].append(Descriptors.MolWt(mol))
        properties['LogP'].append(Descriptors.MolLogP(mol))
        properties['HBA'].append(Descriptors.NumHAcceptors(mol))
        properties['HBD'].append(Descriptors.NumHDonors(mol))
        properties['RotBonds'].append(Descriptors.NumRotatableBonds(mol))
        properties['Rings'].append(Descriptors.RingCount(mol))

    return properties

# Calculate properties of the generated molecules
generated_properties = calculate_properties(valid_mols)


In [None]:

# 7. Compare to training data properties
# Assuming you have access to the original zinc_data
original_mols = [Chem.MolFromSmiles(smiles) for smiles in original_smiles]
original_valid_mols = [mol for mol in original_mols if mol is not None]
original_properties = calculate_properties(original_valid_mols)

# 8. Plot distribution comparisons
def plot_property_comparison(original_props, generated_props):
    """Plot histograms comparing original and generated molecule properties"""
    properties = ['MW', 'LogP', 'HBA', 'HBD', 'RotBonds', 'Rings']
    fig, axes = plt.subplots(3, 2, figsize=(15, 12))
    axes = axes.flatten()

    for i, prop in enumerate(properties):
        ax = axes[i]
        ax.hist(original_props[prop], alpha=0.5, bins=20, label='Training Data')
        ax.hist(generated_props[prop], alpha=0.5, bins=20, label='Generated')
        ax.set_title(f'{prop} Distribution')
        ax.set_xlabel(prop)
        ax.set_ylabel('Count')
        ax.legend()

    plt.tight_layout()
    plt.show()

# Plot the property comparisons
plot_property_comparison(original_properties, generated_properties)

# 9. Advanced Analysis: Chemical Similarity
def analyze_similarity(original_mols, generated_mols, sample_size=100):
    """Analyze chemical similarity between generated and original molecules"""
    from rdkit import DataStructs
    from rdkit.Chem import AllChem
    import random

    # If we have too many molecules, sample them
    if len(original_mols) > sample_size:
        original_sample = random.sample(original_mols, sample_size)
    else:
        original_sample = original_mols

    if len(generated_mols) > sample_size:
        generated_sample = random.sample(generated_mols, sample_size)
    else:
        generated_sample = generated_mols

    # Calculate Morgan fingerprints for all molecules
    original_fps = [AllChem.GetMorganFingerprintAsBitVect(mol, 2) for mol in original_sample]
    generated_fps = [AllChem.GetMorganFingerprintAsBitVect(mol, 2) for mol in generated_sample]

    # Calculate all pairwise similarities between generated and original
    similarities = []
    for gen_fp in generated_fps:
        for orig_fp in original_fps:
            sim = DataStructs.TanimotoSimilarity(gen_fp, orig_fp)
            similarities.append(sim)

    # Calculate internal similarities within generated set
    internal_similarities = []
    for i in range(len(generated_fps)):
        for j in range(i+1, len(generated_fps)):
            sim = DataStructs.TanimotoSimilarity(generated_fps[i], generated_fps[j])
            internal_similarities.append(sim)

    # Plot similarity distributions
    plt.figure(figsize=(10, 6))
    plt.hist(similarities, alpha=0.5, bins=20, label='Gen-to-Original')
    plt.hist(internal_similarities, alpha=0.5, bins=20, label='Gen-to-Gen')
    plt.title('Chemical Similarity Distributions (Tanimoto)')
    plt.xlabel('Tanimoto Similarity')
    plt.ylabel('Count')
    plt.legend()
    plt.show()

    return np.mean(similarities), np.mean(internal_similarities)

# Calculate and plot similarity metrics
avg_cross_sim, avg_internal_sim = analyze_similarity(original_valid_mols, valid_mols)
print(f"Average similarity to training data: {avg_cross_sim:.3f}")
print(f"Average internal similarity: {avg_internal_sim:.3f}")

# # 10. Latent Space Interpolation between molecules
# def interpolate_molecules(vae_model, start_smiles, end_smiles, n_steps=10):
#     """Generate molecules by interpolating in latent space between two SMILES strings"""
#     # Tokenize both SMILES
#     start_tokens = tokenize_smiles(start_smiles)
#     end_tokens = tokenize_smiles(end_smiles)

#     # Encode to token indices
#     start_indices = [vocab[token] for token in start_tokens]
#     end_indices = [vocab[token] for token in end_tokens]

#     # Pad sequences
#     start_padded = keras.preprocessing.sequence.pad_sequences([start_indices], maxlen=max_len, padding='post', value=0)
#     end_padded = keras.preprocessing.sequence.pad_sequences([end_indices], maxlen=max_len, padding='post', value=0)

#     # Get latent vectors
#     start_latent = generate_latent_vectors(vae, start_padded)
#     end_latent = generate_latent_vectors(vae, end_padded)

#     # Create interpolations
#     alphas = np.linspace(0, 1, n_steps)
#     interp_latent = np.array([(1-alpha)*start_latent[0] + alpha*end_latent[0] for alpha in alphas])

#     # Decode interpolated vectors
#     interp_sequences = decode_latent_vectors(vae_model, interp_latent, max_length)
#     interp_smiles = indices_to_smiles(interp_sequences, idx_to_token)

#     # Convert to molecules
#     interp_mols = []
#     valid_interp_smiles = []
#     for smiles in interp_smiles:
#         mol = Chem.MolFromSmiles(smiles)
#         if mol is not None:
#             interp_mols.append(mol)
#             valid_interp_smiles.append(smiles)

#     return interp_mols, valid_interp_smiles

# # Select two valid SMILES from the training data to interpolate between
# if len(original_smiles) >= 2:
#     start_smiles = original_smiles[0]
#     end_smiles = original_smiles[-1]

#     interp_mols, interp_smiles = interpolate_molecules(vae, start_smiles, end_smiles, n_steps=8)

#     # Visualize the interpolation pathway
#     if len(interp_mols) > 0:
#         img = Draw.MolsToGridImage(
#             interp_mols,
#             molsPerRow=4,
#             subImgSize=(200, 200),
#             legends=[f"Step {i}" for i in range(len(interp_mols))]
#         )

#         plt.figure(figsize=(15, 8))
#         plt.imshow(img)
#         plt.axis('off')
#         plt.title("Molecule Interpolation in Latent Space")
#         plt.show()

#         # Print the SMILES
#         print("Interpolation SMILES:")
#         for i, smiles in enumerate(interp_smiles):
#             print(f"Step {i}: {smiles}")
#     else:
#         print("No valid molecules found in the interpolation path.")

# ** HELPFUL LINKS **

In [None]:
1. https://github.com/pyg-team/pytorch_geometric/discussions/3451

2. https://github.com/wengong-jin/icml18-jtnn/tree/master/data/zinc

3. https://github.com/wengong-jin/icml18-jtnn/blob/28ed03fcb3f0a79f44f73eefc0bcad613ea39167/jtnn/mpn.py#L26-L31

4. https://github.com/wengong-jin/icml18-jtnn/blob/master/README.md

5. https://docs.nvidia.com/bionemo-framework/1.10/notebooks/ZINC15-data-preprocessing.html

In [None]:
print('a')