# Graph Attention Network (GAT) Layer:

A GAT layer uses **self-attention on graphs** to weigh the importance of neighboring nodes. Here is the full pipeline:

---

### **1. Linear Transformation**

Apply a learnable linear transformation to each input node feature:

$
h_i' = \mathbf{W} h_i
$

Where:

- $\mathbf{W} \in \mathbb{R}^{F' \times F} $ is a learnable weight matrix.
- $ h_i \in \mathbb{R}^{F} $: original feature vector of node \( i \).
- $ h_i' \in \mathbb{R}^{F'} $: transformed feature vector.

---

### **2. Attention Coefficients**

Compute the unnormalized attention score between node \( i \) and its neighbor \( j \):

$
e_{ij} = \text{LeakyReLU}\left( \vec{a}^{\top} \left[ \mathbf{W} h_i \, \| \, \mathbf{W} h_j \right] \right)
$

Where:
- $ \vec{a} \in \mathbb{R}^{2F'} $ is a learnable attention vector.
- $ \| $ denotes concatenation.
- $ e_{ij} $ is the raw (unnormalized) attention coefficient.

$$
\text{LeakyReLU}(x) =
\begin{cases}
x & \text{if } x > 0 \\
0.01 x & \text{otherwise}
\end{cases}
$$

LeakyReLu will adds non-linearity → allows the model to learn complex patterns and Prevents dead neurons (unlike regular ReLU)

---

### **3. Softmax Normalization**

Normalize the attention scores across neighbors of node \( i \):

$
\alpha_{ij} = \frac{ \exp(e_{ij}) }{ \sum_{k \in \mathcal{N}(i)} \exp(e_{ik}) }
$

Where:
- $\mathcal{N}(i) $ denotes the set of neighbors of node \( i \).
- $ \alpha_{ij} \in [0,1] $ represents the normalized attention coefficient from node \( i \) to node \( j \).

---

### **4. Weighted Sum of Neighbor Features**

Aggregate the transformed neighbor features weighted by attention:

$
h_i^{\text{out}} = \sigma\left( \sum_{j \in \mathcal{N}(i)} \alpha_{ij} \cdot \mathbf{W} h_j \right)
$

Where:
- $ \sigma $ is a non-linear activation function (e.g., ELU or LeakyReLU).
- $ h_i^{\text{out}} $ is the output embedding for node \( i \) after one GAT layer.

---

*This process allows each node to attend differently to each of its neighbors, enabling adaptive feature aggregation.*


In [None]:
!pip install -q torch_geometric

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch_geometric

In [None]:
class GATLayer(nn.Module):
  def __init__(self):
    super(GATLayer, self).__init__()

  def forward(self, input, adj):
    print("")

In [None]:
"""We will take input of size 3×5 which is multimplied by W (learned parameter) of size 5×2.
it transorform each node's 5 feature into 2. Thus we get matrix of size 3×2"""

in_features = 5
out_features = 2
nb_nodes = 3

#xavier parameter initialization
"""If the W is too large or small there is a possiblity of exploding/vanishing after applying the activation function,
inorder to avoid this we use xavier uniform which maintaince a stable varience of activations through out the neural network layer."""
W = nn.Parameter(torch.zeros(size=(in_features, out_features)))
nn.init.xavier_uniform_(W.data, gain=1.414)
input = torch.randn(nb_nodes, in_features)

#Linear transformation
h = torch.mm(input, W)
N= h.size()[0]
print(h.shape)

torch.Size([3, 2])


In [None]:
a = nn.Parameter(torch.zeros(size=(2*out_features, 1)))
nn.init.xavier_uniform_(a.data, gain=1.414)
print(a.shape)

leakyrelu = nn.LeakyReLU(0.2)

torch.Size([4, 1])


In [None]:
a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * out_features)
a_input

tensor([[[ 0.0605, -0.3668,  0.0605, -0.3668],
         [ 0.0605, -0.3668,  1.3190, -0.5537],
         [ 0.0605, -0.3668, -1.0470, -0.0665]],

        [[ 1.3190, -0.5537,  0.0605, -0.3668],
         [ 1.3190, -0.5537,  1.3190, -0.5537],
         [ 1.3190, -0.5537, -1.0470, -0.0665]],

        [[-1.0470, -0.0665,  0.0605, -0.3668],
         [-1.0470, -0.0665,  1.3190, -0.5537],
         [-1.0470, -0.0665, -1.0470, -0.0665]]], grad_fn=<ViewBackward0>)

In [None]:
e = leakyrelu(torch.matmul(a_input, a).squeeze(2))
e

tensor([[-0.0372, -0.1255,  0.0574],
        [-0.2840, -0.3723, -0.2353],
        [ 1.0844,  0.6429,  1.3281]], grad_fn=<LeakyReluBackward0>)

In [None]:
torch.matmul(a_input, a)

tensor([[[-0.1862],
         [-0.6277],
         [ 0.0574]],

        [[-1.4199],
         [-1.8614],
         [-1.1763]],

        [[ 1.0844],
         [ 0.6429],
         [ 1.3281]]], grad_fn=<UnsafeViewBackward0>)

In [None]:
torch.matmul(a_input, a).squeeze(2)

tensor([[-0.1862, -0.6277,  0.0574],
        [-1.4199, -1.8614, -1.1763],
        [ 1.0844,  0.6429,  1.3281]], grad_fn=<SqueezeBackward1>)

###Masked Attention

In [None]:
adj = torch.randint(2, (3,3))
zero_vec  = -9e15*torch.ones_like(e)
zero_vec

tensor([[-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15]])

In [None]:
attention = torch.where(adj>0, e, zero_vec)
print(adj,"\n")
print(e,"\n")
print(zero_vec)

print("\n",attention)

tensor([[1, 1, 1],
        [1, 0, 1],
        [1, 0, 1]]) 

tensor([[-0.0372, -0.1255,  0.0574],
        [-0.2840, -0.3723, -0.2353],
        [ 1.0844,  0.6429,  1.3281]], grad_fn=<LeakyReluBackward0>) 

tensor([[-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15]])

 tensor([[-3.7241e-02, -1.2554e-01,  5.7426e-02],
        [-2.8398e-01, -9.0000e+15, -2.3526e-01],
        [ 1.0844e+00, -9.0000e+15,  1.3281e+00]], grad_fn=<WhereBackward0>)


In [None]:
attention = F.softmax(attention, dim=1)
h_prime = torch.matmul(attention, h)

In [None]:
attention

tensor([[0.3317, 0.3037, 0.3646],
        [0.4878, 0.0000, 0.5122],
        [0.4394, 0.0000, 0.5606]], grad_fn=<SoftmaxBackward0>)

In [None]:
h_prime

tensor([[ 0.0388, -0.3141],
        [-0.5067, -0.2130],
        [-0.5604, -0.1984]], grad_fn=<MmBackward0>)

In [None]:
h

tensor([[ 0.0605, -0.3668],
        [ 1.3190, -0.5537],
        [-1.0470, -0.0665]], grad_fn=<MmBackward0>)

In [None]:
class GATLayer(nn.Module):
  def __init__(self, in_features, out_features, dropout, alpha, concat=True):
    super(GATLayer, self).__init__()
    self.dropout = dropout        # drop prob = 0.6
    self.in_features = in_features
    self.out_features = out_features
    self.alpha = alpha          # LeakyReLU with negative input slope, alpha = 0.2
    self.concat = concat         # conacat = True for all layers except the output layer.

    # Xavier Initialization of Weights
    # Alternatively use weights_init to apply weights of choice
    self.W = nn.Parameter(torch.zeros(size=(in_features, out_features)))
    nn.init.xavier_uniform_(self.W.data, gain=1.414)

    self.a = nn.Parameter(torch.zeros(size=(2*out_features, 1)))
    nn.init.xavier_uniform_(self.a.data, gain=1.414)

    # LeakyReLU
    self.leakyrelu = nn.LeakyReLU(self.alpha)

  def forward(self, input, adj):
    # Linear Transformation
    h = torch.mm(input, self.W) # matrix multiplication
    N = h.size()[0]
    print(N)

    # Attention Mechanism
    a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * self.out_features)
    e = self.leakyrelu(torch.matmul(a_input, self.a).squeeze(2))

    # Masked Attention
    zero_vec  = -9e15*torch.ones_like(e)
    attention = torch.where(adj > 0, e, zero_vec)

    attention = F.softmax(attention, dim=1)
    attention = F.dropout(attention, self.dropout, training=self.training)
    h_prime   = torch.matmul(attention, h)

    if self.concat:
      return F.elu(h_prime)
    else:
      return h_prime

##Using Cora data set

In [None]:
from torch_geometric.data import Data
from torch_geometric.nn import GATConv
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T

"""When you pass the root='/tmp/Cora' and name='Cora', it looks inside '/tmp/Cora' for the dataset files.
If the Cora dataset isn’t already downloaded, it automatically downloads it from the PyTorch Geometric repo or related source."""
name_data = 'Cora'
dataset = Planetoid(root= '/tmp/' + name_data, name = name_data)
dataset.transform = T.NormalizeFeatures() #which row‑normalizes each node’s feature vector to unit sum (For each node, take its feature vector and scale it so that the sum of all feature values equals 1.)

print(f"Number of Classes in {name_data}:", dataset.num_classes)
print(f"Number of Node Features in {name_data}:", dataset.num_node_features)

Number of Classes in Cora: 7
Number of Node Features in Cora: 1433


In [None]:
class GAT(torch.nn.Module):
  def __init__(self):
    super(GAT, self).__init__()
    self.hid = 8 #each head in layer 1 outputs an 8‑dim vector.
    self.in_head = 8 #number of attention heads in the first layer.
    self.out_head = 1
    #transforms 1433 -> (8 * 8)=64 dims, using 8 parallel attention heads + 0.6 dropout - in and before GATConv
    self.conv1 = GATConv(dataset.num_features, self.hid, heads=self.in_head, dropout=0.6)
    #merges those 64 dims back down to 7 classes (no concatenation, single head), again with dropout.
    self.conv2 = GATConv(self.hid*self.in_head, dataset.num_classes, concat=False, heads=self.out_head, dropout=0.6)

"""Dropout is a regularization technique used during training of neural networks to prevent overfitting.
It works by randomly "dropping out" (i.e., setting to zero) a fraction of the input values during each forward pass."""

  def forward(self, data):
    x, edge_index = data.x, data.edge_index #Input: node features x (shape [N,1433]) and edge_index adjacency
    x = F.dropout(x, p=0.6, training=self.training)
    x = self.conv1(x, edge_index) #attention‑based neighborhood aggregation
    x = F.elu(x) #Non linearity
    x = F.dropout(x, p=0.6, training=self.training)
    x = self.conv2(x, edge_index) #final transform to 7‑dim logits per node.
    return F.log_softmax(x, dim=1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = "cpu"
model = GAT().to(device)
model

GAT(
  (conv1): GATConv(1433, 8, heads=8)
  (conv2): GATConv(64, 7, heads=1)
)

In [None]:
data = dataset[0].to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005, weight_decay=5e-4)

In [None]:
model.train()
for epoch in range(1000):
  model.train()
  optimizer.zero_grad()
  out = model(data)
  loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
  if epoch%200 == 0:
    print(loss)
  loss.backward()
  optimizer.step()

tensor(1.9453, grad_fn=<NllLossBackward0>)
tensor(0.8359, grad_fn=<NllLossBackward0>)
tensor(0.5636, grad_fn=<NllLossBackward0>)
tensor(0.4769, grad_fn=<NllLossBackward0>)
tensor(0.4388, grad_fn=<NllLossBackward0>)


In [None]:
model.eval()
_, pred = model(data).max(dim=1)
correct = float(pred[data.test_mask].eq(data.y[data.test_mask]).sum().item())
acc = correct / data.test_mask.sum().item()
print('Accuracy: {:.4f}'.format(acc))

Accuracy: 0.8240


In [None]:
from torch_geometric.transforms import NormalizeFeatures
from torch_geometric.nn import TransformerConv
from torch_geometric.data import DataLoader

#Load Cora dataset
dataset = Planetoid(root='data/Cora', name='Cora', transform=NormalizeFeatures())
data = dataset[0]

In [None]:
#Define a simple 2-layer Transformer-style GNN
class GraphTransformerNet(torch.nn.Module):
  def __init__(self, in_channels, hidden_channels, num_classes, heads=4):
    super().__init__()
    self.conv1 = TransformerConv(in_channels, hidden_channels, heads=heads, concat=True, dropout=0.1)
    self.conv2 = TransformerConv(hidden_channels * heads, num_classes, heads=1, concat=False, dropout=0.1)

  def forward(self, x, edge_index):
    x = self.conv1(x, edge_index)
    x = F.elu(x)
    x = F.dropout(x, p=0.5, training=self.training)
    x = self.conv2(x, edge_index)
    return F.log_softmax(x, dim=-1)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GraphTransformerNet(dataset.num_features, hidden_channels=8, num_classes=dataset.num_classes).to(device)
model

GraphTransformerNet(
  (conv1): TransformerConv(1433, 8, heads=4)
  (conv2): TransformerConv(32, 7, heads=1)
)

In [None]:
data = data.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005, weight_decay=5e-4)

In [None]:
model.train()
for epoch in range(1, 201):
  optimizer.zero_grad()
  out = model(data.x, data.edge_index)
  loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
  loss.backward()
  optimizer.step()
  if epoch % 50 == 0:
    print(f'Epoch {epoch:03d}, Loss: {loss:.4f}')

Epoch 050, Loss: 0.5930
Epoch 100, Loss: 0.1913
Epoch 150, Loss: 0.1368
Epoch 200, Loss: 0.0958


In [None]:
model.eval()
preds = model(data.x, data.edge_index).argmax(dim=-1)
accs = []
for mask in [data.train_mask, data.val_mask, data.test_mask]:
  accs.append((preds[mask] == data.y[mask]).sum().item() / mask.sum().item())
print(f'Train Acc: {accs[0]:.4f}, Val Acc: {accs[1]:.4f}, Test Acc: {accs[2]:.4f}')

Train Acc: 1.0000, Val Acc: 0.7700, Test Acc: 0.7880
