# üè• Affinage (*fine-tuning*) de mod√®les encodeurs PARTAGES (BERT-like) pour  l'annotation (classification de tokens)

### Installation des libraries

In [None]:
# Parfois il est plus simple d'installer torch via conda (ou d'utiliser le
# paquet pytorch). S'il y a des erreurs avec numpy installez 'numpy<2'.

# On utilise `datasets<4` car la version 4 de tokenizer ne permet plus de charger
# des jeux de donn√©es √† l'aide de scripts de chargement. Certains jeux de
# donn√©es utilisent toujours des scripts de chargements (notamment DrBenchmark).

%pip install 'transformers[torch]' 'datasets<4' tensorboardX torch
%pip install seqeval evaluate

In [None]:
# Petit hack pour √©viter un avertissement sp√©cifique √† Google Colab sur le
# chargement du jeton huggingface
try:
  from huggingface_hub.utils import _auth
  _auth._get_token_from_google_colab = lambda: None
except ImportError:
  pass

In [None]:
######################################
###########    A Modifier    #########
######################################

# Si le jeu de donn√©e, mod√®le ou tokeniseur n'est pas public sur huggingface il
# faut utiliser un jeton de connexion √† cr√©er ici : https://huggingface.co/settings/tokens

# Pour l'instant les mod√®les PARTAGES sont priv√©s et accessibles avec le jeton
#  communiqu√© avec ce fichier
HF_TOKEN = ""

# Ici on utilise un des mod√®les du projet PARTAGES
# A modifier pour utiliser un autre mod√®le comme base d'affinage
BASE_MODEL_NAME = "WP4/PARTAGES-encoder-v1"

######################################
assert HF_TOKEN != ""

### Chargement du tokeniseur

In [None]:
import os
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_NAME, token=HF_TOKEN)

### Chargement et pr√©paration des donn√©es

Chargement des donn√©es au format ConLL-2003, la colonne 'tokens' est une liste qui repr√©sente chaque mot d'un document, la colonne 'ner_tags' est de la m√™me longueur que 'tokens' et indique l'√©tiquette de chaque mot.

```python
[{'id': str, 'tokens': list(str), 'ner_tags': list(int)}, ...]
```

In [None]:
def tokenize_and_align_labels(examples, tokens_column, labels_column, label_all_tokens=False, max_length=None):
    """Retokenize each word to match current tokenizer and align new sub tokens with labels.
    Adapted from from https://colab.research.google.com/github/huggingface/notebooks/blob/master/examples/token_classification.ipynb#scrollTo=n9qywopnIrJH

    [("avec", 0), ("une", 0), ("tachycardie", 1), ("√†", 0), ("115", 0)]

    With label_all_tokens = False
    [("avec", 0), ("une", 0), ("tac", 1), ("##hy", -100), ("##card", -100), ("##ie", -100), ("√†", 0), ("11", 0), ("##5", -100)]

    With label_all_tokens = True
    [("avec", 0), ("une", 0), ("tac", 1), ("##hy", 1), ("##card", 1), ("##ie", 1), ("√†", 0), ("11", 0), ("##5", 0)]
    """
    if max_length is None:
        max_length = tokenizer.model_max_length
    tokenized_inputs = tokenizer(
        list(examples[tokens_column]),
        truncation=True, max_length=max_length,
        padding='do_not_pad', is_split_into_words=True
    )
    labels = []
    for i, label in enumerate(examples[labels_column]):
        label_ids = []
        previous_word_idx = None
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
#########################################################
###########    ZONE A Modifier POUR UTILISER    #########
###########         VOS PROPRES DONNEES         #########
##          ATTENTION AUX DONNEES PRIVEES NE PAS       ##
##              EXECUTER SUR LE CLOUD                  ##
#########################################################
# En sortie de ces cellule:
#  - label_list : liste des diff√©rentes √©tiquettes
#  - train_dataset : un Dataset tokenis√© avec une colonne "label"
#  - eval_dataset : un Dataset tokenis√© avec une colonne "label"

# L'exemple est avec le corpus DEFT2021 que l'on peut obtenir via la collection
#  de DrBenchmark (cf github.com/DrBenchmark/DrBenchmark)
# https://aclanthology.org/2024.lrec-main.478.pdf
# Chaque cas clinique est annot√© en extraction d'information au niveau token.
#  Apr√®s extraction, le corps du texte est tokenis√© via la fonction
#  preprocess_function.

import datasets

dataset = datasets.load_dataset('DrBenchmark/DEFT2021', 'ner', trust_remote_code=True)
dataset

In [None]:
label_list = dataset['train'].features[f"ner_tags"].feature.names

# Si les features du dataset ne sont pas d√©finies
#label_list = list({l for d in dataset['train']['ner_tags'] for l in d})

# Si la colonne des etiquettes contient des chaines de caract√®re il faut les transformer
#  en entier
"""id2label = {i: l for i, l in enumerate(label_list)}
label2id = {l: i for i, l in enumerate(label_list)}
dataset = dataset.map(lambda d: {'ner_tags': [label2id[l] for l in d['ner_tags']]})"""

In [None]:
# Tokenize dataset
tokenized_dataset = dataset.map(
    tokenize_and_align_labels,
    fn_kwargs={
        'tokens_column': 'tokens',
        'labels_column': 'ner_tags',
        'label_all_tokens': False,
        # On met un maximum a 8192 pour √©viter les erreurs de m√©moire li√©s au
        #  traitement de long documents mais c'est a modifier selon les besoins
        #  et capacit√©s de calcul. Certains mod√®le n'ayant pas de limite.
        'max_length': min(tokenizer.model_max_length, 8192),
    },
    batched=True, batch_size=100,
)

# Split dataset into train and validation sets
train_dataset = tokenized_dataset["train"]
eval_dataset = tokenized_dataset["validation"]

#############################################

In [None]:
import random
import textwrap

import numpy as np

# Affichage d'exemples au hasard
to_show = [15, random.randint(0, len(train_dataset))]

print(f"Etiquettes : {label_list}")
idx_train_longest = int(np.argmax([len(e) for e in train_dataset['input_ids']]))
print(f"Plus long  doc train : idx {idx_train_longest:4d}, len {len(train_dataset[idx_train_longest]['input_ids']):4d}")
idx_train_shortest = int(np.argmin([len(e) for e in train_dataset['input_ids']]))
print(f"Plus court doc train : idx {idx_train_shortest:4d}, len {len(train_dataset[idx_train_shortest]['input_ids']):4d}")

idx_eval_longest = int(np.argmax([len(e) for e in eval_dataset['input_ids']]))
print(f"Plus long  doc valid : idx {idx_eval_longest:4d}, len {len(eval_dataset[idx_eval_longest]['input_ids']):4d}")
idx_eval_shortest = int(np.argmin([len(e) for e in eval_dataset['input_ids']]))
print(f"Plus court doc valid : idx {idx_eval_shortest:4d}, len {len(eval_dataset[idx_eval_shortest]['input_ids']):4d}")

for i in to_show:
    doc = train_dataset[i]
    print(f"------ Document train {i} ------")
    print(f"Nb. tokens: {len(doc['input_ids'])}")
    tokens = ' '.join(tokenizer.convert_ids_to_tokens(doc['input_ids'])).replace('\n', '\\n')
    print(f"Tokens: {'\n'.join(textwrap.wrap(tokens, width=120, break_long_words=False, break_on_hyphens=False))}")
    print(f"Etiquettes: {doc['labels']}")

### D√©finition des m√©triques a rapporter

In [None]:
import importlib

import evaluate
from seqeval.metrics import accuracy_score, classification_report

class Seqeval(evaluate.Metric):
    def _info(self):
        return evaluate.MetricInfo(
            description="", citation="",
            homepage="https://github.com/chakki-works/seqeval",
            features=datasets.Features({
                "predictions": datasets.Sequence(datasets.Value("string", id="label"), id="sequence"),
                 "references": datasets.Sequence(datasets.Value("string", id="label"), id="sequence")
            }))

    def _compute(
        self,
        predictions,
        references,
        suffix: bool = False,
        scheme: str | None = None,
        mode: str | None = None,
        sample_weight: list[int] | None = None,
        zero_division: str | int = "warn",
    ):
        if scheme is not None:
            try:
                scheme_module = importlib.import_module("seqeval.scheme")
                scheme = getattr(scheme_module, scheme)
            except AttributeError:
                raise ValueError(f"Scheme should be one of [IOB1, IOB2, IOE1, IOE2, IOBES, BILOU], got {scheme}")
        report = classification_report(
            y_true=references,
            y_pred=predictions,
            suffix=suffix,
            output_dict=True,
            scheme=scheme,
            mode=mode,
            sample_weight=sample_weight,
            zero_division=zero_division,
        )
        report.pop("macro avg")
        report.pop("weighted avg")
        overall_score = report.pop("micro avg")

        scores = {
            type_name: {
                "precision": score["precision"],
                "recall": score["recall"],
                "f1": score["f1-score"],
                "number": score["support"],
            }
            for type_name, score in report.items()
        }
        scores["overall_precision"] = overall_score["precision"]
        scores["overall_recall"] = overall_score["recall"]
        scores["overall_f1"] = overall_score["f1-score"]
        scores["overall_accuracy"] = accuracy_score(y_true=references, y_pred=predictions)
        return scores

In [None]:
import numpy as np

# On utilise la metrique SeqEval pour √©valuer l'annotation de tokens
metric = Seqeval()

def compute_metrics(p):
    global metric
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)]

    results = metric.compute(predictions=true_predictions, references=true_labels, zero_division=.0)

    macro_values = [results[r]["f1"] for r in results if "overall_" not in r]
    macro_f1 = sum(macro_values) / len(macro_values)

    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"], "macro_f1": macro_f1,
        "accuracy": results["overall_accuracy"]
    }

### Chargement du mod√®le de base


Maintenant que l'on a charg√© les donn√©es on connait le nombre d'√©tiquettes donc
on peut charger le mod√®le et initialiser sa couche de classification.

In [None]:
#############################################
############    A Modifier    ###############
#############################################

SEED = 42  # Pour faciliter la reproductibilit√© des r√©sultats

#############################################

In [None]:
from transformers import set_seed
from transformers import AutoModelForTokenClassification

# On initialise le g√©n√©rateur de nombres al√©atoires avant de charger le mod√®le
# pour s'assurer que l'initialisation des poids de la couche de classification
# soient les m√™mes √† chaque fois.
set_seed(SEED)
model = AutoModelForTokenClassification.from_pretrained(
    BASE_MODEL_NAME, token=HF_TOKEN, num_labels=len(label_list),
    # Pour avoir de belles etiquettes en sortie du mod√®le
    id2label = {i: l for i, l in enumerate(label_list)},
    label2id = {l: i for i, l in enumerate(label_list)},
)
print(model)

### Pr√©paration de l'entrainement

In [None]:
#############################################
############    A Modifier    ###############
#############################################

OUTPUT_MODEL_NAME = "PARTAGES-encoder-v1-annot"
OUTPUT_DIR = os.path.join(os.environ['HOME'], 'PARTAGES', OUTPUT_MODEL_NAME)

#############################################

Le r√©sultat de l'entrainement sera stock√© dans:
```
~/PARTAGES/PARTAGES-encoder-v1-annot/
- checkpoints/
  - checkpoint-10
  - checkpoint-20
  - checkpoint-30
  - checkpoint-...
- training_args.json
- model.pt
- tokenizer.
```

Ainsi les points de contr√¥les sont conserv√©s, et il sera possible de charger le meilleur mod√®le r√©sultant de l'entrainement en √©crivant:

```python
from transformers import AutoTokenizer, AutoModelForTokenClassification
model_path = os.path.join(os.environ['HOME'], "PARTAGES", "PARTAGES-encoder-v1-annot")
# /!\ Il est important d'utiliser AutoModelForTokenClassification et non
# AutoModel (qui ne va pas charger la couche de classification)
model = AutoModelForTokenClassification.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Pour charger un checkpoint sp√©cifique
model = AutoModelForTokenClassification.from_pretrained(model_path + "/checkpoints/checkpoint-20")
tokenizer = AutoTokenizer.from_pretrained(model_path)
```

In [None]:
from transformers import Trainer, TrainingArguments
from transformers import DataCollatorForTokenClassification


data_collator = DataCollatorForTokenClassification(tokenizer)

# Pour plus d'arguments
# https://huggingface.co/docs/transformers/main/en/main_classes/trainer#transformers.TrainingArguments

training_args = TrainingArguments(
    os.path.join(OUTPUT_DIR, 'checkpoints'),
    overwrite_output_dir=False,  # pour continuer l'entrainement
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2.0e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=15,
    weight_decay=0.01,
    metric_for_best_model="macro_f1",
    load_best_model_at_end=True,
    greater_is_better=True,
    logging_strategy="epoch",
    save_only_model=True,
    save_total_limit=5,
    seed=42,
    report_to='tensorboard',
)

# Pour augmenter le batch_size mais ne pas avoir d'erreur de m√©moire il est possible
#  d'utiliser le param√®tre `gradient_accumulation_steps` qui attend N batchs avant
#  d'effectuer la r√©tropropagation. Par exemple `batch_size=64, gradient_accumulation_steps=1`
#  est √©quivalent √† `batch_size=32, gradient_accumulation_steps=2` ou
#  `batch_size=16, gradient_accumulation_steps=4`
if os.path.exists(OUTPUT_DIR):
    print(f"""Le dossier de sortie existe d√©j√† !
L'entrainement va continuer s'il a √©t√© arr√™t√© ou ne rien faire si l'entrainement a termin√©.
Chemin : {OUTPUT_DIR}
Soyez s√ªr de ce qu'il se passe ou bien supprimez le dossier.""")

In [None]:
# Exemple de document pass√© par le DataCollator (ajout de padding pour que tout
#  les documents d'un m√™me lot (batch) aient la m√™me longueur
sampled_batch = train_dataset.select_columns(['input_ids', 'attention_mask', 'labels']).select(range(8))
collated_batch = data_collator(sampled_batch)
collated_doc = collated_batch['input_ids'][0]

print("Exemple de document pass√© par le Data Collator (ce que le mod√®le va voir):")
doc_to_print = ' '.join(tokenizer.convert_ids_to_tokens(collated_doc)).replace('\n', '\\n')
print('\n'.join(textwrap.wrap(doc_to_print, width=120, break_long_words=False, break_on_hyphens=False)))

In [None]:
# Supprimer ou mettre √† `False` pour un entrainement r√©el
debugging_mode = True
if debugging_mode:
    n_samples = min(len(train_dataset), len(eval_dataset))
    n_samples = min(n_samples, 20)
    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    print("!!!            Mode de debug          !!!")
    print(f"!!!  Seuls {n_samples} examples sont utilis√©s  !!!")
    print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    train_dataset = train_dataset.select(range(n_samples))
    eval_dataset = eval_dataset.select(range(n_samples))

In [None]:
from transformers import EarlyStoppingCallback

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=data_collator,
    processing_class=tokenizer,
    # Avec EarlyStopping, l'entrainement va s'arr√™ter apr√®s 3 epoch si
    #  la macro_f1 (`metric_for_best_model`) ne s'am√©liore pas
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],
    compute_metrics=compute_metrics,
)

In [None]:
trainer.train()

In [None]:
metrics = trainer.evaluate()
print(metrics)

In [None]:
import json
trainer.save_model(os.path.join(OUTPUT_DIR))
with open(os.path.join(OUTPUT_DIR, "training_args.json"), 'w') as f:
  json.dump(json.loads(training_args.to_json_string()) | {'basemodel_name_or_path': model.name_or_path}, f)

## Utilisation du mod√®le entrain√©

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
model_path = os.path.join(os.environ['HOME'], "PARTAGES", OUTPUT_MODEL_NAME)
# /!\ Il est important d'utiliser AutoModelForTokenClassification et non
# AutoModel (qui ne va pas charger la couche de classification)

In [None]:
from transformers import pipeline
print(f"Chargement du mod√®le {OUTPUT_MODEL_NAME}.")
pipe = pipeline("token-classification", model=model_path, device="cpu")

In [None]:
def preprocessing(text):
    # Si un pr√©-traitement a √©t√© effectu√© pour cr√©er le corpus d'entrainement
    #  il faut reproduire ce pr√©traitement ici.
    return text.replace("c'est", "c' est")

texts = [
    "La patiente est enrhumm√©e, et a pris du parac√©tamol.",
    "Une deuxi√®me phrase."
]
texts = [preprocessing(t) for t in texts]

# Pour le param√®tre `aggregation_strategy` voir https://huggingface.co/docs/transformers/main_classes/pipelines#transformers.TokenClassificationPipeline.aggregation_strategy
predictions = pipe(
    texts,
    batch_size=2,
    # Utilise le premier sous-token de chaque mot comme √©tiquette
    aggregation_strategy="first"
)

In [None]:
# Note: l'etiquette sosy signifie "Signes Ou SYmptomes"
for t, p in zip(texts, predictions):
    print(f"Le mod√®le a identifi√© les entit√©s suivantes:")
    print(p)
    print(f"dans le document: {t[:50]} [...]")
    print()


## Hyperparameter tuning

Suivant le temps de calcul √† votre disposition et de la taille de vos jeux de donn√©es. Il est possible de chercher de meilleurs hyperparam√®tres automatiquement √† l'aide de la biblioth√®que `optuna`.

In [None]:
%pip install optuna

On choisit ici de chercher de meilleures valeurs pour la `learning_rate`, le `weight_decay` et une meilleure `seed` qui initialise les param√®tres.

In [None]:
from optuna import Trial

def hp_space(trial: Trial) -> dict[str, float | int | str]:
    return {
        "learning_rate": trial.suggest_float("learning_rate", 2e-6, 2e-3, log=True),
        "weight_decay": trial.suggest_float("weight_decay", 0.005, 0.1, log=True),
        "seed": trial.suggest_int("seed", 1, 40),
    }

# Utilisation d'une fonction model_init pour s'assurer de la reproducibilit√©
def model_init():
    global label_list, base_model_name
    return AutoModelForTokenClassification.from_pretrained(
        base_model_name, token=HF_TOKEN, num_labels=len(label_list),
        # Pour avoir de belles etiquettes en sortie du mod√®le
        id2label = {i: l for i, l in enumerate(label_list)},
        label2id = {l: i for i, l in enumerate(label_list)},
    )

# modification du trainer d√©j√† initialis√© avant pour bien r√©-initialiser le
#  mod√®le √† chaque essai
trainer.model_init = model_init

In [None]:
best_run = trainer.hyperparameter_search(direction="maximize", hp_space=hp_space, n_trials=3)

In [None]:
best_run