### Installing dependencies and gathering datasets

In [1]:
# Check PyTorch version installed on this system
!python -c "import torch; print(torch.__version__)"

2.1.0+cu121


In [2]:
%%capture
# Download the corresponding PyTorch Geometric module
"""
Assign to TORCH with what you get from the cell above. E.g., export TORCH=1.12.1+cu113
"""
%env TORCH=2.1.0+cu121
!pip install torch-scatter -f https://data.pyg.org/whl/torch-${TORCH}.html
!pip install torch-sparse -f https://data.pyg.org/whl/torch-${TORCH}.html
!pip install torch-geometric

#### Importing modules, finding the GPU and importing datasets

In [4]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torch_geometric.datasets import Planetoid, TUDataset
from torch_geometric.loader import DataLoader
from torch_geometric.nn import global_mean_pool, global_max_pool
import torch_geometric.utils as U

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("gpu")

# Cora dataset
dsCora = Planetoid("/tmp/Cora", name="Cora")
num_featuresCora = dsCora.num_node_features
num_classesCora = dsCora.num_classes

dataCora = dsCora[0].to(device)
adj_matrixCora = U.to_dense_adj(dataCora.edge_index).squeeze(0)

# NCI1 dataset
dsNCI1 = TUDataset("/tmp/NCI1", name="NCI1")
num_featuresNCI1 = dsNCI1.num_node_features
num_classesNCI1 = dsNCI1.num_classes

# 90% training data, 10% test for NCI1
TRAIN_RATIO = 0.9
BATCH_SIZE = 64

NCI1_train, NCI1_test = torch.utils.data.random_split(dsNCI1, [0.9,0.1])
NCI1_train_loader = DataLoader(NCI1_train, batch_size=BATCH_SIZE, shuffle=True)
NCI1_test_loader = DataLoader(NCI1_test, batch_size=BATCH_SIZE, shuffle=True)

Definition of basic MLP to be used at the end of every GNN layer, as defined for the $\mathcal S,\mathcal G$ architectures

In [5]:
class MLPModule(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3, act_fn=nn.ReLU):
      super().__init__()
      self.act_fn = act_fn()
      # First layer transforms from input dim to hiddem dim
      self.layers = nn.ModuleList([nn.Linear(in_features=input_dim, out_features=hidden_dim)])
      # All hidden layers stay in hidden_dim size for in/out
      for _ in range(num_layers-2):
        self.layers.append(nn.Linear(in_features=hidden_dim, out_features=hidden_dim))
      # Last layer transforms from hidden dim to output dim
      self.layers.append(nn.Linear(in_features=hidden_dim, out_features=output_dim))

    def forward(self, x):
      for layer in self.layers:
        x = self.act_fn(layer(x))
      return x

Definition of basic architecture $\mathcal S,\mathcal G$ layers before the MLP, i.e. with and without self-loops

In [6]:
class GNNLayer(nn.Module):
    def __init__(self, input_dim, output_dim):
      super().__init__()
      self.self_message = nn.Linear(in_features=input_dim, out_features=output_dim)
      self.neighbors_message = nn.Linear(in_features=input_dim, out_features=output_dim)

    def forward(self, node_feats, adj_matrix):
      self_message = self.self_message(node_feats)
      neighbors_message = self.neighbors_message(torch.matmul(adj_matrix, node_feats))
      return self_message + neighbors_message

class GNNLayerSelfLoop(nn.Module):
    def __init__(self, input_dim, output_dim):
      super().__init__()
      self.message = nn.Linear(in_features=input_dim, out_features=output_dim)

    def forward(self, node_feats, adj_matrix):
      message = self.message(torch.matmul(adj_matrix, node_feats))
      return message

The `GNNModule` collects a bunch of GNN layers (with or without self-loops) and plugs MLPs between them.

In [7]:
class GNNModule(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, mlp_depth=3, self_loops=False):
      super().__init__()
      self.layer_type = GNNLayer
      if self_loops:
        self.layer_type = GNNLayerSelfLoop

      self.num_layers = num_layers
      # First layer transforms from input dim to hiddem dim
      self.layers = nn.ModuleList([self.layer_type(input_dim, hidden_dim)])
      # All hidden layers stay in hidden_dim size for in/out
      for _ in range(num_layers-1):
        self.layers.append(self.layer_type(hidden_dim, hidden_dim))

      self.mlp_layers = nn.ModuleList()
      for _ in range(num_layers-1):
        self.mlp_layers.append(MLPModule(hidden_dim, hidden_dim, hidden_dim, mlp_depth))
      # Last MLP transforms from hidden dim to output dim
      self.mlp_layers.append(MLPModule(hidden_dim, hidden_dim, output_dim, mlp_depth))

    def forward(self, x, adj_matrix):
      for l in range(self.num_layers):
        x = self.mlp_layers[l](self.layers[l](x, adj_matrix))
      return x

Define the models for Cora node classification and NCI1 graph classification, including evaluation and training functions

In [8]:
loss_fn = torch.nn.CrossEntropyLoss()

class CoraNodeClassification(nn.Module):
    def __init__(self, num_features: int, hidden_features: int, num_classes: int, num_gnn_layers: int, num_mlp_layers: int, self_loops: bool):
      super().__init__()
      self.self_loops = self_loops
      self.gnn = GNNModule(num_features, hidden_features, num_classes, num_gnn_layers, num_mlp_layers, self_loops)
      pass

    def forward(self, data, adj_matrix):
      return self.gnn(data, adj_matrix)

    def test_on(self, data, adj_matrix):
        self.eval()
        res = self(data.x, adj_matrix)
        res_evaluated = torch.argmax(res, dim=1)
        masked_truth = data.y[data.test_mask]
        masked_res = res_evaluated[data.test_mask]
        return (masked_truth==masked_res).sum().item()/masked_truth.shape[0]

    def train_on(self, data, adj_matrix, learning_rate: float, num_epochs: int):
      optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)
      for i in range(num_epochs):
        self.train()
        optimizer.zero_grad()
        y = self(data.x, adj_matrix)
        loss = loss_fn(y[data.train_mask], data.y[data.train_mask])
        loss.backward()
        optimizer.step()

In [9]:
class NCI1GraphClassification(nn.Module):
    def __init__(self, num_features: int, hidden_features: int, num_classes: int, num_gnn_layers: int, num_mlp_layers: int, self_loops: bool, pool_max: bool = True):
      super().__init__()
      self.self_loops = self_loops
      self.pool_max = pool_max
      self.gnn = GNNModule(num_features, hidden_features, num_classes, num_gnn_layers, num_mlp_layers, self_loops)
      pass

    def forward(self, data, adj_matrix):
      return self.gnn(data, adj_matrix)

    def _evaluate_one_batch(self, data):
        adj_matrix = U.to_dense_adj(data.edge_index).squeeze(0)
        batch_res = self(data.x, adj_matrix)
        if self.pool_max:
          preds = global_max_pool(batch_res, data.batch)
        else:
          preds = global_mean_pool(batch_res, data.batch)
        return preds

    def test_on(self, loader):
        self.eval()
        hits = 0
        for data in loader:
            data = data.to(device)
            preds = self._evaluate_one_batch(data)
            preds = preds.argmax(dim=1)
            hits += preds.eq(data.y).sum().item()
        return hits/len(loader.dataset)

    def train_on(self, loader, learning_rate: float, num_epochs: int):
        optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)
        for i in range(num_epochs):
          self.train()
          for data in loader:
            optimizer.zero_grad()
            data = data.to(device)
            preds = self._evaluate_one_batch(data)
            loss = loss_fn(preds, data.y)
            loss.backward()
            optimizer.step()


Define the basic benchmark functions, base parameters and run all test cases for parameter combinations.

In [None]:
def average_model(params, modelType: nn.Module, data, adj_matrix, num_runs=10):
  accs = []
  for i in range(num_runs):
    if modelType is CoraNodeClassification:
      model = modelType(params["num_features"],
                  params["hidden_features"],
                  params["num_classes"],
                  params["num_gnn_layers"],
                  params["num_mlp_layers"],
                  params["self_loops"],
      ).to(device)
      model.train_on(dataCora, adj_matrixCora, params["learning_rate"], params["num_epochs"])
      accs.append(model.test_on(dataCora, adj_matrixCora))
    elif modelType is NCI1GraphClassification:
      model = modelType(params["num_features"],
                        params["hidden_features"],
                        params["num_classes"],
                        params["num_gnn_layers"],
                        params["num_mlp_layers"],
                        params["self_loops"],
                        params["pool_max"],
      ).to(device)
      model.train_on(NCI1_train_loader, params["learning_rate"], params["num_epochs"])
      accs.append(model.test_on(NCI1_test_loader))
  accuracy = sum(accs)/num_runs
  return accuracy

def benchmark(param_key: str, values: list):
  base_paramsCora = {
    "learning_rate": 1e-4,
    "num_epochs": 256,
    "num_features": num_featuresCora,
    "num_classes": num_classesCora,
    "self_loops": False,
    "hidden_features": 64,
    "num_gnn_layers": 3,
    "num_mlp_layers": 3,
  }
  base_paramsNCI1 = {
    "learning_rate": 1e-4,
    "num_epochs": 32,
    "num_features": num_featuresNCI1,
    "num_classes": num_classesNCI1,
    "self_loops": False,
    "hidden_features": 64,
    "num_gnn_layers": 3,
    "num_mlp_layers": 3,
    "pool_max": False,
  }
  paramsCora = dict(base_paramsCora)
  paramsNCI1 = dict(base_paramsNCI1)

  accsCora = ([],[])
  accsNCI1 = ([],[])
  for self_loops in [False, True]:
    paramsCora["self_loops"] = self_loops
    paramsNCI1["self_loops"] = self_loops
    for val in values:
      paramsCora[param_key] = val
      paramsNCI1[param_key] = val
      if "param_key" != "pool_max":
        accCora = average_model(paramsCora, CoraNodeClassification, dataCora, adj_matrixCora, 10)
        accsCora[self_loops].append(accCora)
      accNCI1 = average_model(paramsNCI1, NCI1GraphClassification, dataCora, adj_matrixCora, 10)
      accsNCI1[self_loops].append(accNCI1)
  print(f"Accuracy for {param_key} in {values}")
  if "param_key" != "pool_max":
    print("Cora without self-loops: ", accsCora[False])
    print("Cora with self-loops: ", accsCora[True])
  print("NCI1 without self-loops: ", accsNCI1[False])
  print("NCI1 with self-loops: ", accsNCI1[True])

benchmark("num_gnn_layers", [1,2,3,4,5])
benchmark("num_mlp_layers", [1,2,3,4,5])
benchmark("hidden_features", [16,32,64,128,256])
benchmark("pool_max", [False,True])
