In [None]:
import torch
import tqdm
import os
import pickle
import numpy as np
import torch.nn as nn

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
df = pd.read_csv("data_clean.csv")

In [None]:
df.dropna(subset=['texte'], inplace=True)
df['texte'] = df['texte'].str.lower()

In [None]:
df_themes = pd.read_csv('Annotations/theme.csv')

In [None]:
df_merged = df.merge(df_themes, on="identifiant", how="left", suffixes=("", "_manual"))
df_merged["texte_total"] = df_merged["titre"].fillna("") + " " + df_merged["texte"].fillna("")
df_merged = df_merged[~df_merged["theme"].isna()]
df_merged["theme"] = df_merged["theme"].replace({
    "tribune": "analyse",
    "société": "politique"
})

In [None]:
from transformers import CamembertTokenizer, CamembertModel
from tqdm import tqdm
tokenizer = CamembertTokenizer.from_pretrained("camembert-base")
model = CamembertModel.from_pretrained("camembert-base").to(device)
model.eval()  

embeddings_dict = {}

for _, row in tqdm(df_merged.iterrows()):
    sentence = row['texte_total']
    theme = row['theme']
    inputs = tokenizer(sentence, return_tensors='pt', truncation=True, padding=False)
    inputs = {key: val.to(device) for key, val in inputs.items()}  
    with torch.no_grad():
        outputs = model(**inputs)
        token_embeddings = outputs.last_hidden_state.squeeze(0).cpu().numpy()
    
    embeddings_dict[sentence] = {
        "embeddings": token_embeddings, 
        "theme": theme
    }

In [None]:
with open("Models/theme_token_embeddings.pkl", "wb") as f:
    pickle.dump(embeddings_dict, f)

In [None]:
df_merged.drop('Unnamed: 0', axis=1, inplace=True)

In [None]:
counts = df_merged["theme"].value_counts()
valid_themes = counts[counts >= 2].index
df_filtered = df_merged[df_merged["theme"].isin(valid_themes)].copy()

In [None]:
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
df_filtered["theme_encoded"] = label_encoder.fit_transform(df_filtered["theme"])
print("Classes conservées :", list(label_encoder.classes_))

In [None]:
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(
    df_filtered[["texte_total", "theme_encoded"]],
    test_size=0.2,
    stratify=df_filtered["theme_encoded"],
    random_state=42
)

In [None]:
n_classes = len(label_encoder.classes_)

In [None]:
from torch.utils.data import Dataset

class TextDataset(Dataset):
    def __init__(self, df, tokenizer, max_length=256):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        text = self.df.loc[idx, "texte_total"]
        label = self.df.loc[idx, "theme_encoded"]

        inputs = self.tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )

        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "label": torch.tensor(label, dtype=torch.long)  # correction ici
        }

In [None]:
from torch.utils.data import DataLoader

batch_size = 64

train_dataset = TextDataset(train_df, tokenizer)
test_dataset = TextDataset(test_df, tokenizer)

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=2,       
    pin_memory=True      
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)


In [None]:
class CamembertCNNLSTMClassifier(nn.Module):
    def __init__(self, conv_out_dim=256, hidden_dim=128, num_classes=4):
        
        super().__init__()
        self.backbone = CamembertModel.from_pretrained("camembert-base")
        for param in self.backbone.parameters():
            param.requires_grad = False
        self.conv1d = nn.Conv1d(in_channels=768, out_channels=conv_out_dim, kernel_size=3, padding=1)
        self.relu_conv = nn.ReLU()
        self.lstm1 = nn.LSTM(input_size=conv_out_dim, hidden_size=hidden_dim,
                             batch_first=True, bidirectional=True)
        self.relu = nn.ReLU()
        self.lstm2 = nn.LSTM(input_size=hidden_dim * 2, hidden_size=hidden_dim,
                             batch_first=True, bidirectional=True)
        self.classifier = nn.Linear(hidden_dim * 2, num_classes)

    def forward(self, input_ids, attention_mask):
        
        with torch.no_grad():
            outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state              
        x = sequence_output.permute(0, 2, 1)                     
        x = self.conv1d(x)                                       
        x = self.relu_conv(x)
        x = x.permute(0, 2, 1)                                   
        lstm_out1, _ = self.lstm1(x)
        relu_out = self.relu(lstm_out1)
        lstm_out2, _ = self.lstm2(relu_out)
        cls_token_out = lstm_out2[:, 0, :]                       
        logits = self.classifier(cls_token_out)                
        
        return logits


In [None]:
class_counts = train_df['theme_encoded'].value_counts().sort_index().values
class_weights = 1.0 / torch.tensor(class_counts, dtype=torch.float)
class_weights = class_weights / class_weights.sum() * len(class_counts) 
class_weights = class_weights.to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)

In [None]:
n_classes = len(label_encoder.classes_)
print(f"Nombre de classes : {n_classes}")
print("Valeurs des labels dans le jeu de test :", set(test_df["theme_encoded"]))

In [None]:
model = CamembertCNNLSTMClassifier().to(device)
for param in model.backbone.parameters():
    param.requires_grad = False
for param in model.backbone.encoder.layer[-1].parameters():
    param.requires_grad = True
optimizer = torch.optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=1e-4
)

In [None]:
from sklearn.metrics import accuracy_score, f1_score
import torch.nn.functional as F

num_epochs = 25

for epoch in tqdm(range(num_epochs)):
    model.train()
    total_loss = 0

    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)  

        logits = model(input_ids, attention_mask)  
        loss = criterion(logits, labels)           

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # Évaluation
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            logits = model(input_ids, attention_mask)
            preds = torch.argmax(logits, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')

    print(f"Epoch {epoch+1}/{num_epochs} — Loss: {total_loss/len(train_loader):.4f} | "
          f"Accuracy: {accuracy:.4f} | F1-macro: {f1:.4f}")


In [None]:
torch.save(model.state_dict(), "Models/camembert_cnn_lstm_weights.pth")

In [None]:
def predict_theme(text, model, tokenizer, device, label_encoder=None):
    model.eval()
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=256).to(device)

    with torch.no_grad():
        logits = model(inputs['input_ids'], inputs['attention_mask'])  # shape: [1, 6]

    prediction_idx = torch.argmax(logits, dim=1).item()

    if label_encoder is not None:
        prediction_label = label_encoder.inverse_transform([prediction_idx])[0]
        return prediction_label, prediction_idx
    else:
        return prediction_idx


In [None]:
from sklearn.metrics import classification_report
print(classification_report(all_labels, all_preds, target_names=label_encoder.classes_))

### Inference

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader


class InferenceDataset(Dataset):
    def __init__(self, df, tokenizer, max_length=256):
        self.df = df  
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        text = str(self.df.iloc[idx]["texte"]) if pd.notna(self.df.iloc[idx]["texte"]) else ""
        inputs = self.tokenizer(
            text,
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )
        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0)
        }


In [None]:
df_all = df.merge(df_themes, on = 'identifiant', how = 'left')
df_to_predict = df_all[df_all["theme"].isna() & df_all["texte"].notna()].copy()
original_indices = df_to_predict.index
inference_dataset = InferenceDataset(df_to_predict, tokenizer)
inference_loader = DataLoader(inference_dataset, batch_size=32, shuffle=False)


In [None]:
from torch.nn.functional import softmax

model.eval()
all_preds = []
all_probs = []

with torch.no_grad():
    for batch in tqdm(inference_loader):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)

        logits = model(input_ids, attention_mask)            
        probs = softmax(logits, dim=1)                       

        pred_classes = torch.argmax(probs, dim=1)            
        max_probs = torch.max(probs, dim=1).values           

        all_preds.extend(pred_classes.cpu().numpy())
        all_probs.extend(max_probs.cpu().numpy())


In [None]:

assert len(original_indices) == len(all_preds), "Must have equal len keys and value when setting with an iterable"


df_all.loc[original_indices, "theme_pred_encoded"] = all_preds
df_all.loc[original_indices, "theme_pred"] = label_encoder.inverse_transform(all_preds)
df_all.loc[original_indices, "confidence"] = all_probs
df_all["theme_final"] = df_all["theme"].fillna(df_all["theme_pred"])


In [None]:
df_all = df_all.drop(columns=['Unnamed: 0', 'theme', 'theme_pred_encoded', 'theme_pred', 'confidence'])


In [None]:
actualité = df_all[df_all['theme_final']=='actualité']

In [None]:
actualité.to_csv('Data/articles_actualite.csv')

In [None]:
import os 
os.chdir('..')