In [None]:
!pip install torch-geometric -f https://data.pyg.org/whl/torch-1.11.0+cu115.html

In [None]:
from typing import Callable, List, Optional, Tuple
import numpy as np
import torch
import torch.nn. functional as F
import torch_geometric.transforms as T
from torch import Tensor
from torch.optim import Optimizer
from torch_geometric.data import Data
from torch_geometric.datasets import Planetoid, WebKB
from typing_extensions import Literal, TypedDict
from collections import defaultdict
from typing import List, Optional, Tuple, Union
import scipy.sparse
from torch import Tensor
from scipy.sparse import coo_matrix, eye, diags, csr_matrix
import torch.nn as nn
from torch.nn import init
from torch.nn import Parameter
import torch.nn.functional as F
from functools import cached_property
import scipy.sparse as sp
from torch_geometric.transforms import RandomNodeSplit
from torch_geometric.datasets.wikipedia_network import WikipediaNetwork
from torch_geometric.utils import dropout_adj

In [None]:
def get_dataset(path, name, split, transform):
    if name in ('Cora', 'CiteSeer'):
        dataset = Planetoid(path, name=name, split=split, transform=transform)
    else:
        raise ValueError(f"Unknown dataset name: {name}")

    num_nodes = dataset.data.num_nodes
    num_edges = dataset.data.num_edges // 2
    print(f"Dataset: {dataset.name}")
    print(f"Num. nodes: {num_nodes}")
    print(f"Num. edges: {num_edges}")
    print(f"Num. node features: {dataset.num_node_features}")
    print(f"Num. classes: {dataset.num_classes}")
    print(f"Dataset len.: {dataset.len()}")

    return dataset


In [None]:

def split_dataset(dataset, train_ratio=0.6, val_ratio=0.2, test_ratio=0.2):
    num_nodes = dataset[0].num_nodes
    split = RandomNodeSplit(split="random", num_splits=1, num_train_per_class=int((num_nodes) / 7 * train_ratio), num_val=int(num_nodes * val_ratio), num_test=int(num_nodes * test_ratio))
    splitted_dataset = split(dataset[0])
    train_len = splitted_dataset.train_mask.sum()
    val_len = splitted_dataset.val_mask.sum()
    test_len = splitted_dataset.test_mask.sum()
    other_len = num_nodes - train_len-val_len - test_len
    print(f"Num. train={train_len}, val={val_len}, test={test_len}, other={other_len}")
    return splitted_dataset

In [None]:


def adjacency_matrix(edge_index, edge_attr=None, num_nodes=None):

    if num_nodes is None:
        num_nodes = int(edge_index.max()) + 1
    if edge_attr is None:
        edge_attr = torch.ones(edge_index.shape[1], dtype=torch.float)
    adj_matrix_sparse = torch.sparse_coo_tensor(edge_index, edge_attr, (num_nodes, num_nodes))
    adj_matrix = adj_matrix_sparse.to_dense()
    return (adj_matrix + adj_matrix.T) / 2


In [None]:


def normalize_adjacency_matrix(A):

    if not isinstance(A, csr_matrix):
        A = csr_matrix(A)
    A = A + sp.eye(A.shape[0])
    degrees = np.array(A.sum(axis=1)).flatten()
    degrees[degrees == 0] = 1
    D_inv_sqrt = diags(1.0 / np.sqrt(degrees))
    normalized_A = D_inv_sqrt @ A @ D_inv_sqrt
    return normalized_A


def sparse_matrix_to_torch_sparse_tensor(sparse_matrix):
    sparse_matrix = sparse_matrix.tocoo()
    indices = torch.LongTensor(np.vstack((sparse_matrix.row, sparse_matrix.col)))
    values = torch.FloatTensor(sparse_matrix.data)
    shape = torch.Size(sparse_matrix.shape)
    return torch.sparse.FloatTensor(indices, values, shape)

In [None]:
def accuracy(pred: Tensor, target: Tensor) -> float:
    r"""Computes the accuracy of predictions.

    Args:
        pred (Tensor): The predictions.
        target (Tensor): The targets.

    :rtype: float
    """
    return int((pred == target).sum()) / target.numel()

In [None]:
def drop_edges(data, drop_prob):

    new_edge_index, edge_attr = dropout_adj(data.edge_index, p=drop_prob, force_undirected=False, training=True)

    new_data = data.clone()

    new_data.edge_index = new_edge_index

    if edge_attr is not None:
        new_data.edge_attr = edge_attr

    return new_data

In [None]:

class GCNLayer(nn.Module):

    def __init__(self, in_features, out_features):

        super(GCNLayer, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.weight = torch.nn.Parameter(init.kaiming_uniform_(torch.empty(self.in_features, self.out_features), mode='fan_in', nonlinearity='relu'))

    def forward(self, input, adj , h_0 , lamda, alpha, l):

        h_l = torch.spmm(adj, input)
        features = (1 - alpha) * h_l + alpha * h_0
        n = self.weight.shape[0]
        I_n = torch.eye(n)
        beta = np.log((lamda / l) + 1)
        term1 = (1 - beta) * I_n
        term2 = beta * self.weight
        weights = term1 + term2
        output = torch.mm(features, weights)
        return output

class GCNII(nn.Module):
    def __init__(self, nfeat, nlayers, nhidden, nclass, dropout, lamda, alpha):
        super(GCNII, self).__init__()
        self.graph_convs = nn.ModuleList()
        for i in range(nlayers):
            conv_layer = GCNLayer(nhidden, nhidden)
            self.graph_convs.append(conv_layer)

        self.pre_fc = nn.Linear(nfeat, nhidden)
        self.post_fc = nn.Linear(nhidden, nclass)

        self.relu = nn.ReLU()
        self.dropout = dropout
        self.lamda = lamda
        self.alpha = alpha


    def forward(self, x, edge_index, edge_attr):

        adj = adjacency_matrix(edge_index, edge_attr, x.shape[0])
        adj = normalize_adjacency_matrix(adj)
        adj = sparse_matrix_to_torch_sparse_tensor(adj)
        x = F.dropout(x, self.dropout, training=self.training)
        h_0 = self.relu(self.pre_fc(x))
        h = h_0
        for i, con in enumerate(self.graph_convs):
            h = F.dropout(h, self.dropout, training=self.training)
            h = self.relu(con(h, adj, h_0, self.lamda, self.alpha, i + 1))
        h = F.dropout(h, self.dropout, training=self.training)
        h = self.post_fc(h)
        return F.log_softmax(h, dim=1)



if __name__ == '__main__':
    pass




In [None]:
LossFunction = Callable[[Tensor, Tensor], Tensor]
Stage = Literal["train", "val", "test"]

def train_step(
    model: torch.nn.Module, data: Data, optimizer: torch.optim.Optimizer, loss_function: LossFunction
) -> Tuple[float, float]:

    model.train()
    optimizer.zero_grad()
    training_mask = data.train_mask
    logits = model(data.x, data.edge_index, data.edge_attr)[training_mask]
    predictions = torch.argmax(logits, dim=1)
    labels = data.y[training_mask]
    loss = loss_function(logits, labels)
    acc = accuracy(predictions, labels)
    loss.backward()
    optimizer.step()
    return loss.item(), acc

@torch.no_grad()
def evaluate_step(
    model: torch.nn.Module, data: Data, loss_function: LossFunction, stage: Stage
) -> Tuple[float, float]:

    model.eval()
    stage_mask = getattr(data, f"{stage}_mask")
    logits = model(data.x, data.edge_index, data.edge_attr)[stage_mask]
    predictions = torch.argmax(logits, dim=1)
    labels = data.y[stage_mask]
    loss = loss_function(logits, labels)
    acc = accuracy(predictions, labels)
    return loss.item(), acc


In [None]:
class HistoryDict(TypedDict):
    loss: List[float]
    acc: List[float]
    val_loss: List[float]
    val_acc: List[float]

def train(
    model: torch.nn.Module,
    data: Data,
    optimizer: torch.optim.Optimizer,
    loss_function: LossFunction = torch.nn.CrossEntropyLoss(),
    max_epochs: int = 1500,
    early_stopping: int = 100,
    print_interval: int = 100,
    drop_prob: float = 0,
    verbose: bool = True,
) -> HistoryDict:
    history = {"loss": [], "val_loss": [], "acc": [], "val_acc": []}

    for epoch in range(max_epochs):

        new_data = drop_edges(data, drop_prob)
        loss, acc = train_step(model, new_data, optimizer, loss_function)

        val_loss, val_acc = evaluate_step(model, new_data, loss_function, "val")

        history["loss"].append(loss)
        history["acc"].append(acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)

        if epoch > early_stopping and val_loss > np.mean(history["val_loss"][-(early_stopping + 1):-1]):
            if verbose:
                print("\nEarly stopping...")
            break


        if verbose and epoch % print_interval == 0:
            print(f"\nEpoch: {epoch}\n------")
            print(f"Train loss: {loss:.4f} | Train acc: {acc:.4f}")
            print(f"Val loss: {val_loss:.4f} | Val acc: {val_acc:.4f}")

    test_loss, test_acc = evaluate_step(model, data, loss_function, "test")

    return history, test_loss, test_acc

In [None]:

dataset = get_dataset(path = "/tmp/Cora", name="Cora",split = "full", transform=T.NormalizeFeatures())
data = dataset[0]

SEED = 42
NLAYERS = 128
ALPHA = 0.2
LEARNING_RATE = 0.01
NHIDDEN = 64
LAMBDA = 0.5
DROPOUT = 0.5
MAX_EPOCHS = 1500
WEIGHT_DECAY = 0.0001
EARLY_STOPPING = 100
PRINT_INTERVAL = 25
DROP_PROB = 0

torch.manual_seed(SEED)



test_accuracies = []

for i in range(10):

    split_data = split_dataset(dataset)


    model = GCNII(nfeat=split_data.num_node_features,
                  nlayers=NLAYERS,
                  nhidden=NHIDDEN,
                  nclass=dataset.num_classes,
                  dropout=DROPOUT,
                  lamda=LAMBDA,
                  alpha=ALPHA)

    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

    history, test_loss, test_acc  = train(model, data, optimizer, max_epochs=MAX_EPOCHS, early_stopping=EARLY_STOPPING, print_interval=PRINT_INTERVAL, drop_prob=DROP_PROB)

    test_accuracies.append(test_acc)

mean_accuracy = np.mean(test_accuracies)
std_accuracy = np.std(test_accuracies)

print(f'Mean Test Accuracy: {mean_accuracy:.4f}')
print(f'Standard Deviation of Test Accuracies: {std_accuracy:.4f}')


Dataset: Cora
Num. nodes: 2708
Num. edges: 5278
Num. node features: 1433
Num. classes: 7
Dataset len.: 1
Num. train=1557, val=541, test=541, other=69

Epoch: 0
------
Train loss: 1.9492 | Train acc: 0.1316
Val loss: 1.9355 | Val acc: 0.3160

Epoch: 25
------
Train loss: 1.5264 | Train acc: 0.4371
Val loss: 1.4899 | Val acc: 0.4640


KeyboardInterrupt: ignored

In [None]:

dataset = get_dataset(path = "/tmp/CiteSeer", name="CiteSeer",split = "full", transform=T.NormalizeFeatures())
data = dataset[0]

SEED = 42
NLAYERS = 128
ALPHA = 0.5
LEARNING_RATE = 0.01
NHIDDEN = 64
LAMBDA = 0.5
DROPOUT = 0.5
MAX_EPOCHS = 1500
WEIGHT_DECAY = 5e-6
PRINT_INTERVAL = 50
EARLY_STOPPING = 100
PRINT_INTERVAL = 25
DROP_PROB = 0


torch.manual_seed(SEED)

test_accuracies = []

for i in range(10):

    split_data = split_dataset(dataset)

    model = GCNII(nfeat=split_data.num_node_features,
                  nlayers=NLAYERS,
                  nhidden=NHIDDEN,
                  nclass=dataset.num_classes,
                  dropout=DROPOUT,
                  lamda=LAMBDA,
                  alpha=ALPHA)

    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

    history, test_loss, test_acc  = train(model, data, optimizer, max_epochs=MAX_EPOCHS, early_stopping=EARLY_STOPPING, print_interval=PRINT_INTERVAL, drop_prob=DROP_PROB)

    test_accuracies.append(test_acc)

mean_accuracy = np.mean(test_accuracies)
std_accuracy = np.std(test_accuracies)

print(f'Mean Test Accuracy: {mean_accuracy:.4f}')
print(f'Standard Deviation of Test Accuracies: {std_accuracy:.4f}')