# ============================
# Imports et préparations
# ============================

In [55]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import TensorDataset, DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
import re

# ============================
# 1. Chargement et nettoyage
# ============================

In [27]:
df = pd.read_csv('../data/articles3_cleaned.csv')

In [28]:
df.head()

Unnamed: 0,section,title,text,text_cleaned
0,Business,Bank of England expected to leave interest rat...,The Bank of England’s policymakers may not be ...,bank england policymakers may united today pro...
1,Business,"Federal Reserve holds interest rates, defying ...",The US Federal Reserve kept interest rates on ...,federal reserve kept interest rate hold signal...
2,Business,John Lewis tells some head office staff to wor...,John Lewis is asking some head office staff to...,john lewis asking head office staff spend leas...
3,Business,What could Albanese do to improve productivity...,In his address last week at the National Press...,address last week national press club prime mi...
4,Business,EU accuses China’s AliExpress of ‘systemic fai...,The European Commission has accused the online...,european commission accused online retailer al...


# ============================
# 2. Vectorisation TF-IDF
# ============================

In [29]:
texts_cleaned = df['text_cleaned'].astype(str).tolist()

# Transformation des textes en vecteurs via TF-IDF
# Chaque texte devient un vecteur de dimension 7500
# Chaque dimension correspond à un mot important (en anglais ici)

vectorizer = TfidfVectorizer(max_features=7500)
X_tfidf = vectorizer.fit_transform(texts_cleaned).toarray()
print(f"[INFO] TF-IDF shape : {X_tfidf.shape}")  # ex : (n_documents, 7500)

[INFO] TF-IDF shape : (60000, 7500)


In [30]:
from sklearn.preprocessing import normalize

# Normalisation L1 des vecteurs TF-IDF
X_tfidf = normalize(X_tfidf, norm='l2')

# Conversion vers un tenseur PyTorch (utile pour l'entraînement)
X_tensor = torch.tensor(X_tfidf, dtype=torch.float32)

# ============================
# 3. Recurrent Neural Networks en PyTorch
# ============================

In [79]:
from collections import Counter
from torchtext.data.utils import get_tokenizer
import torch

# 1. Tokeniser les textes
tokenizer = get_tokenizer("basic_english")
texts_cleaned = df["text_cleaned"].astype(str).tolist()
tokenized = [tokenizer(text) for text in texts_cleaned]

# 2. Construire le Counter
counter = Counter()
for tokens in tokenized:
    counter.update(tokens)

# 3. Ajouter les tokens spéciaux à la main
vocab_tokens = ["<pad>", "<unk>"] + [token for token, freq in counter.items()]
vocab = {token: idx for idx, token in enumerate(vocab_tokens)}

# 4. Fonction pour indexer les tokens
def encode(tokens, vocab, unk_token="<unk>"):
    return [vocab.get(token, vocab[unk_token]) for token in tokens]

# 5. Conversion des textes en séquences d'indices
indexed = [encode(tokens, vocab) for tokens in tokenized]

# 6. Padding
from torch.nn.utils.rnn import pad_sequence

tensor_sequences = [torch.tensor(seq, dtype=torch.long) for seq in indexed]
padded_sequences = pad_sequence(tensor_sequences, batch_first=True, padding_value=vocab["<pad>"])


In [80]:
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(df["section"])
labels_tensor = torch.tensor(labels_encoded, dtype=torch.long)


In [86]:
from torch.utils.data import random_split, DataLoader, TensorDataset

# Vérification
assert padded_sequences.size(0) == labels_tensor.size(0), "Mismatch entre données et labels"

# Création du dataset complet
full_dataset = TensorDataset(padded_sequences, labels_tensor)

# Taille du split
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size

# Split aléatoire
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)


In [90]:
import torch
import torch.nn as nn

class CustomRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, output_size, pad_idx):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        
        self.hidden_size = hidden_size
        self.W_xh = nn.Parameter(torch.randn(embed_dim, hidden_size))
        self.W_hh = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_h = nn.Parameter(torch.zeros(hidden_size))
        
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # x: (batch_size, seq_len) avec indices de tokens
        embedded = self.embedding(x)  # (batch_size, seq_len, embed_dim)
        batch_size, seq_len, _ = embedded.size()
        h_t = torch.zeros(batch_size, self.hidden_size, device=x.device)
        
        for t in range(seq_len):
            x_t = embedded[:, t, :]  # (batch_size, embed_dim)
            h_t = torch.tanh(x_t @ self.W_xh + h_t @ self.W_hh + self.b_h)  # (batch_size, hidden_size)
        
        output = self.fc(h_t)  # (batch_size, output_size)
        return output


In [None]:
vocab_size = len(vocab)
embed_dim = 100
hidden_dim = 64
output_dim = len(label_encoder.classes_)
pad_idx = vocab["<pad>"]

model = CustomRNN(vocab_size, embed_dim, hidden_dim, output_dim, pad_idx)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)


for epoch in range(10):
    model.train()
    total_loss = 0
    
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.long()  # indices tokens
        batch_y = batch_y.long()  # classes
        
        optimizer.zero_grad()
        outputs = model(batch_X)  # (batch_size, output_dim)
        
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"[Epoch {epoch+1}] Loss: {avg_loss:.4f}")


In [None]:
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.long()
        batch_y = batch_y.long()
        
        outputs = model(batch_X)
        preds = outputs.argmax(dim=1)  # classes prédites
        
        correct += (preds == batch_y).sum().item()
        total += batch_y.size(0)

print(f"Test accuracy: {correct/total:.4f}")


In [62]:
input_size = 10
hidden_size = 20
output_size = 1

model = CustomRNN(input_size, hidden_size, output_size)

X = torch.randn(5, 7, input_size)  # (batch_size=5, seq_len=7, input_size=10)
Y = torch.randn(5, output_size)

optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

for epoch in range(100):
    optimizer.zero_grad()
    output = model(X)
    loss = criterion(output, Y)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch} — Loss: {loss.item():.4f}")


Epoch 0 — Loss: 0.9152
Epoch 10 — Loss: 0.0865
Epoch 20 — Loss: 0.0098
Epoch 30 — Loss: 0.0015
Epoch 40 — Loss: 0.0004
Epoch 50 — Loss: 0.0001
Epoch 60 — Loss: 0.0000
Epoch 70 — Loss: 0.0000
Epoch 80 — Loss: 0.0000
Epoch 90 — Loss: 0.0000
