In [1]:
import pandas as pd
import numpy as np
import os

import torch
from torch_geometric.data import Data, InMemoryDataset
from torch_geometric.loader import DataLoader


from torch.nn import Linear, Parameter
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool, MessagePassing
from torch_geometric.utils import add_self_loops, degree


from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, confusion_matrix
)

In [2]:
class MyOwnDataset(InMemoryDataset):
    def __init__(self, root, transform=None, pre_transform=None):
        super().__init__(root, transform, pre_transform)
        self.data, self.slices = torch.load(self.processed_paths[0], weights_only=False)

    @property
    def raw_file_names(self):
        return []

    @property
    def processed_file_names(self):
        return ['data.pt']

    def process(self):
        import os, torch
        import pandas as pd
        from torch_geometric.data import Data

        data_list = []
        root_dir = 'edkgdl_all_data'
        node_dir = 'Graph_index.txt'
        edge_direct_dir = 'Graph_edge_index_direct.txt'
        label_dir = 'Graph_label.txt'

        n = len([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])

        b = pd.read_csv(os.path.join(root_dir, label_dir), header=None)

        for i in range(n):
            file_dir = str(i)
            node_path = os.path.join(root_dir, file_dir, node_dir)
            edge_path = os.path.join(root_dir, file_dir, edge_direct_dir)

            # nodes
            y = pd.read_csv(node_path, header=None)
            x = torch.tensor(y.values, dtype=torch.float)

            # edges
            a = pd.read_csv(edge_path, header=None)
            edge_index = torch.tensor(a.iloc[:, 0:2].T.values, dtype=torch.long)
            edge_attr = torch.tensor(a.iloc[:, 2:].values, dtype=torch.float)

            # label
            label = torch.tensor(b.iloc[i, 1], dtype=torch.long).reshape(1)

            data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=label)
            data_list.append(data)

        if self.pre_filter is not None:
            data_list = [d for d in data_list if self.pre_filter(d)]
        if self.pre_transform is not None:
            data_list = [self.pre_transform(d) for d in data_list]

        data, slices = self.collate(data_list)
        torch.save((data, slices), self.processed_paths[0])

In [3]:
dataset = MyOwnDataset("EDKG-DL_data")

print()
print(f'Dataset: {dataset}:')
print('==========================')
print(f'Number of graphs: {len(dataset)}')
print(f'Number of features: {dataset.num_features}')
print(f'Number of classes: {dataset.num_classes}')

data = dataset[0] # get the first graph object

print()
print(data)
print('==========================================================')

# gather some statistics about the first graph
print(f'Number of nodes: {data.num_nodes}')
print(f'Number of edges: {data.num_edges}')
print(f'Average node degree: {data.num_edges / data.num_nodes:.2f}')
print(f'Has isolated nodes: {data.has_isolated_nodes()}')
print(f'Has self-loops: {data.has_self_loops()}')
print(f'Is undirected: {data.is_undirected()}')

Processing...



Dataset: MyOwnDataset(903):
Number of graphs: 903
Number of features: 2
Number of classes: 2

Data(x=[74, 2], edge_index=[2, 657], edge_attr=[657, 2], y=[1])
Number of nodes: 74
Number of edges: 657
Average node degree: 8.88
Has isolated nodes: True
Has self-loops: False
Is undirected: False


Done!


In [4]:
class GCNConvEdge(MessagePassing):
    def __init__(self, in_channels, out_channels, edge_channels):
        super().__init__(aggr='add')  # "Add" aggregation (Step 5).
        self.lin = Linear(in_channels, out_channels, bias=False)
        self.bias = Parameter(torch.empty(2 * out_channels))
        self.lin_edge = Linear(edge_channels, out_channels, bias=False)

        self.reset_parameters()

    def reset_parameters(self):
        self.lin.reset_parameters()
        self.bias.data.zero_()
        self.lin_edge.reset_parameters()

    def forward(self, x, edge_index, ex):
        # x has shape [N, in_channels]
        # edge_index has shape [2, E]

        # Step 1: Add self-loops to the adjacency matrix.
        edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))

        # Step 2: Linearly transform node feature matrix.
        x = self.lin(x)
        ex = self.lin_edge(ex)
        extended_ex = torch.cat([ex, torch.zeros([x.shape[0],ex.shape[1]], device=ex.device, dtype=ex.dtype)], dim=0)

        # Step 3: Compute normalization.
        row, col = edge_index
        deg = degree(col, x.size(0), dtype=x.dtype)
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
        norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]

        # Step 4-5: Start propagating messages.
        out = self.propagate(edge_index, x=x, norm=norm, ex=extended_ex)

        # Step 6: Apply a final bias vector.
        out += self.bias

        return out, ex

    def message(self, x_j, norm, ex):
        # x_j has shape [E, out_channels]

        # Step 4: Normalize node features.
        return norm.view(-1, 1) * torch.cat([x_j, ex], dim=1)
    
# model 
class GCN(torch.nn.Module):
    def __init__(self, hidden_channels_1, hidden_channels_2, hidden_channels_3):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConvEdge(dataset.num_node_features, hidden_channels_1, dataset.num_edge_features)
        self.conv2 = GCNConvEdge(2 * hidden_channels_1, hidden_channels_2, hidden_channels_1)
        self.conv3 = GCNConvEdge(2 * hidden_channels_2, hidden_channels_3, hidden_channels_2)
        self.lin = Linear(2 * hidden_channels_3, dataset.num_classes)

    def forward(self, x, edge_index, batch, edge_attr):
        # 1. obtain node embeddings and edge embeddings
        x, ex = self.conv1(x, edge_index, edge_attr)
        x = x.relu()
        x, ex = self.conv2(x, edge_index, ex)
        x = x.relu()
        x, _ = self.conv3(x, edge_index, ex)

        # 2. readout layer
        x = global_mean_pool(x, batch) # [batch_size, hidden_channels]

        # 3. apply a final classifier 
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)

        return x

In [5]:
dataset = MyOwnDataset("EDKG-DL_data")
test_loader = DataLoader(dataset, batch_size=64, shuffle=False)

In [6]:
batch_size = 50
hidden_size1 = 50
hidden_size2 = 20
hidden_size3 = 60

model = GCN(hidden_channels_1=hidden_size1, hidden_channels_2=hidden_size2, hidden_channels_3=hidden_size3)

state_dict = torch.load("model_state.pt", map_location="cuda:0")
model.load_state_dict(state_dict)

<All keys matched successfully>

In [7]:
def test(loader):
    model.eval()
    TP = 0
    TN = 0
    FP = 0
    FN = 0
    correct = 0
    for data in loader:
        data = data
        labels = data.y.squeeze()
        out = model(data.x, data.edge_index, data.batch, data.edge_attr)
        pred = out.argmax(dim=1)
        for i in range(len(labels)):
            if pred[i] == 1 and labels[i] == 1: # TruePositive
                TP = TP + 1
            elif pred[i] == 0 and labels[i] == 0: # TrueNegative
                TN = TN + 1
            elif pred[i] == 1 and labels[i] == 0: # FalsePositive
                FP = FP + 1
            else: # FalseNegative
                FN = FN + 1
        acc = (TP + TN) / (FN + FP + TP + TN)
    return TP, TN, FP, FN, acc

In [8]:
model.eval()
TP, TN, FP, FN, acc = test(test_loader)
acc = (TP + TN)/(TP + TN + FP + FN)
if TP + FP > 0:
    pre = TP/ (TP + FP)
else:
    pre = 0
if TP + FN > 0:
    recall = TP / (TP + FN)
else:
    recall = 0
if pre + recall > 0:
    F1 = 2 * (pre * recall) / (pre + recall)
else:
    F1 = 0

print(f'Test TruePositives: {TP}')
print(f'Test TrueNegatives: {TN}')
print(f'Test FalsePositives: {FP}')
print(f'Test FalseNegatives: {FN}')
print(f'Test Accuracy: {acc}')
print(f'Test Precision: {pre}')
print(f'Test Recall: {recall}')
print(f'Test F1-score: {F1}')

Test TruePositives: 77
Test TrueNegatives: 806
Test FalsePositives: 10
Test FalseNegatives: 10
Test Accuracy: 0.9778516057585825
Test Precision: 0.8850574712643678
Test Recall: 0.8850574712643678
Test F1-score: 0.8850574712643678


In [9]:
@torch.no_grad()
def evaluate_binary(model, loader, device=None, threshold=0.5, average='binary'):
    model.eval()
    if device is None:
        try:
            device = next(model.parameters()).device
        except StopIteration:
            device = torch.device('cpu')

    y_true, y_prob, y_pred = [], [], []

    for data in loader:
        data = data.to(device)
        out = model(data.x, data.edge_index, data.batch, data.edge_attr)

        if out.ndim == 1 or out.shape[1] == 1:
            p1 = torch.sigmoid(out.view(-1))
            pred = (p1 >= threshold).long()
        else:
            probs = torch.softmax(out, dim=1)
            p1 = probs[:, 1]
            pred = probs.argmax(dim=1)

        labels = data.y.view(-1).to(out.device).long()

        y_true.append(labels.detach().cpu().numpy())
        y_prob.append(p1.detach().cpu().numpy())
        y_pred.append(pred.detach().cpu().numpy())

    y_true = np.concatenate(y_true, axis=0)
    y_prob = np.concatenate(y_prob, axis=0)
    y_pred = np.concatenate(y_pred, axis=0)

    acc = accuracy_score(y_true, y_pred)
    pre = precision_score(y_true, y_pred, zero_division=0, average=average)
    rec = recall_score(y_true, y_pred, zero_division=0, average=average)
    f1  = f1_score(y_true, y_pred, zero_division=0, average=average)

    cm = confusion_matrix(y_true, y_pred, labels=[0,1])
    TN, FP, FN, TP = cm.ravel()

    try:
        roc_auc = roc_auc_score(y_true, y_prob)
    except Exception:
        roc_auc = None
    try:
        pr_auc = average_precision_score(y_true, y_prob)
    except Exception:
        pr_auc = None

    return {
        "TP": int(TP), "TN": int(TN), "FP": int(FP), "FN": int(FN),
        "accuracy": float(acc),
        "precision": float(pre),
        "recall": float(rec),
        "f1": float(f1),
        "roc_auc": None if roc_auc is None else float(roc_auc),
        "pr_auc": None if pr_auc is None else float(pr_auc),
        "y_true": y_true,   
        "y_prob": y_prob,   
        "y_pred": y_pred  
    }

In [10]:
model.eval()
metrics = evaluate_binary(model, test_loader)

print(f"TP: {metrics['TP']}")
print(f"TN: {metrics['TN']}")
print(f"FP: {metrics['FP']}")
print(f"FN: {metrics['FN']}")
print(f"Accuracy:  {metrics['accuracy']:.4f}")
print(f"Precision: {metrics['precision']:.4f}")
print(f"Recall:    {metrics['recall']:.4f}")
print(f"F1-score:  {metrics['f1']:.4f}")
print(f"ROC-AUC:   {metrics['roc_auc'] if metrics['roc_auc'] is not None else 'N/A'}")
print(f"PR-AUC:    {metrics['pr_auc'] if metrics['pr_auc'] is not None else 'N/A'}")

TP: 77
TN: 806
FP: 10
FN: 10
Accuracy:  0.9779
Precision: 0.8851
Recall:    0.8851
F1-score:  0.8851
ROC-AUC:   0.9865055217489295
PR-AUC:    0.8643557947189637
