# Reconnaissance du locuteur : Chirac - Mitterrand

## Paramètres du notebook

In [1]:
corpus_language = "french"
folder_path = "./datasets/AFDpresidentutf8/"
corpus_basename = "corpus.tache1"
corpus_base_path = folder_path + corpus_basename

file_name_train = corpus_base_path + ".learn.utf8"
file_name_test = corpus_base_path + ".test.utf8"
file_name_output = corpus_base_path + ".test.pred.utf8"

results_save_path = "../Reports/report_project/results/locuteur/"

In [2]:
training = {"small": 30, "medium": 200, "large": 2000}
study_config = "medium"
final_config = "large"

# minimum number of uppercase characters in a word to be considered as uppercase
# (e.g. "upper" + "HELLO" = "upperhello") if uppercase_markers is True
# during preprocessing
min_upper_char = 3

In [3]:
sklearn_working_memory = 24 * 1024

## Imports

In [4]:
# Imports bibliothèques Python
import codecs
import re
import string

In [5]:
# Imports bibliothèques externes
import numpy as np
import nltk
import sklearn

sklearn.set_config(working_memory=sklearn_working_memory)

In [6]:
# Téléchargement des ressources nécessaires à NLTK
nltk.download("punkt")
nltk.download("stopwords")

[nltk_data] Downloading package punkt to /Users/mathis/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/mathis/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [7]:
# Paramètres matplotlib
import matplotlib

matplotlib.use("pgf")
import matplotlib.pyplot as plt

matplotlib.rcParams.update(
    {
        "pgf.texsystem": "pdflatex",
        "font.family": "serif",
        "text.usetex": True,
        "pgf.rcfonts": False,
    }
)

# 1) Chargement des données

In [8]:
def load_pres(path):
    texts_list, labels_list = [], []
    regex_pattern = r"<[0-9]*:[0-9]*(:(?P<locuteur>C|M))?> (?P<paroles>.*)"
    compiled_regex_pattern = re.compile(pattern=regex_pattern)
    s = codecs.open(path, "r", "utf-8")
    for text in s.readlines():
        regex_search = re.search(pattern=compiled_regex_pattern, string=text)
        if regex_search.group("locuteur") is not None:
            if regex_search.group("locuteur") == "C":
                labels_list.append("Chirac")
            elif regex_search.group("locuteur") == "M":
                labels_list.append("Mitterrand")
            else:
                raise ValueError("Locuteur inconnu")
        if regex_search.group("paroles") is not None:
            texts_list.append(regex_search.group("paroles"))
    return texts_list, labels_list

In [9]:
texts, labels = load_pres(file_name_train)

## Données non équilibrées

In [10]:
n = len(labels)
print(n, "exemples")
for president in ["Mitterrand", "Chirac"]:
    count = labels.count(president)
    percentage = count / n * 100
    print(f"Exemples {president} : {count} ({percentage:.2f}%)")

57413 exemples
Exemples Mitterrand : 7523 (13.10%)
Exemples Chirac : 49890 (86.90%)


# 2) Traitement des données

## a) Prétraitement

In [11]:
# A pattern to match continuous uppercase substrings,
# including accented uppercase characters
# Only matches substrings of min_upper_char or more characters
upper = r"\b(?:[A-ZÀ-ÖØ-öø-ÿĀ-ſƀ-ɏḀ-ỿꜢ-ꞇﬀ-ﬆＡ-Ｚ]{{{},}})\b"
upper_case_pattern = upper.format(min_upper_char)
upper_case_pattern_compiled = re.compile(pattern=upper_case_pattern)
upper_replacement = lambda m: "upper" + m.group(0).lower()


def custom_preprocessor(
    text, remove_punctuation=True, remove_numbers=True, uppercase_markers=False
):
    if uppercase_markers:
        text = re.sub(
            pattern=upper_case_pattern_compiled, repl=upper_replacement, string=text
        )

    if remove_punctuation:
        text = text.translate(str.maketrans("", "", string.punctuation))

    if remove_numbers:
        text = text.translate(str.maketrans("", "", string.digits))

    return text

## b) Tokenization

In [12]:
def custom_tokenizer(text, stemmer=None, lemmatizer=None):
    from nltk.tokenize import word_tokenize

    tokens = word_tokenize(text, language=corpus_language)

    assert (
        stemmer is None or lemmatizer is None
    ), "Only one of stemmer or lemmatizer can be used."

    if stemmer is not None:
        tokens = [stemmer.stem(token) for token in tokens]

    if lemmatizer is not None:
        tokens = [lemmatizer.lemmatize(token) for token in tokens]

    return tokens

## c) Transformation occurrences -> TF-IDF

Sera systématiquement appliqué si un modèle utilisant LSA comme réduction de dimension est utilisé

In [13]:
from sklearn.feature_extraction.text import TfidfTransformer

param_tfidftransformer = {
    "transformer": [TfidfTransformer(smooth_idf=True, sublinear_tf=True, use_idf=True)],
    "transformer__norm": ["l1", "l2", None],
}

param_grid_tfidf_transformer = [
    {
        "transformer": [None],
    },
    param_tfidftransformer,
]

## d) Réduction de la dimension

Ne sera pas employé sur le classifieur Naive Bayes

In [14]:
from scipy.stats import uniform, norm, randint

In [15]:
from sklearn.decomposition import TruncatedSVD

param_grid_dim_reduction = [
    {
        "dim_reduction": [None],
    },
    {
        "dim_reduction": [TruncatedSVD()],
        "dim_reduction__n_components": randint(10, 30),
        "dim_reduction__algorithm": ["randomized", "arpack"],
    },
]

# 3) Modèles étudiés

## a) Classifieur Naive Bayes

In [16]:
from sklearn.naive_bayes import MultinomialNB

# fit_prior=True car les données sont stratifiées
# ne sera pas forcément le cas en production, cf. 9)
multinomial_classifier = MultinomialNB(fit_prior=True)
nb_regul_distrib = uniform(loc=0.8, scale=0.2)
param_grid_nb = {
    "classifier": [multinomial_classifier],
    "classifier__alpha": nb_regul_distrib,
}

## b) Classifieur SVM

In [17]:
from sklearn.svm import LinearSVC

linear_svc_classifier = LinearSVC(class_weight="balanced", dual=False)
svm_regul_distrib = norm(loc=1.0, scale=0.2)
param_grid_svm = {
    "classifier": [linear_svc_classifier],
    "classifier__C": svm_regul_distrib,
    "classifier__penalty": ["l1", "l2"],
}

## c) Classifieur régression logistique

In [18]:
from sklearn.linear_model import LogisticRegression

log_reg_classifier = LogisticRegression(class_weight="balanced", solver="liblinear")
log_reg_regul_distrib = norm(loc=1.0, scale=0.2)
param_grid_log_reg = {
    "classifier": [log_reg_classifier],
    "classifier__C": log_reg_regul_distrib,
    "classifier__penalty": ["l1", "l2"],
}

## Grille de paramètres des classifieurs

In [19]:
param_grid_classifier = [
    param_grid_nb,
    param_grid_svm,
    param_grid_log_reg,
]

# 4) Metrique de selection de modèles

In [20]:
from sklearn.metrics import make_scorer
from sklearn.metrics import fbeta_score

# from sklearn.metrics import roc_auc_score
# from sklearn.metrics import average_precision_score, balanced_accuracy_score

# average precision score
# scorer = make_scorer(average_precision_score)
# average recall score
# scorer = make_scorer(balanced_accuracy_score)

# area under the ROC curve
# scorer = make_scorer(score_func=roc_auc_score, needs_proba=True)

On choisit beta=1.5 car on veut être sûr d'avoir un recall plus élevé pour Mitterrand

In [21]:
scorer = make_scorer(fbeta_score, beta=1.5, pos_label="Mitterrand")

# 5) Pipeline

## a) Assemblage du pipeline

In [22]:
from sklearn.feature_extraction.text import CountVectorizer

In [23]:
from sklearn.pipeline import Pipeline

pipeline = Pipeline(
    steps=
    [
        ("vectorizer", CountVectorizer(token_pattern=None)),
        ("transformer", None),
        ("dim_reduction", None),
        ("scaler", None),
        ("classifier", None),
    ],
    verbose=True,
)

## b) Grille de recherche

In [24]:
from sklearn.preprocessing import StandardScaler

In [25]:
from nltk.stem import SnowballStemmer

In [26]:
param_grid = [
    {
        "vectorizer__binary": [True, False],
        "vectorizer__lowercase": [True, False],
        "vectorizer__max_features": [None, 2000, 5000, 10000],
        "vectorizer__strip_accents": ["ascii", "unicode", None],
        "vectorizer__preprocessor": [
            lambda text: custom_preprocessor(
                text,
                remove_punctuation=remove_punct,
                remove_numbers=remove_nums,
                uppercase_markers=uppercase_marks,
            )
            for remove_punct in [True, False]
            for remove_nums in [True, False]
            for uppercase_marks in [True, False]
        ],
        "vectorizer__tokenizer": [
            lambda text: custom_tokenizer(text, stemmer=stem, lemmatizer=None)
            for stem in [None, SnowballStemmer(language=corpus_language)]
        ],
        # min_df and max_df instead of stopwords list
        "vectorizer__min_df": uniform(loc=0.0, scale=0.02),
        "vectorizer__max_df": uniform(loc=0.5, scale=0.45),
        "vectorizer__ngram_range": [(1, 1), (1, 2), (2, 2)],
        **dim_reduction_params,
        **(
            # tfidf transformer is recommended when dimensionality reduction is used
            transformer_params
            if dim_reduction_params["dim_reduction"] is None
            else param_tfidftransformer
        ),
        # scaler useless with multinomial naive bayes classifier
        # mean = False to avoid transforming the sparse matrix into a dense one
        "scaler": [
            None
            if isinstance(classifier_params["classifier"][0], MultinomialNB)
            else StandardScaler(with_mean=False)
        ],
        **classifier_params,
    }
    for classifier_params in param_grid_classifier
    for transformer_params in param_grid_tfidf_transformer
    for dim_reduction_params in (
        [param_grid_dim_reduction[0]]
        if isinstance(classifier_params["classifier"][0], MultinomialNB)
        else param_grid_dim_reduction[1:]
    )
]

# 6) Recherche du meilleur modèle

## Split train/test

In [27]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    texts, labels, test_size=0.2, stratify=labels, random_state=42, shuffle=True
)

## Stratified K-Fold

In [28]:
from sklearn.model_selection import StratifiedKFold

cv = StratifiedKFold(n_splits=4, shuffle=True, random_state=42)

## Recherche : Halving Random Search

In [None]:
# noinspection PyUnresolvedReferences
from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import HalvingRandomSearchCV

search = HalvingRandomSearchCV(
    estimator=pipeline,
    param_distributions=param_grid,
    scoring=scorer,
    cv=cv,
    factor=2,
    n_candidates=training[study_config],
    min_resources=4000,
    max_resources="auto",
    aggressive_elimination=True,
    n_jobs=-1,
    refit=True,
    verbose=1,
)

fitted_search = search.fit(X_train, y_train)

# 7) Comparaison des modèles

## Sauvegarde des résultats

In [30]:
import pandas as pd


def save_results(model_search, name: str):
    results = pd.concat(
        [
            pd.DataFrame(model_search.cv_results_["params"]),
            pd.DataFrame(
                model_search.cv_results_["mean_test_score"], columns=["score"]
            ),
            pd.DataFrame(
                model_search.cv_results_["std_test_score"], columns=["std_score"]
            ),
            pd.DataFrame(model_search.cv_results_["rank_test_score"], columns=["rank"]),
        ],
        axis=1,
    )
    results.sort_values("rank", axis=0, ascending=True, inplace=True, kind="quicksort")
    results.to_csv(name + ".csv")
    return results

In [31]:
study_results_file_name = "Study_HalvingRandomSearchCV_results"
study_results_save_path = results_save_path + study_results_file_name
study_results = save_results(fitted_search, study_results_save_path)

## Affichage des résultats de la cross-validation

In [32]:
# Get the cv results
cv_results = fitted_search.cv_results_

# Extract mean and std of test scores
mean_scores = fitted_search.cv_results_["mean_test_score"]
std_scores = fitted_search.cv_results_["std_test_score"]

top_indices = np.argsort(mean_scores)[::-1][: training[study_config]]
top_mean_scores = mean_scores[top_indices]
top_std_scores = std_scores[top_indices]

In [33]:
plt.figure()
x = np.arange(training[study_config]) + 1
plt.plot(x, top_mean_scores, label="Mean score")
plt.fill_between(
    x,
    top_mean_scores - top_std_scores,
    top_mean_scores + top_std_scores,
    alpha=0.2,
    label="±1 std",
)

plt.xlabel("models")
plt.ylabel("Validation score")
plt.title(
    "Mean validation scores and standard deviations of top {} models".format(
        training[study_config]
    )
)
plt.legend(loc="upper right")
model_rank_global_file_name = "model_rank_global.pgf"
model_rank_save_path = results_save_path + model_rank_global_file_name
plt.savefig(model_rank_save_path)
plt.close()

In [34]:
# Number of top models to plot
n_top_models = min(80, training[study_config] // 3)

# Classifier colors
classifier_colors = {
    "MultinomialNB": "blue",
    "LinearSVC": "green",
    "LogisticRegression": "red",
}

# Get the indices of the top n models sorted by mean_test_score
top_n_indices = np.argsort(cv_results["mean_test_score"])[-n_top_models:][::-1]

# Plot the mean validated score and standard deviation for the top n models
fig, ax = plt.subplots()

for i, index in enumerate(top_n_indices):
    mean_score = cv_results["mean_test_score"][index]
    std_score = cv_results["std_test_score"][index]
    classifier_type = cv_results["param_classifier"][index].__class__.__name__
    color = classifier_colors.get(classifier_type, "black")
    ax.errorbar(
        i + 1,
        mean_score,
        yerr=std_score,
        fmt="",
        color=color,
        label=f"{classifier_type}",
    )

handles, ax_labels = ax.get_legend_handles_labels()
by_label = dict(zip(ax_labels, handles))
ax.legend(by_label.values(), by_label.keys(), loc="upper right")

ax.set_xlabel("models")
ax.set_ylabel("Validation score")
ax.set_title(
    "Mean validation scores and standard deviations for the top {} models".format(
        n_top_models
    )
)

model_rank_classifier_file_name = "model_rank_classifier.pgf"
model_rank_classifier_save_path = results_save_path + model_rank_classifier_file_name
plt.savefig(model_rank_classifier_save_path)

plt.close()

# 8) Meilleur pipeline

## Meilleur score

In [35]:
best_pipeline = fitted_search.best_estimator_
best_classifier = best_pipeline.named_steps["classifier"]

print("Best pipeline:", best_pipeline)
print("Best classifier:", best_classifier)

Best pipeline: Pipeline(steps=[('vectorizer',
                 CountVectorizer(max_df=0.7056999405180515,
                                 min_df=0.0023172701772555125,
                                 ngram_range=(1, 2),
                                 preprocessor=<function <listcomp>.<listcomp>.<lambda> at 0x136b59e40>,
                                 token_pattern=None,
                                 tokenizer=<function <listcomp>.<listcomp>.<lambda> at 0x136b5a2a0>)),
                ('transformer', TfidfTransformer(norm=None, sublinear_tf=True)),
                ('dim_reduction', None), ('scaler', None),
                ('classifier', MultinomialNB(alpha=0.833728345921864))],
         verbose=True)
Best classifier: MultinomialNB(alpha=0.833728345921864)


## Evaluation finale du meilleur modèle

In [36]:
y_pred = best_pipeline.predict(X_test)

In [37]:
from sklearn.metrics import classification_report

print("Classification report:")
print(classification_report(y_test, y_pred, digits=3))

Classification report:
              precision    recall  f1-score   support

      Chirac      0.947     0.792     0.862      9978
  Mitterrand      0.339     0.708     0.458      1505

    accuracy                          0.781     11483
   macro avg      0.643     0.750     0.660     11483
weighted avg      0.867     0.781     0.809     11483



## Matrice de confusion

In [38]:
from sklearn.metrics import ConfusionMatrixDisplay

ConfusionMatrixDisplay.from_predictions(
    y_test,
    y_pred,
    normalize="all",
    cmap="winter",
    values_format=".3f",
    text_kw={"fontsize": 18, "color": "red", "fontweight": "bold"},
)

<sklearn.metrics._plot.confusion_matrix.ConfusionMatrixDisplay at 0x163339a10>

## Courbe ROC / Precision-Recall / DET

In [39]:
from sklearn.metrics import RocCurveDisplay

RocCurveDisplay.from_estimator(
    best_pipeline, X_test, y_test, name="ROC curve", drop_intermediate=True
)

plt.savefig(results_save_path + "roc_curve.pgf")
plt.close()

In [40]:
from sklearn.metrics import PrecisionRecallDisplay

PrecisionRecallDisplay.from_estimator(
    best_pipeline, X_test, y_test, name="Precision-Recall curve"
)

plt.savefig(results_save_path + "precision_recall_curve.pgf")
plt.close()

In [41]:
from sklearn.metrics import DetCurveDisplay

DetCurveDisplay.from_estimator(
    best_pipeline, X_test, y_test, name="DET curve", pos_label="Mitterrand"
)

plt.savefig(results_save_path + "det_curve.pgf")
plt.close()

# 9) Application sur les données sans labels

## Nouvelle recherche avec cross-validation sur l'ensemble des données
Le modèle final ne sera pas évalué sur des données de test.
L'évaluation se fera lors de la soumission des prédictions sur le serveur de test

In [42]:
final_search = fitted_search.set_params(
    **{"n_candidates": training[final_config],
       "factor": 1.5,
       "refit": True
       }
)

In [None]:
final_fitted_search = final_search.fit(texts, labels)

In [44]:
final_results_file_name = "Final_HalvingRandomSearchCV_results"
final_results_save_path = results_save_path + study_results_file_name
final_results = save_results(final_fitted_search, final_results_save_path)

In [45]:
final_model = final_fitted_search.best_estimator_

In [46]:
# we may have more Mitterrand than Chirac to detect when the model is in production
# we don't know if the data are identically distributed in the test set
# Therefore fit_prior=False for Naive Bayes would be better in this case, cf. 3)a) above
if isinstance(final_model.named_steps["classifier"], MultinomialNB):
    final_model.named_steps["classifier"].fit_prior = False

## Prédiction

In [47]:
texts, _ = load_pres(file_name_test)
y_pred_test = final_model.predict(texts)

# Sauvegarde des prédictions
with open(file_name_output, "w") as f:
    for president in y_pred_test:
        f.write(president[0] + "\n")
