# Implementazione GAT

## Outline

## Introduzione alle Graph Attention Networks (GAT)

Le **Graph Attention Networks (GAT)** sono un tipo di rete neurale progettata per operare su **dati strutturati come grafi**, combinando la flessibilità delle reti neurali con la potenza del **meccanismo di attenzione**.

A differenza dei modelli tradizionali come le GCN (Graph Convolutional Networks), le GAT assegnano **pesi diversi** ai nodi vicini durante l'aggregazione, apprendendo **quali vicini sono più rilevanti** per ogni nodo. Questo è possibile grazie a un meccanismo di **self-attention** che calcola coefficienti di attenzione tra i nodi connessi.

### Vantaggi delle GAT:
- **Importanza adattiva dei vicini**
- **Robustezza al rumore nei dati**
- **Applicabilità a grafi non regolari**
- **Efficienza e parallelizzabilità**

Le GAT si usano in vari ambiti, come social network, biologia computazionale, knowledge graph e sistemi di raccomandazione.


In [8]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

## Struttura

In [9]:
class GATLayer(nn.Module):
    """
    Simple PyTorch Implementation of the Graph Attention layer.
    """
    def __init__(self):
        super(GATLayer, self).__init__()
      
    def forward(self, input, adj):
        print("")

## Metodo forward

### Trasformazione lineare

$$
\bar{h'}_i = \textbf{W}\cdot \bar{h}_i
$$
with $\textbf{W}\in\mathbb R^{F'\times F}$ and $\bar{h}_i\in\mathbb R^{F}$.

$$
\bar{h'}_i \in \mathbb{R}^{F'}
$$

In [10]:
in_features = 5
out_features = 2
nb_nodes = 3

W = nn.Parameter(torch.zeros(size=(in_features, out_features))) #xavier paramiter inizializator
nn.init.xavier_uniform_(W.data, gain=1.414)

#Xavier Parameter Initialization (o Glorot initialization) è una tecnica per inizializzare i pesi di una rete neurale in modo intelligente.
#Problema che risolve:
# - Se i pesi sono troppo piccoli → il gradiente "svanisce" durante la backpropagation
# - Se i pesi sono troppi grandi → il gradiente "esplode" e il training diventa instabile
#Obiettivo:
# Mantenere la varianza dei segnali (attivazioni e gradienti) costante attraverso tutti i layer, evitando che si ampllifichino o si riducano troppo.


print("\nMatrice W dimensione:",W.shape)
print(W,"\n")

input = torch.rand(nb_nodes,in_features) 
print("\nDati di input:",input.shape)
print(input,"\n")

# linear transformation
h = torch.mm(input, W)
print("\nTrasformazione h':",h.shape)
print(h,"\n")
N = h.size()[0]



Matrice W dimensione: torch.Size([5, 2])
Parameter containing:
tensor([[ 0.2402, -1.2035],
        [ 0.2408, -0.0326],
        [-0.1906,  0.3131],
        [ 0.8759, -0.2736],
        [ 0.6560,  0.8273]], requires_grad=True) 


Dati di input: torch.Size([3, 5])
tensor([[0.0326, 0.6763, 0.8810, 0.5566, 0.4229],
        [0.3134, 0.2122, 0.7162, 0.6117, 0.2799],
        [0.3356, 0.2422, 0.9516, 0.6698, 0.4737]]) 


Trasformazione h': torch.Size([3, 2])
tensor([[ 0.7677,  0.4122],
        [ 0.7092, -0.0957],
        [ 0.8549,  0.0948]], grad_fn=<MmBackward0>) 



### Meccanismo di attenzione

![title](AttentionMechanism.png)

In [11]:
a = nn.Parameter(torch.zeros(size=(2*out_features, 1)))  # Crea un parametro trainabile inizializzato a zero
# Dimensione (2*out_features, 1): vettore colonna per il meccanismo di attenzione GAT
# Il fattore 2 deriva dalla concatenazione delle feature di due nodi (h_i || h_j)
# nn.Parameter() rende questo tensor un parametro del modello (verrà aggiornato durante il training)

nn.init.xavier_uniform_(a.data, gain=1.414)  # Applica inizializzazione Xavier uniforme
# xavier_uniform_ sostituisce i valori zero con valori casuali dalla distribuzione uniforme
# gain=1.414 ≈ sqrt(2) è il guadagno raccomandato per LeakyReLU/ELU
# Il gain compensa il fatto che LeakyReLU riduce la varianza dei valori negativi

print(a.shape)  # Stampa la forma del tensor: dovrebbe essere (2*out_features, 1)
print(a.data)   # Stampa i valori inizializzati con Xavier (numeri casuali piccoli e bilanciati)

leakyrelu = nn.LeakyReLU(0.2)  # Crea funzione di attivazione LeakyReLU con pendenza 0.2
# Per valori positivi: f(x) = x
# Per valori negativi: f(x) = 0.2 * x (invece di 0 come in ReLU normale)
# Questo evita il "dying ReLU problem" mantenendo un piccolo gradiente per valori negativi

torch.Size([4, 1])
tensor([[ 0.4327],
        [-0.7629],
        [-0.2371],
        [ 0.3441]])


In [12]:
a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * out_features)
# Questa riga costruisce tutte le possibili coppie di nodi per il calcolo dell'attenzione

# SCOMPOSIZIONE PASSO PER PASSO:

# 1. h.repeat(1, N) 
# Ripete ogni riga di h per N volte orizzontalmente
# Se h è (3, 2) e N=3: [[a,b], [c,d], [e,f]] → [[a,b,a,b,a,b], [c,d,c,d,c,d], [e,f,e,f,e,f]]

# 2. h.repeat(1, N).view(N * N, -1)
# Reshape per ottenere N*N righe, ogni riga rappresenta h_i replicato
# Risultato: (N*N, out_features) dove ogni blocco di N righe contiene lo stesso nodo

# 3. h.repeat(N, 1)
# Ripete l'intera matrice h per N volte verticalmente  
# Se h è (3, 2): [[a,b], [c,d], [e,f]] → [[a,b], [c,d], [e,f], [a,b], [c,d], [e,f], [a,b], [c,d], [e,f]]
# Risultato: (N*N, out_features) dove i nodi si ripetono ciclicamente

# 4. torch.cat([...], dim=1)
# Concatena orizzontalmente le due matrici precedenti
# Ogni riga contiene [h_i, h_j] per una specifica coppia (i,j)
# Risultato: (N*N, 2*out_features)

# 5. .view(N, -1, 2 * out_features)
# Reshape finale per ottenere (N, N, 2*out_features)
# a_input[i][j] contiene la concatenazione [h_i, h_j]

a_input
# Il tensor finale rappresenta TUTTE le possibili coppie di nodi del grafo
# Dimensioni: (N, N, 2*out_features)
# a_input[i][j] = concatenazione delle feature del nodo i e del nodo j
# Questo permette al meccanismo di attenzione di calcolare quanto il nodo i dovrebbe "prestare attenzione" al nodo j

# ESEMPIO PRATICO:
# Se abbiamo 3 nodi con feature [h1, h2, h3]
# a_input conterrà:
# [[h1||h1, h1||h2, h1||h3],
#  [h2||h1, h2||h2, h2||h3], 
#  [h3||h1, h3||h2, h3||h3]]
# dove || indica concatenazione

tensor([[[ 0.7677,  0.4122,  0.7677,  0.4122],
         [ 0.7677,  0.4122,  0.7092, -0.0957],
         [ 0.7677,  0.4122,  0.8549,  0.0948]],

        [[ 0.7092, -0.0957,  0.7677,  0.4122],
         [ 0.7092, -0.0957,  0.7092, -0.0957],
         [ 0.7092, -0.0957,  0.8549,  0.0948]],

        [[ 0.8549,  0.0948,  0.7677,  0.4122],
         [ 0.8549,  0.0948,  0.7092, -0.0957],
         [ 0.8549,  0.0948,  0.8549,  0.0948]]], grad_fn=<ViewBackward0>)

![title](a_input.png)

In [13]:
e = leakyrelu(torch.matmul(a_input, a).squeeze(2))
print(e)

tensor([[-0.0045, -0.0367, -0.0305],
        [ 0.3397,  0.1788,  0.2098],
        [ 0.2574,  0.0965,  0.1275]], grad_fn=<LeakyReluBackward0>)


In [14]:
print(a_input.shape,a.shape)
print("")
print(torch.matmul(a_input,a).shape)
print("")
print(torch.matmul(a_input,a).squeeze(2).shape)

torch.Size([3, 3, 4]) torch.Size([4, 1])

torch.Size([3, 3, 1])

torch.Size([3, 3])


### Mascheriamo

In [15]:
# Masked Attention
adj = torch.randint(2, (3, 3))
print(adj)

zero_vec  = -9e15*torch.ones_like(e)
print(zero_vec.shape)
print(zero_vec)

tensor([[0, 1, 0],
        [0, 0, 1],
        [1, 1, 1]])
torch.Size([3, 3])
tensor([[-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15]])


In [16]:
attention = torch.where(adj > 0, e, zero_vec)
print(adj,"\n",e,"\n",zero_vec)
attention

tensor([[0, 1, 0],
        [0, 0, 1],
        [1, 1, 1]]) 
 tensor([[-0.0045, -0.0367, -0.0305],
        [ 0.3397,  0.1788,  0.2098],
        [ 0.2574,  0.0965,  0.1275]], grad_fn=<LeakyReluBackward0>) 
 tensor([[-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15, -9.0000e+15]])


tensor([[-9.0000e+15, -3.6663e-02, -9.0000e+15],
        [-9.0000e+15, -9.0000e+15,  2.0982e-01],
        [ 2.5738e-01,  9.6488e-02,  1.2751e-01]], grad_fn=<WhereBackward0>)

In [17]:
attention = F.softmax(attention, dim=1)
h_prime   = torch.matmul(attention, h)
print(h)
print(h.shape)
print(attention.shape)

tensor([[ 0.7677,  0.4122],
        [ 0.7092, -0.0957],
        [ 0.8549,  0.0948]], grad_fn=<MmBackward0>)
torch.Size([3, 2])
torch.Size([3, 3])


In [18]:
attention

tensor([[0.0000, 1.0000, 0.0000],
        [0.0000, 0.0000, 1.0000],
        [0.3664, 0.3119, 0.3217]], grad_fn=<SoftmaxBackward0>)

In [19]:
h_prime

tensor([[ 0.7092, -0.0957],
        [ 0.8549,  0.0948],
        [ 0.7775,  0.1517]], grad_fn=<MmBackward0>)

#### h_prime vs h

In [20]:
print(h_prime,"\n",h)

tensor([[ 0.7092, -0.0957],
        [ 0.8549,  0.0948],
        [ 0.7775,  0.1517]], grad_fn=<MmBackward0>) 
 tensor([[ 0.7677,  0.4122],
        [ 0.7092, -0.0957],
        [ 0.8549,  0.0948]], grad_fn=<MmBackward0>)


# Costruiamo il layer

In [22]:
class GATLayer(nn.Module):
    def __init__(self, in_features, out_features, dropout, alpha, concat=True):
        super(GATLayer, self).__init__()
        
        '''
        TODO
        '''
        
    def forward(self, input, adj):
        # Linear Transformation
        h = torch.mm(input, self.W)  # Trasformazione lineare: moltiplica le feature di input per la matrice dei pesi W
        N = h.size()[0]  # Ottiene il numero di nodi nel grafo
    
        # Attention Mechanism
        a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * self.out_features)
        # Crea tutte le possibili coppie di nodi concatenando h_i e h_j per ogni coppia (i,j)
        # h.repeat(1, N) ripete ogni riga N volte, h.repeat(N, 1) ripete l'intera matrice N volte
        # Il risultato è un tensor (N, N, 2*out_features) con le feature concatenate di ogni coppia
    
        e = self.leakyrelu(torch.matmul(a_input, self.a).squeeze(2))
        # Calcola i coefficienti di attenzione grezzi applicando la trasformazione lineare 'a'
        # seguita da LeakyReLU come funzione di attivazione
    
        # Masked Attention
        zero_vec = -9e15*torch.ones_like(e)  # Crea un tensor di valori molto negativi con la stessa forma di e
        attention = torch.where(adj > 0, e, zero_vec)
        # Applica la maschera della matrice di adiacenza: mantiene i valori di e dove esiste un arco (adj > 0),
        # altrimenti imposta valori molto negativi per escludere connessioni inesistenti
    
        attention = F.softmax(attention, dim=1)  # Applica softmax per normalizzare i pesi di attenzione lungo ogni riga
        attention = F.dropout(attention, self.dropout, training=self.training)  # Applica dropout per regolarizzazione durante il training
        h_prime = torch.matmul(attention, h)  # Calcola le nuove rappresentazioni dei nodi come combinazione pesata dei vicini
    
        if self.concat:  # Se questo layer deve concatenare i risultati di più attention head
            return F.elu(h_prime)  # Applica ELU come funzione di attivazione
        else:  # Se questo è l'ultimo layer o non serve concatenazione
            return h_prime  # Ritorna direttamente le nuove rappresentazioni

In [23]:
class GATLayer(nn.Module):
    def __init__(self, in_features, out_features, dropout, alpha, concat=True):
        super(GATLayer, self).__init__()  # Chiama il costruttore della classe padre nn.Module
        self.dropout       = dropout        # Probabilità di dropout (es. 0.6) per regolarizzazione
        self.in_features   = in_features    # Numero di feature in input per ogni nodo
        self.out_features  = out_features   # Numero di feature in output per ogni nodo
        self.alpha         = alpha          # Parametro per LeakyReLU (pendenza per valori negativi, es. 0.2)
        self.concat        = concat         # True per tutti i layer tranne l'ultimo (concatena multi-head)
        
        # Xavier Initialization of Weights
        # Alternativa: usare weights_init per applicare pesi personalizzati
        self.W = nn.Parameter(torch.zeros(size=(in_features, out_features)))
        # Matrice dei pesi W per la trasformazione lineare delle feature dei nodi
        nn.init.xavier_uniform_(self.W.data, gain=1.414)
        # Inizializzazione Xavier uniforme per stabilizzare il training
        
        self.a = nn.Parameter(torch.zeros(size=(2*out_features, 1)))
        # Vettore di parametri per il meccanismo di attenzione (dimensione doppia per concatenazione)
        nn.init.xavier_uniform_(self.a.data, gain=1.414)
        # Inizializzazione Xavier per i parametri di attenzione
        
        # LeakyReLU
        self.leakyrelu = nn.LeakyReLU(self.alpha)
        # Funzione di attivazione LeakyReLU con pendenza alpha per valori negativi
    
    def forward(self, input, adj):
        # Linear Transformation
        h = torch.mm(input, self.W)  # Trasformazione lineare: moltiplica i features di input per la matrice dei pesi W
        N = h.size()[0]  # Ottiene il numero di nodi nel grafo


        # Attention Mechanism
        a_input = torch.cat([h.repeat(1, N).view(N * N, -1), h.repeat(N, 1)], dim=1).view(N, -1, 2 * self.out_features)
        # Crea tutte le possibili coppie di nodi concatenando h_i e h_j per ogni coppia (i,j)
        # h.repeat(1, N) ripete ogni riga N volte, h.repeat(N, 1) ripete l'intera matrice N volte
        # Il risultato è un tensor (N, N, 2*out_features) con le feature concatenate di ogni coppia
        
        e = self.leakyrelu(torch.matmul(a_input, self.a).squeeze(2))
        # Calcola i coefficienti di attenzione grezzi applicando la trasformazione lineare 'a'
        # seguita da LeakyReLU come funzione di attivazione, squeeze rimuove la dimensione singleton
        
        # Masked Attention
        zero_vec = -9e15*torch.ones_like(e)  # Crea un tensor di valori molto negativi con la stessa forma di e
        attention = torch.where(adj > 0, e, zero_vec)
        # Applica la maschera della matrice di adiacenza: mantiene i valori di e dove esiste un arco (adj > 0),
        # altrimenti imposta valori molto negativi per escludere connessioni inesistenti dal softmax
        
        attention = F.softmax(attention, dim=1)  # Applica softmax per normalizzare i pesi di attenzione lungo ogni riga
        attention = F.dropout(attention, self.dropout, training=self.training)  # Applica dropout per regolarizzazione durante il training
        h_prime = torch.matmul(attention, h)  # Calcola le nuove rappresentazioni dei nodi come combinazione pesata dei vicini
        
        if self.concat:  # Se questo layer deve concatenare i risultati di più head attention
            return F.elu(h_prime)  # Applica ELU come funzione di attivazione per layer intermedi
        else:  # Se questo è l'ultimo layer o non serve concatenazione
            return h_prime  # Ritorna direttamente le nuove rappresentazioni senza attivazione

## Funzione di attivazione ELU (Exponential Linear Unit)

La funzione ELU è definita come:
$$
\text{ELU}(x) = \begin{cases} x & \text{se} x > 0 \\\\ \alpha (e^x - 1) & \text{se } x \leq 0 \end{cases}
$$


Dove:
- $x$ è l'input del neurone,
- $\alpha \in \mathbb{R}^+ $ è un iperparametro (solitamente $\alpha = 1.0 $).

### Derivata della funzione ELU:

$$
\frac{d}{dx} \text{ELU}(x) =
\begin{cases}
1 & \text{se } x > 0 \\\\
\text{ELU}(x) + \alpha & \text{se } x \leq 0
\end{cases}
$$

### Proprietà:
- Continuità e derivabilità ovunque
- Riduce il rischio di neuroni morti
- Media delle attivazioni centrata intorno a zero


# Usiamolo

In [24]:
from torch_geometric.data import Data
from torch_geometric.nn import GATConv
from torch_geometric.datasets import Planetoid
import torch_geometric.transforms as T

import matplotlib.pyplot as plt

name_data = 'Cora'
dataset = Planetoid(root= '/tmp/' + name_data, name = name_data)
dataset.transform = T.NormalizeFeatures()

print(f"Number of Classes in {name_data}:", dataset.num_classes)
print(f"Number of Node Features in {name_data}:", dataset.num_node_features)

Number of Classes in Cora: 7
Number of Node Features in Cora: 1433


In [27]:
class GAT(torch.nn.Module):
    def __init__(self):
        super(GAT, self).__init__()  # Chiama il costruttore della classe padre nn.Module
        self.hid = 16  # Numero di feature nascoste nel layer intermedio
        self.in_head = 16  # Numero di head attention nel primo layer
        self.out_head = 1  # Numero di head attention nel layer di output
        
        self.conv1 = GATConv(dataset.num_features, self.hid, concat=False, heads=self.in_head, dropout=0.6)
        # Primo layer GAT: trasforma da num_features del dataset a hid feature nascoste
        # concat=False significa che le head multiple vengono mediate invece che concatenate
        # heads=in_head usa 16 head attention parallele
        # dropout=0.6 applica dropout con probabilità 60%
        
        self.conv2 = GATConv(self.hid  # *self.in_head (commentato perché concat=False)
                             , dataset.num_classes, concat=False,
                             heads=self.out_head, dropout=0.6)
        # Secondo layer GAT: trasforma da hid feature nascoste al numero di classi
        # Una sola head attention per l'output finale
        
    def forward(self, data):
        x, edge_index = data.x, data.edge_index  # Estrae feature dei nodi e indici degli archi
                
        x = F.dropout(x, p=0.6, training=self.training)  # Applica dropout alle feature di input durante il training
        x = self.conv1(x, edge_index)  # Primo layer GAT con multi-head attention
        x = F.elu(x)  # Applica funzione di attivazione ELU
        x = F.dropout(x, p=0.6, training=self.training)  # Dropout tra i layer
        x = self.conv2(x, edge_index)  # Secondo layer GAT per la classificazione finale
        
        return F.log_softmax(x, dim=1)  # Applica log-softmax per ottenere probabilità logaritmiche
    
    
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # Seleziona GPU se disponibile, altrimenti CPU
model = GAT().to(device)  # Crea il modello GAT e lo sposta sul device selezionato
data = dataset[0].to(device)  # Carica il primo grafo dal dataset e lo sposta sul device
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)  # Ottimizzatore Adam con learning rate 0.01 e weight decay per regolarizzazione
model.train()  # Imposta il modello in modalità training

for epoch in range(1000):  # Loop di training per 3000 epoche
    model.train()  # Assicura che il modello sia in modalità training
    optimizer.zero_grad()  # Azzera i gradienti accumulati dall'iterazione precedente
    out = model(data)  # Forward pass: calcola le predizioni del modello
    loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])  # Calcola la negative log likelihood loss solo sui nodi di training
    
    if epoch % 200 == 0:  # Ogni 200 epoche
        print(loss)  # Stampa il valore della loss per monitorare il progresso
    
    loss.backward()  # Backpropagation: calcola i gradienti
    optimizer.step()  # Aggiorna i parametri del modello usando i gradienti
    

tensor(1.9461, grad_fn=<NllLossBackward0>)
tensor(1.2426, grad_fn=<NllLossBackward0>)
tensor(1.0849, grad_fn=<NllLossBackward0>)
tensor(1.0031, grad_fn=<NllLossBackward0>)
tensor(1.0685, grad_fn=<NllLossBackward0>)


In [26]:
model.eval()
_, pred = model(data).max(dim=1)
correct = float(pred[data.test_mask].eq(data.y[data.test_mask]).sum().item())
acc = correct / data.test_mask.sum().item()
print('Accuracy: {:.4f}'.format(acc))

Accuracy: 0.8220
