In [1]:
#!pip install rdkit
#pip install torch-geometric

import pandas as pd
from rdkit import Chem
import networkx as nx
import torch
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.error')

from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score
import numpy as np
from sklearn.metrics import recall_score
from sklearn.metrics import roc_auc_score
import torch.nn.functional as F

import torch.nn as nn
import torch.nn.functional as F

In [2]:
ddi_fp = r"C:\Users\sreej\Desktop\drugbank.tab"
ddi = pd.read_csv(ddi_fp, sep='\t')
ddi.head()

Unnamed: 0,ID1,ID2,Y,Map,X1,X2
0,DB04571,DB00460,1,#Drug1 may increase the photosensitizing activ...,CC1=CC2=CC3=C(OC(=O)C=C3C)C(C)=C2O1,COC(=O)CCC1=C2NC(\C=C3/N=C(/C=C4\N\C(=C/C5=N/C...
1,DB00855,DB00460,1,#Drug1 may increase the photosensitizing activ...,NCC(=O)CCC(O)=O,COC(=O)CCC1=C2NC(\C=C3/N=C(/C=C4\N\C(=C/C5=N/C...
2,DB09536,DB00460,1,#Drug1 may increase the photosensitizing activ...,O=[Ti]=O,COC(=O)CCC1=C2NC(\C=C3/N=C(/C=C4\N\C(=C/C5=N/C...
3,DB01600,DB00460,1,#Drug1 may increase the photosensitizing activ...,CC(C(O)=O)C1=CC=C(S1)C(=O)C1=CC=CC=C1,COC(=O)CCC1=C2NC(\C=C3/N=C(/C=C4\N\C(=C/C5=N/C...
4,DB09000,DB00460,1,#Drug1 may increase the photosensitizing activ...,CC(CN(C)C)CN1C2=CC=CC=C2SC2=C1C=C(C=C2)C#N,COC(=O)CCC1=C2NC(\C=C3/N=C(/C=C4\N\C(=C/C5=N/C...


In [3]:
# filter incorrect smiles rows out 

def valid_smiles(smiles): 
    if not isinstance(smiles, str): 
        return False
    return Chem.MolFromSmiles(smiles) is not None

invalid_rows = ddi[~(ddi['X1'].apply(valid_smiles) & ddi['X2'].apply(valid_smiles))]
ddi_cleaned = ddi.drop(invalid_rows.index).reset_index(drop = True)

print(f"ddi size: {ddi.shape[0]}")
print(f"ddi_cleaned size: {ddi_cleaned.shape[0]}")
print(f"Rows removed: {len(ddi) - len(ddi_cleaned)}")

ddi size: 191808
ddi_cleaned size: 191798
Rows removed: 10


In [4]:
top20_labels = ddi_cleaned['Y'].value_counts().nlargest(20).index
ddi_filt = ddi_cleaned[ddi_cleaned['Y'].isin(top20_labels)].reset_index(drop = True)
#print(top20_labels)
label_mapping = {label: idx for idx, label in enumerate(top20_labels)}
print("Label Mapping:", label_mapping)
ddi_filt['Y'] = ddi_filt['Y'].map(label_mapping)
print("Unique mapped labels:", ddi_filt['Y'].unique())
#ddi_filt.head()

#ddi_filt = ddi_cleaned.iloc[:1000].reset_index(drop=True)
ddi_filt = ddi_filt.iloc[-2000:].reset_index(drop=True)


Label Mapping: {49: 0, 47: 1, 73: 2, 75: 3, 60: 4, 70: 5, 20: 6, 16: 7, 4: 8, 6: 9, 37: 10, 9: 11, 72: 12, 54: 13, 83: 14, 58: 15, 32: 16, 27: 17, 67: 18, 64: 19}
Unique mapped labels: [ 8  9 11  7  6 17 16 10  1  0 13 15  4 19 18  5 12  2  3 14]


In [5]:
# convert smiles string to graph
def smiles_to_graph(smiles): 
    mol = Chem.MolFromSmiles(smiles)

    if mol is None: 
        raise ValueError(f"invalid SMILES string {smiles}")

    node_features = [atom.GetAtomicNum() for atom in mol.GetAtoms()]

    edges = []
    if mol.GetNumBonds() > 0 : 
        for bond in mol.GetBonds(): 
            i = bond.GetBeginAtomIdx()
            j = bond.GetEndAtomIdx()
            edges.append((i, j))
            edges.append((j, i))
    # else: 
    #     print(f"No bonds found for molecule: {smiles}")

    edge_index = torch.tensor(edges, dtype = torch.long).t().contiguous() if edges else torch.empty((2, 0), dtype=torch.long)

    x = torch.tensor(node_features, dtype = torch.float).view(-1, 1)

    return Data(x=x, edge_index=edge_index)

def convert_to_graphs(ddi_filt): 
    graph_data = []
    for _, row in ddi_filt.iterrows(): 
        graph_X1 = smiles_to_graph(row['X1'])
        graph_X2 = smiles_to_graph(row['X2'])

        graph_data.append((graph_X1, graph_X2, row['Y']))

    return graph_data

# convert codes to graphs
graph_data = convert_to_graphs(ddi_filt)

In [6]:
from torch.utils.data import Dataset

class GraphDataset(Dataset): 
    def __init__(self, graph_data): 
        self.graph_data = graph_data

    def __len__(self): 
        return len(self.graph_data)

    def __getitem__(self, idx): 
        graph_X1, graph_X2, label = self.graph_data[idx]
        return graph_X1, graph_X2, label

# graph_dataset = GraphDataset(graph_data)

In [7]:
# create GNN 


class GNNModel(nn.Module): 
    def __init__(self, in_channels, hidden_channels, out_channels): 
        super(GNNModel, self).__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)

        # concatenating X1 and X2
        self.fc = nn.Linear(2 * hidden_channels, out_channels)

    def forward(self, data1, data2): 
        x1, edge_index1 = data1.x, data1.edge_index
        x2, edge_index2 = data2.x, data2.edge_index

        # apply graph convolution on graph 1 (X1)
        x1 = F.relu(self.conv1(x1, edge_index1))
        x1 = self.conv2(x1, edge_index1)
    
        # apply graph convolution on graph 2 (X2)
        x2 = F.relu(self.conv1(x2, edge_index2))
        x2 = self.conv2(x2, edge_index2)
    
        x = torch.cat([x1.mean(dim=0), x2.mean(dim=0)], dim = -1)
        # output layer predicts one of 20 classes
        out = self.fc(x)
        return out
    

In [8]:
from torch.optim import Adam
def gnn_function(epochs, hidden_channels): 

    graph_dataset = GraphDataset(graph_data)

    labels = []
    for i in graph_dataset: 
        labels.append(i[2])
    
    train_data, test_data = train_test_split(graph_dataset.graph_data, test_size = 0.2, stratify = labels)
    train_dataset = GraphDataset(train_data)
    test_dataset = GraphDataset(test_data)
    
    # define model, loss func, and optimizer
    model = GNNModel(in_channels=1, hidden_channels = hidden_channels, out_channels=20)
    optimizer = Adam(model.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()  
    
    # training 
    for epoch in range(epochs):  
        
        model.train()
        total_loss = 0
        
        for data1, data2, label in train_dataset:
            label = torch.tensor(label, dtype = torch.long)
            
            optimizer.zero_grad()
            output = model(data1, data2)
            
            loss = criterion(output, label)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
    
    # Evaluation loop
    model.eval()
    correct = 0
    total = 0
    y_pred = []
    y_true = []
    
    with torch.no_grad():
        for data1, data2, label in test_dataset:
            output = model(data1, data2)
            
            predicted = output.argmax(dim = -1)
            correct += (predicted == label).sum().item()
            y_pred.append(predicted)
            y_true.append(label)
            
            if isinstance(label, torch.Tensor): 
                total += label.size(0)
            else: 
                total += 1
    
    accuracy = correct/total * 100
    precision = precision_score(y_true, y_pred, average='macro', zero_division = 0)
    recall = recall_score(y_true, y_pred, average='macro')
    
    return accuracy

# print(f'Accuracy: {accuracy} %')
# print(f'Precision Score: {precision}')
# print(f'Recall Score: {recall}')


    # Print the loss for the current epoch
    # print(f'Epoch {epoch+1}, Loss: {total_loss/len(train_dataset)}')


In [None]:
def opt_hyps(epochs, hidden_channels): 

    # initialize first model
    best_epochs = 0 
    best_hidden_channels = 0
    iteration = 0
    best_acc = gnn_function(epochs[0], hidden_channels[0])
    grid = pd.DataFrame(columns = epochs, index = hidden_channels)

    for e in range(0, len(epochs)): 
        for h in range(0, len(hidden_channels)): 
            iteration += 1
            print(f'Epochs: {epochs[e]}, Hidden Channels : {hidden_channels[h]}')
            acc = gnn_function(epochs[e], hidden_channels[h])
    
            grid.iloc[h, e] = acc
            if acc >= best_acc: 
                best_epochs = epochs[e]
                best_hidden_channels = hidden_channels[h]
    
    print(f"Optimal hyperparameters: {best_epochs} epochs, {best_hidden_channels} hidden channels")
    return grid

epochs = [5, 7, 10]
hidden_channels = [8, 16, 32]

# function call 
opt_hyps(epochs, hidden_channels)

Epochs: 5, Hidden Channels : 8
Epochs: 5, Hidden Channels : 16


In [None]:

# # add stratify parameter
# labels = []
# for i in graph_dataset: 
#     labels.append(i[2])
    
# train_data, test_data = train_test_split(graph_dataset.graph_data, test_size = 0.2, stratify = labels)

# train_dataset = GraphDataset(train_data)
# test_dataset = GraphDataset(test_data)

In [None]:
# from sklearn.metrics import precision_score
# import numpy as np
# from sklearn.metrics import recall_score
# from sklearn.metrics import roc_auc_score
# import torch.nn.functional as F

# # Evaluation loop
# model.eval()
# correct = 0
# total = 0
# y_pred = []
# y_true = []

# with torch.no_grad():
    
#     for data1, data2, label in test_dataset:
        
#         output = model(data1, data2)
        
#         predicted = output.argmax(dim = -1)
#         correct += (predicted == label).sum().item()
#         y_pred.append(predicted)
#         y_true.append(label)
        
#         if isinstance(label, torch.Tensor): 
#             total += label.size(0)
#         else: 
#             total += 1

# accuracy = correct/total * 100
# precision = precision_score(y_true, y_pred, average='macro', zero_division = 0)
# recall = recall_score(y_true, y_pred, average='macro')

# print(f'Accuracy: {accuracy} %')
# print(f'Precision Score: {precision}')
# print(f'Recall Score: {recall}')


