In [3]:
import pandas as pd
import numpy as np
import json

import networkx as nx

from tqdm import tqdm

from compound import Compound
from reaction import Reaction
from graph import Graph
from data import Data

# read data from csv
cpds = pd.read_csv('../GNN_toxic/data/raw/compounds_final.csv', index_col=0) # containing toxicity
rxns = pd.read_csv('data/reactions_final.csv', index_col=0)
pairs = pd.read_csv('data/pairs_final.csv', index_col=0)
cofactors = pd.read_csv('data/original/cofactors_KEGG.csv')

# create class instances
data = Data()
graph = Graph(pairs=pairs)

In [4]:
# Create a Compound object for each row in the DataFrame and add it to the data
for index, row in cpds.iterrows():
    entry = row['Entry']
    name = row['Names']
    formula = row['Formula']
    mw = row['mol_weight']
    smiles = row['SMILES']
    is_cofactor = row['Entry'] in cofactors['Entry'].values
    is_toxic = row['toxic']

    compound = Compound(entry, name, formula, mw, smiles, is_cofactor, is_toxic)
    data.add_element('compound', compound)

# Create a Reaction object for each row in the DataFrame and add it to the data
for index, row in rxns.iterrows():
    entry = row['Entry']
    name = row['Names']
    compounds = row['Compound']
    enzyme = row['EC Number']

    reaction = Reaction(entry, name, compounds, enzyme)
    data.add_element('reaction', reaction)

In [6]:
# number of times a metabolite apperas on pairs dataset
graph.get_number_of_occurences(pairs)
graph.create_graph(data=data, pairs=pairs)

# nodes: 8591 
# edges: 30081

Removing self-loops...
# nodes: 8591 
# edges: 30026


In [13]:
1/0

<networkx.classes.graph.Graph at 0x7f589beb6b30>

In [9]:




# Create Graph
graph.create_graph(data=data, pairs=pairs)

''' 
*******************************************
Validate the methods on validation datasets 
*******************************************
'''
######### VALIDATION SET FROM nicepath ###########
test_cases = pd.read_csv('data/original/test_cases.csv')
test_cases['source'] = test_cases['Pathway '].apply(lambda x: x.split(',')[0])
test_cases['target'] = test_cases['Pathway '].apply(lambda x: x.split(',')[len(x.split(','))-1])
test_cases['paths_list'] = test_cases['Pathway '].apply(lambda x: x.split(','))

paths = graph.simple_weighted_shortest_path(test_cases=test_cases, data=data, method='mol_weight')

# ######### NEW VALIDATION SET ###########
# pyminer_test = pd.read_csv('data/original/pyminer_validation_set.csv', delimiter=';', header=None, names=['Pathway'])
# pyminer_test['source'] = pyminer_test['Pathway'].apply(lambda x: x.split(',')[0])
# pyminer_test['target'] = pyminer_test['Pathway'].apply(lambda x: x.split(',')[len(x.split(','))-1])
# pyminer_test['paths_list'] = pyminer_test['Pathway'].apply(lambda x: x.split(','))

# print('Simple weighted shortes paths:')
# paths = graph.simple_weighted_shortest_path(test_cases=pyminer_test, data=data, method='mol_weight')

# nodes: 8591 
# edges: 30081

Removing self-loops...
# nodes: 8591 
# edges: 30026


100%|██████████| 30026/30026 [00:00<00:00, 600875.86it/s]


Correct pathway predictions: 14
Correct pathway predictions (%): 28.0


### Save predicted paths

In [10]:
import os

directory = 'data/results'

# Create the directory if it doesn't exist
if not os.path.exists(directory):
    os.makedirs(directory)

paths.to_csv('data/results/predicted_paths.csv')

# Graph learning and stuff to try

In [11]:
cpd = data.get_compound_by_id('C00082')
print(cpd)

smile = cpd.smiles
print(smile)

correct_pathway_example = paths['Pathway'].iloc[1]
print(correct_pathway_example)

correct_subgraph = graph.G.subgraph(correct_pathway_example)
print(correct_subgraph.nodes())

ID: C00082
Name: ["L-Tyrosine", "(S)-3-(p-Hydroxyphenyl)alanine", "(S)-2-Amino-3-(p-hydroxyphenyl)propionic acid", "Tyrosine"]
Formula: C9H11NO3

N[C@@H](Cc1ccc(O)cc1)C(=O)O
['C00223', 'C12096', 'C00029', 'C00761']
['C00029', 'C00761', 'C00223', 'C12096']


## GNN! (Maybe good)

In [12]:
gg = graph.G

In [13]:
import torch.nn as nn
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
import torch_geometric
import torch

class GCN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)

        # Global mean pooling to obtain graph-level representation
        x = torch_geometric.nn.global_mean_pool(x, batch)
        
        # apply a final classifier
        x = self.fc(x)
        
        return x


KeyboardInterrupt



In [None]:
master_G = gg.copy()
print(master_G)

In [None]:
graphs = []
labels = []

for row in range(len(paths)):
    sg = master_G.subgraph(paths['Pathway'].iloc[row])
    graphs.append(sg)

    if paths['Correct'].iloc[row]: label = 1
    else: label = 0

    labels.append(label)

print("Graphs:", graphs)
print("Labels:", labels)

In [None]:
# Convert NetworkX graphs to PyG data
data_list = []
for graph, label in zip(graphs, labels):
    # rename nodes to integers    
    mapping = {node: idx for idx, node in enumerate(graph.nodes())}
    graph = nx.relabel_nodes(graph, mapping)

    # Convert NetworkX graph to PyG data
    x = torch.tensor([graph.nodes[node]['mw'] for node in graph.nodes], dtype=torch.float).view(-1, 1)
    edge_index = torch.tensor(list(graph.edges)).t().contiguous()
    edge_attr = torch.tensor([graph.edges[edge]['mol_weight'] for edge in graph.edges], dtype=torch.float)
    y = torch.tensor([label])  # Graph-level label

    data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y)
    data_list.append(data)

# Concatenate all data samples into a single PyG data object
data = torch_geometric.data.Batch.from_data_list(data_list)
print(data)

In [None]:
from sklearn.model_selection import train_test_split

# Split data into training and testing sets
train_data, test_data = train_test_split(data, test_size=.1)

In [None]:
import torch.optim as optim

# Set random seed for reproducibility
torch.manual_seed(42)

# Initialize GNN model
model = GCN(input_dim=data.x.shape[1], hidden_dim=128, output_dim=2)
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

# Train the model
model.train()
for epoch in range(100):
    optimizer.zero_grad()
    
    total_loss = 0
    for data in train_data:
        out = model(data)
        loss = criterion(out, data.y.view(-1))
        loss.backward()
        total_loss += loss.item()
    
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {total_loss}")

print("Training completed.")

In [None]:
# Evaluation mode
model.eval()
with torch.no_grad():
    correct = 0
    total = 0

    for data in test_data:
        out = model(data)
        predicted_labels = out.argmax(dim=1)
        true_labels = data.y.view(-1)
        correct += (predicted_labels == true_labels).sum().item()
        total += len(true_labels)
        print(predicted_labels, data.y)

    accuracy = correct / total
    print(f"Accuracy: {accuracy}")

# EXAMPLE OF GRAPHNN

In [None]:
import networkx as nx
import random

# Generate random graphs with labels
num_graphs = 5
graphs = []
labels = []

for _ in range(num_graphs):
    # Generate a random graph
    graph = nx.fast_gnp_random_graph(10, 0.3)
    graphs.append(graph)
    
    # Assign a random label (Type 0 or Type 1)
    label = random.choice([0, 1])
    labels.append(label)

print("Graphs:", graphs)
print("Labels:", labels)

In [None]:
from torch_geometric.data import Data
import torch_geometric
import torch

# Convert NetworkX graphs to PyG data
data_list = []
for graph, label in zip(graphs, labels):
    # Convert NetworkX graph to PyG data
    edge_index = torch.tensor(list(graph.edges)).t().contiguous()
    x = torch.randn(graph.number_of_nodes(), 16)  # Random node features (16 dimensions)
    y = torch.tensor([label])  # Graph-level label
    
    data = Data(x=x, edge_index=edge_index, y=y)
    data_list.append(data)


# Concatenate all data samples into a single PyG data object
data = torch_geometric.data.Batch.from_data_list(data_list)
data

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv

class GCN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)

        # Global mean pooling to obtain graph-level representation
        x = torch_geometric.nn.global_mean_pool(x, batch)

        return x


In [None]:
from sklearn.model_selection import train_test_split

# Split data into training and testing sets
train_data, test_data = train_test_split(data, test_size=0.2)

In [None]:
import torch.optim as optim

# Set random seed for reproducibility
torch.manual_seed(42)

# Initialize GNN model
model = GCN(input_dim=16, hidden_dim=32, output_dim=2)
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

# Train the model
model.train()
for epoch in range(100):
    optimizer.zero_grad()
    
    total_loss = 0
    for data in train_data:
        out = model(data)
        loss = criterion(out, data.y.view(-1))
        loss.backward()
        total_loss += loss.item()
    
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {total_loss}")

print("Training completed.")

In [None]:
# Evaluation mode
model.eval()
with torch.no_grad():
    correct = 0
    total = 0

    for data in test_data:
        out = model(data)
        predicted_labels = out.argmax(dim=1)
        true_labels = data.y.view(-1)
        correct += (predicted_labels == true_labels).sum().item()
        total += len(true_labels)

    accuracy = correct / total
    print(f"Accuracy: {accuracy}")