# *Notebook* à utiliser pour faire le travail pratique # 3 sur l'analyse d'incidents.





## Imports

In [1]:
# Importation des bibliothèques nécessaires
from transformers import T5ForConditionalGeneration, T5Tokenizer
import torch
import json
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from torch.optim import AdamW
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from collections import Counter
import string
from nltk.translate.bleu_score import sentence_bleu
from rouge import Rouge
from bleurt import score




## Chargements Modèles & Tokenizers & Données

In [2]:
# Initialisation du modèle et du tokenizer pour t5-base
model_name = "t5-Large"
model = T5ForConditionalGeneration.from_pretrained(model_name)
tokenizer = T5Tokenizer.from_pretrained(model_name)

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thouroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [3]:
# Vérification de la disponibilité du GPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

T5ForConditionalGeneration(
  (shared): Embedding(32128, 1024)
  (encoder): T5Stack(
    (embed_tokens): Embedding(32128, 1024)
    (block): ModuleList(
      (0): T5Block(
        (layer): ModuleList(
          (0): T5LayerSelfAttention(
            (SelfAttention): T5Attention(
              (q): Linear(in_features=1024, out_features=1024, bias=False)
              (k): Linear(in_features=1024, out_features=1024, bias=False)
              (v): Linear(in_features=1024, out_features=1024, bias=False)
              (o): Linear(in_features=1024, out_features=1024, bias=False)
              (relative_attention_bias): Embedding(32, 16)
            )
            (layer_norm): T5LayerNorm()
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (1): T5LayerFF(
            (DenseReluDense): T5DenseActDense(
              (wi): Linear(in_features=1024, out_features=4096, bias=False)
              (wo): Linear(in_features=4096, out_features=1024, bias=False)
              (d

# Chargement des données

In [4]:
file_path = 'data/dev_examples.json'
with open(file_path, 'r') as file:
    data = json.load(file)

In [5]:
print("Nombre d'incidents:", len(data))
print("\nUn exemple:\n", data.iloc[0])

#Remplacement des NaN par des string vides pour faciliter la comparaison
dev_df = dev_df.fillna("")

Nombre d'incidents: 100


AttributeError: 'list' object has no attribute 'iloc'

# Formattage Données

In [None]:
def format_data_for_t5(data):
    formatted_data = []

    for item in data:
        text = item['text']
        arguments = item['arguments']
        
        for key, values in arguments.items():
            for value in values:
                # Création de la question
                question = f"What is the {key} in the incident?"
                # Formatage de la paire question-réponse pour T5
                input_text = f"question: {question} context: {text}"
                target_text = value

                formatted_data.append((input_text, target_text))
    
    return formatted_data

# Formatage des données
formatted_data = format_data_for_t5(data)

# Création DataSet

In [None]:
batch_size = 1
max_token_length = 512
learning_rate = 5e-5
epochs = 15
test_size = 0.2

In [None]:
class IncidentDataset(Dataset):
    def __init__(self, tokenizer, formatted_data, max_token_length=512):
        self.tokenizer = tokenizer
        self.data = formatted_data
        self.max_token_length = max_token_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_text, target_text = self.data[idx]

        input_encoding = self.tokenizer.encode_plus(
            input_text,
            max_length=self.max_token_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        target_encoding = self.tokenizer.encode_plus(
            target_text,
            max_length=self.max_token_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'input_ids': input_encoding['input_ids'].squeeze(0),
            'attention_mask': input_encoding['attention_mask'].squeeze(0),
            'labels': target_encoding['input_ids'].squeeze(0)
        }

# Division des données en ensembles d'entraînement et de validation
train_data, val_data = train_test_split(formatted_data, test_size=test_size, random_state=42)

In [None]:
train_dataset = IncidentDataset(tokenizer, train_data, max_token_length=max_token_length)
val_dataset = IncidentDataset(tokenizer, val_data, max_token_length=max_token_length)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
# Configuration de l'optimiseur
optimizer = AdamW(model.parameters(), lr=learning_rate)

# Fonction de perte (la perte de cross-entropy est généralement utilisée pour T5)
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# Fonctions Annexes

In [None]:
def normalize_answer(s):
    """Mettre en minuscule et retirer la ponctuation, des déterminants and les espaces."""
    def remove_articles(text):
        return re.sub(r'\b(a|an|the)\b', ' ', text)

    def white_space_fix(text):
        return ' '.join(text.split())

    def remove_punc(text):
        exclude = set(string.punctuation)
        return ''.join(ch for ch in text if ch not in exclude)

    def lower(text):
        return text.lower()

    return white_space_fix(remove_articles(remove_punc(lower(s))))

In [None]:
def f1_score(prediction, ground_truth):
    """Normalise les 2 textes, trouve ce qu'il y a en comment et estime précision, rappel et F1."""
    prediction_tokens = normalize_answer(prediction).split()
    ground_truth_tokens = normalize_answer(ground_truth).split()
    common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
    num_same = sum(common.values())
    if len(ground_truth_tokens) == 0 or len(prediction_tokens) == 0:
        return int(ground_truth_tokens == prediction_tokens)
    if num_same == 0:
        return 0
    precision = 1.0 * num_same / len(prediction_tokens)
    recall = 1.0 * num_same / len(ground_truth_tokens)
    f1 = (2 * precision * recall) / (precision + recall)
    return f1

In [None]:
def exact_match_score(prediction, ground_truth): 
    """Vérifie si les 2 textes sont quasi-identiques."""
    return (normalize_answer(prediction) == normalize_answer(ground_truth))

In [None]:
def metric_max_over_ground_truths(metric_fn, prediction, ground_truths):
    """La fonction princiaple. Important de noter que ground_truths est une liste 
       parce qu'il peut y avoir plusieurs réponses possibles."""
    scores_for_ground_truths = []
    for ground_truth in ground_truths:
        score = metric_fn(prediction, ground_truth)
        scores_for_ground_truths.append(score)
    return max(scores_for_ground_truths)

In [None]:
def metric_max_over_ground_truths(metric_fn, prediction, ground_truths):
    """La fonction princiaple. Important de noter que ground_truths est une liste 
       parce qu'il peut y avoir plusieurs réponses possibles."""
    scores_for_ground_truths = []
    for ground_truth in ground_truths:
        score = metric_fn(prediction, ground_truth)
        scores_for_ground_truths.append(score)
    return max(scores_for_ground_truths)

In [None]:
def evaluate_model(model, val_loader, device):
    model.eval()
    total_f1, total_exact_match, total_count = 0, 0, 0

    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            # Création des decoder_input_ids pour T5
            decoder_input_ids = torch.full_like(labels, tokenizer.pad_token_id)
            decoder_input_ids[:, 0] = tokenizer.eos_token_id

            # Appel du modèle avec decoder_input_ids
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, decoder_input_ids=decoder_input_ids)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=-1)

            for pred, label in zip(preds, labels):
                pred_text = tokenizer.decode(pred, skip_special_tokens=True)
                label_text = tokenizer.decode(label, skip_special_tokens=True)
                total_f1 += metric_max_over_ground_truths(f1_score, pred_text, [label_text])
                total_exact_match += metric_max_over_ground_truths(exact_match_score, pred_text, [label_text])
                total_count += 1

    return total_f1 / total_count, total_exact_match / total_count

# Entrainement Models

In [None]:
# Initialisation de la meilleure précision à 0 pour commencer
best_val_accuracy = 0

for epoch in range(epochs):
    model.train()
    total_loss = 0

    for batch in tqdm(train_loader):
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()

        loss.backward()
        optimizer.step()
        # Évaluation du modèle
        val_accuracy = evaluate_model(model, val_loader, device)
        print(f"Validation Accuracy for Epoch {epoch + 1}: {val_accuracy}")

        # Sauvegarde du modèle si la précision est améliorée
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), 'best_model.pth')
            print(f"New best model saved with accuracy: {val_accuracy}")
    val_f1, val_exact_match = evaluate_model(model, val_loader, device)
    print(f"Validation F1 Score for Epoch {epoch + 1}: {val_f1}")
    print(f"Validation Exact Match Score for Epoch {epoch + 1}: {val_exact_match}")
    print(f"Epoch {epoch + 1} completed. Average loss: {total_loss / len(train_loader)}")

# Prompts Potentiels Exemple

In [None]:
# Exemple de prompts
prompts = [
    "Est-ce que la personne est morte ou non? {text}",
    "Quelle était l'activité principale? {text}"
]

for prompt in prompts:
    formatted_data = [prompt.format(text=item["text"]) for item in data]

In [None]:
# Exemple de questions variées
questions = [
    "Quelle est la cause de l'incident?",
    "Qui a été blessé dans l'incident?"
]

for question in questions:
    input_texts = [f"question: {question} context: {item['text']}" for item in data]

# Évaluation

In [None]:
# Exemple d'évaluation
references = ["réponse de référence"]
candidates = ["réponse du modèle"]

# BLEU
bleu_score = sentence_bleu([references], candidates)

# ROUGE
rouge = Rouge()
rouge_score = rouge.get_scores(candidates, references, avg=True)

# BLEURT
bleurt_scorer = score.BleurtScorer("chemin_vers_bleurt_checkpoint")
bleurt_score = bleurt_scorer.score(references=references, candidates=candidates)

print("BLEU Score:", bleu_score)
print("ROUGE Score:", rouge_score)
print("BLEURT Score:", bleurt_score)