#Experiments on GNNs and Graph Embeddings
Experiments conducted to evaluate various GNNs and graph embedding methods for molecular classification tasks by leveraging well-known benchmarking datasets in SMILES format, i.e., HIV, BBBP, BACE, CLINTOX. Extensively used in the literature to benchmark models for molecular property prediction and drug discovery tasks. We systematically tested and compared various GNN architectures and traditional graph embedding techniques. For each dataset, we evaluated their performance to identify strengths and limitations in predicting molecular properties. We trained several different GNNs to assess their performance on the aforementioned datasets: Graph Convolutional Network (GCN), Graph Isomorphism Network (GIN), and Graph Attention Network (GAT). As for the graph embedding methods, also in this case we provide details about the models that achieved the best performance, i.e, a Fully Connected Neural Network using three well-known graph embedding techniques as input: Node2Vec, SDNE, and HOPE.

# Installation and Common Functions


In [None]:
import sys
if 'google.colab' in sys.modules:
  !pip install pysmiles
  !pip install git+https://github.com/VenkateshwaranB/stellargraph.git
  !pip install rdkit
  !pip install torch_geometric
  !pip install datasets
  !pip3 install mxnet-mkl==1.6.0 numpy==1.23.1

In [None]:
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import os
import networkx as nx
import numpy as np
import pandas as pd
from stellargraph import datasets
from IPython.display import display, HTML
from sklearn.model_selection import KFold
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
import numpy as np

%matplotlib inline

In [None]:
!pip install ogb

If the script is runned on colab and the dataset are saved on the drive is possible to mount the drive in order to dowload from the drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

###Load Dataset

download the HIV Dataset from OGBG

In [None]:
from ogb.graphproppred import PygGraphPropPredDataset
dataset = PygGraphPropPredDataset(name='ogbg-molhiv')

#qua sta anche clintox e altri vedere bene documntazione però il codice funziona anche con uun proprio dataset, però il clintox ha la y diversa dalla nostra quindi in molti casi conviene avere a che fare con db propri, dove settiamo noi la y per bene e usiamo lo script SMILEs-MOL-OGB
all_data = dataset[:]

Load experimental .pt dataset: Choose pre-processed *dataset.pt* (clintox.pt, bace_classification.pt, bbbp.pt) from Experiments/exp_datasets and replace the path *content/exp_dataset/dataset.pt* with the choosen dataset.

In [None]:
import pickle
import torch_geometric
# Load Data object from file
#COLAB DRIVE:/content/dive/MyDrive/exp_datasets/clintox.pt
with open('/content/exp_datasets/clintox.pt', 'rb') as f:
    all_data = pickle.load(f)

# Now loaded_data contains the Data object
print(all_data)

Create NetworkX graph

In [None]:
import torch
import networkx as nx
import stellargraph as sg
import matplotlib.pyplot as plt
from gensim.models import Word2Vec
from stellargraph.data import BiasedRandomWalk

def create_networkx_graph(edge_index, edge_attr, x, y, num_nodes):
    # Create an empty NetworkX graph
    G = nx.Graph()

    # Add nodes with features
    for i in range(num_nodes):
        node_features = {f"feat_{j}": x[i, j].item() for j in range(x.shape[1])}
        G.add_node(i, features=node_features)

    # Add edges with attributes
    for j in range(edge_index.shape[1]):
        src, dst = edge_index[:, j].tolist()
        edge_attributes = {f"attr_{k}": edge_attr[j, k].item() for k in range(edge_attr.shape[1])}
        G.add_edge(src, dst, attributes=edge_attributes)

    return G, y.item()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_pred, class_names=None):
    """
    Plots the confusion matrix.

    Parameters:
    y_true (list or array): True labels.
    y_pred (list or array): Predicted labels.
    class_names (list): List of class names. If None, integer labels are used.
    """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

# Example usage:
# y_true = [0, 1, 1, 0, 1, 0]
# y_pred = [0, 1, 0, 0, 1, 1]
# class_names = ['Class 0', 'Class 1']
# plot_confusion_matrix(y_true, y_pred, class_names)


#Node2Vec

Construction of the dataset using Stellargraph and Node2Vec

In [None]:
from sklearn.model_selection import KFold
import copy
import random
import pandas as pd
import numpy as np  # Import numpy for numerical operations

label_0_indices = []
label_1_indices = []
dfs = []

for i in range(len(all_data)):
    if all_data[i].y == 0:
        label_0_indices.append(i)
    elif all_data[i].y == 1:
        label_1_indices.append(i)

for iter in range(1):
    print('iter:', iter)

    min_size_dataset = min(len(label_0_indices), len(label_1_indices))

    random.shuffle(label_0_indices)
    random.shuffle(label_1_indices)
    label_0_indices = label_0_indices[:min_size_dataset]
    label_1_indices = label_1_indices[:min_size_dataset]
    print('size 0 data:', len(label_0_indices))
    print('size 1 data:', len(label_1_indices))

    balanced_indices = label_0_indices + label_1_indices
    print('size total dataset:', len(balanced_indices))

    balanced_data = [all_data[i] for i in balanced_indices]
    print(len(balanced_data))

    df = pd.DataFrame(columns=['graph_emb', 'label'])

    for dtb in balanced_data:
        graph, y = create_networkx_graph(dtb.edge_index, dtb.edge_attr, dtb.x, dtb.y, dtb.num_nodes)

        # Convert node features to DataFrame
        node_features_dict = {node: graph.nodes[node]['features'] for node in graph.nodes}
        node_df = pd.DataFrame.from_dict(node_features_dict, orient='index')

        # Convert edge attributes to DataFrame
        edge_features_dict = {(src, dst): data['attributes'] for src, dst, data in graph.edges(data=True)}
        edge_df = pd.DataFrame.from_dict(edge_features_dict, orient='index')

        # Create 'source', 'target' columns in the edge DataFrame
        edge_df['source'] = [edge[0] for edge in edge_df.index]
        edge_df['target'] = [edge[1] for edge in edge_df.index]

        # Create a StellarGraph from the NetworkX graph and DataFrames
        Gs = sg.StellarGraph(nodes=node_df, edges=edge_df)

        rw = BiasedRandomWalk(Gs)

        walks = rw.run(
            nodes=list(Gs.nodes()),  # root nodes
            length=100,  # maximum length of a random walk
            n=10,  # number of random walks per root node
            p=0.5,  # Defines (unormalised) probability, 1/p, of returning to source node
            q=2.0,  # Defines (unormalised) probability, 1/q, for moving away from source node
            weighted=True,
            seed=42
        )

        str_walks = [[str(n) for n in walk] for walk in walks]
        model = Word2Vec(str_walks, window=5, min_count=0, sg=1, workers=2)

        # Retrieve node embeddings and corresponding subjects
        node_ids = model.wv.index_to_key  # list of node IDs
        node_embeddings = model.wv.vectors  # numpy.ndarray of size number of nodes times embeddings dimensionality

        # Aggregate node embeddings to form a single graph embedding
        # Example using average pooling
        graph_embedding = np.mean(node_embeddings, axis=0)  # Change this line for different pooling methods

        # Store the graph embedding and label in the DataFrame
        df.loc[len(df)] = [graph_embedding, y]

    dfs.append(df)


Below there is the creation and training of the model from the Node2Vec Embedding

The performace and the confusion matrix are showed at the end of the training and evaluation

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import ast


# Extract features and labels
def extract_features_labels(df):
    embeddings = []
    labels = df['label'].tolist()

    for emb in df['graph_emb']:
        if isinstance(emb, str):
            # Convert string representation of list back to list
            emb = ast.literal_eval(emb)
        embeddings.append(emb)

    X = np.array(embeddings)
    y = np.array(labels)
    return X, y

X, y = extract_features_labels(df)

# Check if data is loaded correctly
if X.shape[0] == 0 or X.shape[1] == 0:
    raise ValueError("No valid embeddings found. Please check the DataFrame for formatting issues.")

# Define the neural network model
def create_model(input_dim):
    model = keras.Sequential([
        layers.Dense(128, activation='relu', input_shape=(input_dim,)),
        layers.Dropout(0.3),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Perform 5-fold cross-validation on the entire dataset
kf = KFold(n_splits=5, shuffle=True, random_state=42)
fold_no = 1
val_accuracies = []
test_accuracies = []
roc_aucs = []
precisions = []
recalls = []
f1_scores = []
histories = []

for train_index, test_index in kf.split(X):
    print(f'Fold {fold_no}')

    X_train_val, X_test = X[train_index], X[test_index]
    y_train_val, y_test = y[train_index], y[test_index]

    # Perform an additional split to get a validation set from the training+validation set
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42, stratify=y_train_val)

    # Create and train the neural network
    input_dim = X_train.shape[1]
    model = create_model(input_dim)
    history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_val, y_val), verbose=1)

    # Evaluate the model on the validation set
    val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=0)
    print(f'Fold {fold_no} Validation Accuracy: {val_accuracy:.4f}')
    val_accuracies.append(val_accuracy)
    histories.append(history.history)

    # Evaluate the model on the test set
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f'Fold {fold_no} Test Accuracy: {test_accuracy:.4f}')
    test_accuracies.append(test_accuracy)

    # Predict probabilities and calculate metrics for the test set
    y_test_pred_proba = model.predict(X_test).flatten()
    y_test_pred = (y_test_pred_proba > 0.5).astype("int32")

    roc_auc = roc_auc_score(y_test, y_test_pred_proba)
    precision = precision_score(y_test, y_test_pred)
    recall = recall_score(y_test, y_test_pred)
    f1 = f1_score(y_test, y_test_pred)

    print(f'Fold {fold_no} ROC-AUC: {roc_auc:.4f}')
    print(f'Fold {fold_no} Precision: {precision:.4f}')
    print(f'Fold {fold_no} Recall: {recall:.4f}')
    print(f'Fold {fold_no} F1-Score: {f1:.4f}')

    roc_aucs.append(roc_auc)
    precisions.append(precision)
    recalls.append(recall)
    f1_scores.append(f1)

    # Plot confusion matrix for each fold
    plot_confusion_matrix(y_test, y_test_pred, class_names=['Class 0', 'Class 1'])

    fold_no += 1

# Calculate and print the average validation accuracy and variance
average_val_accuracy = np.mean(val_accuracies)
variance_val_accuracy = np.var(val_accuracies)
print(f'Average Validation Accuracy: {average_val_accuracy:.4f}')
print(f'Variance of Validation Accuracy: {variance_val_accuracy:.4f}')

# Calculate and print the average test accuracy and variance
average_test_accuracy = np.mean(test_accuracies)
variance_test_accuracy = np.var(test_accuracies)
print(f'Average Test Accuracy: {average_test_accuracy:.4f}')
print(f'Variance of Test Accuracy: {variance_test_accuracy:.4f}')

# Calculate and print the average ROC-AUC, Precision, Recall, and F1-Score
average_roc_auc = np.mean(roc_aucs)
average_precision = np.mean(precisions)
average_recall = np.mean(recalls)
average_f1_score = np.mean(f1_scores)

print(f'Average ROC-AUC: {average_roc_auc:.4f}')
print(f'Average Precision: {average_precision:.4f}')
print(f'Average Recall: {average_recall:.4f}')
print(f'Average F1-Score: {average_f1_score:.4f}')

# Save the average validation accuracy, variance, test accuracy, and metrics to a file
with open("model_accuracies.txt", "w") as f:
    f.write(f'Average Validation Accuracy: {average_val_accuracy:.4f}\n')
    f.write(f'Variance of Validation Accuracy: {variance_val_accuracy:.4f}\n')
    f.write(f'Average Test Accuracy: {average_test_accuracy:.4f}\n')
    f.write(f'Variance of Test Accuracy: {variance_test_accuracy:.4f}\n')
    f.write(f'Average ROC-AUC: {average_roc_auc:.4f}\n')
    f.write(f'Average Precision: {average_precision:.4f}\n')
    f.write(f'Average Recall: {average_recall:.4f}\n')
    f.write(f'Average F1-Score: {average_f1_score:.4f}\n')

#SDNE

Dataset creation for SDNE Experiment, the dataset is well-balanced and read to train the SDNE Model and embedding extraction from the model

In [None]:
import torch
import networkx as nx
import random
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch import nn, optim

# Assuming 'all_data' is already defined and contains the graph data

def create_networkx_graph(edge_index, edge_attr, x, y, num_nodes):
    # Create an empty NetworkX graph
    G = nx.Graph()

    # Add nodes with features
    for i in range(num_nodes):
        node_features = {f"feat_{j}": x[i, j].item() for j in range(x.shape[1])}
        G.add_node(i, **node_features)

    # Add edges with attributes
    for j in range(edge_index.shape[1]):
        src, dst = edge_index[:, j].tolist()
        edge_attributes = {f"attr_{k}": edge_attr[j, k].item() for k in range(edge_attr.shape[1])}
        G.add_edge(src, dst, **edge_attributes)

    return G, y.item()

class SDNE(nn.Module):
    def __init__(self, input_dim, hidden_dims, alpha=1e-5, beta=5):
        super(SDNE, self).__init__()
        self.alpha = alpha
        self.beta = beta

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dims[0]),
            nn.ReLU(),
            nn.Linear(hidden_dims[0], hidden_dims[1])
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dims[1], hidden_dims[0]),
            nn.ReLU(),
            nn.Linear(hidden_dims[0], input_dim)
        )

    def forward(self, x, adj):
        # Encode
        y = self.encoder(x)
        # Decode
        x_hat = self.decoder(y)
        return x_hat, y

    def loss_function(self, x, x_hat, y, adj):
        # Reconstruction loss
        mse = nn.MSELoss()
        L_1st = mse(x_hat, x)

        # Laplacian regularization
        L_2nd = torch.sum(adj * torch.norm(y.unsqueeze(1) - y, dim=2))

        return L_1st + self.alpha * L_1st + self.beta * L_2nd

def train_sdne(graph, hidden_dims=[128, 64], epochs=100, lr=0.01):
    # Create adjacency matrix
    adj = nx.adjacency_matrix(graph).todense()
    adj = torch.tensor(adj, dtype=torch.float32)

    # Get node features
    node_features = np.array([list(graph.nodes[i].values()) for i in range(graph.number_of_nodes())])

    # Check if node features are empty
    if node_features.shape[1] == 0:
        raise ValueError("Node features are empty. Ensure nodes have features.")

    scaler = MinMaxScaler()
    node_features = scaler.fit_transform(node_features)
    node_features = torch.tensor(node_features, dtype=torch.float32)

    # Initialize model
    input_dim = node_features.shape[1]
    model = SDNE(input_dim, hidden_dims)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # Training
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        x_hat, y = model(node_features, adj)
        loss = model.loss_function(node_features, x_hat, y, adj)
        loss.backward()
        optimizer.step()

        #if (epoch + 1) % 10 == 0:
            #print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

    return model, y

def balance_dataset(all_data, seed=42):
    if seed is not None:
        random.seed(seed)

    label_0_indices = [i for i, data in enumerate(all_data) if data.y.item() == 0]
    label_1_indices = [i for i, data in enumerate(all_data) if data.y.item() == 1]

    min_size_dataset = min(len(label_0_indices), len(label_1_indices))

    random.shuffle(label_0_indices)
    random.shuffle(label_1_indices)

    label_0_indices = label_0_indices[:min_size_dataset]
    label_1_indices = label_1_indices[:min_size_dataset]
    print(len(label_0_indices))
    print(len(label_1_indices))

    balanced_indices = label_0_indices + label_1_indices
    random.shuffle(balanced_indices)

    balanced_data = [all_data[i] for i in balanced_indices]

    return balanced_data

# Generate and save embeddings for the balanced datasets
if __name__ == "__main__":
    all_graph_embeddings = []

    for i in range(1):  # Loop to generate 5 different datasets
        print(f'Generating dataset {i + 1}')
        balanced_data = balance_dataset(all_data)

        for data in balanced_data:
            num_nodes = data.num_nodes
            edge_index = data.edge_index
            edge_attr = data.edge_attr
            x = data.x
            y = data.y

            # Create graph
            G, y_label = create_networkx_graph(edge_index, edge_attr, x, y, num_nodes)

            # Train SDNE
            model, embeddings = train_sdne(G, epochs=100)

            # Collect node embeddings and aggregate them to obtain a graph-level embedding
            embeddings_np = embeddings.detach().numpy()
            graph_embedding = np.mean(embeddings_np, axis=0)  # Averaging node embeddings
            all_graph_embeddings.append((graph_embedding.tolist(), y_label))

    # Convert graph-level embeddings to DataFrame
    df_graph_embeddings = pd.DataFrame(all_graph_embeddings, columns=["embedding", "label"])


    # Print the DataFrame
    print(df_graph_embeddings)


Training and evaluation of the Fully Connected model on the SDNE Embedding

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import ast


# Extract features and labels
def extract_features_labels(df):
    embeddings = []
    labels = df['label'].tolist()

    for emb in df['embedding']:
        if isinstance(emb, str):
            # Convert string representation of list back to list
            emb = ast.literal_eval(emb)
        embeddings.append(emb)

    X = np.array(embeddings)
    y = np.array(labels)
    return X, y

X, y = extract_features_labels(df_graph_embeddings)

# Check if data is loaded correctly
if X.shape[0] == 0 or X.shape[1] == 0:
    raise ValueError("No valid embeddings found. Please check the DataFrame for formatting issues.")

# Define the neural network model
def create_model(input_dim):
    model = keras.Sequential([
        layers.Dense(128, activation='relu', input_shape=(input_dim,)),
        layers.Dropout(0.3),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Perform 5-fold cross-validation on the entire dataset
kf = KFold(n_splits=5, shuffle=True, random_state=42)
fold_no = 1
val_accuracies = []
test_accuracies = []
roc_aucs = []
precisions = []
recalls = []
f1_scores = []
histories = []

for train_index, test_index in kf.split(X):
    print(f'Fold {fold_no}')

    X_train_val, X_test = X[train_index], X[test_index]
    y_train_val, y_test = y[train_index], y[test_index]

    # Perform an additional split to get a validation set from the training+validation set
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42, stratify=y_train_val)

    # Create and train the neural network
    input_dim = X_train.shape[1]
    model = create_model(input_dim)
    history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_val, y_val), verbose=1)

    # Evaluate the model on the validation set
    val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=0)
    print(f'Fold {fold_no} Validation Accuracy: {val_accuracy:.4f}')
    val_accuracies.append(val_accuracy)
    histories.append(history.history)

    # Evaluate the model on the test set
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f'Fold {fold_no} Test Accuracy: {test_accuracy:.4f}')
    test_accuracies.append(test_accuracy)

    # Predict probabilities and calculate metrics for the test set
    y_test_pred_proba = model.predict(X_test).flatten()
    y_test_pred = (y_test_pred_proba > 0.5).astype("int32")

    roc_auc = roc_auc_score(y_test, y_test_pred_proba)
    precision = precision_score(y_test, y_test_pred)
    recall = recall_score(y_test, y_test_pred)
    f1 = f1_score(y_test, y_test_pred)

    print(f'Fold {fold_no} ROC-AUC: {roc_auc:.4f}')
    print(f'Fold {fold_no} Precision: {precision:.4f}')
    print(f'Fold {fold_no} Recall: {recall:.4f}')
    print(f'Fold {fold_no} F1-Score: {f1:.4f}')

    roc_aucs.append(roc_auc)
    precisions.append(precision)
    recalls.append(recall)
    f1_scores.append(f1)

    # Plot confusion matrix for each fold
    plot_confusion_matrix(y_test, y_test_pred, class_names=['Class 0', 'Class 1'])

    fold_no += 1

# Calculate and print the average validation accuracy and variance
average_val_accuracy = np.mean(val_accuracies)
variance_val_accuracy = np.var(val_accuracies)
print(f'Average Validation Accuracy: {average_val_accuracy:.4f}')
print(f'Variance of Validation Accuracy: {variance_val_accuracy:.4f}')

# Calculate and print the average test accuracy and variance
average_test_accuracy = np.mean(test_accuracies)
variance_test_accuracy = np.var(test_accuracies)
print(f'Average Test Accuracy: {average_test_accuracy:.4f}')
print(f'Variance of Test Accuracy: {variance_test_accuracy:.4f}')

# Calculate and print the average ROC-AUC, Precision, Recall, and F1-Score
average_roc_auc = np.mean(roc_aucs)
average_precision = np.mean(precisions)
average_recall = np.mean(recalls)
average_f1_score = np.mean(f1_scores)

print(f'Average ROC-AUC: {average_roc_auc:.4f}')
print(f'Average Precision: {average_precision:.4f}')
print(f'Average Recall: {average_recall:.4f}')
print(f'Average F1-Score: {average_f1_score:.4f}')

# Save the average validation accuracy, variance, test accuracy, and metrics to a file
with open("model_accuracies.txt", "w") as f:
    f.write(f'Average Validation Accuracy: {average_val_accuracy:.4f}\n')
    f.write(f'Variance of Validation Accuracy: {variance_val_accuracy:.4f}\n')
    f.write(f'Average Test Accuracy: {average_test_accuracy:.4f}\n')
    f.write(f'Variance of Test Accuracy: {variance_test_accuracy:.4f}\n')
    f.write(f'Average ROC-AUC: {average_roc_auc:.4f}\n')
    f.write(f'Average Precision: {average_precision:.4f}\n')
    f.write(f'Average Recall: {average_recall:.4f}\n')
    f.write(f'Average F1-Score: {average_f1_score:.4f}\n')



#HOPE

Dataset creation for HOPE Experiment, the datasets will presents the embedding of the graph using the HOPE Techniques

In [None]:
import torch
import networkx as nx
import random
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from scipy.sparse.linalg import svds

def create_networkx_graph(edge_index, edge_attr, x, y, num_nodes):
    # Create an empty NetworkX graph
    G = nx.Graph()

    # Add nodes with features
    for i in range(num_nodes):
        node_features = {f"feat_{j}": x[i, j].item() for j in range(x.shape[1])}
        G.add_node(i, **node_features)

    # Add edges with attributes
    for j in range(edge_index.shape[1]):
        src, dst = edge_index[:, j].tolist()
        edge_attributes = {f"attr_{k}": edge_attr[j, k].item() for k in range(edge_attr.shape[1])}
        G.add_edge(src, dst, **edge_attributes)

    return G, y.item()

def balance_dataset(all_data, seed=42):
    if seed is not None:
        random.seed(seed)

    label_0_indices = [i for i, data in enumerate(all_data) if data.y.item() == 0]
    label_1_indices = [i for i, data in enumerate(all_data) if data.y.item() == 1]

    min_size_dataset = min(len(label_0_indices), len(label_1_indices))

    random.shuffle(label_0_indices)
    random.shuffle(label_1_indices)

    label_0_indices = label_0_indices[:min_size_dataset]
    label_1_indices = label_1_indices[:min_size_dataset]
    print(len(label_0_indices))
    print(len(label_1_indices))

    balanced_indices = label_0_indices + label_1_indices
    random.shuffle(balanced_indices)

    balanced_data = [all_data[i] for i in balanced_indices]

    return balanced_data

def compute_hope_embedding(G, d=128, beta=0.01):
    # Create adjacency matrix
    A = nx.to_numpy_array(G)

    # Compute the Katz similarity matrix
    I = np.eye(A.shape[0])
    S = np.linalg.inv(I - beta * A) - I

    # Ensure k is valid for SVD
    k = min(d // 2, min(S.shape) - 1)

    if k <= 0:
        # If k is invalid, return trivial embeddings
        return np.zeros((S.shape[0], d))

    # Compute SVD
    U, s, Vt = svds(S, k=k)
    S_sqrt = np.diag(np.sqrt(s))
    X1 = np.dot(U, S_sqrt)
    X2 = np.dot(Vt.T, S_sqrt)
    X = np.concatenate((X1, X2), axis=1)

    # Ensure the final embedding dimension matches d
    if X.shape[1] < d:
        X = np.pad(X, ((0, 0), (0, d - X.shape[1])), 'constant')
    elif X.shape[1] > d:
        X = X[:, :d]

    return X

# Generate and save embeddings for the balanced datasets
if __name__ == "__main__":
    all_graph_embeddings = []

    for i in range(1):  # Loop to generate embeddings from 5 different balanced datasets
        print(f'Generating embeddings for dataset {i + 1}')
        balanced_data = balance_dataset(all_data)

        for data in balanced_data:
            num_nodes = data.num_nodes
            edge_index = data.edge_index
            edge_attr = data.edge_attr
            x = data.x
            y = data.y

            # Create graph
            G, y_label = create_networkx_graph(edge_index, edge_attr, x, y, num_nodes)

            # Compute HOPE embeddings
            embeddings = compute_hope_embedding(G, d=128, beta=0.01)

            # Collect node embeddings and aggregate them to obtain a graph-level embedding
            graph_embedding = np.mean(embeddings, axis=0)  # Averaging node embeddings
            all_graph_embeddings.append((graph_embedding.tolist(), y_label))

    # Convert graph-level embeddings to DataFrame
    df_graph_embeddings = pd.DataFrame(all_graph_embeddings, columns=["embedding", "label"])

    # Save the DataFrame to a CSV file (optional)
    df_graph_embeddings.to_csv("graph_embeddings.csv", index=False)

    # Print the DataFrame
    print(df_graph_embeddings)


Training and evaluation of the Fully Connected model on the HOPE Embedding

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import ast


# Extract features and labels
def extract_features_labels(df):
    embeddings = []
    labels = df['label'].tolist()

    for emb in df['embedding']:
        if isinstance(emb, str):
            # Convert string representation of list back to list
            emb = ast.literal_eval(emb)
        embeddings.append(emb)

    X = np.array(embeddings)
    y = np.array(labels)
    return X, y

# Assuming df_graph_embeddings is already defined and loaded
X, y = extract_features_labels(df_graph_embeddings)

# Check if data is loaded correctly
if X.shape[0] == 0 or X.shape[1] == 0:
    raise ValueError("No valid embeddings found. Please check the DataFrame for formatting issues.")

# Define the neural network model
def create_model(input_dim):
    model = keras.Sequential([
        layers.Dense(128, activation='relu', input_shape=(input_dim,)),
        layers.Dropout(0.3),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Perform 5-fold cross-validation on the entire dataset
kf = KFold(n_splits=5, shuffle=True, random_state=42)
fold_no = 1
val_accuracies = []
test_accuracies = []
histories = []

roc_aucs = []
precisions = []
recalls = []
f1_scores = []

for train_index, test_index in kf.split(X):
    print(f'Fold {fold_no}')

    X_train_val, X_test = X[train_index], X[test_index]
    y_train_val, y_test = y[train_index], y[test_index]

    # Perform an additional split to get a validation set from the training+validation set
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42, stratify=y_train_val)

    # Create and train the neural network
    input_dim = X_train.shape[1]
    model = create_model(input_dim)
    history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_val, y_val), verbose=1)

    # Evaluate the model on the validation set
    val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=0)
    print(f'Fold {fold_no} Validation Accuracy: {val_accuracy:.4f}')
    val_accuracies.append(val_accuracy)
    histories.append(history.history)

    # Evaluate the model on the test set
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f'Fold {fold_no} Test Accuracy: {test_accuracy:.4f}')
    test_accuracies.append(test_accuracy)

    # Make predictions on the test set
    y_pred_proba = model.predict(X_test).flatten()
    y_pred = (y_pred_proba > 0.5).astype(int)

    # Compute and store metrics
    roc_auc = roc_auc_score(y_test, y_pred_proba)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    roc_aucs.append(roc_auc)
    precisions.append(precision)
    recalls.append(recall)
    f1_scores.append(f1)

    # Plot confusion matrix for each fold
    plot_confusion_matrix(y_test, y_test_pred, class_names=['Class 0', 'Class 1'])

    print(f'Fold {fold_no} ROC AUC: {roc_auc:.4f}')
    print(f'Fold {fold_no} Precision: {precision:.4f}')
    print(f'Fold {fold_no} Recall: {recall:.4f}')
    print(f'Fold {fold_no} F1-Score: {f1:.4f}')

    fold_no += 1

# Calculate and print the average metrics
average_val_accuracy = np.mean(val_accuracies)
variance_val_accuracy = np.var(val_accuracies)
print(f'Average Validation Accuracy: {average_val_accuracy:.4f}')
print(f'Variance of Validation Accuracy: {variance_val_accuracy:.4f}')

average_test_accuracy = np.mean(test_accuracies)
variance_test_accuracy = np.var(test_accuracies)
print(f'Average Test Accuracy: {average_test_accuracy:.4f}')
print(f'Variance of Test Accuracy: {variance_test_accuracy:.4f}')

average_roc_auc = np.mean(roc_aucs)
average_precision = np.mean(precisions)
average_recall = np.mean(recalls)
average_f1 = np.mean(f1_scores)

print(f'Average ROC AUC: {average_roc_auc:.4f}')
print(f'Average Precision: {average_precision:.4f}')
print(f'Average Recall: {average_recall:.4f}')
print(f'Average F1-Score: {average_f1:.4f}')

# Save the average metrics to a file
with open("model_accuracies.txt", "w") as f:
    f.write(f'Average Validation Accuracy: {average_val_accuracy:.4f}\n')
    f.write(f'Variance of Validation Accuracy: {variance_val_accuracy:.4f}\n')
    f.write(f'Average Test Accuracy: {average_test_accuracy:.4f}\n')
    f.write(f'Variance of Test Accuracy: {variance_test_accuracy:.4f}\n')
    f.write(f'Average ROC AUC: {average_roc_auc:.4f}\n')
    f.write(f'Average Precision: {average_precision:.4f}\n')
    f.write(f'Average Recall: {average_recall:.4f}\n')
    f.write(f'Average F1-Score: {average_f1:.4f}\n')

# Print model summary
model.summary()

# Save training history for each fold (optional)
with open("training_histories.txt", "w") as f:
    for i, history in enumerate(histories):
        f.write(f'\nFold {i + 1} History:\n')
        f.write(str(history))


#GNN

In [None]:
!pip install datasets
!pip install torch_geometric
!pip install ogb
!pip install rdkit

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Linear, Sequential, BatchNorm1d, ReLU, Dropout
from torch_geometric.nn import GCNConv, GINConv, global_mean_pool, global_add_pool, global_max_pool
from torch_geometric.loader import DataLoader
from ogb.graphproppred import PygGraphPropPredDataset
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import roc_curve, auc
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib import cm
import numpy as np
import random
import copy
import matplotlib
from rdkit import Chem
import pickle
import torch_geometric
import io  # Import io for in-memory file operations


Models Definitions

Cells to define:


*   GCN
*   GIN
*   GAT



In [None]:
class GCN(torch.nn.Module):
    def __init__(self, input_dim, hidden_channels):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.conv4 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, 2)  # Output will be 2 classes

        # Register hook on the last layer (conv3)
        self.hook_handle = self.conv4.register_forward_hook(self._store_gradients)

    def _store_gradients(self, module, input, output):
        # Store gradients during forward pass
        self.gradients = output


    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)
        x = x.relu()
        x = global_max_pool(x, batch)  # Global pooling
        x = self.lin(x)
        return x

In [None]:
import torch
from torch.nn import Linear
from torch_geometric.nn import GATConv, global_max_pool

class GAT(torch.nn.Module):
    def __init__(self, input_dim, hidden_channels):
        super(GAT, self).__init__()
        self.conv1 = GATConv(input_dim, hidden_channels, heads=1)
        self.conv2 = GATConv(hidden_channels, hidden_channels, heads=1)
        self.conv3 = GATConv(hidden_channels, hidden_channels, heads=1)
        self.conv4 = GATConv(hidden_channels, hidden_channels, heads=1)
        self.lin = Linear(hidden_channels, 2)  # Output will be 2 classes

        # Register hook on the last layer (conv3)
        self.hook_handle = self.conv4.register_forward_hook(self._store_gradients)

    def _store_gradients(self, module, input, output):
        # Store gradients during forward pass
        self.gradients = output

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)
        x = x.relu()
        x = global_max_pool(x, batch)  # Global pooling
        x = self.lin(x)
        return x

In [None]:
import torch
from torch.nn import Linear
from torch_geometric.nn import GINConv, global_max_pool

class GIN(torch.nn.Module):
    def __init__(self, input_dim, hidden_channels):
        super(GIN, self).__init__()
        self.conv1 = GINConv(Linear(input_dim, hidden_channels))
        self.conv2 = GINConv(Linear(hidden_channels, hidden_channels))
        self.conv3 = GINConv(Linear(hidden_channels, hidden_channels))
        self.conv4 = GINConv(Linear(hidden_channels, hidden_channels))
        self.lin = Linear(hidden_channels, 2)  # Output will be 2 classes

        # Register hook on the last layer (conv4)
        self.hook_handle = self.conv4.register_forward_hook(self._store_gradients)

    def _store_gradients(self, module, input, output):
        # Store gradients during forward pass
        self.gradients = output

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)
        x = x.relu()
        x = global_max_pool(x, batch)  # Global pooling
        x = self.lin(x)
        return x


Train,Validation and Test function definitions

In [None]:
def train(model, optimizer, train_loader):
    model.train()
    correct = 0
    total = 0
    for data in train_loader:
        optimizer.zero_grad()
        out = model(data.x.float(), data.edge_index, data.batch)
        loss = F.cross_entropy(out, data.y.view(-1).long())
        loss.backward()
        optimizer.step()

        # Calculate training accuracy
        pred = out.argmax(dim=1)
        correct += int((pred == data.y.view(-1)).sum())
        total += data.y.size(0)

    accuracy = correct / total
    return accuracy


def validate(model, loader):
    model.eval()
    correct = 0
    total = 0
    all_predictions = []
    all_labels = []
    with torch.no_grad():
        for data in loader:
            pred = model(data.x.float(), data.edge_index, data.batch)
            pred = pred.argmax(dim=1)
            correct += int((pred == data.y.view(-1)).sum())
            total += data.y.size(0)
            all_predictions.extend(pred.tolist())
            all_labels.extend(data.y.view(-1).tolist())

    accuracy = correct / total
    return accuracy, all_predictions, all_labels



def evaluate_roc_auc(model, loader):
    model.eval()
    all_predictions = []
    all_labels = []
    with torch.no_grad():
        for data in loader:
            pred = model(data.x.float(), data.edge_index, data.batch)
            all_predictions.extend(pred[:, 1].cpu().numpy())
            all_labels.extend(data.y.view(-1).cpu().numpy())

    fpr, tpr, _ = roc_curve(all_labels, all_predictions)
    roc_auc = auc(fpr, tpr)

    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.show()

    print('AUC:', roc_auc)
    return roc_auc

Methods for train and test the models:


*   k fold balance
*   k fold no balanced
*   scatterfold balanced
*   scatterfold no balanced

The balance is based on the idea to train the model on a balanced dataset (the same number of elements for classes) or no balanced dataset




In [None]:
def k_fold_balanced(data, num_classes, hidden_channels, iteration, k_folds, model_choosed):
    all_best_train_acc = []
    all_best_val_acc = []
    all_auc = []
    label_0_indices = []
    label_1_indices = []
    all_best_models = {'model':[], 'val_ACC': [], 'AUC':[]}

    for i in range(len(data)):
      if data[i].y == 0:
        label_0_indices.append(i)
      elif data[i].y == 1:
        label_1_indices.append(i)
    print(len(label_0_indices))
    print(len(label_1_indices))

    for iter in range(iteration):
      all_data = copy.deepcopy(data)
      print('iter:', iter)
      # Define cross-validation settings
      kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

      best_train_accuracies = []
      best_val_accuracies = []
      best_models = []

      # Collect evaluation metrics for all folds
      all_accuracies = []
      all_precisions = []
      all_recalls = []
      all_f1_scores = []

      min_size_dataset = min(len(label_0_indices), len(label_1_indices))

      random.shuffle(label_0_indices)
      random.shuffle(label_1_indices)
      label_0_indices = label_0_indices[:min_size_dataset]
      label_1_indices = label_1_indices[:min_size_dataset]
      print('size 0 data:' , len(label_0_indices))
      print('size 1 data:', len(label_1_indices))

      balanced_indices = label_0_indices + label_1_indices
      print('size total dataset:' , len(balanced_indices))

      balanced_data = [all_data[i] for i in balanced_indices]
      print(len(balanced_data))

      for btd in balanced_data:
       # Apply one-hot encoding for each column
        one_hot_encoded = [F.one_hot(btd.x[:, i].long(), num_classes[i]) for i in range(btd.x.shape[1])]
        one_hot_encoded_tensor = torch.cat(one_hot_encoded, dim=-1)
        btd.x = one_hot_encoded_tensor

      # Training loop with cross-validation
      for fold, (train_idx, val_idx) in enumerate(kf.split(balanced_indices)):
          print(f"Fold [{fold+1}/{k_folds}]")
          # Subset the data for this fold
          train_data = [balanced_data[i] for i in train_idx]
          val_data = [balanced_data[i] for i in val_idx]

          train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
          val_loader = DataLoader(val_data, batch_size=32, shuffle=False)

          total_features = sum(num_classes)
          # Define the model with the correct input dimension
          if model_choosed == 1:
            model = GCN(total_features, hidden_channels=hidden_channels)
          elif model_choosed == 2:
            model = GIN(total_features, hidden_channels=hidden_channels)
          elif model_choosed == 3:
            model = GAT(total_features, hidden_channels=hidden_channels)

          optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

          # Early stopping variables
          best_val_acc = 0.0
          best_train_acc = 0.0
          patience = 25  # Stop after no improvement for 25 epochs
          counter = 0

          # Collect evaluation metrics for this fold
          fold_accuracies = []
          fold_precisions = []
          fold_recalls = []
          fold_f1_scores = []

          # Training loop with validation and early stopping
          for epoch in range(1, 200):  # Train for maximum 100 epochs
              # Train the model and get train accuracy
              train_acc = train(model, optimizer, train_loader)

              # Validate the model
              val_acc, _, _ = validate(model, val_loader)

              print(f'Fold [{fold+1}/{k_folds}] Epoch: {epoch}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')

              # Check for improvement in validation accuracy
              if val_acc > best_val_acc:
                  best_val_acc = val_acc
                  best_train_acc = train_acc
                  counter = 0  # Reset counter
              else:
                  counter += 1  # No improvement, increase counter

              # If no improvement for "patience" number of epochs, stop training
              if counter >= patience:
                  print(f'Fold [{fold+1}/{k_folds}] Early stopping at epoch {epoch}')
                  break


          # Store the best train and val accuracies for this fold
          best_train_accuracies.append(best_train_acc)
          best_val_accuracies.append(best_val_acc)
          best_models.append(model)

          # Append metrics for this fold
          all_accuracies.append(fold_accuracies)
          all_precisions.append(fold_precisions)
          all_recalls.append(fold_recalls)
          all_f1_scores.append(fold_f1_scores)

          # Final evaluation on the test set
          test_acc, all_test_predictions, all_test_labels = validate(model, val_loader)

          print(f'Final Test Accuracy: {test_acc:.4f}')

          # Convert predictions and labels to numpy arrays
          all_test_predictions = np.array(all_test_predictions)
          all_test_labels = np.array(all_test_labels)

          # Plot confusion matrix
          plot_confusion_matrix(all_test_labels, all_test_predictions, class_names=['Class 0', 'Class 1'])


          # Calculate True Positives (TP), False Positives (FP),
          # True Negatives (TN), False Negatives (FN)
          TP = np.sum((all_test_predictions == 1) & (all_test_labels == 1))
          FP = np.sum((all_test_predictions == 1) & (all_test_labels == 0))
          TN = np.sum((all_test_predictions == 0) & (all_test_labels == 0))
          FN = np.sum((all_test_predictions == 0) & (all_test_labels == 1))

          print("True Positives (TP):", TP)
          print("False Positives (FP):", FP)
          print("True Negatives (TN):", TN)
          print("False Negatives (FN):", FN)

          # Calculate accuracy, precision, recall, and F1-score
          accuracy = (TP + TN) / (TP + FP + TN + FN)
          precision = TP / (TP + FP)
          recall = TP / (TP + FN)
          f1_score = 2 * (precision * recall) / (precision + recall)

          print("Accuracy:", accuracy)
          print("Precision:", precision)
          print("Recall:", recall)
          print("F1-Score:", f1_score)

          # Append metrics for this epoch
          fold_accuracies.append(val_acc)
          fold_precisions.append(precision)
          fold_recalls.append(recall)
          fold_f1_scores.append(f1_score)

          all_best_models['model'].append(model)
          all_best_models['val_ACC'].append(max(best_val_accuracies))
          all_best_models['AUC'].append(evaluate_roc_auc(model, val_loader))

      # Calculate median of best train and val accuracies
      median_best_train_acc = np.median(best_train_accuracies)
      median_best_val_acc = np.median(best_val_accuracies)
      all_best_train_acc.append(median_best_train_acc)
      all_best_val_acc.append(median_best_val_acc)

      print("\nMedian of Best Train Accuracies:", median_best_train_acc)
      print("Median of Best Validation Accuracies:", median_best_val_acc)

      # Compare median validation accuracies to choose the better model
      if median_best_val_acc == max(median_best_train_acc, median_best_val_acc):
          best_model_idx = best_val_accuracies.index(max(best_val_accuracies))
      else:
          best_model_idx = best_train_accuracies.index(max(best_train_accuracies))

      best_model = best_models[best_model_idx]

      # Calculate median of all metrics across all folds
      median_accuracy = np.median(np.concatenate(all_accuracies))
      median_precision = np.median(np.concatenate(all_precisions))
      median_recall = np.median(np.concatenate(all_recalls))
      median_f1_score = np.median(np.concatenate(all_f1_scores))

      print("\nMedian of All Accuracies:", median_accuracy)
      print("Median of All Precisions:", median_precision)
      print("Median of All Recalls:", median_recall)
      print("Median of All F1-Scores:", median_f1_score)
      print('\n')

    return all_best_models

In [None]:
def k_fold_no_balanced(data, num_classes, hidden_channels, iteration, k_folds, model_choosed):
    all_best_train_acc = []
    all_best_val_acc = []
    all_auc = []
    label_0_indices = []
    label_1_indices = []
    all_best_models = {'model':[], 'val_ACC': [], 'AUC':[]}

    for i in range(len(data)):
      if data[i].y == 0:
        label_0_indices.append(i)
      elif data[i].y == 1:
        label_1_indices.append(i)
    print(len(label_0_indices))
    print(len(label_1_indices))

    for iter in range(iteration):
      all_data = copy.deepcopy(data)
      print('iter:', iter)
      # Define cross-validation settings
      kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

      best_train_accuracies = []
      best_val_accuracies = []
      best_models = []

      # Collect evaluation metrics for all folds
      all_accuracies = []
      all_precisions = []
      all_recalls = []
      all_f1_scores = []

      min_size_dataset = min(len(label_0_indices), len(label_1_indices))

      random.shuffle(label_0_indices)
      random.shuffle(label_1_indices)
      label_0_indices = label_0_indices[:]
      label_1_indices = label_1_indices[:]
      print('size 0 data:' , len(label_0_indices))
      print('size 1 data:', len(label_1_indices))

      no_balanced_indices = label_0_indices + label_1_indices
      print('size total dataset:' , len(no_balanced_indices))

      no_balanced_data = [all_data[i] for i in no_balanced_indices]
      print(len(no_balanced_data))

      for btd in no_balanced_data:
       # Apply one-hot encoding for each column
        one_hot_encoded = [F.one_hot(btd.x[:, i].long(), num_classes[i]) for i in range(btd.x.shape[1])]
        one_hot_encoded_tensor = torch.cat(one_hot_encoded, dim=-1)
        btd.x = one_hot_encoded_tensor

      # Training loop with cross-validation
      for fold, (train_idx, val_idx) in enumerate(kf.split(no_balanced_indices)):
          print(f"Fold [{fold+1}/{k_folds}]")
          # Subset the data for this fold
          train_data = [no_balanced_data[i] for i in train_idx]
          val_data = [no_balanced_data[i] for i in val_idx]

          train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
          val_loader = DataLoader(val_data, batch_size=32, shuffle=False)

          total_features = sum(num_classes)
          # Define the model with the correct input dimension
          if model_choosed == 1:
            model = GCN(total_features, hidden_channels=hidden_channels)
          elif model_choosed == 2:
            model = GIN(total_features, hidden_channels=hidden_channels)
          elif model_choosed == 3:
            model = GAT(total_features, hidden_channels=hidden_channels)

          optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

          # Early stopping variables
          best_val_acc = 0.0
          best_train_acc = 0.0
          patience = 25  # Stop after no improvement for 25 epochs
          counter = 0

          # Collect evaluation metrics for this fold
          fold_accuracies = []
          fold_precisions = []
          fold_recalls = []
          fold_f1_scores = []

          # Training loop with validation and early stopping
          for epoch in range(1, 200):  # Train for maximum 100 epochs
              # Train the model and get train accuracy
              train_acc = train(model, optimizer, train_loader)

              # Validate the model
              val_acc, _, _ = validate(model, val_loader)

              print(f'Fold [{fold+1}/{k_folds}] Epoch: {epoch}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')

              # Check for improvement in validation accuracy
              if val_acc > best_val_acc:
                  best_val_acc = val_acc
                  best_train_acc = train_acc
                  counter = 0  # Reset counter
              else:
                  counter += 1  # No improvement, increase counter

              # If no improvement for "patience" number of epochs, stop training
              if counter >= patience:
                  print(f'Fold [{fold+1}/{k_folds}] Early stopping at epoch {epoch}')
                  break


          # Store the best train and val accuracies for this fold
          best_train_accuracies.append(best_train_acc)
          best_val_accuracies.append(best_val_acc)
          best_models.append(model)

          # Append metrics for this fold
          all_accuracies.append(fold_accuracies)
          all_precisions.append(fold_precisions)
          all_recalls.append(fold_recalls)
          all_f1_scores.append(fold_f1_scores)

          # Final evaluation on the test set
          test_acc, all_test_predictions, all_test_labels = validate(model, val_loader)

          print(f'Final Test Accuracy: {test_acc:.4f}')

          # Convert predictions and labels to numpy arrays
          all_test_predictions = np.array(all_test_predictions)
          all_test_labels = np.array(all_test_labels)

          # Plot confusion matrix
          plot_confusion_matrix(all_test_labels, all_test_predictions, class_names=['Class 0', 'Class 1'])


          # Calculate True Positives (TP), False Positives (FP),
          # True Negatives (TN), False Negatives (FN)
          TP = np.sum((all_test_predictions == 1) & (all_test_labels == 1))
          FP = np.sum((all_test_predictions == 1) & (all_test_labels == 0))
          TN = np.sum((all_test_predictions == 0) & (all_test_labels == 0))
          FN = np.sum((all_test_predictions == 0) & (all_test_labels == 1))

          print("True Positives (TP):", TP)
          print("False Positives (FP):", FP)
          print("True Negatives (TN):", TN)
          print("False Negatives (FN):", FN)

          # Calculate accuracy, precision, recall, and F1-score
          accuracy = (TP + TN) / (TP + FP + TN + FN)
          precision = TP / (TP + FP)
          recall = TP / (TP + FN)
          f1_score = 2 * (precision * recall) / (precision + recall)

          print("Accuracy:", accuracy)
          print("Precision:", precision)
          print("Recall:", recall)
          print("F1-Score:", f1_score)

          # Append metrics for this epoch
          fold_accuracies.append(val_acc)
          fold_precisions.append(precision)
          fold_recalls.append(recall)
          fold_f1_scores.append(f1_score)

          all_best_models['model'].append(model)
          all_best_models['val_ACC'].append(max(best_val_accuracies))
          all_best_models['AUC'].append(evaluate_roc_auc(model, val_loader))

      # Calculate median of best train and val accuracies
      median_best_train_acc = np.median(best_train_accuracies)
      median_best_val_acc = np.median(best_val_accuracies)
      all_best_train_acc.append(median_best_train_acc)
      all_best_val_acc.append(median_best_val_acc)

      print("\nMedian of Best Train Accuracies:", median_best_train_acc)
      print("Median of Best Validation Accuracies:", median_best_val_acc)

      # Compare median validation accuracies to choose the better model
      if median_best_val_acc == max(median_best_train_acc, median_best_val_acc):
          best_model_idx = best_val_accuracies.index(max(best_val_accuracies))
      else:
          best_model_idx = best_train_accuracies.index(max(best_train_accuracies))

      best_model = best_models[best_model_idx]

      # Calculate median of all metrics across all folds
      median_accuracy = np.median(np.concatenate(all_accuracies))
      median_precision = np.median(np.concatenate(all_precisions))
      median_recall = np.median(np.concatenate(all_recalls))
      median_f1_score = np.median(np.concatenate(all_f1_scores))

      print("\nMedian of All Accuracies:", median_accuracy)
      print("Median of All Precisions:", median_precision)
      print("Median of All Recalls:", median_recall)
      print("Median of All F1-Scores:", median_f1_score)
      print('\n')

    return all_best_models

In [None]:
def no_balanced_scatterfold(data, num_classes, hidden_channels,model_choosed):
    all_best_train_acc = []
    all_best_val_acc = []
    all_auc = []
    all_best_models = {'model':[], 'val_ACC': [], 'AUC':[]}

    for iter in range(1):
        all_data = copy.deepcopy(data)
        print('iter:', iter)
        print(all_data[1].x.shape[1])
        balanced_data = [all_data[i] for i in range(len(all_data))]
        # One-hot encode the features
        for btd in balanced_data:
            one_hot_encoded = [F.one_hot(btd.x[:, i].long(), num_classes[i]) for i in range(btd.x.shape[1])]
            one_hot_encoded_tensor = torch.cat(one_hot_encoded, dim=-1)
            btd.x = one_hot_encoded_tensor

        # Split the data into training (80%), validation (20%), and test (20%) sets
        train_val_data, test_data = train_test_split(balanced_data, test_size=0.2, random_state=42)
        train_data, val_data = train_test_split(train_val_data, test_size=0.25, random_state=42) # 0.25 * 0.8 = 0.2

        train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
        test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

        total_features = sum(num_classes)
        if model_choosed == 1:
            model = GCN(total_features, hidden_channels=hidden_channels)
        elif model_choosed == 2:
            model = GIN(total_features, hidden_channels=hidden_channels)
        elif model_choosed == 3:
            model = GAT(total_features, hidden_channels=hidden_channels)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

        best_val_acc = 0.0
        best_train_acc = 0.0
        patience = 25
        counter = 0

        for epoch in range(1, 200):
            train_acc = train(model, optimizer, train_loader)
            val_acc, _, _ = validate(model, val_loader)

            print(f'Epoch: {epoch}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')

            if val_acc > best_val_acc:
                best_val_acc = val_acc
                best_train_acc = train_acc
                counter = 0
            else:
                counter += 1

            if counter >= patience:
                print(f'Early stopping at epoch {epoch}')
                break

        test_acc, all_test_predictions, all_test_labels = validate(model, test_loader)

        print(f'Final Test Accuracy: {test_acc:.4f}')

        all_test_predictions = np.array(all_test_predictions)
        all_test_labels = np.array(all_test_labels)

        # Plot confusion matrix
        plot_confusion_matrix(all_test_labels, all_test_predictions, class_names=['Class 0', 'Class 1'])


        TP = np.sum((all_test_predictions == 1) & (all_test_labels == 1))
        FP = np.sum((all_test_predictions == 1) & (all_test_labels == 0))
        TN = np.sum((all_test_predictions == 0) & (all_test_labels == 0))
        FN = np.sum((all_test_predictions == 0) & (all_test_labels == 1))

        accuracy = (TP + TN) / (TP + FP + TN + FN)
        precision = TP / (TP + FP)
        recall = TP / (TP + FN)
        f1_score = 2 * (precision * recall) / (precision + recall)

        print("Accuracy:", accuracy)
        print("Precision:", precision)
        print("Recall:", recall)
        print("F1-Score:", f1_score)

        all_best_models['model'].append(model)
        all_best_models['val_ACC'].append(best_val_acc)
        all_best_models['AUC'].append(evaluate_roc_auc(model, val_loader))

        all_best_train_acc.append(best_train_acc)
        all_best_val_acc.append(best_val_acc)

    return all_best_models

In [None]:
def balanced_scatterfold(data, num_classes, hidden_channels,model_choosed):
    all_best_train_acc = []
    all_best_val_acc = []
    all_auc = []
    all_best_models = {'model':[], 'val_ACC': [], 'AUC':[]}
    label_0_indices = []
    label_1_indices = []
    for i in range(len(data)):
      if data[i].y == 0:
        label_0_indices.append(i)
      elif data[i].y == 1:
        label_1_indices.append(i)

    for iter in range(1):
      all_data = copy.deepcopy(data)

      print('iter:', iter)
      print(all_data[1].x.shape[1])
      min_size_dataset = min(len(label_0_indices), len(label_1_indices))

      random.shuffle(label_0_indices)
      random.shuffle(label_1_indices)
      label_0_indices = label_0_indices[:min_size_dataset]
      label_1_indices = label_1_indices[:min_size_dataset]
      print('size 0 data:' , len(label_0_indices))
      print('size 1 data:', len(label_1_indices))

      balanced_indices = label_0_indices + label_1_indices
      print('size total dataset:' , len(balanced_indices))

      balanced_data = [all_data[i] for i in balanced_indices]
      print(len(balanced_data))
        #balanced_data = [all_data[i] for i in range(len(all_data))]
        # One-hot encode the features
      for btd in balanced_data:
            one_hot_encoded = [F.one_hot(btd.x[:, i].long(), num_classes[i]) for i in range(btd.x.shape[1])]
            one_hot_encoded_tensor = torch.cat(one_hot_encoded, dim=-1)
            btd.x = one_hot_encoded_tensor

      # Split the data into training (80%), validation (20%), and test (20%) sets
      train_val_data, test_data = train_test_split(balanced_data, test_size=0.2, random_state=42)
      train_data, val_data = train_test_split(train_val_data, test_size=0.25, random_state=42) # 0.25 * 0.8 = 0.2

      train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
      val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
      test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

      total_features = sum(num_classes)
      if model_choosed == 1:
        model = GCN(total_features, hidden_channels=hidden_channels)
      elif model_choosed == 2:
        model = GIN(total_features, hidden_channels=hidden_channels)
      elif model_choosed == 3:
            model = GAT(total_features, hidden_channels=hidden_channels)
      optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

      best_val_acc = 0.0
      best_train_acc = 0.0
      patience = 25
      counter = 0

      for epoch in range(1, 200):
        train_acc = train(model, optimizer, train_loader)
        val_acc, _, _ = validate(model, val_loader)

        print(f'Epoch: {epoch}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')

        if val_acc > best_val_acc:
          best_val_acc = val_acc
          best_train_acc = train_acc
          counter = 0
        else:
          counter += 1

        if counter >= patience:
          print(f'Early stopping at epoch {epoch}')
          break

      test_acc, all_test_predictions, all_test_labels = validate(model, test_loader)

      print(f'Final Test Accuracy: {test_acc:.4f}')

      all_test_predictions = np.array(all_test_predictions)
      all_test_labels = np.array(all_test_labels)

      # Plot confusion matrix
      plot_confusion_matrix(all_test_labels, all_test_predictions, class_names=['Class 0', 'Class 1'])


      TP = np.sum((all_test_predictions == 1) & (all_test_labels == 1))
      FP = np.sum((all_test_predictions == 1) & (all_test_labels == 0))
      TN = np.sum((all_test_predictions == 0) & (all_test_labels == 0))
      FN = np.sum((all_test_predictions == 0) & (all_test_labels == 1))

      accuracy = (TP + TN) / (TP + FP + TN + FN)
      precision = TP / (TP + FP)
      recall = TP / (TP + FN)
      f1_score = 2 * (precision * recall) / (precision + recall)

      print("Accuracy:", accuracy)
      print("Precision:", precision)
      print("Recall:", recall)
      print("F1-Score:", f1_score)

      all_best_models['model'].append(model)
      all_best_models['val_ACC'].append(best_val_acc)
      all_best_models['AUC'].append(evaluate_roc_auc(model, val_loader))

      all_best_train_acc.append(best_train_acc)
      all_best_val_acc.append(best_val_acc)

    return all_best_models

Evaluation of the models with extraction of the best model

In [None]:
def extract_metrics_and_best_model(all_best_models):
  max_auc_index = all_best_models['AUC'].index(max(all_best_models['AUC']))
  # Print the index and corresponding AUC value
  print("Index of Max AUC:", max_auc_index)
  print("Max AUC Value:", max(all_best_models['AUC']))
  mean_auc = np.mean(all_best_models['AUC'])
  std_auc = np.std(all_best_models['AUC'])

  # Print the results
  print("Mean AUC Value:", mean_auc)
  print("Standard Deviation AUC:", std_auc)

  best_model = all_best_models['model'][max_auc_index]

  return best_model

Molecular Plotting with the node importance calculated using the gradient from the model



In [None]:
import copy
import io
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
from PIL import Image
from rdkit.Chem import Draw
from rdkit import Chem
from IPython.display import display, Image as IPImage

def plot_graph_feature_importance(data, num_classes, best_model):
    tp = Chem.GetPeriodicTable()
    cp_data = copy.deepcopy(data)
    best_model.eval()
    elements = {}
    for i in range(data.num_nodes):
        first_element_int = int(data.x[i][0].item())
        elements[i] = tp.GetElementSymbol(first_element_int)

    one_hot_encoded = [F.one_hot(cp_data.x[:, j].long(), num_classes[j]) for j in range(cp_data.x.shape[1])]
    one_hot_encoded_tensor = torch.cat(one_hot_encoded, dim=-1)
    cp_data.x = one_hot_encoded_tensor

    # Forward pass to get the model output
    output = best_model(cp_data.x.float(), cp_data.edge_index, cp_data.batch)

    # Compute node importance scores
    node_importance = best_model.gradients.norm(dim=1)

    # Convert PyTorch tensor to numpy array for visualization
    node_importance_np = node_importance.detach().numpy()

    # Print node importance scores
    #print("Node importance scores (raw):", node_importance_np)

    # Normalize importance scores for coloring
    max_importance = max(node_importance_np) if len(node_importance_np) > 0 else 1
    min_importance = min(node_importance_np) if len(node_importance_np) > 0 else 0
    normalized_importance = (node_importance_np - min_importance) / (max_importance - min_importance + 1e-6)  # Add small epsilon to avoid division by zero

    # Print normalized importance scores
    #print("Node importance scores (normalized):", normalized_importance)

    # Create a NetworkX graph from edge_index
    G = nx.Graph()
    G.add_edges_from(cp_data.edge_index.T.numpy())

    # Create a mapping from node indices to their importance scores
    node_importance_dict = {i: importance for i, importance in enumerate(normalized_importance)}

    '''
    # Define colors based on node importance (red scale)
    node_colors = [node_importance_dict[i] for i in range(len(G.nodes))]
    cmap = matplotlib.colormaps['Reds']

    # Determine node size based on importance (larger for higher importance)
    node_sizes = [2000 * importance for importance in normalized_importance]

    # Draw the graph with node labels and adjusted node sizes
    plt.figure(figsize=(10, 7))
    pos = nx.spring_layout(G, seed=42)  # Positions for all nodes
    nodes = nx.draw_networkx_nodes(G, pos, node_color=node_colors, cmap=cmap, node_size=node_sizes)
    edges = nx.draw_networkx_edges(G, pos, edgelist=G.edges, alpha=0.5)
    node_labels = nx.draw_networkx_labels(G, pos, labels=elements, font_size=10, font_color='white')
    for _, text in node_labels.items():
        text.set_bbox(dict(facecolor='none', edgecolor='none'))  # No box around labels
    plt.colorbar(nodes, label='Node Importance', orientation='vertical')
    plt.title('Graph with Node Importance')
    plt.axis('off')
    plt.show()
    '''
    return elements, node_importance_dict

def visualize_molecular_graph(smiles, elements, node_importance_dict):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        raise ValueError("Invalid SMILES string")

    # Create a list of atom importance scores
    atom_importance = []
    for atom in mol.GetAtoms():
        idx = atom.GetIdx()
        atom_importance.append(node_importance_dict[idx] if idx in node_importance_dict else 0)

    # Normalize importance scores for coloring
    max_importance = max(atom_importance) if atom_importance else 1
    min_importance = min(atom_importance) if atom_importance else 0
    normalized_importance = [(score - min_importance) / (max_importance - min_importance + 1e-6) for score in atom_importance]

    '''
    # Print atom importance scores
    print("Atom importance scores (raw):", atom_importance)
    print("Atom importance scores (normalized):", normalized_importance)
    '''
    # Define the color map and normalize
    cmap = plt.get_cmap('Reds')
    norm = mcolors.Normalize(vmin=0, vmax=1)

    # Create a dictionary of highlight colors for atoms
    highlight_atom_colors = {}
    for i, atom in enumerate(mol.GetAtoms()):
        color = cmap(norm(normalized_importance[i]))
        highlight_atom_colors[atom.GetIdx()] = (float(color[0]), float(color[1]), float(color[2]))

    # Draw the molecule with highlighted atoms
    drawer = Draw.MolDraw2DCairo(300, 300)
    opts = drawer.drawOptions()
    for i, atom in enumerate(mol.GetAtoms()):
        opts.atomLabels[atom.GetIdx()] = elements[atom.GetIdx()]

    drawer.DrawMolecule(mol, highlightAtoms=[atom.GetIdx() for atom in mol.GetAtoms()], highlightAtomColors=highlight_atom_colors)
    drawer.FinishDrawing()

    # Convert to PIL image
    img_data = drawer.GetDrawingText()
    img = Image.open(io.BytesIO(img_data))

    # Display image in Colab
    display(IPImage(img_data))

    return img

plot confusion matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_pred, class_names=None):
    """
    Plots the confusion matrix.

    Parameters:
    y_true (list or array): True labels.
    y_pred (list or array): Predicted labels.
    class_names (list): List of class names. If None, integer labels are used.
    """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

# Example usage:
# y_true = [0, 1, 1, 0, 1, 0]
# y_pred = [0, 1, 0, 0, 1, 1]
# class_names = ['Class 0', 'Class 1']
# plot_confusion_matrix(y_true, y_pred, class_names)


> **MAIN**

load HIV dataset from OGBG

In [None]:
#graphs-datasets/PROTEINS
from ogb.graphproppred import PygGraphPropPredDataset
dataset = PygGraphPropPredDataset(name='ogbg-molhiv')
#qua sta anche clintox e altri vedere bene documntazione però il codice funziona anche con uun proprio dataset, però il clintox ha la y diversa dalla nostra quindi in molti casi conviene avere a che fare con db propri, dove settiamo noi la y per bene e usiamo lo script SMILEs-MOL-OGB
data = dataset[:]

load dataset from local or drive

In [None]:
with open('/content/exp_datasets/clintox.pt', 'rb') as f:
    all_data = pickle.load(f)

Dataset in csv format can be found into the *Platform/datasets/* folder

In [None]:
import pandas as pd
df = pd.read_csv('/content/clintox_dataset.csv')
smiles_list = df['smiles'].tolist()

In [None]:
num_classes = [119, 5, 12, 12, 9, 6, 6, 2, 2]
hidden_channels = 70
#you can choose between: k_fold_balanced,k_fold_no_balance,balanced_scatterfold,no_balanced_scatterfold
all_best_models = balanced_scatterfold(data, num_classes, hidden_channels,3)
best_model = extract_metrics_and_best_model(all_best_models)

To depict the molecule with the node importance is important to extract the SMILES string in order to use the model to extract the *node_importance_dict*

For this reason the dataset in csv format that is possible to find in *Platform/datasets* can be used to extract the SMILES list

In [None]:
data_index = 16
smiles = smiles_list[data_index]
elements, node_importance_dict = plot_graph_feature_importance(data[data_index], num_classes, best_model)
img = visualize_molecular_graph(smiles, elements, node_importance_dict)