In [None]:

"""
GNNFingers Structured Implementation for Google Colab
======================================================


STRUCTURE:
  gnnfingers/
    ├── data/              (Cora, Citeseer datasets)
    ├── models/
    │   ├── target/        (target model checkpoint)
    │   ├── positive/      (positive models)
    │   └── negative/      (negative models)
    ├── fingerprints/      (saved fingerprints)
    ├── verifier/          (trained verifier)
    ├── results/           (TP/TN/accuracy logs)
    └── manifest.csv       (tracking file)

RUN IN COLAB: Copy each cell below into separate Colab cells in order.
"""

# ============================================================================
# CELL 1: Setup and Install Dependencies
# ============================================================================
print("CELL 1: Installing dependencies...")
import subprocess
import sys

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
                       "torch", "torch_geometric", "torch_scatter", "torch_sparse"])

import os
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
from sklearn.metrics import accuracy_score

# Create base directory
base_dir = Path("/content/gnnfingers")
base_dir.mkdir(exist_ok=True)

# Create subdirectories
(base_dir / "data").mkdir(exist_ok=True)
(base_dir / "models" / "target").mkdir(parents=True, exist_ok=True)
(base_dir / "models" / "positive").mkdir(parents=True, exist_ok=True)
(base_dir / "models" / "negative").mkdir(parents=True, exist_ok=True)
(base_dir / "fingerprints").mkdir(exist_ok=True)
(base_dir / "verifier").mkdir(exist_ok=True)
(base_dir / "results").mkdir(exist_ok=True)

print(f"✓ Directory structure created at {base_dir}\n")

# Set seeds
torch.manual_seed(42)
np.random.seed(42)

print("✓ Dependencies installed and directories ready\n")

CELL 1: Installing dependencies...
✓ Directory structure created at /content/gnnfingers

✓ Dependencies installed and directories ready



In [None]:
# ============================================================================
# CELL 2: Define Models and Dataset Utilities
# ============================================================================
print("CELL 2: Define Models and Utilities...")

from torch_geometric.nn import GCNConv, SAGEConv
from torch_geometric.datasets import Planetoid

class GCNModel(nn.Module):
    """Graph Convolutional Network"""
    def __init__(self, in_channels, hidden_channels=64, out_channels=7):
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, out_channels)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return x

class GraphSAGEModel(nn.Module):
    """GraphSAGE Model"""
    def __init__(self, in_channels, hidden_channels=64, out_channels=7):
        super().__init__()
        self.sage1 = SAGEConv(in_channels, hidden_channels)
        self.sage2 = SAGEConv(hidden_channels, out_channels)

    def forward(self, x, edge_index):
        x = self.sage1(x, edge_index)
        x = F.relu(x)
        x = self.sage2(x, edge_index)
        return x

class Verifier(nn.Module):
    """Binary classifier verifier"""
    def __init__(self, input_dim, hidden_dim=32):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 16)
        self.fc3 = nn.Linear(16, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x.squeeze()

def load_dataset(dataset_name="Cora"):
    """Load Cora or Citeseer dataset"""
    dataset = Planetoid(root=str(base_dir / "data"), name=dataset_name)
    data = dataset[0]
    return data, dataset

def train_model(model, data, epochs=50, lr=0.001, verbose=True):
    """Train a model on Cora/Citeseer"""
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        out = model(data.x, data.edge_index)
        loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
        loss.backward()
        optimizer.step()

        if verbose and (epoch + 1) % 10 == 0:
            model.eval()
            with torch.no_grad():
                pred = model(data.x, data.edge_index).argmax(dim=1)
                test_acc = accuracy_score(data.y[data.test_mask].numpy(),
                                         pred[data.test_mask].numpy())
            print(f"    Epoch {epoch+1}/{epochs} | Test Acc: {test_acc:.3f}")

    return model

print("✓ Models and utilities defined\n")

CELL 2: Define Models and Utilities...
✓ Models and utilities defined



In [None]:
# ============================================================================
# CELL 3: Load Dataset and Train Target Model
# ============================================================================
print("CELL 3: Load Dataset and Train Target Model...")
print("=" * 70)

dataset_name = "Citeseer"
data, dataset = load_dataset(dataset_name)

print(f"Dataset: {dataset_name}")
print(f"  Nodes: {data.num_nodes}")
print(f"  Edges: {data.num_edges}")
print(f"  Features: {data.num_features}")
print(f"  Classes: {dataset.num_classes}\n")

# Train target model
print("Training TARGET model (GCN)...")
target_model = GCNModel(data.num_features, hidden_channels=64, out_channels=dataset.num_classes)
target_model = train_model(target_model, data, epochs=50)

# Save target model
target_path = base_dir / "models" / "target" / "gcn_target.pt"
torch.save(target_model.state_dict(), target_path)
print(f"✓ Target model saved to {target_path}\n")

CELL 3: Load Dataset and Train Target Model...


Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.x
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.tx
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.allx
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.y
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.ty
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.ally
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.graph
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.test.index
Processing...
Done!


Dataset: Citeseer
  Nodes: 3327
  Edges: 9104
  Features: 3703
  Classes: 6

Training TARGET model (GCN)...
    Epoch 10/50 | Test Acc: 0.553
    Epoch 20/50 | Test Acc: 0.636
    Epoch 30/50 | Test Acc: 0.649
    Epoch 40/50 | Test Acc: 0.652
    Epoch 50/50 | Test Acc: 0.657
✓ Target model saved to /content/gnnfingers/models/target/gcn_target.pt



In [None]:
# ============================================================================
# CELL 4: Generate 2 Positive Models (Fine-tuned)
# ============================================================================
print("CELL 4: Generate Positive Models (Fine-tuned Clones)...")
print("=" * 70)

def clone_and_finetune(model, data, seed, finetune_epochs=10, lr=0.0001):
    """Clone target and fine-tune it"""
    torch.manual_seed(seed)
    cloned = GCNModel(data.num_features, hidden_channels=64, out_channels=dataset.num_classes)
    cloned.load_state_dict(model.state_dict())

    optimizer = torch.optim.Adam(cloned.parameters(), lr=lr)
    for _ in range(finetune_epochs):
        cloned.train()
        optimizer.zero_grad()
        out = cloned(data.x, data.edge_index)
        loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
        loss.backward()
        optimizer.step()

    return cloned

positive_models = []
positive_paths = []

for i in range(2):
    print(f"Creating POSITIVE model {i+1}...")
    pos_model = clone_and_finetune(target_model, data, seed=100+i, finetune_epochs=10)
    positive_models.append(pos_model)

    # Save
    pos_path = base_dir / "models" / "positive" / f"gcn_pos_{i}.pt"
    torch.save(pos_model.state_dict(), pos_path)
    positive_paths.append(pos_path)
    print(f"  ✓ Saved to {pos_path}")

print()

CELL 4: Generate Positive Models (Fine-tuned Clones)...
Creating POSITIVE model 1...
  ✓ Saved to /content/gnnfingers/models/positive/gcn_pos_0.pt
Creating POSITIVE model 2...
  ✓ Saved to /content/gnnfingers/models/positive/gcn_pos_1.pt



In [None]:
# ============================================================================
# CELL 5: Generate 2 Negative Models (Independent)
# ============================================================================
print("CELL 5: Generate Negative Models (Independent Training)...")
print("=" * 70)

negative_models = []
negative_paths = []

# Negative 1: Fresh GCN
print("Creating NEGATIVE model 1 (fresh GCN)...")
torch.manual_seed(200)
neg_model_1 = GCNModel(data.num_features, hidden_channels=64, out_channels=dataset.num_classes)
neg_model_1 = train_model(neg_model_1, data, epochs=50, verbose=False)
negative_models.append(neg_model_1)

neg_path_1 = base_dir / "models" / "negative" / "gcn_neg_0.pt"
torch.save(neg_model_1.state_dict(), neg_path_1)
negative_paths.append(neg_path_1)
print(f"  ✓ Saved to {neg_path_1}\n")

# Negative 2: GraphSAGE
print("Creating NEGATIVE model 2 (GraphSAGE)...")
torch.manual_seed(201)
neg_model_2 = GraphSAGEModel(data.num_features, hidden_channels=64, out_channels=dataset.num_classes)
neg_model_2 = train_model(neg_model_2, data, epochs=50, verbose=False)
negative_models.append(neg_model_2)

neg_path_2 = base_dir / "models" / "negative" / "sage_neg_1.pt"
torch.save(neg_model_2.state_dict(), neg_path_2)
negative_paths.append(neg_path_2)
print(f"  ✓ Saved to {neg_path_2}\n")

CELL 5: Generate Negative Models (Independent Training)...
Creating NEGATIVE model 1 (fresh GCN)...
  ✓ Saved to /content/gnnfingers/models/negative/gcn_neg_0.pt

Creating NEGATIVE model 2 (GraphSAGE)...
  ✓ Saved to /content/gnnfingers/models/negative/sage_neg_1.pt



In [None]:
# ============================================================================
# CELL 6: Create Synthetic Fingerprints
# ============================================================================
print("CELL 6: Create Synthetic Fingerprints...")
print("=" * 70)

num_fingerprints = 5
nodes_per_fp = 16

def create_random_fingerprint(num_nodes, num_features, sparsity=0.3):
    """Create random synthetic graph fingerprint"""
    x = torch.randn(num_nodes, num_features)

    num_possible_edges = num_nodes * (num_nodes - 1) / 2
    num_edges = max(1, int(num_possible_edges * sparsity))

    edge_pairs = []
    for _ in range(num_edges):
        u = np.random.randint(0, num_nodes)
        v = np.random.randint(0, num_nodes)
        if u != v and [u, v] not in edge_pairs:
            edge_pairs.append([u, v])

    if edge_pairs:
        edge_index = torch.tensor(edge_pairs, dtype=torch.long).t().contiguous()
    else:
        edge_index = torch.zeros((2, 0), dtype=torch.long)

    return x, edge_index

fingerprints = []
print(f"Creating {num_fingerprints} fingerprints...")

for i in range(num_fingerprints):
    x, edge_index = create_random_fingerprint(nodes_per_fp, data.num_features, sparsity=0.2)
    fingerprints.append((x, edge_index))
    print(f"  ✓ FP {i+1}: nodes={x.shape[0]}, edges={edge_index.shape[1]}")

# Save fingerprints
fp_path = base_dir / "fingerprints" / "fingerprints.pt"
torch.save(fingerprints, fp_path)
print(f"✓ Fingerprints saved to {fp_path}\n")

CELL 6: Create Synthetic Fingerprints...
Creating 5 fingerprints...
  ✓ FP 1: nodes=16, edges=22
  ✓ FP 2: nodes=16, edges=22
  ✓ FP 3: nodes=16, edges=23
  ✓ FP 4: nodes=16, edges=22
  ✓ FP 5: nodes=16, edges=19
✓ Fingerprints saved to /content/gnnfingers/fingerprints/fingerprints.pt



In [None]:
# ============================================================================
# CELL 7: Collect Model Response Vectors
# ============================================================================
print("CELL 7: Collect Model Response Vectors...")
print("=" * 70)

def get_response_vector(model, fingerprints, num_samples=5):
    """Query model on all fingerprints and collect responses"""
    model.eval()
    responses = []

    with torch.no_grad():
        for fp_x, fp_edge in fingerprints:
            out = model(fp_x, fp_edge)

            num_to_sample = min(num_samples, out.shape[0])
            sampled_indices = np.random.choice(out.shape[0], num_to_sample, replace=False)
            sampled_out = out[sampled_indices].flatten()

            responses.append(sampled_out)

    response_vector = torch.cat(responses)
    return response_vector

# Collect all responses
all_responses = {}

print("Collecting responses from TARGET model...")
all_responses['target'] = get_response_vector(target_model, fingerprints)

for i, pos_model in enumerate(positive_models):
    print(f"Collecting responses from POSITIVE model {i}...")
    all_responses[f'pos_{i}'] = get_response_vector(pos_model, fingerprints)

for i, neg_model in enumerate(negative_models):
    print(f"Collecting responses from NEGATIVE model {i}...")
    all_responses[f'neg_{i}'] = get_response_vector(neg_model, fingerprints)

print(f"\nResponse vector dimension: {all_responses['target'].shape[0]}\n")

CELL 7: Collect Model Response Vectors...
Collecting responses from TARGET model...
Collecting responses from POSITIVE model 0...
Collecting responses from POSITIVE model 1...
Collecting responses from NEGATIVE model 0...
Collecting responses from NEGATIVE model 1...

Response vector dimension: 150



In [None]:
# ============================================================================
# CELL 8: Build Training Data and Train Verifier
# ============================================================================
print("CELL 8: Build Training Data and Train Verifier...")
print("=" * 70)

# Build training dataset
X_train = []
y_train = []

# Positive samples (label = 1)
X_train.append(all_responses['target'].unsqueeze(0))
y_train.append(1)
print("✓ Target model (label=1)")

for i in range(len(positive_models)):
    X_train.append(all_responses[f'pos_{i}'].unsqueeze(0))
    y_train.append(1)
    print(f"✓ Positive model {i} (label=1)")

# Negative samples (label = 0)
for i in range(len(negative_models)):
    X_train.append(all_responses[f'neg_{i}'].unsqueeze(0))
    y_train.append(0)
    print(f"✓ Negative model {i} (label=0)")

X_train = torch.cat(X_train, dim=0)
y_train = torch.tensor(y_train, dtype=torch.float32)

print(f"\nTraining data shape: X={X_train.shape}, y={y_train.shape}")
print(f"  Class 1 (positive): {(y_train == 1).sum()} samples")
print(f"  Class 0 (negative): {(y_train == 0).sum()} samples\n")

# Train verifier
print("Training VERIFIER...")
verifier = Verifier(input_dim=X_train.shape[1], hidden_dim=32)
optimizer = torch.optim.Adam(verifier.parameters(), lr=0.01)
loss_fn = nn.BCELoss()

num_epochs = 200
for epoch in range(num_epochs):
    verifier.train()
    optimizer.zero_grad()

    y_pred = verifier(X_train)
    loss = loss_fn(y_pred, y_train)
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 50 == 0:
        print(f"  Epoch {epoch+1}/{num_epochs} | Loss: {loss.item():.4f}")

# Save verifier
verifier_path = base_dir / "verifier" / "verifier.pt"
torch.save(verifier.state_dict(), verifier_path)
print(f"✓ Verifier saved to {verifier_path}\n")

CELL 8: Build Training Data and Train Verifier...
✓ Target model (label=1)
✓ Positive model 0 (label=1)
✓ Positive model 1 (label=1)
✓ Negative model 0 (label=0)
✓ Negative model 1 (label=0)

Training data shape: X=torch.Size([5, 150]), y=torch.Size([5])
  Class 1 (positive): 3 samples
  Class 0 (negative): 2 samples

Training VERIFIER...
  Epoch 50/200 | Loss: 0.0000
  Epoch 100/200 | Loss: 0.0000
  Epoch 150/200 | Loss: 0.0000
  Epoch 200/200 | Loss: 0.0000
✓ Verifier saved to /content/gnnfingers/verifier/verifier.pt



In [None]:
# ============================================================================
# CELL 9: Evaluate Verifier and Calculate Metrics
# ============================================================================
print("CELL 9: Evaluate Verifier - Calculate TP/TN/Accuracy...")
print("=" * 70)

verifier.eval()
with torch.no_grad():
    y_pred_probs = verifier(X_train)
    y_pred = (y_pred_probs >= 0.5).long()
    y_true = y_train.long()

# Calculate confusion matrix
TP = ((y_pred == 1) & (y_true == 1)).sum().item()
TN = ((y_pred == 0) & (y_true == 0)).sum().item()
FP = ((y_pred == 1) & (y_true == 0)).sum().item()
FN = ((y_pred == 0) & (y_true == 1)).sum().item()

total = len(y_true)
accuracy = (TP + TN) / total
precision = TP / (TP + FP) if (TP + FP) > 0 else 0
recall = TP / (TP + FN) if (TP + FN) > 0 else 0

print("CONFUSION MATRIX:")
print(f"  TP (True Positive):   {TP}")
print(f"  TN (True Negative):   {TN}")
print(f"  FP (False Positive):  {FP}")
print(f"  FN (False Negative):  {FN}")
print()
print("METRICS:")
print(f"  Accuracy:   {accuracy:.3f}  (TP+TN)/Total = ({TP}+{TN})/{total}")
print(f"  Precision:  {precision:.3f}  TP/(TP+FP) = {TP}/({TP}+{FP})")
print(f"  Recall:     {recall:.3f}   TP/(TP+FN) = {TP}/({TP}+{FN})")
print()


CELL 9: Evaluate Verifier - Calculate TP/TN/Accuracy...
CONFUSION MATRIX:
  TP (True Positive):   3
  TN (True Negative):   2
  FP (False Positive):  0
  FN (False Negative):  0

METRICS:
  Accuracy:   1.000  (TP+TN)/Total = (3+2)/5
  Precision:  1.000  TP/(TP+FP) = 3/(3+0)
  Recall:     1.000   TP/(TP+FN) = 3/(3+0)

