# TP 04 - Classification de Texte : Détection de Fake News

**Module** : Réseaux de Neurones Approfondissement  
**Durée** : 2h  
**Objectif** : Entraîner un Transformer pour détecter les fake news

---

## Objectifs pédagogiques

À la fin de ce TP, vous serez capable de :
1. Préparer un dataset de texte pour un Transformer
2. Entraîner votre Transformer sur une tâche de classification
3. Évaluer et interpréter les résultats
4. Utiliser un pipeline de traduction pour l'inférence en français

## 0. Installation et imports

In [None]:
!pip install torch transformers datasets matplotlib numpy scikit-learn tqdm -q

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import numpy as np
import math
from tqdm.auto import tqdm
from collections import Counter
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns

# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

torch.manual_seed(42)

## 1. Notre Transformer (des TPs précédents)

In [None]:
# === Composants du Transformer ===

class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.d_k = embed_dim // num_heads
        
        self.W_q = nn.Linear(embed_dim, embed_dim)
        self.W_k = nn.Linear(embed_dim, embed_dim)
        self.W_v = nn.Linear(embed_dim, embed_dim)
        self.W_o = nn.Linear(embed_dim, embed_dim)
    
    def forward(self, x, mask=None):
        B, S, _ = x.shape
        Q = self.W_q(x).view(B, S, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(x).view(B, S, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(B, S, self.num_heads, self.d_k).transpose(1, 2)
        
        scores = Q @ K.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask, float('-inf'))
        attn = F.softmax(scores, dim=-1)
        out = (attn @ V).transpose(1, 2).contiguous().view(B, S, self.embed_dim)
        return self.W_o(out), attn


class FeedForward(nn.Module):
    def __init__(self, embed_dim, ff_dim=None, dropout=0.1):
        super().__init__()
        ff_dim = ff_dim or 4 * embed_dim
        self.net = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(ff_dim, embed_dim),
            nn.Dropout(dropout)
        )
    
    def forward(self, x):
        return self.net(x)


class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=5000, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x):
        return self.dropout(x + self.pe[:, :x.size(1)])


class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim=None, dropout=0.1):
        super().__init__()
        self.attn = MultiHeadAttention(embed_dim, num_heads)
        self.ff = FeedForward(embed_dim, ff_dim, dropout)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, mask=None):
        attn_out, _ = self.attn(self.norm1(x), mask)
        x = x + self.dropout(attn_out)
        x = x + self.dropout(self.ff(self.norm2(x)))
        return x


class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, num_classes,
                 max_len=512, dropout=0.1, pad_idx=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.pos_encoding = PositionalEncoding(embed_dim, max_len, dropout)
        self.layers = nn.ModuleList([TransformerBlock(embed_dim, num_heads, dropout=dropout) 
                                     for _ in range(num_layers)])
        self.norm = nn.LayerNorm(embed_dim)
        self.classifier = nn.Linear(embed_dim, num_classes)
        self.embed_dim = embed_dim
    
    def forward(self, x, mask=None):
        x = self.embedding(x) * math.sqrt(self.embed_dim)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, mask)
        x = self.norm(x)
        # Moyenne sur tous les tokens (alternative à [CLS])
        x = x.mean(dim=1)
        return self.classifier(x)

print("Transformer chargé !")

---

## 2. Chargement du Dataset

Nous utilisons un dataset de fake news en anglais.

In [None]:
from datasets import load_dataset

# Charger un dataset de fake news
# On utilise un sous-ensemble pour l'entraînement rapide
print("Chargement du dataset...")

# Dataset LIAR (fact-checking)
dataset = load_dataset("liar", trust_remote_code=True)

print(f"\nStructure du dataset:")
print(dataset)

print(f"\nExemple:")
print(dataset['train'][0])

In [None]:
# Le dataset LIAR a 6 labels, simplifions en 2 (fake vs real)
# Labels originaux: pants-fire, false, barely-true, half-true, mostly-true, true
# Fake: pants-fire, false, barely-true (0, 1, 2)
# Real: half-true, mostly-true, true (3, 4, 5)

def simplify_label(example):
    # 0, 1, 2 -> 0 (Fake)
    # 3, 4, 5 -> 1 (Real)
    example['binary_label'] = 0 if example['label'] < 3 else 1
    return example

dataset = dataset.map(simplify_label)

# Vérifier la distribution
train_labels = [ex['binary_label'] for ex in dataset['train']]
print(f"Distribution train: Fake={train_labels.count(0)}, Real={train_labels.count(1)}")

---

## 3. Tokenization

On crée un tokenizer simple basé sur les mots.

In [None]:
# ============================================
# EXERCICE 1 : Créer un vocabulaire
# ============================================

class SimpleTokenizer:
    def __init__(self, texts, max_vocab_size=10000, min_freq=2):
        """
        Tokenizer simple basé sur les mots.
        """
        self.pad_token = '<PAD>'
        self.unk_token = '<UNK>'
        
        # Compter les mots
        word_counts = Counter()
        for text in texts:
            words = text.lower().split()
            word_counts.update(words)
        
        # Créer le vocabulaire
        # TODO: Créer word2idx et idx2word
        # 1. Commencer par les tokens spéciaux
        # 2. Ajouter les mots les plus fréquents (>= min_freq)
        # 3. Limiter à max_vocab_size
        
        self.word2idx = {self.pad_token: 0, self.unk_token: 1}
        
        # Ajouter les mots fréquents
        for word, count in word_counts.most_common(max_vocab_size - 2):
            if count >= min_freq:
                self.word2idx[word] = len(self.word2idx)
        
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.vocab_size = len(self.word2idx)
        
        print(f"Vocabulaire créé: {self.vocab_size} mots")
    
    def encode(self, text, max_len=128):
        """Convertit un texte en indices."""
        words = text.lower().split()[:max_len]
        indices = [self.word2idx.get(w, self.word2idx[self.unk_token]) for w in words]
        
        # Padding
        if len(indices) < max_len:
            indices += [self.word2idx[self.pad_token]] * (max_len - len(indices))
        
        return indices
    
    def decode(self, indices):
        """Convertit des indices en texte."""
        words = [self.idx2word.get(idx, self.unk_token) for idx in indices]
        return ' '.join(w for w in words if w != self.pad_token)

In [None]:
# Créer le tokenizer
train_texts = [ex['statement'] for ex in dataset['train']]
tokenizer = SimpleTokenizer(train_texts, max_vocab_size=8000, min_freq=2)

# Test
test_text = "The president said the economy is doing great."
encoded = tokenizer.encode(test_text, max_len=20)
decoded = tokenizer.decode(encoded)

print(f"Original: {test_text}")
print(f"Encoded: {encoded}")
print(f"Decoded: {decoded}")

---

## 4. Dataset PyTorch

In [None]:
class FakeNewsDataset(Dataset):
    def __init__(self, data, tokenizer, max_len=128):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        text = item['statement']
        label = item['binary_label']
        
        input_ids = self.tokenizer.encode(text, self.max_len)
        
        return {
            'input_ids': torch.tensor(input_ids, dtype=torch.long),
            'label': torch.tensor(label, dtype=torch.long)
        }

In [None]:
# Créer les datasets
MAX_LEN = 64
BATCH_SIZE = 32

train_dataset = FakeNewsDataset(dataset['train'], tokenizer, MAX_LEN)
val_dataset = FakeNewsDataset(dataset['validation'], tokenizer, MAX_LEN)
test_dataset = FakeNewsDataset(dataset['test'], tokenizer, MAX_LEN)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

print(f"Train: {len(train_dataset)} samples")
print(f"Val: {len(val_dataset)} samples")
print(f"Test: {len(test_dataset)} samples")

---

## 5. Entraînement

In [None]:
# Créer le modèle
model = TransformerClassifier(
    vocab_size=tokenizer.vocab_size,
    embed_dim=128,
    num_heads=4,
    num_layers=2,
    num_classes=2,
    max_len=MAX_LEN,
    dropout=0.1,
    pad_idx=0
).to(device)

# Compter les paramètres
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Paramètres: {num_params:,}")

In [None]:
# ============================================
# EXERCICE 2 : Boucle d'entraînement
# ============================================

def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch in tqdm(loader, desc="Training", leave=False):
        input_ids = batch['input_ids'].to(device)
        labels = batch['label'].to(device)
        
        # TODO: Implémenter
        # 1. Zero grad
        optimizer.zero_grad()
        
        # 2. Forward
        outputs = model(input_ids)
        
        # 3. Loss
        loss = criterion(outputs, labels)
        
        # 4. Backward
        loss.backward()
        
        # 5. Optimizer step
        optimizer.step()
        
        # Stats
        total_loss += loss.item()
        preds = outputs.argmax(dim=-1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    
    return total_loss / len(loader), correct / total


def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch in loader:
            input_ids = batch['input_ids'].to(device)
            labels = batch['label'].to(device)
            
            outputs = model(input_ids)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            preds = outputs.argmax(dim=-1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    
    return total_loss / len(loader), correct / total

In [None]:
# Configuration
EPOCHS = 5
LR = 1e-4

optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss()

# Historique
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}

print("Début de l'entraînement...\n")

for epoch in range(EPOCHS):
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    
    print(f"Epoch {epoch+1}/{EPOCHS}")
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"  Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
    print()

In [None]:
# Visualisation
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(history['train_loss'], label='Train')
axes[0].plot(history['val_loss'], label='Val')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Loss')
axes[0].legend()

axes[1].plot(history['train_acc'], label='Train')
axes[1].plot(history['val_acc'], label='Val')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Accuracy')
axes[1].legend()

plt.tight_layout()
plt.show()

---

## 6. Évaluation sur le Test Set

In [None]:
# Évaluation finale
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")

In [None]:
# Rapport détaillé
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        labels = batch['label']
        
        outputs = model(input_ids)
        preds = outputs.argmax(dim=-1).cpu()
        
        all_preds.extend(preds.tolist())
        all_labels.extend(labels.tolist())

print("Classification Report:")
print(classification_report(all_labels, all_preds, target_names=['Fake', 'Real']))

In [None]:
# Matrice de confusion
cm = confusion_matrix(all_labels, all_preds)

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Fake', 'Real'],
            yticklabels=['Fake', 'Real'])
plt.xlabel('Prédiction')
plt.ylabel('Vérité')
plt.title('Matrice de Confusion')
plt.show()

---

## 7. Pipeline avec Traduction (FR → EN)

Pour utiliser notre modèle sur du texte français, on ajoute une étape de traduction.

In [None]:
from transformers import pipeline

# Charger le traducteur FR -> EN
print("Chargement du traducteur...")
translator = pipeline("translation_fr_to_en", model="Helsinki-NLP/opus-mt-fr-en")
print("Traducteur chargé !")

In [None]:
# ============================================
# EXERCICE 3 : Pipeline complet FR -> Classification
# ============================================

class FakeNewsDetectorFR:
    def __init__(self, model, tokenizer, translator, device, max_len=64):
        self.model = model
        self.tokenizer = tokenizer
        self.translator = translator
        self.device = device
        self.max_len = max_len
        self.labels = ['Fake', 'Real']
    
    def predict(self, text_fr):
        """
        Prédit si un texte français est une fake news.
        
        Args:
            text_fr: Texte en français
        
        Returns:
            dict avec 'label', 'confidence', 'translation'
        """
        # TODO: Implémenter
        
        # 1. Traduire FR -> EN
        translation = self.translator(text_fr, max_length=200)[0]['translation_text']
        
        # 2. Tokenizer
        input_ids = self.tokenizer.encode(translation, self.max_len)
        input_ids = torch.tensor([input_ids], dtype=torch.long).to(self.device)
        
        # 3. Prédiction
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(input_ids)
            probs = F.softmax(outputs, dim=-1)
            pred = outputs.argmax(dim=-1).item()
            confidence = probs[0, pred].item()
        
        return {
            'label': self.labels[pred],
            'confidence': confidence,
            'translation': translation,
            'probabilities': {'Fake': probs[0, 0].item(), 'Real': probs[0, 1].item()}
        }

In [None]:
# Créer le détecteur
detector = FakeNewsDetectorFR(model, tokenizer, translator, device)

# Tests en français
textes_test = [
    "Le président a annoncé une baisse des impôts pour les classes moyennes.",
    "Des extraterrestres ont été découverts dans une base secrète du gouvernement.",
    "L'économie a connu une croissance de 2% au dernier trimestre.",
    "Un homme a survécu 3 mois sans manger grâce à la méditation.",
]

print("=" * 60)
for texte in textes_test:
    result = detector.predict(texte)
    print(f"\nTexte FR: {texte}")
    print(f"Traduction: {result['translation']}")
    print(f"Prédiction: {result['label']} (confiance: {result['confidence']:.2%})")
    print("-" * 60)

---

## 8. Récapitulatif

### Ce que nous avons fait

1. **Préparation des données** : Tokenization, Dataset, DataLoader
2. **Entraînement** : Boucle d'entraînement avec validation
3. **Évaluation** : Métriques, matrice de confusion
4. **Pipeline FR→EN** : Traduction + Classification

### Limitations

- Tokenizer simple (word-level) → Perd les mots rares
- Petit modèle → Capacité limitée
- Dataset anglais → Biais culturels possibles

### Améliorations possibles

- Utiliser un tokenizer BPE (comme BERT)
- Augmenter la taille du modèle
- Fine-tuner un modèle pré-entraîné (BERT, RoBERTa)
- Utiliser un dataset multilingue

### Prochaine session

**Projet Final** : Vous choisirez entre :
- Approfondir ce détecteur de fake news
- Explorer CLIP pour la recherche d'images par texte