In [2]:
!pip install torch-geometric


Collecting torch-geometric
  Downloading torch_geometric-2.7.0-py3-none-any.whl.metadata (63 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/63.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.7/63.7 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
Downloading torch_geometric-2.7.0-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m38.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch-geometric
Successfully installed torch-geometric-2.7.0


In [3]:
import os
import time
import csv
import random
import numpy as np

import torch
import torch.nn.functional as F
from torch import nn
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
from torch_geometric.nn import GATConv

from sklearn.model_selection import StratifiedShuffleSplit


In [4]:
def set_seed(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


In [None]:
def load_cora():
    dataset = Planetoid(
        root="./data",
        name="Cora",
        transform=NormalizeFeatures()
    )
    return dataset[0]


In [5]:
def create_label_splits(data, label_rate, seed):
    set_seed(seed)

    y = data.y.cpu().numpy()
    num_nodes = data.num_nodes

    idx = np.arange(num_nodes)

    # First: fixed test set (20%)
    sss1 = StratifiedShuffleSplit(
        n_splits=1, test_size=0.2, random_state=seed
    )
    train_val_idx, test_idx = next(sss1.split(idx, y))

    # Second: labeled subset from train+val
    sss2 = StratifiedShuffleSplit(
        n_splits=1,
        train_size=label_rate,
        random_state=seed
    )
    labeled_idx, _ = next(
        sss2.split(train_val_idx, y[train_val_idx])
    )
    labeled_idx = train_val_idx[labeled_idx]

    # Validation set = remaining train_val - labeled
    val_idx = np.setdiff1d(train_val_idx, labeled_idx)

    masks = {}
    for name, indices in zip(
        ["train", "val", "test"],
        [labeled_idx, val_idx, test_idx]
    ):
        mask = torch.zeros(num_nodes, dtype=torch.bool)
        mask[indices] = True
        masks[name] = mask

    return masks


In [6]:
def apply_feature_ablation(x, ablation, seed, noise_level=None):
    set_seed(seed)

    if ablation == "Vanilla":
        return x

    if ablation == "Identity":
        return torch.eye(x.size(0), device=x.device)

    if ablation == "Shuffled":
        perm = torch.randperm(x.size(0))
        return x[perm]

    if ablation == "Gaussian":
        assert noise_level is not None
        noise = torch.randn_like(x) * noise_level
        return x + noise

    raise ValueError(f"Unknown feature ablation: {ablation}")


In [7]:
def apply_structure_ablation(edge_index, ablation, seed):
    set_seed(seed)

    if ablation != "EdgeDrop":
        return edge_index

    num_edges = edge_index.size(1)
    keep = int(0.8 * num_edges)
    perm = torch.randperm(num_edges)[:keep]
    return edge_index[:, perm]


In [8]:
class GAT(nn.Module):
    def __init__(self, in_dim, hidden_dim, out_dim, heads=8, dropout=0.6):
        super().__init__()
        self.conv1 = GATConv(
            in_dim, hidden_dim, heads=heads, dropout=dropout
        )
        self.conv2 = GATConv(
            hidden_dim * heads, out_dim, heads=1, dropout=dropout
        )
        self.dropout = dropout

    def forward(self, x, edge_index, return_attn=False):
        x = F.dropout(x, p=self.dropout, training=self.training)

        if return_attn:
            x, (edge_idx, attn1) = self.conv1(
                x, edge_index, return_attention_weights=True
            )
        else:
            x = self.conv1(x, edge_index)

        x = F.elu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)

        if return_attn:
            x, (_, attn2) = self.conv2(
                x, edge_index, return_attention_weights=True
            )
            return x, [attn1, attn2]

        x = self.conv2(x, edge_index)
        return x


In [9]:
def representation_variance(embeddings):
    """
    embeddings: Tensor [num_nodes, dim]
    Returns mean variance across dimensions.
    """
    return embeddings.var(dim=0).mean().item()

In [10]:
def attention_entropy(attn_weights, eps=1e-9):
    """
    attn_weights: Tensor of shape [num_edges, num_heads]
    Returns mean entropy over edges.
    """
    attn = attn_weights + eps
    ent = -(attn * torch.log(attn)).sum(dim=1)
    return ent.mean().item()


def train_and_eval(
    data,
    masks,
    feature_ablation,
    structure_ablation,
    noise_level,
    seed,
    device
):
    set_seed(seed)
    data = data.to(device)

    x = apply_feature_ablation(
        data.x, feature_ablation, seed, noise_level
    )
    edge_index = apply_structure_ablation(
        data.edge_index, structure_ablation, seed
    )

    model = GAT(
        in_dim=x.size(1),
        hidden_dim=8,
        out_dim=int(data.y.max().item()) + 1
    ).to(device)

    optimizer = torch.optim.Adam(
        model.parameters(),
        lr=0.005,
        weight_decay=5e-4
    )

    best_val = 0.0
    best_epoch = 0
    patience = 100
    wait = 0

    start = time.time()

    # ---- training loop (unchanged) ----
    for epoch in range(1, 1001):
        model.train()
        optimizer.zero_grad()

        out = model(x, edge_index)
        loss = F.cross_entropy(
            out[masks["train"]],
            data.y[masks["train"]]
        )
        loss.backward()
        optimizer.step()

        model.eval()
        with torch.no_grad():
            val_pred = out[masks["val"]].argmax(dim=1)
            val_acc = (
                val_pred == data.y[masks["val"]]
            ).float().mean().item()

        if val_acc > best_val:
            best_val = val_acc
            best_epoch = epoch
            wait = 0
        else:
            wait += 1
            if wait >= patience:
                break

    train_time = time.time() - start

    # ---- final evaluation + attention diagnostics ----
    model.eval()
    with torch.no_grad():
        # forward with attention weights
        out, attn = model(
            x, edge_index, return_attn=True
        )
        rep_var = representation_variance(out)

        test_pred = out[masks["test"]].argmax(dim=1)
        test_acc = (
            test_pred == data.y[masks["test"]]
        ).float().mean().item()

        # attention entropy per layer
        attn_entropy_l1 = attention_entropy(attn[0])
        attn_entropy_l2 = attention_entropy(attn[1])

    return (
        test_acc,
        best_epoch,
        train_time,
        attn_entropy_l1,
        attn_entropy_l2,
        rep_var
    )


In [None]:
def run_cora_gat_experiments(output_csv):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    data = load_cora()

    label_rates = [0.01, 0.03, 0.05, 0.10]
    seeds = [0, 1, 2, 3, 4]

    ablations = [
        ("Vanilla", None),
        ("Identity", None),
        ("Shuffled", None),
        ("Gaussian", 0.1),
        ("Gaussian", 0.3),
        ("Gaussian", 0.5),
        ("EdgeDrop", None)
    ]

    with open(output_csv, "w", newline="") as f:
        writer = csv.writer(f)

        # ---- updated CSV header ----
        writer.writerow([
            "dataset", "model", "seed", "label_rate", "ablation",
            "noise_level", "accuracy", "best_epoch", "train_time",
            "attn_entropy_l1", "attn_entropy_l2","rep_variance"
        ])

        for seed in seeds:
            for lr in label_rates:
                masks = create_label_splits(data, lr, seed)

                for ablation, noise in ablations:
                    feat_ab = ablation if ablation != "EdgeDrop" else "Vanilla"
                    struct_ab = ablation if ablation == "EdgeDrop" else None

                    # ---- updated unpacking ----
                    acc, epoch, t, ent1, ent2,rep_var = train_and_eval(
                        data=data,
                        masks=masks,
                        feature_ablation=feat_ab,
                        structure_ablation=struct_ab,
                        noise_level=noise,
                        seed=seed,
                        device=device
                    )

                    writer.writerow([
                        "Cora",
                        "GAT",
                        seed,
                        lr,
                        ablation,
                        noise if noise is not None else "NA",
                        acc,
                        epoch,
                        t,
                        ent1,
                        ent2,
                        rep_var
                    ])


In [None]:
run_cora_gat_experiments("gat_cora_results.csv")


In [11]:
def load_citeseer():
    dataset = Planetoid(
        root="./data",
        name="CiteSeer",
        transform=NormalizeFeatures()
    )
    return dataset[0]


In [14]:
def run_citeseer_gat_experiments(output_csv):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    data = load_citeseer()

    label_rates = [0.01, 0.03, 0.05, 0.10]
    seeds = [0, 1, 2, 3, 4]

    ablations = [
        ("Vanilla", None),
        ("Identity", None),
        ("Shuffled", None),
        ("Gaussian", 0.1),
        ("Gaussian", 0.3),
        ("Gaussian", 0.5),
        ("EdgeDrop", None)
    ]

    with open(output_csv, "w", newline="") as f:
        writer = csv.writer(f)

        # ---- updated CSV header ----
        writer.writerow([
            "dataset", "model", "seed", "label_rate", "ablation",
            "noise_level", "accuracy", "best_epoch", "train_time",
            "attn_entropy_l1", "attn_entropy_l2","rep_variance"
        ])

        for seed in seeds:
            for lr in label_rates:
                masks = create_label_splits(data, lr, seed)

                for ablation, noise in ablations:
                    feat_ab = ablation if ablation != "EdgeDrop" else "Vanilla"
                    struct_ab = ablation if ablation == "EdgeDrop" else None

                    # ---- updated unpacking ----
                    acc, epoch, t, ent1, ent2,rep_var = train_and_eval(
                        data=data,
                        masks=masks,
                        feature_ablation=feat_ab,
                        structure_ablation=struct_ab,
                        noise_level=noise,
                        seed=seed,
                        device=device
                    )

                    writer.writerow([
                        "CiteSeer",
                        "GAT",
                        seed,
                        lr,
                        ablation,
                        noise if noise is not None else "NA",
                        acc,
                        epoch,
                        t,
                        ent1,
                        ent2,
                        rep_var
                    ])

In [16]:
run_citeseer_gat_experiments("gat_citeseer_model_agnostic.csv")


In [None]:
from torch_geometric.data import Data

def generate_synthetic_heterophilous_graph(
    num_nodes=2000,
    num_classes=5,
    feature_dim=128,
    p_in=0.01,
    p_out=0.05,
    seed=0
):
    set_seed(seed)

    # ---- labels ----
    y = torch.randint(0, num_classes, (num_nodes,))

    # ---- features (weakly correlated with labels) ----
    class_means = torch.randn(num_classes, feature_dim)
    x = torch.randn(num_nodes, feature_dim)
    x[y == 0] += 0.2
    x[y == 1] -= 0.2


    # ---- edges (heterophily: more inter-class edges) ----
    edge_list = []

    for i in range(num_nodes):
        for j in range(i + 1, num_nodes):
            if y[i] == y[j]:
                if torch.rand(1).item() < p_in:
                    edge_list.append([i, j])
                    edge_list.append([j, i])
            else:
                if torch.rand(1).item() < p_out:
                    edge_list.append([i, j])
                    edge_list.append([j, i])

    edge_index = torch.tensor(edge_list, dtype=torch.long).t().contiguous()

    data = Data(
        x=x,
        edge_index=edge_index,
        y=y
    )

    return data


In [None]:
def load_synthetic(seed):
    return generate_synthetic_heterophilous_graph(seed=seed)


In [None]:
def run_synthetic_gat_experiments(output_csv):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    label_rates = [0.01, 0.03, 0.05, 0.10]
    seeds = [0, 1, 2, 3, 4]

    ablations = [
        ("Vanilla", None),
        ("Identity", None),
        ("Shuffled", None),
        ("Gaussian", 0.1),
        ("Gaussian", 0.3),
        ("Gaussian", 0.5),
        ("EdgeDrop", None)
    ]

    with open(output_csv, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([
            "dataset", "model", "seed", "label_rate", "ablation",
            "noise_level", "accuracy", "best_epoch", "train_time"
        ])

        for seed in seeds:
            data = load_synthetic(seed)

            for lr in label_rates:
                masks = create_label_splits(data, lr, seed)

                for ablation, noise in ablations:
                    feat_ab = ablation if ablation != "EdgeDrop" else "Vanilla"
                    struct_ab = ablation if ablation == "EdgeDrop" else None

                    acc, epoch, t = train_and_eval(
                        data=data,
                        masks=masks,
                        feature_ablation=feat_ab,
                        structure_ablation=struct_ab,
                        noise_level=noise,
                        seed=seed,
                        device=device
                    )

                    writer.writerow([
                        "Synthetic-Heterophilous",
                        "GAT",
                        seed,
                        lr,
                        ablation,
                        noise if noise is not None else "NA",
                        acc,
                        epoch,
                        t
                    ])


In [None]:
run_synthetic_gat_experiments("gat_synthetic_results.csv")
