In [13]:
#!pip uninstall -y torch-scatter torch-sparse torch-cluster torch-spline-conv torch-geometric

In [14]:
import numpy as np

In [15]:
import torch
print(torch.__version__, torch.version.cuda)

2.8.0+cu126 12.6


In [16]:
!pip install torch-scatter -f https://data.pyg.org/whl/torch-2.8.0+cu126.html
!pip install torch-sparse -f https://data.pyg.org/whl/torch-2.8.0+cu126.html
!pip install torch-cluster -f https://data.pyg.org/whl/torch-2.8.0+cu126.html
!pip install torch-spline-conv -f https://data.pyg.org/whl/torch-2.8.0+cu126.html
!pip install torch-geometric

Looking in links: https://data.pyg.org/whl/torch-2.8.0+cu126.html
Looking in links: https://data.pyg.org/whl/torch-2.8.0+cu126.html
Looking in links: https://data.pyg.org/whl/torch-2.8.0+cu126.html
Looking in links: https://data.pyg.org/whl/torch-2.8.0+cu126.html


In [17]:
!pip install torchmetrics



In [18]:
import torchmetrics
print(torchmetrics.__version__)

1.8.2


In [19]:
import torch
import torch_geometric
print(torch.__version__, torch.version.cuda)
print(torch_geometric.__version__)

from torch_geometric.datasets import TUDataset
data = TUDataset(root='./data/TUDataset', name='MUTAG')
print(data)


2.8.0+cu126 12.6
2.7.0
MUTAG(188)


In [25]:
import torch
import torch.nn.functional as F
from torch_geometric.nn.conv.gcn_conv import gcn_norm
#from torch_geometric.utils import accuracy as accuracy_1d
from sklearn.metrics import accuracy_score
from torch.nn import Dropout, SELU
from torch_geometric.nn import MessagePassing, SAGEConv, GCNConv, GATConv
from torch_sparse import matmul
from torch_geometric.transforms import ToSparseTensor


class KProp(MessagePassing):
    def __init__(self, steps, aggregator, add_self_loops, normalize, cached, transform=lambda x: x):
        super().__init__(aggr=aggregator)
        self.transform = transform
        self.K = steps
        self.add_self_loops = add_self_loops
        self.normalize = normalize
        self.cached = cached
        self._cached_x = None

    def forward(self, x, adj_t):
        if self._cached_x is None or not self.cached:
            self._cached_x = self.neighborhood_aggregation(x, adj_t)

        return self._cached_x

    def neighborhood_aggregation(self, x, adj_t):
        if self.K <= 0:
            return x

        if self.normalize:
            adj_t = gcn_norm(adj_t, add_self_loops=False)

        if self.add_self_loops:
            adj_t = adj_t.set_diag()

        for k in range(self.K):
            x = self.propagate(adj_t, x=x)

        x = self.transform(x)
        return x

    def message_and_aggregate(self, adj_t, x):  # noqa
        return matmul(adj_t, x, reduce=self.aggr)


class GNN(torch.nn.Module):
    def __init__(self, dropout):
        super().__init__()
        self.conv1 = None
        self.conv2 = None
        self.dropout = Dropout(p=dropout)
        self.activation = SELU(inplace=True)

    def forward(self, x, adj_t):
        x = self.conv1(x, adj_t)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.conv2(x, adj_t)
        return x


class GCN(GNN):
    def __init__(self, input_dim, output_dim, hidden_dim, dropout):
        super().__init__(dropout)
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, output_dim)


class GAT(GNN):
    def __init__(self, input_dim, output_dim, hidden_dim, dropout):
        super().__init__(dropout)
        heads = 4
        self.conv1 = GATConv(input_dim, hidden_dim, heads=heads, concat=True)
        self.conv2 = GATConv(heads * hidden_dim, output_dim, heads=1, concat=False)


class GraphSAGE(GNN):
    def __init__(self, input_dim, output_dim, hidden_dim, dropout):
        super().__init__(dropout)
        self.conv1 = SAGEConv(in_channels=input_dim, out_channels=hidden_dim, normalize=False, root_weight=True)
        self.conv2 = SAGEConv(in_channels=hidden_dim, out_channels=output_dim, normalize=False, root_weight=True)


class NodeClassifier(torch.nn.Module):
    def __init__(self,
                 input_dim,
                 num_classes,
                 model:                 dict(help='backbone GNN model', choices=['gcn', 'sage', 'gat']) = 'sage',
                 hidden_dim:            dict(help='dimension of the hidden layers') = 16,
                 dropout:               dict(help='dropout rate (between zero and one)') = 0.0,
                 x_steps:               dict(help='KProp step parameter for features', option='-kx') = 0,
                 y_steps:               dict(help='KProp step parameter for labels', option='-ky') = 0,
                 forward_correction:    dict(help='applies forward loss correction', option='--forward') = True,
                 ):
        super().__init__()

        self.x_prop = KProp(steps=x_steps, aggregator='add', add_self_loops=False, normalize=True, cached=True)
        self.y_prop = KProp(steps=y_steps, aggregator='add', add_self_loops=False, normalize=True, cached=False,
                            transform=torch.nn.Softmax(dim=1))

        self.gnn = {'gcn': GCN, 'sage': GraphSAGE, 'gat': GAT}[model](
            input_dim=input_dim,
            output_dim=num_classes,
            hidden_dim=hidden_dim,
            dropout=dropout
        )

        self.cached_yt = None
        self.forward_correction = forward_correction

    def forward(self, data, return_logits=False):
        x, adj_t = data.x, data.adj_t
        x = self.x_prop(x, adj_t)
        x = self.gnn(x, adj_t)

        if return_logits:
          return x #Raw logits before softmax - for LinkTeller

        p_y_x = F.softmax(x, dim=1)
        #print()                                                       # P(y|x')
        p_yp_x = torch.matmul(p_y_x, data.T) if self.forward_correction else p_y_x          # P(y'|x')
        p_yt_x = self.y_prop(p_yp_x, data.adj_t)                                            # P(y~|x')

        return p_y_x, p_yp_x, p_yt_x

    def training_step(self, data):
        p_y_x, p_yp_x, p_yt_x = self(data)

        if self.cached_yt is None:
            yp = data.y.float()
            yp[data.test_mask] = 0  # to avoid using test labels
            self.cached_yt = self.y_prop(yp, data.adj_t)  # y~

        loss = self.cross_entropy_loss(p_y=p_yt_x[data.train_mask], y=self.cached_yt[data.train_mask], weighted=False)

        metrics = {
            'train/loss': loss.item(),
            'train/acc': self.accuracy(pred=p_y_x[data.train_mask], target=data.y[data.train_mask]) * 100,
        }
        #Removed not needed for mutag 'train/maxacc': data.T[0, 0].item() * 100,

        #print(loss)
        return loss, metrics

    def validation_step(self, data):
        p_y_x, p_yp_x, p_yt_x = self(data)

        metrics = {
            'val/loss': self.cross_entropy_loss(p_yp_x[data.val_mask], data.y[data.val_mask]).item(),
            'val/acc': self.accuracy(pred=p_y_x[data.val_mask], target=data.y[data.val_mask]) * 100,
            'test/acc': self.accuracy(pred=p_y_x[data.test_mask], target=data.y[data.test_mask]) * 100,
        }

        return metrics

    @staticmethod
    def accuracy(pred, target):
        pred = pred.argmax(dim=1) if len(pred.size()) > 1 else pred
        target = target.argmax(dim=1) if len(target.size()) > 1 else target
        return accuracy_score(target,pred)

    @staticmethod
    def cross_entropy_loss(p_y, y, weighted=False):
        y_onehot = F.one_hot(y.argmax(dim=1))
        loss = -torch.log(p_y + 1e-20) * y_onehot
        loss *= y if weighted else 1
        loss = loss.sum(dim=1).mean()
        return loss

In [26]:
import sys
import torch
from torch.optim import SGD, Adam
from tqdm.auto import tqdm


class Trainer:
    def __init__(
            self,
            optimizer:      dict(help='optimization algorithm', choices=['sgd', 'adam']) = 'adam',
            max_epochs:     dict(help='maximum number of training epochs') = 500,
            learning_rate:  dict(help='learning rate') = 0.01,
            weight_decay:   dict(help='weight decay (L2 penalty)') = 0.0,
            patience:       dict(help='early-stopping patience window size') = 0,
            device='cuda',
            logger=None,
    ):
        self.optimizer_name = optimizer
        self.max_epochs = max_epochs
        self.device = device
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.patience = patience
        self.logger = logger
        self.model = None

    def configure_optimizers(self):
        if self.optimizer_name == 'sgd':
            return SGD(self.model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay)
        elif self.optimizer_name == 'adam':
            return Adam(self.model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay)

    def fit(self, model, data):
        self.model = model.to(self.device)
        data = data.to(self.device)
        optimizer = self.configure_optimizers()

        num_epochs_without_improvement = 0
        best_metrics = None

        epoch_progbar = tqdm(range(1, self.max_epochs + 1), desc='Epoch: ', leave=False, position=1, file=sys.stdout)
        for epoch in epoch_progbar:
            metrics = {'epoch': epoch}
            train_metrics = self._train(data, optimizer)
            metrics.update(train_metrics)

            val_metrics = self._validation(data)
            metrics.update(val_metrics)

            if self.logger:
                self.logger.log(metrics)

            #Removed from original best_metrics['val/acc'] < metrics['val/acc'] <= metrics['train/maxacc'] and best_metrics['train/acc'] < metrics['train/acc'] <= 1.05 * metrics['train/maxacc']
            if best_metrics is None or metrics['val/loss'] < best_metrics['val/loss'] :
                best_metrics = metrics
                num_epochs_without_improvement = 0
            else:
                num_epochs_without_improvement += 1
                if num_epochs_without_improvement >= self.patience > 0:
                    break

            # display metrics on progress bar
            epoch_progbar.set_postfix(metrics)

        if self.logger:
            self.logger.log_summary(best_metrics)

        return best_metrics

    def _train(self, data, optimizer):
        self.model.train()
        optimizer.zero_grad()
        loss, metrics = self.model.training_step(data)
        loss.backward()
        optimizer.step()
        return metrics

    @torch.no_grad()
    def _validation(self, data):
        self.model.eval()
        return self.model.validation_step(data)

In [27]:
# THIS CODE WAS PART OF AN ATTEMPT TO RUN LPGNN ON MUTAG GRAPHS THAT WERE STICHED TOGETHER
# THIS METHOD WAS NOT EFFECTIVE SINCE LINKS BETWEEN DISCONNECTED GRAPHS DO NOT EXIST AND WOULD NEED TO BE ADDED BY USER
# THIS WOULD PRODUCE INCORRECT SCORES OF ZERO FROM LINKTELLER CODE

# import os
# from functools import partial
# import pandas as pd
# import torch
# from torch_geometric.data import Data, InMemoryDataset, download_url
# from torch_geometric.datasets import Planetoid
# from torch_geometric.transforms import ToSparseTensor #, AddTrainValTestMask
# from torch_geometric.utils import to_undirected

# #from transforms import Normalize, FilterTopClass

# def preprocess (num_graphs = 5, val_ratio  = .25, test_ratio = .25):

#         #Get the dataset of interest
#         mutag = torch_geometric.datasets.TUDataset(root='./data/TUDataset', name='MUTAG')


#         nodeFeats = []
#         edges = []
#         labels = []

#         offset = 0

#         for g in mutag[:num_graphs]:
#           x = g.x
#           edge_index = g.edge_index
#           y_graph = g.y.item()

#           #Convert graph label to node labels
#           y_labels = torch.full((g.num_nodes,), y_graph, dtype = torch.long)

#           #Offset edges
#           edge_index = edge_index + offset


#           nodeFeats.append(x)
#           edges.append(edge_index)
#           labels.append(y_labels)

#           offset += g.num_nodes

#         #Combine in one graph
#         X = torch.cat(nodeFeats, dim = 0)
#         Y = torch.cat(labels, dim = 0 )
#         E = torch.cat(edges, dim = 1)
#         E = to_undirected(E)

#         Y = F.one_hot(Y, num_classes=2).float()

#         mutag_graph = Data(x=X, edge_index = E, y = Y)

#         edge_index_original = E.clone()

#         #Get the number of nodes in the graph
#         numNode = mutag_graph.num_nodes

#         #Randomize indices
#         index = torch.randperm(numNode)

#         #Masks
#         train_mask = torch.zeros(mutag_graph.num_nodes, dtype=torch.bool)
#         val_mask   = torch.zeros(mutag_graph.num_nodes, dtype=torch.bool)
#         test_mask  = torch.zeros(mutag_graph.num_nodes, dtype=torch.bool)

#         #Get the splits for training, test, and val data
#         val_data = int(numNode*val_ratio)
#         test_data = int(numNode*test_ratio)
#         train_data = numNode - val_data - test_data

#         #Get random nodes from the fractions calculated above
#         train_index = index[:train_data]
#         valid_index = index[train_data: train_data+val_data]
#         test_index = index[train_data+val_data:]

#         #Assign masks - used in original code
#         train_mask[train_index] = True
#         test_mask[test_index] = True
#         val_mask[valid_index] = True

#         #Set masks in data
#         mutag_graph.train_mask = train_mask
#         mutag_graph.test_mask = test_mask
#         mutag_graph.val_mask = val_mask

#         mutag_graph = ToSparseTensor()(mutag_graph)

#         mutag_graph.edge_index = edge_index_original

#         return mutag_graph


In [28]:
#LOOKING AT INFORMATION ABOUT THE GRAPH
# data = preprocess(num_graphs= 100,val_ratio=0.15, test_ratio=0.15)
# print(data)
# print(f"Number of nodes is: {data.num_nodes}")
# print(f"Number of edges is: {data.num_edges}")
# print(f"Node feature shape: {data.x.shape}")
# print(f"Labels: {data.y.shape}")
# print(f"Unique labels: {data.y.unique()}")

In [39]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.utils import to_networkx, dense_to_sparse
from torch_geometric.nn import GCNConv
import networkx as nx
from sklearn.cluster import KMeans
import torch, fsspec, torch_geometric
from torch_geometric.datasets import TUDataset
import numpy as np
from sklearn.model_selection import train_test_split
import math, random
from torch_geometric.data import Data

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device


#USING CODE FROM TEAMMATES IN GITHUB
dataset = TUDataset(root='data/TUD', name='MUTAG')  # 188 graphs
sizes = [data.num_nodes for data in dataset]
idx = int(np.argmax([n if n >= 25 else 0 for n in sizes]))  # pick a larger graph
data = dataset[idx]
print(f"Graph index {idx}: nodes={data.num_nodes}, edges={data.num_edges // 2} (undirected)")

#Keep edges for use in LinkTeller - WHEN CONVERTING TOSPARSETENSOR WE LOSE EDGES
edges_linkteller = data.edge_index.clone()


X = data.x.float()  # [N, D]
N, D = X.shape

#print(X)
K = min(3, len(torch.unique(X, dim=0))) if len(torch.unique(X, dim=0))>1 else 2
km = KMeans(n_clusters=K, n_init=10, random_state=0).fit(X.numpy())
y_node = torch.from_numpy(km.labels_).long()
num_classes = len(torch.unique(y_node))
data.y = F.one_hot(y_node, num_classes=num_classes).float()

# Train/val/test node splits
idx_all = np.arange(N)
idx_train, idx_tmp = train_test_split(idx_all, test_size=0.4, random_state=42, stratify=y_node.numpy())
idx_val, idx_test = train_test_split(idx_tmp, test_size=0.5, random_state=42, stratify=y_node.numpy()[idx_tmp])

data.train_mask = torch.zeros(N, dtype=torch.bool)
data.train_mask[idx_train] = True
data.val_mask   = torch.zeros(N, dtype=torch.bool)
data.val_mask[idx_val] = True
data.test_mask  = torch.zeros(N, dtype=torch.bool)
data.test_mask[idx_test] = True

print(f"Splits: train {data.train_mask.sum().item()}, val {data.val_mask.sum().item()}, test {data.test_mask.sum().item()}")

data = ToSparseTensor()(data)
data.T = torch.eye(num_classes, dtype=torch.float)  # needed for NodeClassifier
device = 'cuda' if torch.cuda.is_available() else 'cpu'
data = data.to(device)
data

# CHECK DETAILS OF THE GRAPH TO MAKE SURE EVERYTHING LOOKS FINE/MAKES SENSE
print("X shape:", data.x.shape)           # [28, 7]
print("y shape:", data.y.shape)           # [28, 3]
print("train_mask sum:", data.train_mask.sum())
print("val_mask sum:", data.val_mask.sum())
print("test_mask sum:", data.test_mask.sum())
print("adj_t:", data.adj_t)

Graph index 5: nodes=28, edges=31 (undirected)
Splits: train 16, val 6, test 6
X shape: torch.Size([28, 7])
y shape: torch.Size([28, 3])
train_mask sum: tensor(16)
val_mask sum: tensor(6)
test_mask sum: tensor(6)
adj_t: SparseTensor(row=tensor([ 0,  0,  1,  1,  2,  2,  2,  3,  3,  3,  4,  4,  5,  5,  5,  6,  6,  6,
                            7,  7,  7,  8,  8,  8,  9,  9,  9, 10, 10, 10, 11, 11, 12, 12, 12, 13,
                           13, 13, 14, 14, 15, 15, 16, 16, 16, 17, 18, 19, 19, 19, 20, 21, 22, 22,
                           22, 23, 24, 25, 25, 25, 26, 27]),
             col=tensor([ 1,  9,  0,  2,  1,  3,  7,  2,  4, 25,  3,  5,  4,  6, 22,  5,  7, 15,
                            2,  6,  8,  7,  9, 13,  0,  8, 10,  9, 11, 19, 10, 12, 11, 13, 16,  8,
                           12, 14, 13, 15,  6, 14, 12, 17, 18, 16, 16, 10, 20, 21, 19, 19,  5, 23,
                           24, 22, 22,  3, 26, 27, 25, 25]),
             size=(28, 28), nnz=62, density=7.91%)


In [40]:
edge_index = edges_linkteller

# For evaluation convenience, build a boolean adjacency (without self loops)
A = torch.zeros((N, N), dtype=torch.bool)
A[edge_index[0], edge_index[1]] = True
A[edge_index[1], edge_index[0]] = True
A.fill_diagonal_(False)
true_edges_undirected = torch.nonzero(torch.triu(A, diagonal=1), as_tuple=False)  # [M, 2]
M_true = true_edges_undirected.shape[0]
density = M_true / (N*(N-1)/2)
print(f"True undirected edges: {M_true} | density={density:.4f}")


True undirected edges: 31 | density=0.0820


In [41]:
#Run model
model = NodeClassifier(input_dim = D, num_classes = num_classes, model = "sage", hidden_dim = 16, dropout = 0.3, x_steps =  1, y_steps = 1, forward_correction = True)
model

NodeClassifier(
  (x_prop): KProp()
  (y_prop): KProp()
  (gnn): GraphSAGE(
    (dropout): Dropout(p=0.3, inplace=False)
    (activation): SELU(inplace=True)
    (conv1): SAGEConv(7, 16, aggr=mean)
    (conv2): SAGEConv(16, 3, aggr=mean)
  )
)

In [42]:
trainer = Trainer(optimizer = "adam", max_epochs = 50, learning_rate = 0.01, weight_decay = 0, patience = 5, device = 'cpu')

In [43]:
from torch_geometric.loader import DataLoader
#train_loader = DataLoader([data], batch_size = 1, shuffle=False)
#val_loader = DataLoader([data], batch_size = 1, shuffle=False)
metrics = trainer.fit(model, data)
print(metrics)

Epoch:   0%|          | 0/50 [00:00<?, ?it/s]

{'epoch': 50, 'train/loss': 0.6994549036026001, 'train/acc': 100.0, 'val/loss': 0.013582475483417511, 'val/acc': 100.0, 'test/acc': 66.66666666666666}


# LinkTeller

In [44]:
import scipy as sp
class Linkteller():
      def __init__(self, model, device, test_node_feats, test_edge_idx,
                   test_edge_attr = None):
        """
        model: Pretrained model
        device: torch.device
        test_edge_idx: adjacency matrix for the graph being evaluated
        undirected: whether the graph is undirected or not (default: True)
        """
        self.model = model
        self.device = device
        #graph dataset node features
        self.test_node_feats = test_node_feats
        self.num_nodes = test_node_feats.shape[0]
        self.test_edge_idx = test_edge_idx
        self.test_edge_attr = test_edge_attr
        #build adjcency matrix for test graph from edge indices
        self.test_adj = torch.zeros((self.num_nodes, self.num_nodes), dtype=torch.float)
        self.test_adj[test_edge_idx[0], test_edge_idx[1]] = True
        self.test_adj[test_edge_idx[1], test_edge_idx[0]] = True
        self.test_adj.fill_diagonal_(False)
        self.true_edges_undirected = torch.nonzero(torch.triu(self.test_adj,
                                                         diagonal=1),
                                              as_tuple=False)  # [M, 2]
        self.M_true = self.true_edges_undirected.shape[0]
        self.density = self.M_true / (self.num_nodes*(self.num_nodes-1)/2)

      @torch.no_grad()
      def gbb_api(self, node_ids, X_query):
          """
          node_ids: 1D LongTensor of node indices to fetch from output
          X_query: (N, D) full feature matrix Bob provides (Alice uses it with her private edge_index)
          returns: logits[node_ids] shape (len(node_ids), K)

          modified from Linkteller.ipynb
          """
          model.eval()
          #reconstruct graph using Bob's provided node features & Alice's edges
          if self.test_edge_attr is None:
            test_graph = Data(x=X_query, edge_index=self.test_edge_idx)
          else:
            test_graph = Data(x=X_query,
                              edge_index=self.test_edge_idx,
                              edge_attr=self.test_edge_attr)

          test_graph = ToSparseTensor()(test_graph)

          test_graph = test_graph.to(device)

          #out, _ , _ = model(test_graph)
          out = model(test_graph, return_logits=True) #ADDED THIS TO MAKE NODECLASSIFER RETURN LOGITS INSTEAAD OF SOFTMAX PROBABILITIES
          #print(out[:5])

          return out[node_ids.to(device)].detach().cpu()

      def influence_matrix_for_v(self,v, V_I, X_base, delta=1e-2):
          """
          v: node index (int)
          V_I: 1D LongTensor of nodes-of-interest to score against
          X_base: (N, D) baseline features
          returns: Iv (|V_I|, K) = (P' - P)/delta where rows correspond to u in V_I
          """
          #X_base = X_base.float().to(self.device)

          node_ids = V_I
          P = self.gbb_api(node_ids, X_base)

          Xp = X_base.clone()
          Xp[v] = (1.0 + delta) * Xp[v]  # upweight features of v
          Pp = self.gbb_api(node_ids, Xp)

         #print(P[:5])   # before perturbation
          #print(Pp[:5])

          Iv = (Pp - P) / delta  # finite-diff approximation
          return Iv  # (|V_I|, K)

      def linkteller_scores(self, V_C, X_base, delta=1e-2):
          """
          V_C: nodes-of-interest (attack surface) as 1D LongTensor
          returns: dict {(u,v): score} for u != v, unordered pairs
          """

          #X_base = X_base.float().to(self.device)

          V_C = V_C.cpu()
          scores = {}

          for j, v in enumerate(V_C.tolist()):
              # rows aligned with V_C
              Iv = self.influence_matrix_for_v(v,
                                               V_C,
                                               X_base,
                                               delta=delta).numpy()
              # influence value of v on each u = ||Iv[u,:]||_2
              norms = np.linalg.norm(Iv, axis=1)
              for i, u in enumerate(V_C.tolist()):
                  if u == v:
                      continue
                  key = (min(u,v), max(u,v))
                  # symmetrical score: max of v→u and u→v will be handled later; accumulate max
                  scores[key] = max(scores.get(key, 0.0), float(norms[i]))
          return scores

In [45]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#mutag = torch_geometric.datasets.TUDataset(root='./data/TUDataset', name='MUTAG')
#test_graph = preprocess(num_graphs=100) #IN STICHED GRAPH VERSION USED SAMPLE OF 100 GRAPHS

#print(data.edge_attr) #EXISTS IN DATA

linkteller_MUTAG_LPGNN = Linkteller(model = model,
                                  device=device,
                                  test_node_feats=data.x,
                                  test_edge_idx=edges_linkteller,
                                  test_edge_attr=data.edge_attr)


In [46]:
# Choose attack node set V_C (we’ll use all nodes to make life easy)
N = data.x.shape[0]
V_C = torch.arange(N, dtype=torch.long)
X = data.x
scores = linkteller_MUTAG_LPGNN.linkteller_scores(V_C, X, delta=1e-2)

# Turn scores into a sorted list
sorted_pairs = sorted(scores.items(), key=lambda kv: kv[1], reverse=True)
len(sorted_pairs), sorted_pairs[:5] #SCORES CONTINUE TO BE ZERO - UNSURE WHAT PART OF THE CODE IS LEADING TO THIS


(378,
 [((0, 1), 0.0), ((0, 2), 0.0), ((0, 3), 0.0), ((0, 4), 0.0), ((0, 5), 0.0)])

In [47]:
#scores

In [48]:
def evaluate_at_fraction(frac):
    m = int(round(frac * (N*(N-1)/2)))
    pred = set([pair for (pair, _) in sorted_pairs[:m]])
    tp = len(pred & true_edges)
    fp = len(pred - true_edges)
    fn = len(true_edges - pred)
    p = tp / (tp + fp + 1e-12)
    r = tp / (tp + fn + 1e-12)
    f1 = 2*p*r / (p + r + 1e-12)
    return p, r, f1, m
density = linkteller_MUTAG_LPGNN.density
for frac in [0.5*density, 0.8*density, density, 1.2*density, 1.5*density]:
    p, r, f1, m = evaluate_at_fraction(frac)
    print(f"k_hat={frac:.4f}  m={m:3d}  P={p:.3f} R={r:.3f} F1={f1:.3f}")

NameError: name 'true_edges' is not defined