# TIF360 Project

Main source: https://www.kaggle.com/code/rmonge/predicting-molecule-properties-based-on-its-smiles/notebook

### Import packages

In [1]:
import os
import rdkit
from rdkit import Chem  # To extract information of the molecules
from rdkit.Chem import Draw  # To draw the molecules
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random

import torch
import torch_geometric
from torch_geometric.loader import DataLoader
import torch_geometric.utils as utils
import networkx as nx
from torch.nn import Linear
from torch_geometric.nn import global_mean_pool, GraphConv, GATConv, GCNConv
import torch.nn.functional as F

from sklearn.metrics import r2_score

  from .autonotebook import tqdm as notebook_tqdm


### Load data

In [2]:
df = pd.read_csv("../data/smiles_and_targets.csv")

### Load descriptors

In [3]:
#mol_descriptor = np.load("../data/mol_descriptors.npy")

# Investigate Neural Networks

## Graph Neural Networks

#### Convert data to graphs

In [4]:
# import packages
from rdkit.Chem import GetAdjacencyMatrix
import torch
from torch_geometric.data import Data
from torch.utils.data import DataLoader
from rdkit.Chem import Descriptors
from rdkit.ML.Descriptors import MoleculeDescriptors

In [5]:
def one_hot_encoding(x, permitted_list):
    if x not in permitted_list:
        x = permitted_list[-1]  # If the atom is not in the list, get "Unknown"
        
    binary_encoding = [int(boolean) for boolean in list(map(lambda s: x==s, permitted_list))]
    
    return binary_encoding    

Atom featurisation\
Currently generates ca. 80 node features

In [6]:
def get_atom_features(atom, use_chirality = True, hydrogens_implicit = True):
    # list of permitted atoms
    permitted_atom_list = ['C','N','O','S','F','Si','P','Cl','Br','Mg','Na','Ca',
                           'Fe','As','Al','I', 'B','V','K','Tl','Yb','Sb','Sn','Ag','Pd','Co',
                           'Se','Ti','Zn', 'Li','Ge','Cu','Au','Ni','Cd','In','Mn','Zr','Cr','Pt',
                           'Hg','Pb','Unknown']
    
    atom_type_enc = one_hot_encoding(str(atom.GetSymbol()), permitted_atom_list)
    
    n_heavy_neighbors = one_hot_encoding(int(atom.GetDegree()), [0,1,2,3,4,"MoreThanFour"])
    
    formal_charge_enc = one_hot_encoding(int(atom.GetFormalCharge()), [-3, -2, -1, 0, 1, 2, 3, 'Extreme'])
    
    hybridisation_type_enc = one_hot_encoding(str(atom.GetHybridization()), ["S", "SP", "SP2", "SP3", "SP3D", "SP3D2", "OTHER"])
    
    is_in_ring_enc = one_hot_encoding(int(atom.IsInRing()), [0, 1])
    
    is_aromatic_enc = one_hot_encoding(int(atom.GetIsAromatic()), [0, 1])
    
    atomic_mass_scaled = [float(atom.GetMass() - 10.812)/116.092] # (?) replace 10.812 with mean the and 116.092 with std
    
    vdw_radius_scaled = [float((Chem.GetPeriodicTable().GetRvdw(atom.GetAtomicNum()) - 1.5)/0.6)] # (?) replace 1.5 with mean the and 0.6 with std
    
    covalent_radius_scaled = [float((Chem.GetPeriodicTable().GetRcovalent(atom.GetAtomicNum()) - 0.64)/0.76)] # (?) replace 0.64 with mean the and 0.76 with std
                              
    atom_feature_vector = atom_type_enc + n_heavy_neighbors + formal_charge_enc + hybridisation_type_enc + is_in_ring_enc + is_aromatic_enc + atomic_mass_scaled + vdw_radius_scaled + covalent_radius_scaled
    
    if use_chirality:
        chirality_type_enc = one_hot_encoding(str(atom.GetChiralTag()), ["CHI_UNSPECIFIED", "CHI_TETRAHEDRAL_CW", "CHI_TETRAHEDRAL_CCW", "CHI_OTHER"])
        atom_feature_vector += chirality_type_enc
        
    if hydrogens_implicit:
        n_hydrogens_enc = one_hot_encoding(int(atom.GetTotalNumHs()), [0, 1, 2, 3, 4, "MoreThanFour"])
        atom_feature_vector += n_hydrogens_enc
        
    return np.array(atom_feature_vector) 

Bond Featurisation\
Currently generates ca. 10 edge features

In [7]:
def get_bond_features(bond, use_stereochemistry=True):
    permitted_bond_types = [Chem.rdchem.BondType.SINGLE, Chem.rdchem.BondType.DOUBLE, 
                            Chem.rdchem.BondType.TRIPLE, Chem.rdchem.BondType.AROMATIC]
    
    bond_type_enc = one_hot_encoding(str(bond.GetBondType()), permitted_bond_types)
    
    bond_is_conjugated_enc = [int(bond.GetIsConjugated())]
    
    bond_is_in_ring_enc = [int(bond.IsInRing())]
    
    bond_feature_vector = bond_type_enc + bond_is_conjugated_enc + bond_is_in_ring_enc
    
    if use_stereochemistry:
        stereo_type_enc = one_hot_encoding(str(bond.GetStereo()), ["STEREOZ", "STEREOE", "STEREOANY", "STEREONONE"])
        bond_feature_vector += stereo_type_enc
        
    return np.array(bond_feature_vector)

Define function to generate dataset of labeled Pytorch Geometric Graphs

In [8]:
def create_graph_dataset_from_smiles(x_smiles, y):
    ## Inputs:
    # x_smiles = [smiles_1, smiles_2, ...], smiles representation of molecules
    # y = [y_1, y_2, ...] list of numerical labels for each smiles string, here chemical properties
    
    # Outputs:
    # dataset = [data_1, data_2, ...] list of torch_geometric.data.Data objects representing molecular graphs
    
    dataset = []
    
    for (smiles, y_val) in zip(x_smiles, y):
        # convert smiles to molecular object
        mol = Chem.MolFromSmiles(smiles)
        
        # get feature dimensions
        n_nodes = mol.GetNumAtoms()
        n_edges = 2*mol.GetNumBonds() # each bond is represented twice in the adjacency matrix
        n_node_features = len(get_atom_features(mol.GetAtomWithIdx(0)))
        if n_nodes > 1:
            n_edge_features = len(get_bond_features(mol.GetBondBetweenAtoms(0,1)))
        else:
            n_edge_features = 0  # for single atom molecules -> no edges
        
        # construct node feature matrix X 
        X = np.zeros((n_nodes, n_node_features))
        
        for atom in mol.GetAtoms():
            X[atom.GetIdx(), :] = get_atom_features(atom)
        
        X = torch.tensor(X, dtype=torch.float)
        
        # construct edge index array E, shape = (2, n_edges)
        (rows, cols) = np.nonzero(GetAdjacencyMatrix(mol))
        torch_rows = torch.tensor(rows.astype(np.int64)).to(torch.long)
        torch_cols = torch.tensor(cols.astype(np.int64)).to(torch.long)
        E = torch.stack([torch_rows, torch_cols], dim=0)
        
        # construct edge feature matrix EF
        EF = np.zeros((n_edges, n_edge_features))       # Note: generates zero matrix if n_edges = n_edge_features = 0
        for (k, (i,j)) in enumerate(zip(rows, cols)):
            EF[k] = get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j)))
        EF = torch.tensor(EF, dtype=torch.float)
        
        # construct label/y tensor
        y_tensor = torch.tensor(np.array([y_val]), dtype=torch.float)
        
        # construct torch_geometric.data.Data object and append to dataset
        dataset.append(Data(x=X, edge_index=E, edge_attr=EF, y=y_tensor))
        
    return dataset
        

We use the above functions to create a dataset of molecular graphs from the smiles and labels corresponding to chemical properties

In [9]:
properties_names = ['A', 'B', 'C', 'mu', 'alfa', 'homo', 'lumo', 'gap', 'R²', 'zpve', 'U0', 'U', 'H', 'G', 'Cv']

x_smiles = df.smiles.values
y = df.loc[:, properties_names].values  # shape = (n_samples, n_properties)

dataset = create_graph_dataset_from_smiles(x_smiles, y[0:len(x_smiles), :])

In [10]:
print(len(dataset))

# Example entries
print(df.smiles.values[0])
print(dataset[0])
print(df.smiles.values[2])
print(dataset[2])
print(df.smiles.values[50])
print(dataset[50])

print(dataset[50].edge_attr)
print(dataset[50].x)


133796
C
Data(x=[1, 81], edge_index=[2, 0], edge_attr=[0, 0], y=[1, 15])
O
Data(x=[1, 81], edge_index=[2, 0], edge_attr=[0, 0], y=[1, 15])
N1C=CN=C1
Data(x=[5, 81], edge_index=[2, 10], edge_attr=[10, 10], y=[1, 15])
tensor([[0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.],
        [0., 0., 0., 1., 1., 1., 0., 0., 0., 1.]])
tensor([[0.0000, 1.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000,

Information of the graph dataset

In [11]:
print(f'Number of graphs (molecules): {len(dataset)}')
graph = dataset[50]
print('=================================================================================')
print(f'Properties of graph {50}, molecule smiles: {df.smiles.values[50]}')
print(f'Number of nodes: {dataset[50].x.shape[0]}')
print(f'Number of edges: {dataset[50].edge_index.shape[1]}')
print(f'Number of node features: {dataset[50].x.shape[1]}')
print(f'Number of edge features: {dataset[50].edge_attr.shape[1]}')
print(f'Number of properties: {dataset[50].y.shape[1]}')

Number of graphs (molecules): 133796
Properties of graph 50, molecule smiles: N1C=CN=C1
Number of nodes: 5
Number of edges: 10
Number of node features: 81
Number of edge features: 10
Number of properties: 15


Filterout data with no edge features defined (Like ex: CH4) (These causes problems down the line)

In [12]:


indexes_to_delete = []
for item in range(0,len(dataset)):
    if dataset[item].edge_attr.shape[1] == 0:
        indexes_to_delete.append(item)
    else:
        pass

indexes_to_delete.sort()

print("Number of none edge feature molecules: ", len(indexes_to_delete))

print("Before: ", len(dataset))

for item in range(0,len(indexes_to_delete)):
    print("Molecule to delete: ", df.smiles.values[indexes_to_delete[item]])
    #del dataset[indexes_to_delete[item] - item] 
    dataset.pop((indexes_to_delete[item] - item)) # -item since all future data points will have its index reduced by 1 for each deleted previous data point
print("After: ", len(dataset))

Number of none edge feature molecules:  3
Before:  133796
Molecule to delete:  C
Molecule to delete:  N
Molecule to delete:  O
After:  133793


Split data into train and test set

In [13]:
from torch_geometric.loader import DataLoader

# split the dataset into test and validation:
num_samples = len(dataset)

# Want to divide data randomly
random_indexes = np.array(random.sample(range(num_samples), num_samples)) # random.sample ensures no duplicates

train_data = [dataset[index] for index in random_indexes[int(.2 * num_samples ) :]] # 80%
test_data = [dataset[index] for index in random_indexes[: int(.2 * num_samples )]] # 20%

print(num_samples)
print(len(train_data))
print(len(test_data))

train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=True)

133793
107035
26758


### Main GNN

#### Model for all targets at once

GNN function

In [14]:
data_labels = dataset[50].y.shape[1]
data_features = dataset[50].x.shape[1]

class GNN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super().__init__()
        torch.manual_seed(12345)
        self.conv1 = GATConv(data_features, hidden_channels)
        self.conv2 = GATConv(hidden_channels, hidden_channels)
        self.conv3 = GATConv(hidden_channels, hidden_channels)
        self.conv4 = GATConv(hidden_channels, hidden_channels)
        self.conv5 = GATConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, hidden_channels)
        self.lin2 = Linear(hidden_channels, hidden_channels)
        self.lin3 = Linear(hidden_channels, data_labels)

    def forward(self, x, edge_index, edge_attr, batch): 
        x = self.conv1(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv2(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv3(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv4(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv5(x, edge_index, edge_attr)

        #Returns batch-wise graph-level-outputs by averaging node features across the node dimension, so that for a single graph G
        #its output is computed by
        x = global_mean_pool(x, batch) 
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.lin(x)
        x = F.dropout(x, p=0.2, training=self.training)
        x = F.relu(x)
        x = self.lin2(x)
        x = F.dropout(x, p=0.2, training=self.training)
        x = F.relu(x)
        x = self.lin3(x)
        
 
        return x
    

Train GNN

In [15]:
model = GNN(hidden_channels=256) 
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
criterion = torch.nn.MSELoss()

def train(data_in):
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(data_in.x, data_in.edge_index, data_in.edge_attr, data_in.batch)  # Perform a single forward pass.

      #Alt 1
      loss = criterion(out, data_in.y) 

      #Alt 2
      #loss = 0
      #for item in range(0,len(data_in.y[0,:])):
      #      loss += criterion(out[:,item], data_in.y[:,item]) 

      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test(data):
      all_test_r2 = []
      counter = -1    
      for data_in in data:
            counter += 1
            model.eval()
            out = model(data_in.x, data_in.edge_index, data_in.edge_attr, data_in.batch)

            # Caculate R2
            r2_score_var = []
            for item in range(0,data_in.y.shape[1]):
                  if item == 0:
                        r2_score_var = r2_score(data_in.y[:,item].detach().numpy(), out[:,item].detach().numpy())
                  else:
                        r2_score_var = np.vstack((r2_score_var,(r2_score(data_in.y[:,item].detach().numpy(), out[:,item].detach().numpy()))))

            if counter == 0:
                  all_test_r2 = r2_score_var
            else:
                  all_test_r2 = np.hstack((all_test_r2,r2_score_var))

      average_test_r2 = np.sum(all_test_r2,axis=1) / all_test_r2.shape[1]

      return average_test_r2

# Vectors to append accuracy to:
Train_r2 = []
Test_r2 = []

# Calculate accuracy before training 
Train_r2.append(test(train_loader))
Test_r2.append(test(test_loader))
print("Initial training R2: ", Train_r2[0])
print("Initial test R2: ", Test_r2[0])

print_r2_option = True
for epoch in range(1, 101):
      average_loss = []
      for data in train_loader:
            loss = train(data)
            average_loss.append(loss)
      print(f'Epoch: {epoch:03d}, Loss: {(sum(average_loss)/len(average_loss)):.4f}')

      if print_r2_option:

            temp_train_r2 = test(train_loader)
            Train_r2.append(temp_train_r2)

            temp_test_r2 = test(test_loader)
            Test_r2.append(temp_test_r2)

            print(f'Average Train R2: {temp_train_r2}')
            print(f'Average Test R2: {temp_test_r2:}')

KeyboardInterrupt: 

#### Model for just one target

GNN function

In [None]:
data_labels = 1
data_features = dataset[50].x.shape[1]

class GNN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super().__init__()
        torch.manual_seed(12345)
        self.conv1 = GATConv(data_features, hidden_channels)
        self.conv2 = GATConv(hidden_channels, hidden_channels)
        self.conv3 = GATConv(hidden_channels, hidden_channels)
        self.conv4 = GATConv(hidden_channels, hidden_channels)
        self.conv5 = GATConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, hidden_channels)
        self.lin2 = Linear(hidden_channels, hidden_channels)
        self.lin3 = Linear(hidden_channels, data_labels)

    def forward(self, x, edge_index, edge_attr, batch): 
        x = self.conv1(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv2(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv3(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv4(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.conv5(x, edge_index, edge_attr)

        #Returns batch-wise graph-level-outputs by averaging node features across the node dimension, so that for a single graph G
        #its output is computed by
        x = global_mean_pool(x, batch) 
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.lin(x)
        x = F.dropout(x, p=0.2, training=self.training)
        x = F.relu(x)
        x = self.lin2(x)
        x = F.dropout(x, p=0.2, training=self.training)
        x = F.relu(x)
        x = self.lin3(x)
        
 
        return x
    

Train GNN

In [None]:
model = GNN(hidden_channels=64) 
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
criterion = torch.nn.MSELoss()

def train(data_in):
      target = 0 # target index of interest
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(data_in.x, data_in.edge_index, data_in.edge_attr, data_in.batch)  # Perform a single forward pass.

      loss = criterion(out, data_in.y[:,target])  

      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test(data):
      target = 0 # target index of interest
      all_test_r2 = []  
      for data_in in data:
            model.eval()
            out = model(data_in.x, data_in.edge_index, data_in.edge_attr, data_in.batch)

            # Caculate R2
            r2_score_var = r2_score(data_in.y[:,target].detach().numpy(), out.detach().numpy())

            all_test_r2 .append(r2_score_var)

      average_test_r2 = np.sum(all_test_r2) / len(all_test_r2)

      return average_test_r2

# Vectors to append accuracy to:
Train_r2 = []
Test_r2 = []

# Calculate accuracy before training 
Train_r2.append(test(train_loader))
Test_r2.append(test(test_loader))
print("Initial training R2: ", Train_r2[0])
print("Initial test R2: ", Test_r2[0])

print_r2_option = True
for epoch in range(1, 101):
      average_loss = []
      for data in train_loader:
            loss = train(data)
            average_loss.append(loss)
      print(f'Epoch: {epoch:03d}, Loss: {(sum(average_loss)/len(average_loss)):.4f}')

      if print_r2_option:

            temp_train_r2 = test(train_loader)
            Train_r2.append(temp_train_r2)

            temp_test_r2 = test(test_loader)
            Test_r2.append(temp_test_r2)

            print(f'Average Train R2: {temp_train_r2}')
            print(f'Average Test R2: {temp_test_r2:}')

Initial training R2:  -7.49786089020006
Initial test R2:  -7.329587586502999


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch: 001, Loss: 5.9924
Average Train R2: 0.0028777230272099418
Average Test R2: 0.002441659361192309


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch: 002, Loss: 5.1882
Average Train R2: -0.0045602759162772614
Average Test R2: -0.006537056023025651


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch: 003, Loss: 5.1688
Average Train R2: -0.007242981300759531
Average Test R2: -0.009304122220448203


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch: 004, Loss: 5.1586
Average Train R2: -0.009144920442126211
Average Test R2: -0.008319962919350802


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch: 005, Loss: 5.1475
Average Train R2: -0.010778084401283363
Average Test R2: -0.010981890703993701


  return F.mse_loss(input, target, reduction=self.reduction)


KeyboardInterrupt: 

## Transformer