In [20]:
from rdkit import Chem
from rdkit.Chem.rdmolops import GetAdjacencyMatrix
from typing import Union, List, Type, Optional
import torch
import numpy as np
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from torch_geometric.nn import GCNConv, global_mean_pool, GATConv
from torch import nn, sigmoid
import torch.nn.functional as F
from torch.nn.functional import relu, dropout
from torch.optim import Adam, RMSprop
import matplotlib.pyplot as plt

In [3]:
lst_smiles=['C1=CC=C(C=C1)O','c1ccccc1']
m=Chem.MolFromSmiles(lst_smiles[0])
for atom in m.GetAtoms():
    print([atom.GetSymbol(),atom.GetDegree()])

['C', 2]
['C', 2]
['C', 2]
['C', 3]
['C', 2]
['C', 2]
['O', 1]


In [4]:
def one_hot_encoding(elem : str, allowable_elem : Union[List[str],List[int]])->List[bool]:
    
    if elem not in allowable_elem : elem = allowable_elem[-1]
    
    # [True if k==elem else False for k in allowable_elem]
    # note map is faster
    onehotvec=list(map(lambda dummy : dummy==elem,allowable_elem))
    
    return onehotvec

def get_atom_features(atom : Type[Chem.rdchem.Mol],
                  allowable_elem : List[str] =['C','N','O','S','F','Si','P','Cl','Br','Mg','Na',
                                               'Ca','Fe','As','Al','I', 'B','V','K','Tl','Yb',
                                               'Sb','Sn','Ag','Pd','Co','Se','Ti','Zn', 'Li','Ge',
                                               'Cu','Au','Ni','Cd','In','Mn','Zr','Cr','Pt','Hg','Pb','Unknown'], 
                  use_H_explicit : bool =False,
                  use_chirality : bool =False)->Optional[torch.Tensor]:
    '''
    function calcualtes node features
    args: 
        - atom : Mol object
        - allowable_elem : list of allowable elements
    return :
        - feats : node features
    '''
    # use H
    if use_H_explicit : allowable_elem = ['H'] + allowable_elem
    
    # map elements 
    elem_list_enc=one_hot_encoding(atom.GetSymbol(),allowable_elem)
    
    # get atom degree
    n_neighbors_enc = one_hot_encoding(atom.GetDegree(), [0, 1, 2, 3, 4])
    
    # formal charges
    formal_charge_enc = one_hot_encoding(atom.GetFormalCharge(), [-3, -2, -1, 0, 1, 2, 3])
    
    # hybridization
    hybridisation_type_enc = one_hot_encoding(str(atom.GetHybridization()), ["S", "SP", "SP2", "SP3", "SP3D", "SP3D2", "OTHER"])
    
    # atom belongs to a ring
    is_in_a_ring_enc = [int(atom.IsInRing())]
    
    # presence of aromatic ring
    is_aromatic_enc = [int(atom.GetIsAromatic())]
    
    # chirality and explicit H's
    
    # final encoding
    f_enc=elem_list_enc + formal_charge_enc + hybridisation_type_enc + is_in_a_ring_enc + is_aromatic_enc
    
    return np.array(f_enc).astype('float')

def get_bond_features(bond : Type[Chem.rdchem.Bond], use_stereochemistry : bool = True)->Optional[torch.Tensor]:
    
    # bond type only used common ones for organic molecules
    bond_type_enc=one_hot_encoding(str(bond.GetBondType()),['SINGLE', 'DOUBLE', 'TRIPLE','AROMATIC'])
    
    # conjugation
    bond_is_conj_enc = [int(bond.GetIsConjugated())]
    
    # is ring
    bond_is_in_ring_enc = [int(bond.IsInRing())]
    
    bond_feature_enc = bond_type_enc + bond_is_conj_enc + bond_is_in_ring_enc
    
    if use_stereochemistry == True: 
        bond_feature_enc.extend(one_hot_encoding(str(bond.GetStereo()), ["STEREOZ", "STEREOE", "STEREOANY", "STEREONONE"]))
    
    return np.array(bond_feature_enc).astype('float')
    
def gen_graph_dset(smiles,labels):
    # check num of nodses and edges
    ethane='CC'
    ethane=Chem.MolFromSmiles(ethane)
    num_node_features=len(get_atom_features(ethane.GetAtoms()[0]))
    num_edge_features=len(get_bond_features(ethane.GetBonds()[0]))

    data_list=[]
    print(len(labels))
    for smile, label in zip(smiles,labels):
        #print(smile)
        #print(label)
        mol=Chem.MolFromSmiles(smile)

        num_atoms=mol.GetNumAtoms()
        atom_features=np.zeros((num_atoms,num_node_features))

        num_bonds=2*mol.GetNumBonds()
        bond_features=np.zeros((num_bonds,num_edge_features))

        # node features
        for idx,atom in enumerate(mol.GetAtoms()):
            atom_features[idx,:]=get_atom_features(atom)

        # adjacency
        adj=np.array([*np.nonzero(GetAdjacencyMatrix(mol))])

        # bond features
        bond_features=np.array([get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j))) for (i,j) in zip(adj[0,:],adj[1,:])])
        #for idx,(i,j) in enumerate(zip(adj[0,:],adj[1,:])):
        #    bond_features[idx,:]=get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j)))

        # data
        data_list.append(Data(x=torch.tensor(atom_features,dtype=torch.float),
            edge_index=torch.tensor(adj,dtype=torch.long),
            edge_attr=torch.tensor(bond_features,dtype=torch.float),
            y=torch.tensor([label],dtype=torch.long)))
        
    return data_list

In [5]:
path=r'C:\Users\bdutta\work\pys\AI_algos\graphs\Biodegradability_Prediction_QSAR_GCN-main\data'
df = pd.read_csv(os.path.join(path,'All-Public_dataset_Mordred.csv'), low_memory=False)
df=df[['SMILES','Class']]
df.head()

Unnamed: 0,SMILES,Class
0,C=C(Br)c1ccccc1,0
1,BrC(Br)Br,0
2,BrC(Br)C(Br)Br,0
3,BrC(Br)c1ccccc1OCC1CO1,0
4,BrC1CCC(Br)C(Br)CCC(Br)C(Br)CCC1Br,0


In [6]:
# train & test loader
X_train, X_test, y_train, y_test = train_test_split(df['SMILES'].to_numpy(),df['Class'].to_numpy(), test_size=.2, shuffle=True, stratify=df['Class'], random_state=100)
print(X_train.shape)
print(X_test.shape)

dset_train=gen_graph_dset(smiles=X_train,labels=y_train)
dset_test=gen_graph_dset(smiles=X_test,labels=y_test)



(2264,)
(566,)
2264
566


In [7]:
# create dataloader for training
train_loader = DataLoader(dataset = dset_train, batch_size = X_train.shape[0])
test_loader = DataLoader(dataset = dset_test, batch_size = X_test.shape[0])

In [8]:
count=0
for data in train_loader:
    print(data)
    if count==2 : break
    count+=1
d1=data

DataBatch(x=[35102, 59], edge_index=[2, 70968], edge_attr=[70968, 10], y=[2264], batch=[35102], ptr=[2265])


In [9]:
d1=next(iter(train_loader))
d1.y

tensor([0, 1, 0,  ..., 0, 0, 1])

In [33]:
class GCN(nn.Module):
    def __init__(self,n_f=[59,24,16,8],num_hops=3,num_classes=2):
        
        super(GCN,self).__init__()
        lst=nn.ModuleList()
        for k in range(num_hops):
            lst.append(GCNConv(in_channels=n_f[k],out_channels=n_f[k+1]))
            #lst.append(nn.ReLU())
            
        self.lin1=nn.Linear(n_f[-1],num_classes) 
        self.conv=lst
    
    def forward(self, x, edge_idx, batch):
        # conv layers
        for count, layer in enumerate(self.conv):
            x = layer(x,edge_idx)
            x = F.relu(x)
            #if count == len(self.conv)-1 : x = F.relu(x)
            
        #readout
        x = global_mean_pool(x,batch=batch)
        
        #linear layers
        x = self.lin1(x)
        
        return x
'''    
def train(model,data_loader,criterion=nn.BCEWithLogitsLoss(),lr=1e-3):
    model.train()
    loss_lst=[]
    optimizer=Adam(model.parameters(),lr=lr)
    for epoch in range(epochs):
        for data in data_loader:
            # zeroing grads
            optimizer.zero_grad()
            # model out
            out = model(data.x, data.edge_index, data.batch)
            loss = criterion(out,data.y.reshape(-1,1))
            loss.backward()
            optimizer.step()
        print('epoch: {}, loss : {}'.format(epoch,loss.item()))
        loss_lst.append(loss.item())
    
    return loss_lst
'''

def train(model,data_loader,optimizer,criterion=nn.BCEWithLogitsLoss(),lr=1e-3):
    model.train()
    
    for data in data_loader:
        # zeroing grads
        optimizer.zero_grad()
        # model out
        out = model(data.x, data.edge_index, data.batch)
        #loss = criterion(out,data.y.reshape(-1,1))
        loss = criterion(out,data.y)
        loss.backward()
        optimizer.step()

    return loss.item()


def test(model,loader):
    model.eval()
    correct = 0
    for data in loader:  # Iterate in batches over the training/test dataset.
        out = model(data.x, data.edge_index, data.batch)  
        pred = out.argmax(dim=1)  # Use the class with highest probability.
        correct += int((pred == data.y).sum())  # Check against ground-truth labels.
    return correct / len(loader.dataset)  # Derive ratio of correct predictions.


class GAT(nn.Module):
    def __init__(self,n_f=[59,16,8],heads=4,num_classes=2):
        
        super(GAT,self).__init__()
        lst=nn.ModuleList()
        num_hops=len(n_f)
        for k in range(num_hops-1):
            print(k)
            if k ==0:
                lst.append(GATConv(in_channels=n_f[0],out_channels=n_f[1],heads=heads))
            else:
                lst.append(GATConv(in_channels=n_f[k]*heads,out_channels=n_f[k+1],heads=heads))
                
            #lst.append(nn.ReLU())
            
        self.lin1=nn.Linear(n_f[-1]*heads,num_classes) 
        self.conv=lst
    
    def forward(self, x, edge_idx, edge_attr, batch):
        # conv layers
        for count, layer in enumerate(self.conv):
            x = layer(x,edge_idx, edge_attr)
            x = F.relu(x)
            #if count == len(self.conv)-1 : x = F.relu(x)
            
        #readout
        x = global_mean_pool(x,batch=batch)
        
        #linear layers
        x = self.lin1(x)
        
        return x
    

In [35]:
gat=GAT()
gat

0
1


GAT(
  (lin1): Linear(in_features=32, out_features=2, bias=True)
  (conv): ModuleList(
    (0): GATConv(59, 16, heads=4)
    (1): GATConv(64, 8, heads=4)
  )
)

In [40]:
from torch_sparse import SparseTensor

In [42]:
isinstance(d1.edge_index,torch.LongTensor)

True

In [24]:

for count, layer in enumerate(self.conv):
    x = layer(d1.x,edge_idx, edge_attr)
    x = F.relu(x)

In [36]:
d2=gat(d1.x,d1.edge_index,d1.edge_attr)
d2=F.relu(d2)
#d2=gat(d1.x,d1.edge_index,d1.edge_attr)
d2.shape

TypeError: forward() missing 1 required positional argument: 'batch'

In [23]:
d2

tensor([[0.5577, 0.1485, 0.0000, 0.0000],
        [0.5577, 0.1485, 0.0000, 0.0000],
        [0.5577, 0.1485, 0.0000, 0.0000],
        ...,
        [0.6183, 0.0000, 0.0504, 0.0716],
        [0.6247, 0.0253, 0.0173, 0.0000],
        [0.6183, 0.0000, 0.0504, 0.0716]], grad_fn=<ReluBackward0>)

In [12]:
gcn=GCN(n_f=[59,24,16,8],num_hops=3)
out=gcn(d1.x,d1.edge_index,d1.batch)
print(out)
d1.y.reshape(-1,1)


tensor([[ 0.3120, -0.2698],
        [ 0.3220, -0.2585],
        [ 0.3188, -0.2686],
        ...,
        [ 0.3259, -0.2605],
        [ 0.3313, -0.2727],
        [ 0.3289, -0.2701]], grad_fn=<AddmmBackward0>)


tensor([[0],
        [1],
        [0],
        ...,
        [0],
        [0],
        [1]])

In [50]:
test(model=gcn,loader=test_loader)

0.6130742049469965

In [80]:
optimizer=Adam(gcn.parameters(),lr=1e-2)
epochs=250
loss_list=[]
for epoch in range(epochs):

    loss = train(model=gcn,
                 data_loader=train_loader,
                 optimizer=optimizer,
                 criterion=nn.CrossEntropyLoss())
    
    train_acc = test(model=gcn,loader=train_loader)
    test_acc = test(model=gcn,loader=test_loader)
    
    loss_list.append(loss)
    
    print(f'Epoch: {epoch:03d}, loss: {loss : .4f}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

Epoch: 000, loss:  0.6690, Train Acc: 0.6122, Test Acc: 0.6131
Epoch: 001, loss:  0.6599, Train Acc: 0.6122, Test Acc: 0.6131
Epoch: 002, loss:  0.6518, Train Acc: 0.6122, Test Acc: 0.6131
Epoch: 003, loss:  0.6427, Train Acc: 0.6122, Test Acc: 0.6131
Epoch: 004, loss:  0.6329, Train Acc: 0.6122, Test Acc: 0.6131
Epoch: 005, loss:  0.6220, Train Acc: 0.6458, Test Acc: 0.6555
Epoch: 006, loss:  0.6092, Train Acc: 0.6860, Test Acc: 0.7014
Epoch: 007, loss:  0.5959, Train Acc: 0.7297, Test Acc: 0.7261
Epoch: 008, loss:  0.5830, Train Acc: 0.7306, Test Acc: 0.7367
Epoch: 009, loss:  0.5714, Train Acc: 0.7292, Test Acc: 0.7314
Epoch: 010, loss:  0.5622, Train Acc: 0.7310, Test Acc: 0.7385
Epoch: 011, loss:  0.5550, Train Acc: 0.7372, Test Acc: 0.7438
Epoch: 012, loss:  0.5498, Train Acc: 0.7443, Test Acc: 0.7491
Epoch: 013, loss:  0.5447, Train Acc: 0.7504, Test Acc: 0.7509
Epoch: 014, loss:  0.5378, Train Acc: 0.7549, Test Acc: 0.7544
Epoch: 015, loss:  0.5294, Train Acc: 0.7588, Test Acc:

Epoch: 131, loss:  0.4278, Train Acc: 0.7951, Test Acc: 0.7792
Epoch: 132, loss:  0.4295, Train Acc: 0.8057, Test Acc: 0.7756
Epoch: 133, loss:  0.4333, Train Acc: 0.7959, Test Acc: 0.7686
Epoch: 134, loss:  0.4379, Train Acc: 0.8034, Test Acc: 0.7827
Epoch: 135, loss:  0.4360, Train Acc: 0.7977, Test Acc: 0.7792
Epoch: 136, loss:  0.4273, Train Acc: 0.7999, Test Acc: 0.7792
Epoch: 137, loss:  0.4250, Train Acc: 0.8083, Test Acc: 0.7809
Epoch: 138, loss:  0.4307, Train Acc: 0.7964, Test Acc: 0.7792
Epoch: 139, loss:  0.4306, Train Acc: 0.8052, Test Acc: 0.7915
Epoch: 140, loss:  0.4241, Train Acc: 0.8043, Test Acc: 0.7862
Epoch: 141, loss:  0.4254, Train Acc: 0.7968, Test Acc: 0.7809
Epoch: 142, loss:  0.4285, Train Acc: 0.8052, Test Acc: 0.7898
Epoch: 143, loss:  0.4242, Train Acc: 0.8061, Test Acc: 0.7862
Epoch: 144, loss:  0.4228, Train Acc: 0.7995, Test Acc: 0.7774
Epoch: 145, loss:  0.4258, Train Acc: 0.8057, Test Acc: 0.7862
Epoch: 146, loss:  0.4239, Train Acc: 0.8026, Test Acc:

In [96]:
d1.edge_index

tensor([[    0,     1,     1,  ..., 35100, 35100, 35101],
        [    1,     0,     2,  ..., 35098, 35101, 35100]])

In [103]:
x=d1.x
for layer in gcn.conv:
    x=layer(x,d1.edge_index)
y=global_mean_pool(x,d1.batch)    
print(x.shape)
print(y.shape)

torch.Size([35102, 8])
torch.Size([2264, 8])


tensor([[ 0.2677, -0.4080,  0.0694,  ..., -0.5944,  0.1971, -0.2537],
        [ 0.0874, -0.3078,  0.0867,  ..., -0.6759,  0.3521, -0.2275],
        [ 0.2329, -0.4535,  0.0303,  ..., -0.4110,  0.1651, -0.2481],
        ...,
        [ 0.0410, -0.3021,  0.0523,  ..., -0.6461,  0.3569, -0.2481],
        [ 0.1258, -0.2006,  0.1695,  ..., -0.7600,  0.3040, -0.0679],
        [ 0.0774, -0.2730,  0.1426,  ..., -0.5744,  0.1906, -0.2397]],
       grad_fn=<DivBackward0>)

# Jibbersih/

In [135]:
F.sigmoid(torch.tensor(1))

tensor(0.7311)

In [98]:
d1=dset_train[1]
d1

Data(x=[21, 59], edge_index=[2, 44], edge_attr=[44, 10], y=[1])

In [74]:
gcn1=GCNConv(in_channels=59,out_channels=10)
gcn2=GCNConv(in_channels=10,out_channels=5)

In [104]:
count=0
for data in train_loader:
    print(data)
    if count==5 : break
    count+=1
d1=data

DataBatch(x=[30, 59], edge_index=[2, 60], edge_attr=[60, 10], y=[2], batch=[30], ptr=[3])
DataBatch(x=[29, 59], edge_index=[2, 60], edge_attr=[60, 10], y=[2], batch=[29], ptr=[3])
DataBatch(x=[47, 59], edge_index=[2, 100], edge_attr=[100, 10], y=[2], batch=[47], ptr=[3])
DataBatch(x=[20, 59], edge_index=[2, 42], edge_attr=[42, 10], y=[2], batch=[20], ptr=[3])
DataBatch(x=[28, 59], edge_index=[2, 56], edge_attr=[56, 10], y=[2], batch=[28], ptr=[3])
DataBatch(x=[23, 59], edge_index=[2, 48], edge_attr=[48, 10], y=[2], batch=[23], ptr=[3])


In [106]:
gcn1=nn.Sequential(GCNConv(in_channels=59,out_channels=10),GCNConv(in_channels=10,out_channels=5))
gcn1

Sequential(
  (0): GCNConv(59, 10)
  (1): GCNConv(10, 5)
)

In [107]:
gcn1(x=d1.x,edge_index=d1.edge_index)

TypeError: forward() got an unexpected keyword argument 'x'

In [78]:
x=gcn1(x=d1.x,edge_index=d1.edge_index)
print(x.shape)
x=gcn2(x=x,edge_index=d1.edge_index)
print(x.shape)

y=global_mean_pool(x,batch=d1.batch)
print(y)

torch.Size([29, 10])
torch.Size([29, 5])
tensor([[-0.3221,  0.2483, -0.1072, -0.3957,  0.1450],
        [-1.0307,  0.2611, -0.4610, -0.5807, -0.4320]], grad_fn=<DivBackward0>)


In [51]:
x.sum()

tensor(6.4278, grad_fn=<SumBackward0>)

In [88]:
lst_smiles=['C=C(Br)c1ccccc1']#,'c1ccccc1']
lst_labels=[1,1]
# check num of nodses and edges
ethane='CC'
ethane=Chem.MolFromSmiles(ethane)
num_node_features=len(get_atom_features(ethane.GetAtoms()[0]))
num_edge_features=len(get_bond_features(ethane.GetBonds()[0]))

data_list=[]
for smile, label in zip(lst_smiles,lst_labels):
    #print(smile)
    #print(label)
    mol=Chem.MolFromSmiles(smile)

    num_atoms=mol.GetNumAtoms()
    atom_features=np.zeros((num_atoms,num_node_features))

    num_bonds=2*mol.GetNumBonds()
    bond_features=np.zeros((num_bonds,num_edge_features))

    # node features
    for atom in mol.GetAtoms():
        atom_features[atom.GetIdx(),:]=get_atom_features(atom)

    # adjacency
    adj=np.array([*np.nonzero(GetAdjacencyMatrix(mol))])

    # bond features
    for idx,(i,j) in enumerate(zip(adj[0,:],adj[1,:])):
        bond_features[idx,:]=get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j)))
        
    bond_features=np.array([get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j))) for (i,j) in zip(adj[0,:],adj[1,:])])

    # data
    data_list+=[Data(x=torch.tensor(atom_features,dtype=torch.float),
        edge_index=torch.tensor(adj,dtype=torch.long),
        edge_attr=torch.tensor(bond_features,dtype=torch.float),y=torch.LongTensor([label]))]



In [75]:
data_list = []
lst_smiles=['C1=CC=C(C=C1)O']
lst_labels=[1]

for (smiles, y_val) in zip(lst_smiles, lst_labels):

    # convert SMILES to RDKit mol object
    mol = Chem.MolFromSmiles(smiles)
    # get feature dimensions
    n_nodes = mol.GetNumAtoms()
    n_edges = 2*mol.GetNumBonds()
    unrelated_smiles = "O=O"
    unrelated_mol = Chem.MolFromSmiles(unrelated_smiles)
    n_node_features = len(get_atom_features(unrelated_mol.GetAtomWithIdx(0)))
    n_edge_features = len(get_bond_features_u(unrelated_mol.GetBondBetweenAtoms(0,1)))
    # construct node feature matrix X of shape (n_nodes, n_node_features)
    X = np.zeros((n_nodes, n_node_features))
    for atom in mol.GetAtoms():
        X[atom.GetIdx(), :] = get_atom_features(atom)

    X = torch.tensor(X, dtype = torch.float)

    # construct edge index array E of shape (2, n_edges)
    (rows, cols) = np.nonzero(GetAdjacencyMatrix(mol))
    torch_rows = torch.from_numpy(rows.astype(np.int64)).to(torch.long)
    torch_cols = torch.from_numpy(cols.astype(np.int64)).to(torch.long)
    E = torch.stack([torch_rows, torch_cols], dim = 0)

    # construct edge feature array EF of shape (n_edges, n_edge_features)
    EF = np.zeros((n_edges, n_edge_features))

    for (k, (i,j)) in enumerate(zip(rows, cols)):

        EF[k] = get_bond_features(mol.GetBondBetweenAtoms(int(i),int(j)))

    EF = torch.tensor(EF, dtype = torch.float)

    # construct label tensor
    y_tensor = torch.tensor(np.array([y_val]), dtype = torch.float)

    # construct Pytorch Geometric data object and append to data list
    #data_list.append(Data(x = X, edge_index = E, edge_attr = EF, y = y_tensor))