In [None]:
import os
import re
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from datasets import Dataset
from transformers import (
    AutoTokenizer,
    RobertaModel,
    RobertaPreTrainedModel,
    AutoConfig,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [None]:
df = pd.read_csv("/content/data_cleaned_sent_class.csv")
df["polarity"] = df["polarity"].str.lower()
df = df[df["polarity"] != "conflict"]
df["polarity"].value_counts()

Unnamed: 0_level_0,count
polarity,Unnamed: 1_level_1
neutral,3470
negative,3377
positive,3149


In [None]:
df["model_input"] = df["sentence"]

In [None]:
train_df, temp_df = train_test_split(
    df,
    test_size=0.30,
    stratify=df['polarity'],
    random_state=42
)
valid_df, test_df = train_test_split(
    temp_df,
    test_size=0.66,  # ≈ 20 % du total
    stratify=temp_df['polarity'],
    random_state=42
)

print(f"Tailles : Train={len(train_df)}, Valid={len(valid_df)}, Test={len(test_df)}")

Tailles : Train=6997, Valid=1019, Test=1980


In [None]:
def clean_text(text: str) -> str:
    text = text.lower()
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'[@#]\S+', '', text)
    text = re.sub(r'[^a-z0-9éèàçôûîäëïöüâêîôûùœ€\[\]/ ]', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text


In [None]:
# On choisit le format RoBERTa "[aspect_term] </s> [clean_sentence]"
for df_ in [train_df, valid_df, test_df]:
    df_["clean_sentence"] = df_["sentence"].map(clean_text)
    df_["model_input"] = df_.apply(
        lambda row: f"{row['aspect_term']} </s> {row['clean_sentence']}",
        axis=1
    )
    # Conversion des labels en entiers
    df_["label"] = df_["polarity"].map({"positive": 0, "negative": 1, "neutral": 2})


In [None]:
tokenizer = AutoTokenizer.from_pretrained("roberta-large")
all_lengths = [
    len(tokenizer.encode(txt, add_special_tokens=True))
    for txt in train_df["model_input"]
]
percentile_90 = int(np.percentile(all_lengths, 90))
percentile_95 = int(np.percentile(all_lengths, 95))
print(f"90ᵉ percentile : {percentile_90}")
print(f"95ᵉ percentile : {percentile_95}")
# On fixe max_length à 80 après analyse
MAX_LEN = 80


90ᵉ percentile : 41
95ᵉ percentile : 46


In [None]:
# On inclut 'aspect_term' dans chaque Dataset pour pouvoir calculer les positions
train_ds = Dataset.from_pandas(
    train_df[["model_input", "label", "aspect_term"]]
)
valid_ds = Dataset.from_pandas(
    valid_df[["model_input", "label", "aspect_term"]]
)
test_ds = Dataset.from_pandas(
    test_df[["model_input", "label", "aspect_term"]]
)


In [None]:
def get_relative_positions(text: str, aspect: str, tokenizer, max_len: int = MAX_LEN):
    """
    Pour chaque token de `text`, renvoie la distance (en nombre de tokens)
    jusqu'au début de `aspect`. Si l'aspect n'est pas trouvé, on met max_len.
    """
    # Tokenize la phrase complète (incluant l’aspect dans le texte)
    tokens = tokenizer.tokenize(text)
    aspect_tokens = tokenizer.tokenize(aspect)
    asp_len = len(aspect_tokens)
    aspect_start = -1

    # Recherche de la première occurrence des tokens d'aspect
    for i in range(len(tokens) - asp_len + 1):
        if tokens[i : i + asp_len] == aspect_tokens:
            aspect_start = i
            break

    # Si l’aspect n’est pas trouvée, on retourne une liste pleine de max_len
    if aspect_start == -1:
        return [max_len] * max_len

    # Calcul de la distance absolue au premier token d'aspect
    rel_positions = [abs(i - aspect_start) for i in range(len(tokens))]
    # Tronquer/padder pour avoir exactement max_len
    rel_positions = rel_positions[:max_len] + [max_len] * (max_len - len(rel_positions))
    return rel_positions


In [None]:
def preprocess_fn(examples):
    encodings = tokenizer(
        examples["model_input"],
        padding="max_length",
        truncation=True,
        max_length=MAX_LEN,
        return_tensors=None
    )
    encodings["labels"] = examples["label"]

    # Calcul du vecteur de positions relatives pour chaque exemple
    rel_pos_batch = []
    for text, aspect in zip(examples["model_input"], examples["aspect_term"]):
        rel_pos = get_relative_positions(text, aspect, tokenizer, max_len=MAX_LEN)
        rel_pos_batch.append(rel_pos)
    encodings["relative_positions"] = rel_pos_batch

    return encodings


In [None]:
tokenized_train = train_ds.map(
    preprocess_fn,
    batched=True,
    remove_columns=train_ds.column_names
)
tokenized_valid = valid_ds.map(
    preprocess_fn,
    batched=True,
    remove_columns=valid_ds.column_names
)
tokenized_test = test_ds.map(
    preprocess_fn,
    batched=True,
    remove_columns=test_ds.column_names
)


Map:   0%|          | 0/6997 [00:00<?, ? examples/s]

Map:   0%|          | 0/1019 [00:00<?, ? examples/s]

Map:   0%|          | 0/1980 [00:00<?, ? examples/s]

In [None]:
y_train = train_df["label"].to_numpy()
class_weights = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(y_train),
    y=y_train
)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float)
print(f"Poids de classe : {class_weights}")


Poids de classe : [1.05822747 0.98660462 0.9602031 ]


In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, preds, average="weighted", zero_division=0
    )
    accuracy = accuracy_score(labels, preds)
    f1_pc, _, _, _ = precision_recall_fscore_support(
        labels, preds, average=None, zero_division=0
    )
    metrics = {
        "accuracy": accuracy,
        "f1": f1,
        "precision": precision,
        "recall": recall,
    }
    class_names = ["positive", "negative", "neutral"]
    for i, cn in enumerate(class_names):
        metrics[f"f1_{cn}"] = float(f1_pc[i])
    return metrics


In [None]:
model_name = "roberta-large"
config = AutoConfig.from_pretrained(
    model_name,
    num_labels=3,
    id2label={0: "positive", 1: "negative", 2: "neutral"},
    label2id={"positive": 0, "negative": 1, "neutral": 2},
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Utilisation de la tête de classification standard
from transformers import AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    config=config
)

# (Optionnel) Si tu veux toujours pondérer la loss :
model.class_weights = class_weights_tensor


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-large and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
output_dir = "/content/working/absa_roberta_position"
os.makedirs(output_dir, exist_ok=True)
os.makedirs(f"{output_dir}/logs", exist_ok=True)

training_args = TrainingArguments(
    output_dir=output_dir,
    num_train_epochs=4,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    gradient_accumulation_steps=2,  # équivalent à batch_size 32
    learning_rate=3e-5,
    weight_decay=0.01,
    warmup_ratio=0.2,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_dir=f"{output_dir}/logs",
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    save_total_limit=2,
    seed=42,
    data_seed=42,
    report_to=[],
    fp16=torch.cuda.is_available(),
    dataloader_num_workers=2,
)

print(f"Device utilisé : {'GPU' if torch.cuda.is_available() else 'CPU'}")
print(f"Mixed precision : {training_args.fp16}")


Device utilisé : GPU
Mixed precision : True


In [None]:
def data_collator(features):
    batch = {
        "input_ids": torch.tensor([f["input_ids"] for f in features], dtype=torch.long),
        "attention_mask": torch.tensor([f["attention_mask"] for f in features], dtype=torch.long),
        "labels": torch.tensor([f["labels"] for f in features], dtype=torch.long),
    }
    return batch


In [None]:
class WeightedLossTrainer(Trainer):
    """
    Trainer avec perte CrossEntropy pondérée par les poids de classe.
    """
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        # Récupération des labels
        labels = inputs.pop("labels")

        # Forward pass
        outputs = model(
            input_ids=inputs.get("input_ids"),
            attention_mask=inputs.get("attention_mask"),
            labels=labels,
        )
        # SequenceClassifierOutput
        loss = outputs.loss
        logits = outputs.logits

        # Si class_weights défini, override de la loss
        if self.class_weights is not None:
            cw = self.class_weights.to(logits.device)
            loss_fn = nn.CrossEntropyLoss(weight=cw)
            loss = loss_fn(logits, labels)

        return (loss, outputs) if return_outputs else loss


In [None]:
trainer = WeightedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_valid,
    compute_metrics=compute_metrics,
    class_weights=class_weights_tensor,
    data_collator=data_collator,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)



In [None]:
train_result = trainer.train()

Epoch,Training Loss,Validation Loss,Accuracy,F1,Precision,Recall,F1 Positive,F1 Negative,F1 Neutral
1,0.617,0.569935,0.771344,0.770885,0.78253,0.771344,0.678922,0.886435,0.77551
2,0.4129,0.438928,0.830226,0.828189,0.833202,0.830226,0.858521,0.79703,0.845395
3,0.2482,0.459869,0.871443,0.871187,0.871466,0.871443,0.838323,0.919308,0.85503
4,0.1204,0.491865,0.885182,0.885048,0.884963,0.885182,0.874608,0.91404,0.866097


In [None]:
final_metrics = trainer.evaluate(eval_dataset=tokenized_test)
print("Métriques finales sur le test set :", final_metrics)


Métriques finales sur le test set : {'eval_loss': 0.41523319482803345, 'eval_accuracy': 0.8994949494949495, 'eval_f1': 0.8991252204581427, 'eval_precision': 0.8991686315643137, 'eval_recall': 0.8994949494949495, 'eval_f1_positive': 0.9024390243902439, 'eval_f1_negative': 0.9124820659971306, 'eval_f1_neutral': 0.8832335329341318, 'eval_runtime': 5.7755, 'eval_samples_per_second': 342.828, 'eval_steps_per_second': 10.735, 'epoch': 4.0}


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Chemin vers ton dossier dans Google Drive
save_path = "/content/drive/MyDrive/absa_model/sentiment_classifier/"

# Sauvegarder le modèle
model.save_pretrained(save_path)

# Sauvegarder aussi le tokenizer
tokenizer.save_pretrained(save_path)

('/content/drive/MyDrive/absa_model/sentiment_classifier/tokenizer_config.json',
 '/content/drive/MyDrive/absa_model/sentiment_classifier/special_tokens_map.json',
 '/content/drive/MyDrive/absa_model/sentiment_classifier/vocab.json',
 '/content/drive/MyDrive/absa_model/sentiment_classifier/merges.txt',
 '/content/drive/MyDrive/absa_model/sentiment_classifier/added_tokens.json',
 '/content/drive/MyDrive/absa_model/sentiment_classifier/tokenizer.json')