### Author: Jennifer Gao
### April 2025
The purpose of this notebook is to create a decoder/diffusion architecture that will be used in our final project.

In [2]:
! pip install phate



In [35]:
import torch
hidden_values_i = torch.load("insecticides_graphs.pt", weights_only=False)

In [1]:
# Install PyTorch Geometric and its dependencies
! pip install torch-scatter torch-sparse torch-cluster torch-spline-conv torch-geometric -f https://data.pyg.org/whl/torch-$(python -c "import torch; print(torch.__version__)")+$(python -c "import platform; print(platform.system().lower())")

# Install other required packages from your imports
! pip install numpy matplotlib pandas torch scikit-learn tqdm networkx phate

Looking in links: https://data.pyg.org/whl/torch-2.6.0+cu124+linux
Collecting torch-scatter
  Downloading torch_scatter-2.1.2.tar.gz (108 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m108.0/108.0 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting torch-sparse
  Downloading torch_sparse-0.6.18.tar.gz (209 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m210.0/210.0 kB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting torch-cluster
  Downloading torch_cluster-1.6.3.tar.gz (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.5/54.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting torch-spline-conv
  Downloading torch_spline_conv-1.2.2.tar.gz (25 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting torch-geometric
  Downloa

In [42]:
!pip install --upgrade numpy



In [48]:
!pip install pandas==2.2.2



In [34]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn import Linear
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
import tqdm

# NetworkX is a Python package used to create, manipulate, and mine graphs
import networkx as nx

# further libraries for working with graphs
import torch_geometric
from torch_geometric.nn import GCNConv, pool
from torch_geometric.utils import to_networkx
from torch_geometric.data import Data, Dataset
from torch_geometric.loader import DataLoader

# For visualization
import phate

## Hyperparameters

In [21]:
epochs = 200
learning_rate = 1e-4
batch_size = 32
p = 0.2     # probability for dropout
smoothness_lambda = 1e-2 # for calculating composite loss

**This is where we will give embeddings to Jennifer.**

Expected output: torch tensor of dimension $[(\textit{number of molecules}), (\textit{hidden dimension}=16)]$

In [22]:
hidden_values = torch.load("hidden_values_from_insecticides.pt", weights_only=False)

In [23]:
hidden_values_i = torch.load("insecticides_graphs.pt", weights_only=False)

In [24]:
hidden_values_p = torch.load("pesticides_graphs.pt", weights_only=False)

In [25]:
print(hidden_values)
print(hidden_values_i)
print(hidden_values_p)

tensor([[ 1.0212, -3.7411,  0.8089,  ...,  6.4987,  0.5759, -2.0453],
        [ 1.1803, -3.8429,  1.3700,  ...,  3.5151, -0.0610, -2.4764],
        [ 1.0348, -4.0212,  1.2255,  ...,  4.9978, -0.7030, -2.7249],
        ...,
        [ 1.5806, -6.6030,  1.0950,  ...,  7.9210, -0.6606, -4.5219],
        [ 1.3128, -5.6144,  1.1045,  ...,  9.0061, -0.0495, -3.4316],
        [ 1.2134, -4.3813,  0.9890,  ...,  6.1571, -0.4070, -3.0381]])
[Data(x=[10, 79], edge_index=[2, 20], edge_attr=[20, 10], y=[1]), Data(x=[10, 79], edge_index=[2, 20], edge_attr=[20, 10], y=[1]), Data(x=[10, 79], edge_index=[2, 20], edge_attr=[20, 10], y=[1]), Data(x=[5, 79], edge_index=[2, 8], edge_attr=[8, 10], y=[1]), Data(x=[13, 79], edge_index=[2, 24], edge_attr=[24, 10], y=[1]), Data(x=[18, 79], edge_index=[2, 38], edge_attr=[38, 10], y=[1]), Data(x=[15, 79], edge_index=[2, 30], edge_attr=[30, 10], y=[1]), Data(x=[14, 79], edge_index=[2, 30], edge_attr=[30, 10], y=[1]), Data(x=[16, 79], edge_index=[2, 34], edge_attr=[

## Generation
### 1. Molecule Encoder (Graph → Latent Space)

In [26]:
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv

In [27]:
# Define constants
input_features = 79  # From your data structure
atom_types = 10      # Example value, adjust based on your chemical elements
bond_types = 4       # Example value (single, double, triple, aromatic)

# Implement the missing GeometricScatteringTransform class
class GeometricScatteringTransform(nn.Module):
    def __init__(self):
        super(GeometricScatteringTransform, self).__init__()
        # A simple implementation - in practice this would be more complex
        self.conv = nn.Conv1d(32, 32, kernel_size=3, padding=1)

    def forward(self, x):
        # This is a placeholder implementation
        # In reality, this would implement a graph scattering transform
        batch_size = x.size(0)
        return x.view(batch_size, -1)  # Flatten for simplicity

In [28]:
class MoleculeEncoder(nn.Module):
    def __init__(self, hidden_dim=16):
        super(MoleculeEncoder, self).__init__()
        # Graph convolutional layers to transform molecular graph representation into features
        self.gcn1 = GCNConv(input_features, 64)
        self.gcn2 = GCNConv(64, 32)

        # Geometric scattering transform layer
        self.scattering = GeometricScatteringTransform()

        # MLP encoder
        self.mlp = nn.Sequential(
            nn.Linear(32, 24),
            nn.ReLU(),
            nn.Linear(24, hidden_dim)
        )

    def forward(self, molecule_graph):
        x = F.relu(self.gcn1(molecule_graph))
        x = F.relu(self.gcn2(x))
        x = self.scattering(x)
        latent = self.mlp(x)
        return latent

### 2. Diffusion Model (Latent Space → Noise → Latent Space)

In [29]:
class DiffusionModel(nn.Module):
    def __init__(self, hidden_dim=16, time_steps=1000):
        super(DiffusionModel, self).__init__()
        self.time_steps = time_steps
        self.hidden_dim = hidden_dim

        # Time embedding
        self.time_embed = nn.Sequential(
            nn.Linear(1, hidden_dim),
            nn.SiLU(),
            nn.Linear(hidden_dim, hidden_dim)
        )

        # U-Net architecture for noise prediction
        self.net = nn.Sequential(
            nn.Linear(hidden_dim*2, hidden_dim*4),
            nn.ReLU(),
            nn.Linear(hidden_dim*4, hidden_dim*2),
            nn.ReLU(),
            nn.Linear(hidden_dim*2, hidden_dim)
        )

        # Define noise scheduler
        self.beta_schedule = torch.linspace(0.0001, 0.02, time_steps)

    def forward(self, x, t):
        # Time embedding
        t_emb = self.time_embed(t.unsqueeze(-1).float())

        # Combine time information with molecular latent representation
        x_input = torch.cat([x, t_emb], dim=1)

        # Predict noise
        return self.net(x_input)

    def sample(self, n_samples, property_condition=None):
        """Generate molecular latent representations from noise"""
        device = next(self.parameters()).device

        # Sample from standard normal distribution
        x = torch.randn(n_samples, self.hidden_dim).to(device)
        beta_schedule = self.beta_schedule.to(device)

        # Gradual denoising process
        for i in range(self.time_steps-1, 0, -1):
            t = torch.ones(n_samples, dtype=torch.long).to(device) * i

            # Predict noise
            predicted_noise = self.forward(x, t)

            # Update samples
            alpha = 1 - beta_schedule[i]
            alpha_hat = torch.prod(torch.tensor([1 - beta_schedule[j] for j in range(i+1)]))
            beta = beta_schedule[i]

            if i > 1:
                noise = torch.randn_like(x)
            else:
                noise = torch.zeros_like(x)

            x = (1 / torch.sqrt(alpha)) * (x - ((beta / torch.sqrt(1 - alpha_hat)) * predicted_noise)) + torch.sqrt(beta) * noise

        return x

### 3. Molecule Decoder (Latent Space → Graph → SMILES)

In [30]:
class MoleculeDecoder(nn.Module):
    def __init__(self, hidden_dim=16, max_atoms=50):
        super(MoleculeDecoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.max_atoms = max_atoms
        # MLP decoder
        self.mlp = nn.Sequential(
            nn.Linear(hidden_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 64)
        )
        # Graph generation layers
        self.node_predictor = nn.Linear(64, atom_types)  # Predict atom types
        # Modified line - ensure input dimensions are correct
        # We need concatenation of two node features, so input is 64*2
        self.edge_predictor = nn.Linear(64*2, bond_types)  # Predict bond types

    def forward(self, latent):
        features = self.mlp(latent)
        # Generate molecular graph
        atom_logits = self.node_predictor(features)
        # Build adjacency matrix (simplified version)
        batch_size = latent.size(0)
        # Create expanded version of features so each sample has max_atoms nodes
        adj_features = features.unsqueeze(1).expand(-1, self.max_atoms, -1)  # [batch, max_atoms, 64]
        # Create transposed version
        adj_features_t = adj_features.clone()  # Use clone to avoid dimension issues

        # Now, for each pair of atoms, we need to concatenate their features
        # To do this, we'll process the upper and lower triangular parts of the adjacency matrix separately
        # Create an empty edge features tensor
        edge_features = torch.zeros(batch_size, self.max_atoms, self.max_atoms, 64*2).to(latent.device)

        # Fill in edge features (simplified version, should use more efficient method in practice)
        for i in range(self.max_atoms):
            for j in range(self.max_atoms):
                # Concatenate features of two nodes
                edge_features[:, i, j] = torch.cat([adj_features[:, i], adj_features[:, j]], dim=1)

        # Reshape to fit linear layer
        edge_features = edge_features.view(batch_size, self.max_atoms * self.max_atoms, 64*2)
        edge_logits = self.edge_predictor(edge_features)
        # Reshape back to original shape
        edge_logits = edge_logits.view(batch_size, self.max_atoms, self.max_atoms, bond_types)

        return atom_logits, edge_logits

    def decode_to_smiles(self, atom_logits, edge_logits):
        # Convert logits to atom and bond predictions
        atom_preds = torch.argmax(atom_logits, dim=-1)
        edge_preds = torch.argmax(edge_logits, dim=-1)

        # This is just a placeholder - in practice, you would use RDKit
        # to convert the predicted atom types and bond matrix to SMILES strings
        batch_size = atom_logits.size(0)
        dummy_smiles = ["C" * (i + 3) for i in range(batch_size)]  # Dummy SMILES

        return dummy_smiles

### 4. Property Predictor (Latent Space → Properties)

In [44]:
class PropertyPredictor(nn.Module):
    def __init__(self, hidden_dim=16):
        super(PropertyPredictor, self).__init__()
        self.insecticide_predictor = nn.Sequential(
            nn.Linear(hidden_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

        self.bioaccumulation_predictor = nn.Sequential(
            nn.Linear(hidden_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            # nn.Tanh()
        )

    def forward(self, latent):
        insecticide_score = self.insecticide_predictor(latent)
        # logp_prediction = self.bioaccumulation_predictor(latent) * 10
        logp_prediction = self.bioaccumulation_predictor(latent)
        return insecticide_score, logp_prediction

### 5. Complete Model Integration

In [45]:
class MoleculeGenerationModel(nn.Module):
    def __init__(self, molecule_num, hidden_dim=16):
        super(MoleculeGenerationModel, self).__init__()
        self.encoder = MoleculeEncoder(hidden_dim)
        self.diffusion = DiffusionModel(hidden_dim)
        self.decoder = MoleculeDecoder(hidden_dim)
        self.property_predictor = PropertyPredictor(hidden_dim)

        # Set up bottleneck for latent space regularization
        self.bottleneck = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh()
        )

    def encode(self, molecules):
        return self.encoder(molecules)

    def regularize_latent(self, latent, properties):
        """Regularize latent space based on molecular properties"""
        regularized = self.bottleneck(latent)
        return regularized

    def generate(self, n_samples, target_properties=None):
        """Generate new molecules"""
        # Sample from diffusion model
        latent_samples = self.diffusion.sample(n_samples, target_properties)

        # Decode to molecular graph
        atom_logits, edge_logits = self.decoder(latent_samples)

        # Convert to SMILES
        molecules = self.decoder.decode_to_smiles(atom_logits, edge_logits)

        # Predict properties
        insecticide_scores, logp_values = self.property_predictor(latent_samples)

        return molecules, insecticide_scores, logp_values

    def train_step(self, molecules, properties):
        """Training step"""
        # Encode molecules to latent space
        latent = self.encode(molecules)

        # Regularize latent space
        regularized_latent = self.regularize_latent(latent, properties)

        # Diffusion model training
        # noise = torch.randn_like(regularized_latent)
        # noisy_latent = regularized_latent + noise
        # predicted_noise = self.diffusion(noisy_latent)
        # loss = F.mse_loss(predicted_noise, noise)

        # Property prediction training
        pred_insecticide, pred_logp = self.property_predictor(regularized_latent)

        # Reconstruction training
        atom_logits, edge_logits = self.decoder(regularized_latent)

        # Various loss functions...

        return loss

In [46]:
# Process dataset
def prepare_data(graphs_data):
   # Convert PyG Data objects to tensors suitable for the model
   # This is a simplified example
   processed_data = []
   for graph in graphs_data:
       # Process each graph
       processed_data.append({
           'x': graph.x,
           'edge_index': graph.edge_index,
           'edge_attr': graph.edge_attr,
           'y': graph.y
       })
   return processed_data

# Prepare datasets
insecticide_data = prepare_data(hidden_values_i)
pesticide_data = prepare_data(hidden_values_p)

# Initialize model
model = MoleculeGenerationModel(
   molecule_num=len(insecticide_data),
   hidden_dim=16
)

# Example: Generate molecules
n_samples = 5
molecules, insecticide_scores, logp_values = model.generate(n_samples)
print(f"Generated {n_samples} molecules")
print(f"Insecticide scores: {insecticide_scores}")
print(f"LogP values: {logp_values}")

Generated 5 molecules
Insecticide scores: tensor([[1.],
        [1.],
        [1.],
        [1.],
        [1.]], grad_fn=<SigmoidBackward0>)
LogP values: tensor([[ -84.1885],
        [-123.6669],
        [ -88.7653],
        [ -97.6494],
        [ -98.1394]], grad_fn=<AddmmBackward0>)
