In [1]:
import torch
from torch_geometric.nn import GCNConv
from torch.nn import Linear
from torch_geometric.nn import global_mean_pool
import torch.nn.functional as F
from torch_geometric.datasets import Planetoid
from pool_class import *
from torch_geometric.transforms import NormalizeFeatures
from torch_geometric.transforms.largest_connected_components import LargestConnectedComponents

In [2]:
dataset = Planetoid(root='data/Planetoid', name='Cora', transform=NormalizeFeatures())
data = dataset[0]
data = LargestConnectedComponents(1)(data)

In [3]:
# Test whether the message can flow (gradient can be updated automatically)
class dispooling_GCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super().__init__()
        torch.manual_seed(1234)
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.pool1 = pooling(hidden_channels, score_method=2, p1=0.3, p2=0.95, aggregate_score_method='avg')
        # self.poo1 = EdgePooling(hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, out_channels)
    def forward(self, x, edge_index):
        h = self.conv1(x, edge_index).relu()
        edge_index, _, h, score_assignment_matrix_0, edge_index_0 = self.pool1(edge_index, h)
        h = self.conv2(h, edge_index).relu()
        h, edge_index = self.pool1.unpool(h, score_assignment_matrix_0, edge_index_0)
        h = self.conv3(h, edge_index).relu()
        h = self.lin(h)

        return h

In [5]:
model = dispooling_GCN(in_channels=dataset.num_node_features, hidden_channels=20, out_channels=dataset.num_classes)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-3)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()
    optimizer.zero_grad() 
    out = model(data.x, data.edge_index)
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    return(loss)


def test():
    model.eval()
    pred = model(data.x, data.edge_index).argmax(dim=1)
    test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
    test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
    return test_acc


def val():
    model.eval()
    pred = model(data.x, data.edge_index).argmax(dim=1)
    test_correct = pred[data.val_mask] == data.y[data.val_mask]  # Check against ground-truth labels.
    test_acc = int(test_correct.sum()) / int(data.val_mask.sum())  # Derive ratio of correct predictions.
    return test_acc



best_val_acc = 0
for epoch in range(1, 171):
    train_loss = train()
    val_acc = val()
    if val_acc > best_val_acc:
        test_acc = test()
        best_val_acc = val_acc
    if epoch % 10 == 1:
        print(f'Epoch: {epoch:03d}, Train Loss: {train_loss:.4f}, '
            f'Val Acc: {val_acc:.4f}, Test Acc: {test_acc:.4f}')

Epoch: 001, Train Loss: 91.6526, Val Acc: 0.2963, Test Acc: 0.2863


KeyboardInterrupt: 

In [None]:
# Test edgepooling
from torch_geometric.nn import EdgePooling
class edgepooling_GCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super().__init__()
        torch.manual_seed(1234)
        self.conv1 = GCNConv(in_channels, hidden_channels)
        # self.pool1 = pooling(hidden_channels, score_method=2, p1=0.3, p2=0.95, aggregate_score_method='avg')
        self.poo1 = EdgePooling(hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, out_channels)
    def forward(self, x, edge_index):
        h = self.conv1(x, edge_index).relu()
        edge_index, _, h, score_assignment_matrix_0, edge_index_0 = self.pool1(edge_index, h)
        h = self.conv2(h, edge_index).relu()
        h, edge_index = self.pool1.unpool(h, score_assignment_matrix_0, edge_index_0)
        h = self.conv3(h, edge_index).relu()
        h = self.lin(h)

        return h

model = edgepooling_GCN(in_channels=dataset.num_node_features, hidden_channels=20, out_channels=dataset.num_classes)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-3)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()
    optimizer.zero_grad() 
    out = model(data.x, data.edge_index)
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    return(loss)


def test():
    model.eval()
    pred = model(data.x, data.edge_index).argmax(dim=1)
    test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
    test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
    return test_acc


def val():
    model.eval()
    pred = model(data.x, data.edge_index).argmax(dim=1)
    test_correct = pred[data.val_mask] == data.y[data.val_mask]  # Check against ground-truth labels.
    test_acc = int(test_correct.sum()) / int(data.val_mask.sum())  # Derive ratio of correct predictions.
    return test_acc



best_val_acc = 0
for epoch in range(1, 171):
    train_loss = train()
    val_acc = val()
    if val_acc > best_val_acc:
        test_acc = test()
        best_val_acc = val_acc
    if epoch % 10 == 1:
        print(f'Epoch: {epoch:03d}, Train Loss: {train_loss:.4f}, '
            f'Val Acc: {val_acc:.4f}, Test Acc: {test_acc:.4f}')