In [None]:
# ============================================================
# Instalação de pacotes no Google Colab
# ============================================================
!pip install torch torchvision torchaudio torch-geometric
!pip install networkx matplotlib

In [None]:
# ============================================================
# Importações
# ============================================================
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import networkx as nx
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import numpy as np
import copy
from torch_geometric.datasets import Planetoid
from torch_geometric.utils import to_dense_adj
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Semente para reprodutibilidade (gera os mesmos resultados em diferentes execuções)
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

In [None]:
# -----------------------------------------------
# Carregar o dataset Cora
# -----------------------------------------------
# Dataset de citações: cada nó é um artigo e cada aresta representa uma citação entre artigos.
# A tarefa é classificar os artigos de acordo com o tema (7 classes).

dataset = Planetoid(root='data/Cora', name='Cora')
data = dataset[0]

print(f"Dataset: {dataset.name}")
print(f"Nós: {data.num_nodes}, Arestas: {data.num_edges // 2}, Features: {data.num_features}, Classes: {dataset.num_classes}")

In [None]:
# -----------------------------------------------
# Converter dados para formato compatível
# -----------------------------------------------
# Obtemos matriz de adjacência e matriz de features
A = to_dense_adj(data.edge_index)[0]         # matriz de adjacência densa
X = data.x                                   # features dos nós (dim: N x F)
Y = data.y                                   # rótulos reais dos nós

# Adiciona self-loops (identidade)
I = torch.eye(A.size(0))
A_hat = A + I

# Normalização simétrica: D^-1/2 * A_hat * D^-1/2
D_hat = torch.diag(torch.sum(A_hat, dim=1))
D_hat_inv_sqrt = torch.linalg.inv(torch.sqrt(D_hat))
A_norm = D_hat_inv_sqrt @ A_hat @ D_hat_inv_sqrt

In [None]:
# -----------------------------------------------
# Definir as classes da GCN
# -----------------------------------------------
class GCNLayer(nn.Module):
  """Camada de convolução em grafo: propaga informações entre vizinhos."""
  def __init__(self, in_features, out_features):
    super(GCNLayer, self).__init__()
    self.linear = nn.Linear(in_features, out_features)
  def forward(self, X, A_norm):
    # Multiplicação pela matriz normalizada mistura informações dos vizinhos
    return self.linear(A_norm @ X)

class GCN(nn.Module):
  """Rede de duas camadas GCN para classificação de nós."""
  def __init__(self, in_features, hidden_size, num_classes, dropout=0.5):
    super(GCN, self).__init__()
    self.gcn1 = GCNLayer(in_features, hidden_size)
    self.gcn2 = GCNLayer(hidden_size, num_classes)
    self.dropout = nn.Dropout(dropout)
  def forward(self, X, A_norm):
    h = F.relu(self.gcn1(X, A_norm))   # primeira camada com ReLU
    h = self.dropout(h)                # regularização
    out = self.gcn2(h, A_norm)         # logits finais
    return out

In [None]:
# -----------------------------------------------
# Divisão treino / validação / teste
# -----------------------------------------------
num_nodes = X.shape[0]
idx_all = np.arange(num_nodes)
np.random.shuffle(idx_all)

train_end = int(0.6 * num_nodes)
val_end = int(0.8 * num_nodes)
train_idx = idx_all[:train_end]
val_idx = idx_all[train_end:val_end]
test_idx = idx_all[val_end:]

In [None]:
# -----------------------------------------------
# Configurações de treino
# -----------------------------------------------
model = GCN(in_features=X.shape[1], hidden_size=32, num_classes=dataset.num_classes)
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = nn.CrossEntropyLoss()

max_epochs = 500
patience = 20
monitor = 'val_loss'   # pode alternar para 'val_acc'

best_state = None
best_metric = float('inf') if monitor == 'val_loss' else -float('inf')
epochs_no_improve = 0

train_losses, val_losses = [], []
train_accs, val_accs = [], []

# Função de avaliação
def evaluate_model(model, X, A_norm, idx):
  model.eval()
  with torch.no_grad():
    out = model(X, A_norm)
    loss = criterion(out[idx], Y[idx]).item()
    preds = out[idx].argmax(dim=1)
    acc = (preds == Y[idx]).float().mean().item()
  return loss, acc, preds

In [None]:
# -----------------------------------------------
# Loop de treino com Early Stopping
# -----------------------------------------------
for epoch in range(1, max_epochs + 1):
  model.train()
  optimizer.zero_grad()

  out = model(X, A_norm)
  loss = criterion(out[train_idx], Y[train_idx])
  loss.backward()
  optimizer.step()

  train_loss, train_acc, _ = evaluate_model(model, X, A_norm, train_idx)
  val_loss, val_acc, _ = evaluate_model(model, X, A_norm, val_idx)

  train_losses.append(train_loss)
  val_losses.append(val_loss)
  train_accs.append(train_acc)
  val_accs.append(val_acc)

  # Verifica melhora
  current_metric = val_loss if monitor == 'val_loss' else val_acc
  improved = (current_metric < best_metric) if monitor == 'val_loss' else (current_metric > best_metric)

  if improved:
    best_metric = current_metric
    best_state = copy.deepcopy(model.state_dict())
    epochs_no_improve = 0
  else:
    epochs_no_improve += 1

  if epoch % 10 == 0 or epoch == 1:
    print(f"Epoch {epoch:03d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | "
          f"Train Acc: {train_acc:.4f} | Val Acc: {val_acc:.4f} | Patience: {epochs_no_improve}")

  if epochs_no_improve >= patience:
    print(f"Early stopping na epoch {epoch}. Melhor {monitor}: {best_metric:.4f}")
    break

# Restaura melhor modelo
if best_state is not None:
  model.load_state_dict(best_state)

In [None]:
# -----------------------------------------------
# Avaliação final no conjunto de teste
# -----------------------------------------------
test_loss, test_acc, test_preds = evaluate_model(model, X, A_norm, test_idx)
print(f"\n✅ Teste | Loss: {test_loss:.4f} | Acc: {test_acc:.4f}")

In [None]:
# -----------------------------------------------
# Visualização dos resultados
# -----------------------------------------------
plt.figure(figsize=(12,4))

plt.subplot(1,2,1)
plt.plot(train_losses, label='Treino')
plt.plot(val_losses, label='Validação')
plt.title("Loss (Treino vs Validação)")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

plt.subplot(1,2,2)
plt.plot(train_accs, label='Treino')
plt.plot(val_accs, label='Validação')
plt.title("Acurácia (Treino vs Validação)")
plt.xlabel("Epoch")
plt.ylabel("Acurácia")
plt.legend()

plt.show()

In [None]:
# -----------------------------------------------
# Comparar grafo real vs grafo predito
# -----------------------------------------------
G = nx.Graph()
edges = data.edge_index.t().tolist()
G.add_edges_from(edges)

# Layout fixo para comparação
pos = nx.spring_layout(G, seed=42)

# Classes reais e preditas
true_labels = Y.cpu().numpy()
pred_labels = test_preds.cpu().numpy()

# -----------------------------
# Grafo 1: Classes reais
# -----------------------------
plt.figure(figsize=(14, 6))

plt.subplot(1, 2, 1)
nx.draw(
    G,
    pos,
    node_color=true_labels,
    cmap=plt.cm.tab10,
    node_size=50,
    linewidths=0.2
)
plt.title("Classes Reais (rótulos verdadeiros)")

# -----------------------------
# Grafo 2: Classes preditas
# -----------------------------
node_colors = np.array(["#cccccc"] * len(true_labels))

cmap = plt.cm.tab10
for i, idx in enumerate(test_idx):
    color = cmap(pred_labels[i] / (max(pred_labels) if max(pred_labels) > 0 else 1))
    node_colors[idx] = mcolors.rgb2hex(color[:3])  # conversão correta

plt.subplot(1, 2, 2)
nx.draw(
    G,
    pos,
    node_color=node_colors,
    node_size=50,
    linewidths=0.2
)
plt.title("Classes Preditas (modelo GCN)")

plt.show()

In [None]:
cm = confusion_matrix(Y[test_idx], test_preds)
disp = ConfusionMatrixDisplay(cm)
disp.plot(cmap='Blues')
plt.title("Matriz de Confusão - Teste")
plt.show()