In [None]:
# ================================
# Environment Setup (Colab)
# ================================
"""
Install PyTorch and PyTorch Geometric for graph neural networks
Follow the paper's requirement for PyTorch and torch-geometric
"""
!pip -q install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu121
!pip -q install torch-geometric torch-scatter torch-sparse torch-cluster -f https://data.pyg.org/whl/torch-2.3.0+cu121.html
print("✅ PyTorch & PyG installation completed")

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.9/10.9 MB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.1/5.1 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m113.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m58.7 MB/s[0m eta [36m0:00:00[0m
[?25h✅ PyTorch & PyG installation completed


In [None]:
# ================================
#  Imports & Core Utilities
# ================================
"""
Import all necessary libraries and set up reproducible random seeds
Following the paper's experimental setup for consistent results
"""
import os, random, copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

from torch_geometric.datasets import TUDataset
from torch_geometric.nn import GCNConv, SAGEConv, global_mean_pool, global_add_pool
from torch_geometric.loader import DataLoader
from torch_geometric.utils import to_undirected, subgraph

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

def set_seed(seed: int = 42):
    """Set random seeds for reproducibility across all libraries"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)



Device: cuda


In [None]:
# ================================
#  Dataset Loading & Splitting
# ================================
"""
Load PROTEINS dataset and create 7/1/2 train/val/test split
as specified in the paper (Section 4.1.1)
"""
# Load PROTEINS dataset from TUDataset
dataset = TUDataset(root='/content/data/PROTEINS', name='PROTEINS')
print(f"Dataset: {dataset}")
print(f"Number of graphs: {len(dataset)}")
print(f"Number of features: {dataset.num_features}")
print(f"Number of classes: {dataset.num_classes}")

def make_graph_split(dataset, train_ratio=0.7, val_ratio=0.1, seed=1):
    """
    Split graph dataset into train/validation/test sets
    Paper uses 7/1/2 split as mentioned in Section 4.1.1
    """
    g = torch.Generator().manual_seed(seed)
    idx = torch.randperm(len(dataset), generator=g)
    n = len(dataset)
    n_tr = int(n * train_ratio)
    n_va = int(n * val_ratio)

    train_idx = idx[:n_tr]
    val_idx = idx[n_tr:n_tr + n_va]
    test_idx = idx[n_tr + n_va:]

    train_dataset = [dataset[i] for i in train_idx]
    val_dataset = [dataset[i] for i in val_idx]
    test_dataset = [dataset[i] for i in test_idx]

    return train_dataset, val_dataset, test_dataset

# Create dataset splits
train_dataset, val_dataset, test_dataset = make_graph_split(dataset, 0.7, 0.1, seed=1)

# Create data loaders with appropriate batch size
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

num_feats = dataset.num_features
num_classes = dataset.num_classes
print(f'Features: {num_feats} | Classes: {num_classes}')
print(f'Train: {len(train_dataset)} | Val: {len(val_dataset)} | Test: {len(test_dataset)}')


Downloading https://www.chrsmrrs.com/graphkerneldatasets/PROTEINS.zip
Processing...
Done!


Dataset: PROTEINS(1113)
Number of graphs: 1113
Number of features: 3
Number of classes: 2
Features: 3 | Classes: 2
Train: 779 | Val: 111 | Test: 223


In [None]:
# ================================
# Configuration Parameters
# ================================
"""
Configuration following the paper's experimental setup
All hyperparameters are based on Section 4.1 and appendix
"""
CFG = dict(
    # Model pool sizes (Section 4.1.2)
    POS_TRAIN=50,        # Positive models for training
    POS_TEST=50,         # Positive models for testing
    NEG_TRAIN=50,        # Negative models for training
    NEG_TEST=50,         # Negative models for testing

    # Obfuscation techniques (Section 2.2.2)
    USE_FT_LAST=True,    # Fine-tune last layer only
    USE_FT_ALL=True,     # Fine-tune all layers
    USE_PR_LAST=True,    # Partial retrain last layer
    USE_PR_ALL=True,     # Partial retrain all layers
    USE_DISTILL=True,    # Knowledge distillation
    DISTILL_STEPS=250,   # Distillation training steps

    # Graph fingerprint parameters (Section 3.3)
    FP_P=64,             # Number of fingerprint graphs (P)
    FP_NODES=32,         # Nodes per fingerprint graph (n)
    FP_EDGE_INIT_P=0.05, # Initial edge probability (r)
    FP_EDGE_TOPK=96,     # Top-k edges to flip (K)
    EDGE_LOGIT_STEP=2.5, # Edge logit update step size

    # Joint learning parameters (Section 3.4)
    OUTER_ITERS=20,      # Joint learning iterations
    FP_STEPS=5,          # Feature update steps per iteration
    V_STEPS=10,          # Verifier update steps per iteration

    # Learning rates (Section 4.1.5)
    LR_TARGET=0.005,     # Target model learning rate
    WD_TARGET=5e-4,      # Target model weight decay
    LR_V=1e-3,           # Verifier learning rate
    LR_X=1e-3,           # Feature learning rate

    SEED=1,              # Global random seed
)
print("Configuration:", CFG)

Configuration: {'POS_TRAIN': 50, 'POS_TEST': 50, 'NEG_TRAIN': 50, 'NEG_TEST': 50, 'USE_FT_LAST': True, 'USE_FT_ALL': True, 'USE_PR_LAST': True, 'USE_PR_ALL': True, 'USE_DISTILL': True, 'DISTILL_STEPS': 250, 'FP_P': 64, 'FP_NODES': 32, 'FP_EDGE_INIT_P': 0.05, 'FP_EDGE_TOPK': 96, 'EDGE_LOGIT_STEP': 2.5, 'OUTER_ITERS': 20, 'FP_STEPS': 5, 'V_STEPS': 10, 'LR_TARGET': 0.005, 'WD_TARGET': 0.0005, 'LR_V': 0.001, 'LR_X': 0.001, 'SEED': 1}


In [None]:
# ================================
#  GNN Model Architectures
# ================================
"""
Define 3-layer GNN models for graph classification
Following the paper's architecture specification (Section 4.1.5)
"""
class GCN_GraphCls(nn.Module):
    """
    3-layer Graph Convolutional Network for graph classification
    Architecture: GCN -> ReLU -> Dropout -> GCN -> ReLU -> Dropout -> GCN -> Global Pool -> Linear
    """
    def __init__(self, in_channels, hidden, out_channels, dropout=0.5):
        super().__init__()
        # 3-layer GCN as specified in the paper
        self.conv1 = GCNConv(in_channels, hidden)
        self.conv2 = GCNConv(hidden, hidden)
        self.conv3 = GCNConv(hidden, hidden)
        self.classifier = nn.Linear(hidden, out_channels)
        self.dropout = dropout

    def forward(self, x, edge_index, batch):
        # First GCN layer
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)

        # Second GCN layer
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)

        # Third GCN layer (no dropout after final conv)
        x = self.conv3(x, edge_index)

        # Graph-level pooling (mean pooling as commonly used)
        x = global_mean_pool(x, batch)

        # Classification layer
        x = self.classifier(x)
        return x

class GraphSAGE_GraphCls(nn.Module):
    """
    3-layer GraphSAGE for graph classification
    Used as student model in distillation experiments
    """
    def __init__(self, in_channels, hidden, out_channels, dropout=0.5):
        super().__init__()
        self.conv1 = SAGEConv(in_channels, hidden)
        self.conv2 = SAGEConv(hidden, hidden)
        self.conv3 = SAGEConv(hidden, hidden)
        self.classifier = nn.Linear(hidden, out_channels)
        self.dropout = dropout

    def forward(self, x, edge_index, batch):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)

        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)

        x = self.conv3(x, edge_index)

        # Graph-level pooling
        x = global_mean_pool(x, batch)

        # Classification
        x = self.classifier(x)
        return x

In [None]:
# ================================
#  Training Utilities
# ================================
"""
Training and evaluation functions for graph classification models
Following standard practices with early stopping on validation
"""
@torch.no_grad()
def evaluate_graph_cls(model, loader):
    """Evaluate graph classification model accuracy"""
    model.eval()
    total_correct = 0
    total_samples = 0

    for batch in loader:
        batch = batch.to(device)
        out = model(batch.x, batch.edge_index, batch.batch)
        pred = out.argmax(dim=1)
        total_correct += (pred == batch.y).sum().item()
        total_samples += batch.y.size(0)

    return total_correct / total_samples

def train_graph_classifier(model, train_loader, val_loader, test_loader,
                          epochs=200, lr=0.005, wd=5e-4, verbose=True):
    """
    Train graph classification model with early stopping
    Returns the best model based on validation accuracy
    """
    model = model.to(device)
    opt = Adam(model.parameters(), lr=lr, weight_decay=wd)

    best = {'val': 0.0, 'state': None}

    for ep in range(epochs):
        model.train()
        total_loss = 0

        # Training loop
        for batch in train_loader:
            batch = batch.to(device)
            opt.zero_grad()
            out = model(batch.x, batch.edge_index, batch.batch)
            loss = F.cross_entropy(out, batch.y)
            loss.backward()
            opt.step()
            total_loss += loss.item()

        # Evaluation
        train_acc = evaluate_graph_cls(model, train_loader)
        val_acc = evaluate_graph_cls(model, val_loader)
        test_acc = evaluate_graph_cls(model, test_loader)

        # Save best model based on validation accuracy
        if val_acc > best['val']:
            best['val'] = val_acc
            best['state'] = copy.deepcopy(model.state_dict())

        if verbose and ep % 20 == 0:
            avg_loss = total_loss / len(train_loader)
            print(f"Epoch {ep:03d} | loss {avg_loss:.4f} | train {train_acc:.3f} | val {val_acc:.3f} | test {test_acc:.3f}")

    # Load best model
    if best['state'] is not None:
        model.load_state_dict(best['state'])

    # Final evaluation
    train_acc = evaluate_graph_cls(model, train_loader)
    val_acc = evaluate_graph_cls(model, val_loader)
    test_acc = evaluate_graph_cls(model, test_loader)

    if verbose:
        print(f"✅ Final (best-val) | train {train_acc:.3f} | val {val_acc:.3f} | test {test_acc:.3f}")

    return model

In [None]:
# ================================
#  Target Model Training
# ================================
"""
Train the target model (F) that we want to protect
This is the main GNN whose ownership we want to verify
"""
print("Training target model F (GCN for graph classification)...")
set_seed(CFG["SEED"])
model_f = GCN_GraphCls(num_feats, hidden=16, out_channels=num_classes, dropout=0.5)
model_f = train_graph_classifier(
    model_f, train_loader, val_loader, test_loader,
    epochs=200, lr=CFG["LR_TARGET"], wd=CFG["WD_TARGET"]
)
print("Target model training completed.\n")


Training target model F (GCN for graph classification)...
Epoch 000 | loss 0.6716 | train 0.596 | val 0.631 | test 0.578
Epoch 020 | loss 0.6204 | train 0.716 | val 0.658 | test 0.668
Epoch 040 | loss 0.6035 | train 0.723 | val 0.667 | test 0.677
Epoch 060 | loss 0.6100 | train 0.736 | val 0.685 | test 0.673
Epoch 080 | loss 0.5943 | train 0.724 | val 0.703 | test 0.682
Epoch 100 | loss 0.5952 | train 0.727 | val 0.676 | test 0.673
Epoch 120 | loss 0.5978 | train 0.741 | val 0.703 | test 0.682
Epoch 140 | loss 0.6002 | train 0.707 | val 0.730 | test 0.673
Epoch 160 | loss 0.5917 | train 0.719 | val 0.748 | test 0.686
Epoch 180 | loss 0.5883 | train 0.742 | val 0.703 | test 0.677
✅ Final (best-val) | train 0.728 | val 0.766 | test 0.700
Target model training completed.



In [None]:
# ================================
#  Suspect Model Generation - Utilities
# ================================
"""
Functions to create positive and negative suspect models
Positive models: Derived from target model via obfuscation techniques
Negative models: Independently trained models
"""
@torch.no_grad()
def reset_module(m):
    """Reset module parameters to random initialization"""
    for layer in m.modules():
        if hasattr(layer, 'reset_parameters'):
            layer.reset_parameters()

def ft_graph_model(base_model, train_loader, last_only=True, epochs=10, lr=0.005, seed=123):
    """
    Fine-tuning obfuscation technique (Section 2.2.2)
    last_only: True = fine-tune only classifier, False = fine-tune all layers
    """
    set_seed(seed)
    m = copy.deepcopy(base_model).to(device)

    # Freeze parameters if last_only
    for p in m.parameters():
        p.requires_grad_(not last_only)

    # Always fine-tune classifier (last layer)
    for p in m.classifier.parameters():
        p.requires_grad_(True)

    opt = Adam(filter(lambda p: p.requires_grad, m.parameters()), lr=lr)

    for _ in range(epochs):
        m.train()
        for batch in train_loader:
            batch = batch.to(device)
            opt.zero_grad()
            out = m(batch.x, batch.edge_index, batch.batch)
            loss = F.cross_entropy(out, batch.y)
            loss.backward()
            opt.step()

    return m.eval()

def pr_graph_model(base_model, train_loader, last_only=True, epochs=10, lr=0.005, seed=456):
    """
    Partial retraining obfuscation technique (Section 2.2.2)
    Reset parameters and retrain selected layers
    """
    set_seed(seed)
    m = copy.deepcopy(base_model).to(device)

    # Reset parameters based on strategy
    if last_only:
        reset_module(m.classifier)
    else:
        reset_module(m)

    opt = Adam(m.parameters(), lr=lr)

    for _ in range(epochs):
        m.train()
        for batch in train_loader:
            batch = batch.to(device)
            opt.zero_grad()
            out = m(batch.x, batch.edge_index, batch.batch)
            loss = F.cross_entropy(out, batch.y)
            loss.backward()
            opt.step()

    return m.eval()

def make_graph_student(arch='GCN', hidden=16):
    """Create student model for knowledge distillation"""
    if arch == 'GCN':
        return GCN_GraphCls(num_feats, hidden, num_classes, dropout=0.5).to(device)
    else:  # arch == 'SAGE'
        return GraphSAGE_GraphCls(num_feats, hidden, num_classes, dropout=0.5).to(device)

def distill_from_graph_teacher(teacher, train_loader, arch='GCN', T=2.0, steps=250, lr=0.01, seed=777):
    """
    Knowledge distillation obfuscation technique (Section 2.2.2)
    Train student model to mimic teacher model outputs
    """
    set_seed(seed)
    student = make_graph_student(arch, hidden=16)
    opt = Adam(student.parameters(), lr=lr)
    kl = nn.KLDivLoss(reduction='batchmean')

    teacher.eval()
    data_iter = iter(train_loader)

    for step in range(steps):
        try:
            batch = next(data_iter)
        except StopIteration:
            data_iter = iter(train_loader)
            batch = next(data_iter)

        batch = batch.to(device)

        # Teacher predictions (soft targets)
        with torch.no_grad():
            teacher_logits = teacher(batch.x, batch.edge_index, batch.batch)
            p_t = F.softmax(teacher_logits / T, dim=-1)

        # Student predictions
        student.train()
        opt.zero_grad()
        student_logits = student(batch.x, batch.edge_index, batch.batch)
        logit_s = student_logits / T
        loss = kl(F.log_softmax(logit_s, dim=-1), p_t) * (T * T)
        loss.backward()
        opt.step()

        if step % 50 == 0:
            print(f"  Distillation step {step}/{steps}, loss: {loss.item():.4f}")

    return student.eval()

In [None]:
# ================================
# Positive Model Generation (F+)
# ================================
"""
Create positive suspect models using various obfuscation techniques
These models should be recognized as pirated versions of the target model
"""
def _distribute_budget(total, keys):
    """Distribute total budget evenly across keys with remainder handling"""
    if not keys:
        return {}
    base = total // len(keys)
    rem = total - base * len(keys)
    out = {k: base for k in keys}
    for k in keys[:rem]:
        out[k] += 1
    return out

print("Creating positive suspect models (F+)...")

F_pos_all = []
pos_total = CFG["POS_TRAIN"] + CFG["POS_TEST"]

# Determine which obfuscation techniques to use
pos_keys = []
if CFG["USE_FT_LAST"]: pos_keys.append("FT_LAST")
if CFG["USE_FT_ALL"]:  pos_keys.append("FT_ALL")
if CFG["USE_PR_LAST"]: pos_keys.append("PR_LAST")
if CFG["USE_PR_ALL"]:  pos_keys.append("PR_ALL")
if CFG["USE_DISTILL"]: pos_keys.append("DISTILL")

pos_budget = _distribute_budget(pos_total, pos_keys)
print(f"Positive budget distribution: {pos_budget}")

seed_base = 10
for key in pos_keys:
    cnt = pos_budget[key]
    print(f"Creating {cnt} {key} models...")

    if key == "FT_LAST":
        # Fine-tune last layer only
        for s in range(seed_base, seed_base + cnt):
            model = ft_graph_model(model_f, train_loader, last_only=True, epochs=10, seed=s)
            F_pos_all.append(model)
        seed_base += cnt

    elif key == "FT_ALL":
        # Fine-tune all layers
        for s in range(seed_base, seed_base + cnt):
            model = ft_graph_model(model_f, train_loader, last_only=False, epochs=10, seed=s)
            F_pos_all.append(model)
        seed_base += cnt

    elif key == "PR_LAST":
        # Partial retrain last layer only
        for s in range(seed_base, seed_base + cnt):
            model = pr_graph_model(model_f, train_loader, last_only=True, epochs=10, seed=s)
            F_pos_all.append(model)
        seed_base += cnt

    elif key == "PR_ALL":
        # Partial retrain all layers
        for s in range(seed_base, seed_base + cnt):
            model = pr_graph_model(model_f, train_loader, last_only=False, epochs=10, seed=s)
            F_pos_all.append(model)
        seed_base += cnt

    elif key == "DISTILL":
        # Knowledge distillation with different architectures
        arches = (['GCN'] * (cnt//2) + ['SAGE'] * (cnt - cnt//2))
        for i, arch in enumerate(arches):
            print(f"  Distilling to {arch} architecture...")
            model = distill_from_graph_teacher(
                model_f, train_loader, arch=arch,
                T=2.0, steps=CFG["DISTILL_STEPS"], seed=1000+i
            )
            F_pos_all.append(model)

assert len(F_pos_all) == pos_total, f"Expected {pos_total} positive models, got {len(F_pos_all)}"
print(f"Created {len(F_pos_all)} positive models\n")

Creating positive suspect models (F+)...
Positive budget distribution: {'FT_LAST': 20, 'FT_ALL': 20, 'PR_LAST': 20, 'PR_ALL': 20, 'DISTILL': 20}
Creating 20 FT_LAST models...
Creating 20 FT_ALL models...
Creating 20 PR_LAST models...
Creating 20 PR_ALL models...
Creating 20 DISTILL models...
  Distilling to GCN architecture...
  Distillation step 0/250, loss: 0.0916
  Distillation step 50/250, loss: 0.0206
  Distillation step 100/250, loss: 0.0122
  Distillation step 150/250, loss: 0.0145
  Distillation step 200/250, loss: 0.0141
  Distilling to GCN architecture...
  Distillation step 0/250, loss: 0.0732
  Distillation step 50/250, loss: 0.0159
  Distillation step 100/250, loss: 0.0162
  Distillation step 150/250, loss: 0.0080
  Distillation step 200/250, loss: 0.0099
  Distilling to GCN architecture...
  Distillation step 0/250, loss: 0.1428
  Distillation step 50/250, loss: 0.0363
  Distillation step 100/250, loss: 0.0216
  Distillation step 150/250, loss: 0.0230
  Distillation step 

In [None]:
# ================================
#  Negative Model Generation (F-)
# ================================
"""
Create negative suspect models (independently trained)
These models should NOT be recognized as pirated versions
"""
print("Creating negative suspect models (F-)...")

F_neg_all = []
neg_total = CFG["NEG_TRAIN"] + CFG["NEG_TEST"]
neg_keys = ["GCN", "SAGE"]
neg_budget = _distribute_budget(neg_total, neg_keys)
print(f"Negative budget distribution: {neg_budget}")

seed_base = 500

# Create independent GCN models
print(f"Creating {neg_budget['GCN']} independent GCN models...")
for s in range(seed_base, seed_base + neg_budget["GCN"]):
    set_seed(s)
    m = GCN_GraphCls(num_feats, 16, num_classes, dropout=0.5)
    m = train_graph_classifier(
        m, train_loader, val_loader, test_loader,
        epochs=120, lr=CFG["LR_TARGET"], wd=CFG["WD_TARGET"], verbose=False
    )
    F_neg_all.append(m.eval())

seed_base += neg_budget["GCN"]

# Create independent GraphSAGE models
print(f"Creating {neg_budget['SAGE']} independent SAGE models...")
for s in range(seed_base, seed_base + neg_budget["SAGE"]):
    set_seed(s)
    m = GraphSAGE_GraphCls(num_feats, 32, num_classes, dropout=0.5)
    m = train_graph_classifier(
        m, train_loader, val_loader, test_loader,
        epochs=120, lr=CFG["LR_TARGET"], wd=CFG["WD_TARGET"], verbose=False
    )
    F_neg_all.append(m.eval())

assert len(F_neg_all) == neg_total, f"Expected {neg_total} negative models, got {len(F_neg_all)}"
print(f"Created {len(F_neg_all)} negative models\n")


Creating negative suspect models (F-)...
Negative budget distribution: {'GCN': 50, 'SAGE': 50}
Creating 50 independent GCN models...
Creating 50 independent SAGE models...
Created 100 negative models



In [None]:
# ================================
#  Model Pool Splitting
# ================================
"""
Split positive and negative models into training and testing pools
Training pools: Used for fingerprint optimization and verifier training
Testing pools: Used for final evaluation (held-out testing)
"""
def split_pool(pool, n_train, n_test, seed=999):
    """Split model pool into train/test sets"""
    set_seed(seed)
    idx = torch.randperm(len(pool)).tolist()
    train = [pool[i] for i in idx[:n_train]]
    test  = [pool[i] for i in idx[n_train:n_train + n_test]]
    return train, test

# Split positive and negative pools
F_pos_tr, F_pos_te = split_pool(F_pos_all, CFG["POS_TRAIN"], CFG["POS_TEST"])
F_neg_tr, F_neg_te = split_pool(F_neg_all, CFG["NEG_TRAIN"], CFG["NEG_TEST"])

print(f"Model pool split completed:")
print(f"F+ train/test: {len(F_pos_tr)}/{len(F_pos_te)}")
print(f"F- train/test: {len(F_neg_tr)}/{len(F_neg_te)}\n")

Model pool split completed:
F+ train/test: 50/50
F- train/test: 50/50



In [None]:
# ================================
# Graph Fingerprint Implementation
# ================================
"""
Graph fingerprint construction for graph-level tasks (Section 3.3)
Each fingerprint is a small graph with learnable features and adjacency matrix
"""
class GraphFingerprint(nn.Module):
    """
    Individual graph fingerprint for graph-level tasks
    Maintains differentiable adjacency matrix via logits and learnable node features
    """
    def __init__(self, n_nodes, feat_dim, edge_init_p=0.05):
        super().__init__()
        self.n = n_nodes
        self.d = feat_dim

        # Initialize node features uniformly
        X = torch.empty(self.n, self.d).uniform_(-0.5, 0.5)
        self.X = nn.Parameter(X.to(device))

        # Initialize adjacency matrix as logits for differentiability
        # Start with low edge probability as specified in paper
        A0 = (torch.rand(self.n, self.n, device=device) < edge_init_p).float()
        A0.fill_diagonal_(0.0)  # No self-loops
        A0 = torch.maximum(A0, A0.T)  # Make symmetric

        # Convert to logits (avoiding numerical issues)
        self.A_logits = nn.Parameter(torch.logit(torch.clamp(A0, 1e-4, 1-1e-4)))

    @torch.no_grad()
    def edge_index(self):
        """
        Convert adjacency logits to edge_index format
        Used for GNN forward passes
        """
        A_prob = torch.sigmoid(self.A_logits)
        A_bin = (A_prob > 0.5).float()
        A_bin.fill_diagonal_(0.0)
        A_bin = torch.maximum(A_bin, A_bin.T)  # Ensure symmetry

        # Convert to edge_index format
        idx = A_bin.nonzero(as_tuple=False)
        if idx.numel() == 0:
            return torch.empty(2, 0, dtype=torch.long, device=device)
        return idx.t().contiguous()

    @torch.no_grad()
    def flip_topk_by_grad(self, gradA, topk=64, step=2.5):
        """
        Flip top-k edges based on gradient magnitude (Section 3.4.2)
        Following paper's discrete optimization strategy
        """
        g = gradA.abs()
        # Only consider upper triangular part to avoid double-counting
        triu = torch.triu(torch.ones_like(g), diagonal=1)
        scores = (g * triu).flatten()
        k = min(topk, scores.numel())
        if k == 0:
            return

        # Find top-k edges by gradient magnitude
        _, idxs = torch.topk(scores, k=k)
        r = self.n
        pairs = torch.stack((idxs // r, idxs % r), dim=1)

        A_prob = torch.sigmoid(self.A_logits).detach()

        # Apply edge flipping rules from paper
        for (u, v) in pairs.tolist():
            guv = gradA[u, v].item()
            exist = A_prob[u, v] > 0.5

            if exist and guv <= 0:  # Remove existing edge
                self.A_logits.data[u, v] -= step
                self.A_logits.data[v, u] -= step
            elif (not exist) and guv >= 0:  # Add new edge
                self.A_logits.data[u, v] += step
                self.A_logits.data[v, u] += step

        # Ensure no self-loops
        self.A_logits.data.fill_diagonal_(-10.0)

class GraphFingerprintSet(nn.Module):
    """
    Set of P graph fingerprints for graph-level tasks (Section 3.3)
    Manages multiple fingerprint graphs and their optimization
    """
    def __init__(self, P, n_nodes, feat_dim, edge_init_p=0.05, topk_edges=64, edge_step=2.5):
        super().__init__()
        self.P = P
        self.fps = nn.ModuleList([
            GraphFingerprint(n_nodes, feat_dim, edge_init_p)
            for _ in range(P)
        ]).to(device)
        self.topk_edges = topk_edges
        self.edge_step = edge_step

    def concat_outputs(self, model, *, require_grad: bool = False):
        """
        Get concatenated outputs from all fingerprint graphs
        This creates the input vector for the Univerifier
        """
        outs = []
        model.eval()
        ctx = torch.enable_grad() if require_grad else torch.no_grad()

        with ctx:
            for fp in self.fps:
                ei = fp.edge_index()

                # Create batch tensor for single graph
                batch = torch.zeros(fp.n, dtype=torch.long, device=device)

                # Get model output (graph-level prediction)
                logits = model(fp.X, ei, batch)
                probs = F.softmax(logits, dim=-1).flatten()
                outs.append(probs)

        return torch.cat(outs, dim=0)

    def flip_adj_by_grad(self, surrogate_grad_list):
        """
        Apply gradient-based edge flipping to all fingerprints
        """
        for fp, g in zip(self.fps, surrogate_grad_list):
            fp.flip_topk_by_grad(g, topk=self.topk_edges, step=self.edge_step)



In [None]:
# ================================
# Fingerprint Set Initialization
# ================================
"""
Initialize the set of graph fingerprints with specified parameters
"""
print("Initializing graph fingerprint set...")

fp_set = GraphFingerprintSet(
    P=CFG["FP_P"],                    # Number of fingerprint graphs
    n_nodes=CFG["FP_NODES"],          # Nodes per graph
    feat_dim=num_feats,               # Feature dimension
    edge_init_p=CFG["FP_EDGE_INIT_P"], # Initial edge probability
    topk_edges=CFG["FP_EDGE_TOPK"],   # Top-k edges to flip
    edge_step=CFG["EDGE_LOGIT_STEP"],  # Edge update step size
)

# Calculate Univerifier input dimension
# Each fingerprint produces a probability vector of size num_classes
INPUT_DIM = CFG["FP_P"] * num_classes
print(f"Fingerprint set initialized:")
print(f"- {CFG['FP_P']} fingerprint graphs")
print(f"- {CFG['FP_NODES']} nodes per graph")
print(f"- Univerifier input dimension: {INPUT_DIM}\n")

Initializing graph fingerprint set...
Fingerprint set initialized:
- 64 fingerprint graphs
- 32 nodes per graph
- Univerifier input dimension: 128



In [None]:
# ================================
# Univerifier Implementation
# ================================
"""
Univerifier: Binary classifier for ownership verification (Section 3.4.1)
Takes concatenated outputs from fingerprints and predicts pirated vs. irrelevant
"""
class Univerifier(nn.Module):
    """
    Binary classifier following paper's architecture
    3-layer MLP with LeakyReLU activations as specified in Section 4.1.5
    """
    def __init__(self, input_dim: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.LeakyReLU(0.01),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.01),
            nn.Linear(64, 32),
            nn.LeakyReLU(0.01),
            nn.Linear(32, 2),  # Binary classification: pirated vs irrelevant
        )

    def forward(self, x):
        return self.net(x)

# Initialize Univerifier
print("Initializing Univerifier...")
V = Univerifier(INPUT_DIM).to(device)
opt_V = Adam(V.parameters(), lr=CFG["LR_V"])
print(f"Univerifier initialized with input dimension {INPUT_DIM}\n")



Initializing Univerifier...
Univerifier initialized with input dimension 128



In [None]:
# ================================
# Joint Learning Setup
# ================================
"""
Prepare model pools for joint learning (Section 3.4.2)
Training pools include target model + positive/negative models
"""
# Combine target model with positive training models
models_pos_tr = [model_f.to(device)] + [m.to(device) for m in F_pos_tr]
models_neg_tr = [m.to(device) for m in F_neg_tr]

print(f"Joint learning setup:")
print(f"- Positive training models: {len(models_pos_tr)} (including target)")
print(f"- Negative training models: {len(models_neg_tr)}")
print(f"- Total training models: {len(models_pos_tr) + len(models_neg_tr)}\n")


Joint learning setup:
- Positive training models: 51 (including target)
- Negative training models: 50
- Total training models: 101



In [None]:
# ================================
# Joint Learning Helper Functions
# ================================
"""
Core functions for joint optimization of fingerprints and verifier
"""
def batch_from_pool_graph(fp_set, pos_models, neg_models, *, require_grad: bool):
    """
    Create training batch from model pools
    Returns: (X, y) where X are concatenated outputs, y are labels
    """
    X = []
    y = []

    # Positive models (should be classified as pirated)
    for m in pos_models:
        X.append(fp_set.concat_outputs(m, require_grad=require_grad))
        y.append(1)

    # Negative models (should be classified as irrelevant)
    for m in neg_models:
        X.append(fp_set.concat_outputs(m, require_grad=require_grad))
        y.append(0)

    return torch.stack(X, dim=0), torch.tensor(y, device=device)

def surrogate_grad_A_for_graph_fp(fp, model):
    """
    Compute surrogate gradient for adjacency matrix (Section 3.4.2)
    Uses node similarity as proxy for actual gradient computation
    """
    with torch.no_grad():
        ei = fp.edge_index()
        batch = torch.zeros(fp.n, dtype=torch.long, device=device)

        # Get intermediate representation from first conv layer
        h = model.conv1(fp.X, ei)
        h = F.relu(h)

        # Compute pairwise node similarity
        hn = F.normalize(h, dim=-1)
        sim = hn @ hn.t()

        # Surrogate gradient: similarity above threshold suggests edge should exist
        gradA = sim - 0.5
        return gradA.detach().cpu()

def update_features_graph(fp_set, V, pos_models, neg_models, steps, lr_x):
    """
    Update fingerprint node features (Section 3.4.2)
    Alternates between feature updates and edge updates
    """
    # Freeze all model parameters during feature optimization
    for m in pos_models + neg_models:
        for p in m.parameters():
            p.requires_grad_(False)

    # Enable gradients for fingerprint features only
    for fp in fp_set.fps:
        fp.X.requires_grad_(True)

    for _ in range(steps):
        # Get training batch with gradients enabled
        Xb, yb = batch_from_pool_graph(fp_set, pos_models, neg_models, require_grad=True)

        # Freeze verifier during feature update
        V.eval()
        for p in V.parameters():
            p.requires_grad_(False)

        # Compute loss for feature optimization
        logits = V(Xb.to(device))
        loss = F.cross_entropy(logits, yb)

        # Zero existing gradients
        for fp in fp_set.fps:
            if fp.X.grad is not None:
                fp.X.grad.zero_()

        # Backpropagate to get feature gradients
        loss.backward()

        # Update features using computed gradients
        with torch.no_grad():
            for fp in fp_set.fps:
                if fp.X.grad is not None:
                    fp.X.add_(lr_x * fp.X.grad)
                    fp.X.grad.zero_()

        # Re-enable verifier gradients
        for p in V.parameters():
            p.requires_grad_(True)

    # Compute surrogate gradients for adjacency matrices
    grads = [surrogate_grad_A_for_graph_fp(fp, pos_models[0]) for fp in fp_set.fps]
    fp_set.flip_adj_by_grad(grads)

def update_verifier_graph(fp_set, V, pos_models, neg_models, steps):
    """
    Update verifier parameters (Section 3.4.2)
    Standard supervised learning on model outputs
    """
    for _ in range(steps):
        V.train()

        # Get training batch (no gradients needed for fingerprints)
        Xb, yb = batch_from_pool_graph(fp_set, pos_models, neg_models, require_grad=False)

        # Forward pass and loss computation
        logits = V(Xb.to(device))
        loss = F.cross_entropy(logits, yb)

        # Update verifier parameters
        opt_V.zero_grad()
        loss.backward()
        opt_V.step()

In [None]:
# ================================
# Joint Learning Loop
# ================================
"""
Main joint optimization loop (Section 3.4.2)
Alternates between fingerprint updates and verifier updates
"""
print("Starting joint learning...")
print("Iter | Overall Acc | Positive Acc | Negative Acc")
print("-" * 50)

for it in range(1, CFG["OUTER_ITERS"] + 1):
    # Update fingerprint features and adjacency matrices
    update_features_graph(
        fp_set, V, models_pos_tr, models_neg_tr,
        steps=CFG["FP_STEPS"], lr_x=CFG["LR_X"]
    )

    # Update verifier parameters
    update_verifier_graph(
        fp_set, V, models_pos_tr, models_neg_tr,
        steps=CFG["V_STEPS"]
    )

    # Evaluate training progress
    V.eval()
    with torch.no_grad():
        Xb, yb = batch_from_pool_graph(fp_set, models_pos_tr, models_neg_tr, require_grad=False)
        pred = V(Xb).argmax(dim=1)

        # Overall accuracy
        acc = (pred.cpu() == yb.cpu()).float().mean().item()

        # Positive accuracy (robustness)
        pos_acc = (pred[:len(models_pos_tr)].cpu() == 1).float().mean().item()

        # Negative accuracy (uniqueness)
        neg_acc = (pred[len(models_pos_tr):].cpu() == 0).float().mean().item()

    print(f"{it:4d} | {acc:11.3f} | {pos_acc:12.3f} | {neg_acc:12.3f}")

print("\nJoint learning completed!\n")

Starting joint learning...
Iter | Overall Acc | Positive Acc | Negative Acc
--------------------------------------------------
   1 |       0.485 |        0.412 |        0.560
   2 |       0.475 |        0.353 |        0.600
   3 |       0.554 |        0.529 |        0.580
   4 |       0.683 |        0.647 |        0.720
   5 |       0.752 |        0.745 |        0.760
   6 |       0.733 |        0.647 |        0.820
   7 |       0.842 |        0.922 |        0.760
   8 |       0.832 |        0.882 |        0.780
   9 |       0.842 |        0.922 |        0.760
  10 |       0.842 |        0.922 |        0.760
  11 |       0.832 |        0.882 |        0.780
  12 |       0.832 |        0.902 |        0.760
  13 |       0.832 |        0.922 |        0.740
  14 |       0.822 |        0.902 |        0.740
  15 |       0.851 |        0.902 |        0.800
  16 |       0.851 |        0.902 |        0.800
  17 |       0.851 |        0.922 |        0.780
  18 |       0.842 |        0.922 |     

In [None]:
# ================================
#  Test Set Evaluation
# ================================
"""
Evaluate on held-out test sets to measure final performance
Following paper's evaluation methodology (Section 4.1.4)
"""
print("Evaluating on test sets...")

# Prepare test model pools
models_pos_te = [model_f.to(device)] + [m.to(device) for m in F_pos_te]
models_neg_te = [m.to(device) for m in F_neg_te]

@torch.no_grad()
def verify_scores_graph(V, fp_set, models):
    """
    Get verification scores (probability of being pirated) for models
    """
    V.eval()
    Xs = [fp_set.concat_outputs(m, require_grad=False) for m in models]
    X_batch = torch.stack(Xs, dim=0).to(device)
    logits = V(X_batch)
    probs = F.softmax(logits, dim=-1)[:, 1]  # Probability of positive class
    return probs.detach().cpu().numpy()

# Get verification scores for test models
p_pos = verify_scores_graph(V, fp_set, models_pos_te)  # Positive model scores
p_neg = verify_scores_graph(V, fp_set, models_neg_te)  # Negative model scores

print(f"Test set sizes: {len(models_pos_te)} positive, {len(models_neg_te)} negative")
print(f"Sample positive scores: {p_pos[:5]}")
print(f"Sample negative scores: {p_neg[:5]}\n")


Evaluating on test sets...
Test set sizes: 51 positive, 50 negative
Sample positive scores: [0.7363378  0.3280269  0.93478674 0.8450202  0.30074674]
Sample negative scores: [0.03424155 0.06295612 0.0069764  0.01874819 0.0108477 ]



In [None]:
# ================================
#  Threshold Sweep & Metrics
# ================================
"""
Compute robustness, uniqueness, and ARUC across threshold values
Following paper's evaluation metrics (Section 4.1.4)
"""
def sweep_threshold_graph(p_pos, p_neg, num=301):
    """
    Sweep threshold values to compute robustness and uniqueness curves
    """
    ths = np.linspace(0.0, 1.0, num=num)
    R = []  # Robustness (True Positive Rate)
    U = []  # Uniqueness (True Negative Rate)
    A = []  # Balanced Accuracy

    for t in ths:
        tp = (p_pos >= t).mean()    # Robustness: correctly identify positive
        tn = (p_neg <  t).mean()    # Uniqueness: correctly identify negative
        R.append(tp)
        U.append(tn)
        A.append((tp + tn) / 2.0)   # Balanced accuracy

    return ths, np.array(R), np.array(U), np.array(A)

# Perform threshold sweep
ths, R, U, A = sweep_threshold_graph(p_pos, p_neg, num=301)

# Find best threshold and metrics
best_idx = A.argmax()
mean_acc = A.mean()

# Calculate ARUC (Area under Robustness-Uniqueness Curve)
try:
    # For numpy >= 2.0
    ARUC = np.trapezoid(np.minimum(R, U), ths)
except AttributeError:
    # For numpy < 2.0
    ARUC = np.trapz(np.minimum(R, U), ths)

# ================================
"""
Display final results and compare with paper benchmarks
"""
print("=" * 60)
print("FINAL RESULTS - PROTEINS Graph Classification")
print("=" * 60)
print(f"Best threshold λ = {ths[best_idx]:.3f}")
print(f"Robustness (True Positive Rate) = {R[best_idx]:.3f}")
print(f"Uniqueness (True Negative Rate) = {U[best_idx]:.3f}")
print(f"Mean Test Accuracy = {A[best_idx]:.3f}")
print(f"Average Test Accuracy (over all λ) = {mean_acc:.3f}")
print(f"ARUC (Area Under RU Curve) = {ARUC:.3f}")
print("=" * 60)

# Compare with paper results
print("\nComparison with Paper Results (Table 1 - PROTEINS):")
print("Paper Benchmarks:")
print("- GCNMean:      0.967")
print("- GCNDiff:      0.961")
print("- GraphsageMean: 0.989")
print("- GraphsageDiff: 0.984")
print(f"\nYour Implementation:")
print(f"- Mean Accuracy: {A[best_idx]:.3f}")
print(f"- ARUC Score:    {ARUC:.3f}")

# Performance assessment
if A[best_idx] >= 0.96:
    print("\n✅ EXCELLENT! Results match/exceed paper performance!")
elif A[best_idx] >= 0.90:
    print("\n✅ VERY GOOD! Results are close to paper performance!")
elif A[best_idx] >= 0.80:
    print("\n⚠️  DECENT! Results are reasonable but could be improved.")
    print("   Consider: increasing joint learning iterations, tuning hyperparameters")
else:
    print("\n❌ NEEDS IMPROVEMENT! Check implementation details.")
    print("   Focus on: gradient computation, edge flipping logic, feature updates")

print("\n" + "=" * 60)
print("GNNFingers Implementation Complete!")
print("=" * 60)

FINAL RESULTS - PROTEINS Graph Classification
Best threshold λ = 0.793
Robustness (True Positive Rate) = 0.765
Uniqueness (True Negative Rate) = 0.780
Mean Test Accuracy = 0.772
Average Test Accuracy (over all λ) = 0.713
ARUC (Area Under RU Curve) = 0.547

Comparison with Paper Results (Table 1 - PROTEINS):
Paper Benchmarks:
- GCNMean:      0.967
- GCNDiff:      0.961
- GraphsageMean: 0.989
- GraphsageDiff: 0.984

Your Implementation:
- Mean Accuracy: 0.772
- ARUC Score:    0.547

❌ NEEDS IMPROVEMENT! Check implementation details.
   Focus on: gradient computation, edge flipping logic, feature updates

GNNFingers Implementation Complete!
