In [1]:
from torch_geometric.datasets import Planetoid
 
# Import dataset from PyTorch Geometric
dataset = Planetoid(root=".", name="Cora")
data = dataset[0]



In [21]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.utils import to_dense_adj

class CustomGATLayer(torch.nn.Module):
    def __init__(self, dim_in, dim_out, alpha=0.2):
        super().__init__()
        
        # 기본 선형 변환 (bias 없음)
        self.linear = nn.Linear(dim_in, dim_out, bias=False)  # W
        self.att_w = nn.Linear(dim_out*2, 1) # Attention score
        self.leakyrelu = nn.LeakyReLU(alpha)

    def forward(self, x, edge_index):
        # 밀집 형태의 인접 행렬로 변환 (Self-loop 추가)
        adjacency = to_dense_adj(edge_index)[0]
        adj_matrix = adjacency + torch.eye(x.size(0), device=x.device)

        # (1) 선형 변환
        h = self.linear(x)  # X @ W.T

        # 연결된 노드 쌍 추출
        src, dst = adj_matrix.nonzero(as_tuple=True)  # (source, neighbor)

        # (2) Attention score 계산
        h_cat = torch.cat([h[src], h[dst]], dim=1)  # (N_edges, 2 * dim_out)
        a = self.att_w(h_cat).squeeze() # attention score
        e = self.leakyrelu(a)  # 활성화 함수 적용
        
        # (3) Softmax 정규화
        E = torch.zeros_like(adjacency)  # (N, N) 크기의 0 행렬
        E[src, dst] = e  # 인접한 노드에만 score 적용
        T = 0.1
        w_att = torch.softmax(E/T, dim=1)  # 소프트맥스 적용 => softmax 를 적용하면 크기가 너무 작아져 학습 X , Temperature 도입
        
        # (4) Attention 가중치 적용 후 메시지 전달
        H = w_att @ h # 가중치 행렬 @ 특성 행렬
        H = adj_matrix @ H # 메세지 전달
        return H


In [35]:
len(data.x)

2708

In [24]:
import torch
torch.manual_seed(14)
import torch.nn.functional as F
from torch_geometric.nn import GATv2Conv, GCNConv
from torch.nn import Linear, Dropout
 
 
def accuracy(y_pred, y_true):
    """Calculate accuracy."""
    return torch.sum(y_pred == y_true) / len(y_true)
 
 
class GAT(torch.nn.Module):
    # multi-head GAT에서 첫번째 레이어의 head는 8개가 좋음. 이후는 달라짐
    def __init__(self, dim_in, dim_h, dim_out, heads=8):
        super().__init__()
        self.gat1 = CustomGATLayer(dim_in, dim_h)
        self.gat2 = CustomGATLayer(dim_h, dim_out)
    def forward(self, x, edge_index):
        h = F.dropout(x, p=0.6, training=self.training) # Preventing overfitting
        h=x
        h = self.gat1(h, edge_index)
        h = F.elu(h)
        h = F.dropout(h, p=0.6, training=self.training)
        h = self.gat2(h, edge_index)
        return F.log_softmax(h, dim=1)
 
    def fit(self, data, epochs):
        criterion = torch.nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.parameters(), lr=0.01, weight_decay=0.01)
 
        self.train()
        for epoch in range(epochs+1):
            optimizer.zero_grad()
            out = self(data.x, data.edge_index)
            loss = criterion(out[data.train_mask], data.y[data.train_mask])
            acc = accuracy(out[data.train_mask].argmax(dim=1), data.y[data.train_mask])
            loss.backward()
            optimizer.step()
 
            if(epoch % 20 == 0):
                val_loss = criterion(out[data.val_mask], data.y[data.val_mask])
                val_acc = accuracy(out[data.val_mask].argmax(dim=1), data.y[data.val_mask])
                print(f'Epoch {epoch:>3} | Train Loss: {loss:.3f} | Train Acc: {acc*100:>5.2f}% | Val Loss: {val_loss:.2f} | Val Acc: {val_acc*100:.2f}%')
 
    @torch.no_grad()
    def test(self, data):
        self.eval()
        out = self(data.x, data.edge_index)
        acc = accuracy(out.argmax(dim=1)[data.test_mask], data.y[data.test_mask])
        return acc
 
# Create the GAT model
gat = GAT(dataset.num_features, 32, dataset.num_classes)
print(gat)
 
# Train
gat.fit(data, epochs=200)

GAT(
  (gat1): CustomGATLayer(
    (linear): Linear(in_features=1433, out_features=32, bias=False)
    (att_w): Linear(in_features=64, out_features=1, bias=True)
    (leakyrelu): LeakyReLU(negative_slope=0.2)
  )
  (gat2): CustomGATLayer(
    (linear): Linear(in_features=32, out_features=7, bias=False)
    (att_w): Linear(in_features=14, out_features=1, bias=True)
    (leakyrelu): LeakyReLU(negative_slope=0.2)
  )
)
Epoch   0 | Train Loss: 2.413 | Train Acc: 15.71% | Val Loss: 2.19 | Val Acc: 15.80%
Epoch  20 | Train Loss: 2.277 | Train Acc:  9.29% | Val Loss: 2.10 | Val Acc: 12.80%
Epoch  40 | Train Loss: 1.976 | Train Acc: 14.29% | Val Loss: 2.04 | Val Acc: 12.20%
Epoch  60 | Train Loss: 1.934 | Train Acc: 14.29% | Val Loss: 1.95 | Val Acc: 12.20%
Epoch  80 | Train Loss: 1.950 | Train Acc: 17.86% | Val Loss: 2.04 | Val Acc: 9.40%
Epoch 100 | Train Loss: 1.579 | Train Acc: 57.14% | Val Loss: 1.71 | Val Acc: 40.80%
Epoch 120 | Train Loss: 0.464 | Train Acc: 86.43% | Val Loss: 1.39 | Va

In [25]:
# Test
acc = gat.test(data)
print(f'GAT test accuracy: {acc*100:.2f}%')

GAT test accuracy: 69.40%


multi-head attention version

In [11]:
# 멀티헤드 GAT
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.utils import to_dense_adj

class MultiHeadGATLayer(nn.Module):
    def __init__(self, dim_in, dim_out, num_heads=4, alpha=0.2, concat=True):
        super().__init__()
        self.num_heads = num_heads
        self.concat = concat  # True: Concat 방식, False: Mean 방식
        
        # Multi-head Linear Transformation
        self.linear = nn.ModuleList([
            nn.Linear(dim_in, dim_out, bias=False) for _ in range(num_heads)
        ])
        
        # 각 head마다 개별적인 attention score 계산을 위한 weight
        self.att_w = nn.ModuleList([
            nn.Linear(dim_out * 2, 1) for _ in range(num_heads)
        ])

        self.leakyrelu = nn.LeakyReLU(alpha)

    def forward(self, x, edge_index):
        adjacency = to_dense_adj(edge_index)[0]
        adj_matrix = adjacency + torch.eye(x.size(0), device=x.device)
        
        outputs = []
        for i in range(self.num_heads):
            h = self.linear[i](x)  # 각 head에 대해 다른 linear 변환 적용

            # 인접 노드 쌍 (source, target) 추출
            src, dst = adjacency.nonzero(as_tuple=True)

            # Attention Score 계산
            h_cat = torch.cat([h[src], h[dst]], dim=1)  # (N_edges, 2 * dim_out)
            a = self.att_w[i](h_cat).squeeze()
            e = self.leakyrelu(a)

            # Softmax 적용 (인접 행렬 기반으로 score 배분)
            E = torch.zeros_like(adjacency)
            E[src, dst] = e
            T = 0.1
            w_att = torch.softmax(E/T, dim=1)

            # 메시지 전달
            H = w_att @ h
            H = torch.sparse.mm(adj_matrix, H)
            outputs.append(H)

        # 여러 head의 출력을 병합
        if self.concat:
            H_final = torch.cat(outputs, dim=1)  # (N, dim_out * num_heads)
        else:
            H_final = torch.mean(torch.stack(outputs, dim=0), dim=0)  # (N, dim_out)

        return H_final


In [14]:
import torch
torch.manual_seed(42)
import torch.nn.functional as F
from torch_geometric.nn import GATv2Conv, GCNConv
from torch.nn import Linear, Dropout
 
 
def accuracy(y_pred, y_true):
    """Calculate accuracy."""
    return torch.sum(y_pred == y_true) / len(y_true)
 
 
class GAT(torch.nn.Module):
    # multi-head GAT에서 첫번째 레이어의 head는 8개가 좋음. 이후는 달라짐
    def __init__(self, dim_in, dim_h, dim_out, heads=8):
        super().__init__()
        self.gat1 = MultiHeadGATLayer(dim_in, dim_h, num_heads=heads)
        self.gat2 = MultiHeadGATLayer(dim_h*heads, dim_out, num_heads=1)
    def forward(self, x, edge_index):
        h = F.dropout(x, p=0.6, training=self.training) # Preventing overfitting
        h=x
        h = self.gat1(h, edge_index)
        h = F.elu(h)
        h = F.dropout(h, p=0.6, training=self.training)
        h = self.gat2(h, edge_index)
        return F.log_softmax(h, dim=1)
 
    def fit(self, data, epochs):
        criterion = torch.nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.parameters(), lr=0.01, weight_decay=0.01)
 
        self.train()
        for epoch in range(epochs+1):
            optimizer.zero_grad()
            out = self(data.x, data.edge_index)
            loss = criterion(out[data.train_mask], data.y[data.train_mask])
            acc = accuracy(out[data.train_mask].argmax(dim=1), data.y[data.train_mask])
            loss.backward()
            optimizer.step()
 
            if(epoch % 20 == 0):
                val_loss = criterion(out[data.val_mask], data.y[data.val_mask])
                val_acc = accuracy(out[data.val_mask].argmax(dim=1), data.y[data.val_mask])
                print(f'Epoch {epoch:>3} | Train Loss: {loss:.3f} | Train Acc: {acc*100:>5.2f}% | Val Loss: {val_loss:.2f} | Val Acc: {val_acc*100:.2f}%')
 
    @torch.no_grad()
    def test(self, data):
        self.eval()
        out = self(data.x, data.edge_index)
        acc = accuracy(out.argmax(dim=1)[data.test_mask], data.y[data.test_mask])
        return acc
 
# Create the GAT model
gat = GAT(dataset.num_features, 32, dataset.num_classes)
print(gat)
 
# Train
gat.fit(data, epochs=200)

GAT(
  (gat1): MultiHeadGATLayer(
    (linear): ModuleList(
      (0-7): 8 x Linear(in_features=1433, out_features=32, bias=False)
    )
    (att_w): ModuleList(
      (0-7): 8 x Linear(in_features=64, out_features=1, bias=True)
    )
    (leakyrelu): LeakyReLU(negative_slope=0.2)
  )
  (gat2): MultiHeadGATLayer(
    (linear): ModuleList(
      (0): Linear(in_features=256, out_features=7, bias=False)
    )
    (att_w): ModuleList(
      (0): Linear(in_features=14, out_features=1, bias=True)
    )
    (leakyrelu): LeakyReLU(negative_slope=0.2)
  )
)
Epoch   0 | Train Loss: 1.971 | Train Acc: 14.29% | Val Loss: 1.95 | Val Acc: 11.40%
Epoch  20 | Train Loss: 5.522 | Train Acc: 22.14% | Val Loss: 5.07 | Val Acc: 16.20%
Epoch  40 | Train Loss: 1.786 | Train Acc: 39.29% | Val Loss: 2.14 | Val Acc: 28.60%
Epoch  60 | Train Loss: 0.444 | Train Acc: 85.00% | Val Loss: 1.34 | Val Acc: 60.20%
Epoch  80 | Train Loss: 0.124 | Train Acc: 97.14% | Val Loss: 1.25 | Val Acc: 70.60%
Epoch 100 | Train Lo

In [15]:
# Test
acc = gat.test(data)
print(f'GAT test accuracy: {acc*100:.2f}%')

GAT test accuracy: 79.70%
