In [1]:
import random
import numpy as np
import torch
import json
import os
from tqdm import tqdm
from pathlib import Path
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import copy

# Seed pour reproductibilit√©
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

ROOT = Path("Amazon_products")
TRAIN_DIR = ROOT / "train"
TEST_DIR = ROOT / "test"

TEST_CORPUS_PATH = TEST_DIR / "test_corpus.txt"
TRAIN_CORPUS_PATH = TRAIN_DIR / "train_corpus.txt"

CLASS_HIERARCHY_PATH = ROOT / "class_hierarchy.txt"
CLASS_RELATED_PATH = ROOT / "class_related_keywords.txt"
CLASS_PATH = ROOT / "classes.txt"

SUBMISSION_PATH = "Submission/submission.csv"

NUM_CLASSES = 531
MIN_LABELS = 2
MAX_LABELS = 3

def load_corpus(path):
    """Load corpus into {id: text} dictionary."""
    id2text = {}
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split("\t", 1)
            if len(parts) == 2:
                id, text = parts
                id2text[id] = text
    return id2text

def load_multilabel(path):
    """Load multi-label data into {id: [labels]} dictionary."""
    id2labels = {}
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split("\t")
            if len(parts) == 2:
                pid, label = parts
                pid = int(pid)
                label = int(label)
                if pid not in id2labels:
                    id2labels[pid] = []
                id2labels[pid].append(label)
    return id2labels

def load_class_keywords(path):
    """Load class keywords into {class_name: [keywords]} dictionary."""
    class2keywords = {}
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            if ":" not in line:
                continue
            classname, keywords = line.strip().split(":", 1)
            keyword_list = [kw.strip() for kw in keywords.split(",") if kw.strip()]
            class2keywords[classname] = keyword_list
    return class2keywords

id2text_test = load_corpus(TEST_CORPUS_PATH)
id2text_train = load_corpus(TRAIN_CORPUS_PATH)

# Classes
id2class = load_corpus(CLASS_PATH)
class2hierarchy = load_multilabel(CLASS_HIERARCHY_PATH)
class2related = load_class_keywords(CLASS_RELATED_PATH)

# Silver labels (RoBERTa - les meilleurs)
with open("Silver/silver_train_roberta.json", "r") as f:
    pid2labelids_silver = json.load(f)

with open("Silver/silver_test_roberta.json", "r") as f:
    pid2labelids_test = json.load(f)

print(f"Train: {len(id2text_train)} samples")
print(f"Test: {len(id2text_test)} samples")
print(f"Classes: {len(id2class)}")

X_train = torch.load("Embeddings/X_train.pt").to(device)
y_train = torch.load("Embeddings/y_train.pt").to(device)
X_test = torch.load("Embeddings/X_test.pt").to(device)
label_emb = torch.load("Embeddings/label_emb.pt").to(device)
test_ids = list(id2text_test.keys())
train_ids = list(id2text_train.keys())

print(f"Train embeddings: {X_train.shape}")
print(f"Test embeddings: {X_test.shape}")
print(f"Train labels: {y_train.shape}")
print(f"Label embeddings: {label_emb.shape}")

# Index mapping
pid2idx = {pid: i for i, pid in enumerate(train_ids)}

input_dim = X_train.size(1)
num_classes = y_train.size(1)

print(f"Input dimension: {input_dim}")
print(f"Num classes: {num_classes}")


Device: cuda
Train: 29487 samples
Test: 19658 samples
Classes: 531
Train embeddings: torch.Size([29487, 768])
Test embeddings: torch.Size([19658, 768])
Train labels: torch.Size([29487, 531])
Label embeddings: torch.Size([531, 768])
Input dimension: 768
Num classes: 531


In [2]:
# Build A and D

# Use the number of classes inferred from y_train for consistency
num_classes = y_train.size(1)

# Adjacency matrix (num_classes x num_classes)
A = torch.zeros((num_classes, num_classes), dtype=torch.float32, device=device)

# class2hierarchy is a dict {class_id: [related_class_ids]}
# We'll connect nodes symmetrically (u <-> v)
for cls, related_list in class2hierarchy.items():
    if cls >= num_classes:
        continue
    for other in related_list:
        if other >= num_classes:
            continue
        A[cls, other] = 1.0
        A[other, cls] = 1.0  # make the graph undirected

# Add self-loops (each class connected to itself)
A = A + torch.eye(num_classes, device=device)

# Degree matrix D (diagonal matrix of node degrees)
deg = A.sum(dim=1)  # (num_classes,)
D = torch.diag(deg)

print("A shape:", A.shape)
print("D shape:", D.shape)

# Normalized adjacency matrix for GCN: A_hat = D^{-1/2} A D^{-1/2}
deg_inv_sqrt = torch.pow(deg, -0.5)
deg_inv_sqrt[torch.isinf(deg_inv_sqrt)] = 0.0
D_inv_sqrt = torch.diag(deg_inv_sqrt)

A_hat = D_inv_sqrt @ A @ D_inv_sqrt
print("A_hat shape:", A_hat.shape)


A shape: torch.Size([531, 531])
D shape: torch.Size([531, 531])
A_hat shape: torch.Size([531, 531])


In [None]:
# ==========================================================
# Your Task: Implement Label GCN and GCN-Enhanced Classifier
# ==========================================================

class LabelGCN(nn.Module):
    """
    Multi-layer Graph Convolutional Network (GCN) encoder for label embeddings.

    Each layer should perform the following steps:
        1. Aggregate neighbor embeddings: H <- A_hat @ H
        2. Linear transformation: H <- H @ W
        3. (Optional) Apply ReLU and Dropout (skip for the last layer)

    Args:
        emb_dim (int): Dimension of label embeddings.
        num_layers (int): Number of GCN layers.
        dropout (float): Dropout probability.
    """
    def __init__(self, emb_dim, num_layers=1, dropout=0.5):
        super().__init__()
        # TODO: Define learnable weight matrices (list of emb_dim x emb_dim parameters)
        # Hint: Use nn.ParameterList and Xavier uniform initialization

        self.emb_dim = emb_dim
        self.num_layers=num_layers
        self.dropout = dropout
        self.W_list = nn.ParameterList()

        for i in range (self.num_layers):
            W = nn.Parameter(torch.empty(self.emb_dim, self.emb_dim))
            nn.init.xavier_uniform_(W) 
            self.W_list.append(W)

    def forward(self, H, A_hat):
        """
        Args:
            H (torch.Tensor): Initial label embeddings, shape (num_labels, emb_dim).
            A_hat (torch.Tensor): Normalized adjacency matrix, shape (num_labels, num_labels).

        Returns:
            torch.Tensor: Updated label embeddings, shape (num_labels, emb_dim).
        """
        # TODO: Implement multi-layer GCN
        # for each layer:
        #   1) propagate messages: H = A_hat @ H
        #   2) linear transform: H = H @ W
        #   3) if not last layer: apply ReLU + Dropout

        for i in range (len(self.W_list)):
            H = torch.matmul(torch.matmul(A_hat, H), self.W_list[i]) # H = A_hat @ H @ self.W_list[i]
            if i < self.num_layers -1:
                H = F.relu(H)
                H = F.dropout(H, p=self.dropout, training=self.training)

        return H


class GCNEnhancedClassifier(nn.Module):
    """
    Classifier that combines:
      - Document representations projected into label space
      - Label embeddings refined by a GCN over the label graph

    Args:
        input_dim (int): Dimension of input document embeddings.
        label_init_emb (torch.Tensor): Initial label embeddings, shape (num_labels, emb_dim).
        A_hat (torch.Tensor): Normalized adjacency matrix of labels, shape (num_labels, num_labels).
        num_layers (int): Number of GCN layers.
        dropout (float): Dropout probability.
    """
    def __init__(self, input_dim, label_init_emb, A_hat, num_layers=1, dropout=0.5):
        super().__init__()
        self.proj = nn.Linear(input_dim, label_init_emb.size(1)) #1

        self.dropout = dropout
        self.num_layers = num_layers
        self.encoder = LabelGCN(label_init_emb.size(1), num_layers, dropout) #2

        self.label_emb = nn.Parameter(label_init_emb.clone()) #3

        self.register_buffer("A_hat", A_hat) #4

        # TODO: 
        # 1. Define projection layer (input_dim -> emb_dim)
        # 2. Define GCN encoder for label embeddings
        # 3. Make label_init_emb trainable (nn.Parameter)
        # 4. Register adjacency matrix (use register_buffer)

    def forward(self, x):
        """
        Args:
            x (torch.Tensor): Input embeddings for documents, shape (batch_size, input_dim).

        Returns:
            torch.Tensor: Logits for classification, shape (batch_size, num_labels).
        """
        E_refine = self.encoder(self.label_emb, self.A_hat) #1

        x_proj = self.proj(x)
        x_proj = F.dropout(x_proj, p=self.dropout, training=self.training) #2

        logits = torch.matmul(x_proj, E_refine.T) #3
        return logits

        # TODO:
        # 1. Refine label embeddings using GCN
        # 2. Project input x into label embedding space (+ dropout)
        # 3. Compute similarity (inner product) between x_proj and label_emb    

In [None]:
"""model = GCNEnhancedClassifier(embeddings.size(1), label_emb, A_hat.to(device), num_layers=2).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-4)

best_val_acc = -1
best_model_state = None
patience = 5
patience_counter = 0

val_acc_list = []
test_acc_list = []

EPOCHS = 500

for epoch in range(1, EPOCHS + 1):
    model.train()
    total_loss = 0

    for batch in tqdm(train_loader, desc=f"Epoch {epoch}"):
        X = batch["X"].to(device)
        y = batch["y"].to(device)
        logits = model(X)
        loss = F.cross_entropy(logits, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"[Epoch {epoch}] Train Loss: {avg_loss:.4f}")

    # === Validation ===
    val_result = evaluate(model, val_loader, device=device)
    val_acc = val_result["accuracy"]
    val_acc_list.append(val_acc)

    is_improved = val_acc > best_val_acc
    print_eval_result(val_result, stage="val", is_improved=is_improved)

    # === Test ===
    test_result = evaluate(model, test_loader, device=device)
    test_acc = test_result["accuracy"]
    test_acc_list.append(test_acc)
    print_eval_result(test_result, stage="test")

    # === Update best model ===
    if is_improved:
        best_val_acc = val_acc
        best_model_state = copy.deepcopy(model.state_dict())
        patience_counter = 0
    else:
        patience_counter += 1

    # === Early stopping ===
    if patience_counter >= patience:
        print(f"[Early Stopping] No improvement for {patience} consecutive epochs.")
        break"""