<a href="https://colab.research.google.com/github/carlosmatherson/SULI-Project/blob/main/TgGCN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup


Install Spektral and RDKit.

In [7]:
#!pip install spektral
#!pip install rdkit

Import necessary packages and functions.

In [8]:
# general tools
import numpy as np
import pandas as pd

# RDkit
from rdkit import Chem
from rdkit.Chem.rdmolops import GetAdjacencyMatrix

# Spektral
from spektral.utils.sparse import reorder
from spektral.data.graph import Graph
from spektral.data import Dataset, BatchLoader
from spektral.layers import GCNConv, GlobalSumPool

# TensorFlow & Keras
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import RootMeanSquaredError

# Featurization

Onehot Encoding: Maps input elements x which are not in the permitted list to the last element of the permitted list.

In [9]:
# Onehot Encoding
def one_hot_encoding(x, permitted_list):

    if x not in permitted_list:
        x = permitted_list[-1]

    binary_encoding = [int(boolean_value) for boolean_value in list(map(lambda s: x == s, permitted_list))]

    return binary_encoding

Atom Featurization: Takes an RDKit atom object as input and gives a 1d-numpy array of atom features as output.

In [10]:
# Atom Featurization
def get_atom_features(atom, 
                      use_chirality = True, 
                      hydrogens_implicit = True):

    # define list of permitted atoms
    
    permitted_list_of_atoms =  ['C','N','O','S','F','Si','P','Cl','Br','Mg','Na','Ca','Fe','As','Al','I', 'B','V','K','Tl','Yb','Sb','Sn','Ag','Pd','Co','Se','Ti','Zn', 'Li','Ge','Cu','Au','Ni','Cd','In','Mn','Zr','Cr','Pt','Hg','Pb','Unknown']
    
    if hydrogens_implicit == False:
        permitted_list_of_atoms = ['H'] + permitted_list_of_atoms
    
    # compute atom features
    
    atom_type_enc = one_hot_encoding(str(atom.GetSymbol()), permitted_list_of_atoms)
    
    n_heavy_neighbors_enc = one_hot_encoding(int(atom.GetDegree()), [0, 1, 2, 3, 4, "MoreThanFour"])
    
    formal_charge_enc = one_hot_encoding(int(atom.GetFormalCharge()), [-3, -2, -1, 0, 1, 2, 3, "Extreme"])
    
    hybridisation_type_enc = one_hot_encoding(str(atom.GetHybridization()), ["S", "SP", "SP2", "SP3", "SP3D", "SP3D2", "OTHER"])
    
    is_in_a_ring_enc = [int(atom.IsInRing())]
    
    is_aromatic_enc = [int(atom.GetIsAromatic())]
    
    atomic_mass_scaled = [float((atom.GetMass() - 10.812)/116.092)]
    
    vdw_radius_scaled = [float((Chem.GetPeriodicTable().GetRvdw(atom.GetAtomicNum()) - 1.5)/0.6)]
    
    covalent_radius_scaled = [float((Chem.GetPeriodicTable().GetRcovalent(atom.GetAtomicNum()) - 0.64)/0.76)]

    atom_feature_vector = atom_type_enc + n_heavy_neighbors_enc + formal_charge_enc + hybridisation_type_enc + is_in_a_ring_enc + is_aromatic_enc + atomic_mass_scaled + vdw_radius_scaled + covalent_radius_scaled
                                    
    if use_chirality == True:
        chirality_type_enc = one_hot_encoding(str(atom.GetChiralTag()), ["CHI_UNSPECIFIED", "CHI_TETRAHEDRAL_CW", "CHI_TETRAHEDRAL_CCW", "CHI_OTHER"])
        atom_feature_vector += chirality_type_enc
    
    if hydrogens_implicit == True:
        n_hydrogens_enc = one_hot_encoding(int(atom.GetTotalNumHs()), [0, 1, 2, 3, 4, "MoreThanFour"])
        atom_feature_vector += n_hydrogens_enc

    return np.array(atom_feature_vector)

Bond Featurization: Takes an RDKit bond object as input and gives a 1d-numpy array of bond features as output.

In [11]:
# Bond Featurisation
def get_bond_features(bond, 
                      use_stereochemistry = True):

    permitted_list_of_bond_types = [Chem.rdchem.BondType.SINGLE, Chem.rdchem.BondType.DOUBLE, Chem.rdchem.BondType.TRIPLE, Chem.rdchem.BondType.AROMATIC]

    bond_type_enc = one_hot_encoding(bond.GetBondType(), permitted_list_of_bond_types)
    
    bond_is_conj_enc = [int(bond.GetIsConjugated())]
    
    bond_is_in_ring_enc = [int(bond.IsInRing())]
    
    bond_feature_vector = bond_type_enc + bond_is_conj_enc + bond_is_in_ring_enc
    
    if use_stereochemistry == True:
        stereo_type_enc = one_hot_encoding(str(bond.GetStereo()), ["STEREOZ", "STEREOE", "STEREOANY", "STEREONONE"])
        bond_feature_vector += stereo_type_enc

    return np.array(bond_feature_vector)

Dablander, Markus. How to Turn a SMILES String into a Molecular Graph for Pytorch Geometric | Oxford Protein Informatics Group. https://www.blopig.com/blog/2022/02/how-to-turn-a-smiles-string-into-a-molecular-graph-for-pytorch-geometric/. Accessed 11 July 2022.


# Create Dataset

Subclass the Dataset class to standardize how graph datasets are represented in Spektral. Here, you can choose the endpoint to work with.

In [12]:
class MyDataset(Dataset):
  def read(self):
    polydata = pd.read_csv("https://raw.githubusercontent.com/carlosmatherson/SULI-Project/main/SMILES_Density_Tg_Mt.csv")
    graph_list = makeGraphObjList(polydata["SMILES"], polydata["Tg"])
    return graph_list

Define `makeGraphObjList` to create Spektral graph dataset from smiles and labels. The function takes a list of SMILES strings `x_smiles = [smiles_1, smiles_2, ....]` and a list of numerial labels for the SMILES strings `y = [y_1, y_2, ...]` as inputs. The output is a list of spektral.data.graph.Graph objects that can readily be used for machine learning, `graph_list = [G_1, G_2, ...]`. 






In [13]:
def makeGraphObjList(x_smiles, y):
    
    graph_list = []

    for (smiles, y_val) in zip(x_smiles, y):
        
        # convert SMILES to RDKit mol object
        mol = Chem.MolFromSmiles(smiles)
        
        # get feature dimensions
        n_nodes = mol.GetNumAtoms()
        n_edges = 2*mol.GetNumBonds()
        unrelated_smiles = "O=O"
        unrelated_mol = Chem.MolFromSmiles(unrelated_smiles)
        n_node_features = len(get_atom_features(unrelated_mol.GetAtomWithIdx(0)))
        n_edge_features = len(get_bond_features(unrelated_mol.GetBondBetweenAtoms(0,1)))

        # construct adjacency matrix (n_nodes, n_nodes), full matrix
        A = np.array(GetAdjacencyMatrix(mol)) # full matrix

        # construct edge index array Ei of shape (n_edges, 2), sparse matrix
        Ei = np.transpose(np.nonzero(A)) # vertical format
        (rows, cols) = np.transpose(Ei)
        #Ei = np.stack([rows, cols])

        # construct node feature matrix Xn of shape (n_nodes, n_node_features)
        X = np.zeros((n_nodes, n_node_features))
        for atom in mol.GetAtoms():
            X[atom.GetIdx(), :] = get_atom_features(atom)
        
        # construct edge feature array Ef of shape (n_edges, n_edge_features)
        E = np.zeros((n_edges, n_edge_features))
        for (k, (i,j)) in enumerate(zip(rows, cols)):
            E[k] = get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j)))
        reorder(Ei, E)
        
        # construct label tensor
        Y = np.array([y_val])

        # construct Spektral graph object and append to dataset (list)
        graph_list.append(Graph(x=X, a=A, e=E, y=Y))
        
    return graph_list 

# Load Dataset

# Graph Convolutional Network

Hyperparameters

In [14]:
# train/test split
idxs = np.random.permutation(len(data))
split = int(0.7 * len(data))
idx_tr, idx_te = np.split(idxs, [split])
dataset_tr, dataset_te = data[idx_tr], data[idx_te]

NameError: ignored

In [167]:
# parameters
F = data.n_node_features  # Dimension of node features, 79
S = data.n_edge_features  # Dimension of edge features, 10
n_out = data.n_labels  # Dimension of the target

# config
learning_rate = 1e-3  # Learning rate
epochs = 50  # Number of training epochs
batch_size = 15  # Batch size

In [169]:
#model
class myGCN(Model):
    def __init__(self):
        super().__init__()
        self.conv1 = GCNConv(100, activation="relu")
        self.conv2 = GCNConv(100, activation="relu")
        self.conv3 = GCNConv(100, activation="relu")
        self.conv4 = GCNConv(100, activation="relu")
        self.conv5 = GCNConv(100, activation="relu")
        self.conv6 = GCNConv(100, activation="relu")
        self.global_pool = GlobalSumPool()
        self.dense = Dense(n_out)
        
    def call(self, inputs):
        x, a = inputs[0], inputs[1]
        x = self.conv1([x, a])
        x = self.conv2([x, a])
        output = self.global_pool(x)
        output = self.dense(output)

        return output


model = myGCN()
optimizer = Adam(learning_rate)
model.compile(optimizer=optimizer, metrics=[RootMeanSquaredError()], loss="mse")

In [170]:
loader_tr = BatchLoader(dataset_tr, batch_size=batch_size, mask=True)
model.fit(loader_tr.load(), steps_per_epoch=loader_tr.steps_per_epoch, epochs=epochs)

Epoch 1/50


  np.random.shuffle(a)
  f"The adjacency matrix of dtype {a.dtype} is incompatible with the dtype "
  return py_builtins.overload_of(f)(*args)


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


KeyboardInterrupt: ignored

In [None]:
print("Testing model")
loader_te = BatchLoader(dataset_te, batch_size=batch_size, mask=True)
loss = model.evaluate(loader_te.load(), steps=loader_te.steps_per_epoch)
print("Done. Test loss: \tMSE \t\t RMSE \n \t\t{}".format(loss))

In [166]:
#load Data
data = MyDataset()