In [1]:
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression, RidgeClassifier, SGDClassifier, Perceptron
from sklearn.naive_bayes import MultinomialNB, ComplementNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import SGDClassifier, Perceptron, LogisticRegression, RidgeClassifier
from sklearn.svm import LinearSVC, SVC
from lightgbm import LGBMClassifier
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score, classification_report

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



# Y A UNE CELLULE EN BAS QUI IMPORTE LES DONNÉES DES CSV C'EST PLUS SIMPLE DE LANCER À PARTIR DE ÇA (et la cellule des import évidemment)

# Extraction des données

In [2]:
def get_xml_content(file):

    with open(file, "r") as f:
        soup = BeautifulSoup(f, "xml")

    return soup

In [3]:
def extract_train_info(doc):

    doc_dico = {}

    # Extraire l'id
    doc_id = doc.get("id")
    doc_dico["id"] = doc_id

   # Extraire l'évaluation
    evaluation = doc.find("EVALUATION")
    evaluation_parti = evaluation.find("EVAL_PARTI")
    doc_dico["nombre"] = evaluation_parti.get("nombre")

    parti = evaluation_parti.find("PARTI")
    doc_dico["valeur"] = parti.get("valeur")
    doc_dico["confiance"] = parti.get("confiance")

    # Extraire le texte
    texte = doc.find("p")
    doc_dico["texte"] = texte.text

    return doc_dico

In [4]:
def get_test_labels(file):

    with open(file, "r") as f:
        lines = f.readlines()

    lines = [line.split("\t") for line in lines]
    labels = {line[0]: line[1].strip() for line in lines}

    return labels

J'ai remarqué plus tard que deux textes de l'ensemble de test n'avaient pas de labels

In [8]:
labels = get_test_labels("data/ref/deft09_parlement_ref_fr.txt")
for key, value in labels.items():
    if value == "":
        print(key)

1175
4574


Je fais donc en sorte lorsque j'obtiens la df que si la valeur est égale à "" alors je ne prends pas le texte en compte.

In [9]:
def extract_test_info(doc):

    doc_dico = {}

    doc_id = doc.get("id")
    doc_dico["id"] = doc_id

    texte = doc.find("p")
    doc_dico["texte"] = texte.text

    return doc_dico

In [10]:
def get_train_and_test_df(train_file, test_file, ref_file):

    soup_train = get_xml_content(train_file)
    soup_test = get_xml_content(test_file)

    test_labels = get_test_labels(ref_file)

    all_docs = []
    for doc in soup_train.find_all("doc"):
        doc_dico = extract_train_info(doc)
        all_docs.append(doc_dico)

    df_train = pd.DataFrame(all_docs)

    all_docs = []
    for doc in soup_test.find_all("doc"):
        doc_dico = extract_test_info(doc)
        if test_labels[doc_dico["id"]] == "": # pour les deux textes sans label dans le fichier de référence
            continue
        doc_dico["valeur"] = test_labels[doc_dico["id"]]
        all_docs.append(doc_dico)

    df_test = pd.DataFrame(all_docs)

    return df_train, df_test

In [11]:
df_train, df_test = get_train_and_test_df("data/train/deft09_parlement_appr_fr.xml", "data/test/deft09_parlement_test_fr.xml", "data/ref/deft09_parlement_ref_fr.txt")

In [None]:
df_train

In [None]:
df_test

# Présentation des données

## - Doublons

### - Combien de doublons ?

In [None]:
train_texts = df_train["texte"].to_list()
test_texts = df_test["texte"].to_list()

In [None]:
def check_copy(train_texts, test_texts):

    num_copy = 0

    for text in train_texts:
        if text in test_texts:
            num_copy += 1

    return num_copy

In [None]:
num_copy = check_copy(train_texts, test_texts)
print(f"Il y a {num_copy} textes en commun entre les données d'entraînement et de test.")

### - On veut atteindre une distribution

Répartition des sets = 60 et 40

In [None]:
def get_distribution(df_train, df_test):

    total_texts = len(df_train) + len(df_test)
    percentage_train = len(df_train) / total_texts * 100
    percentage_test = len(df_test) / total_texts * 100

    return percentage_train, percentage_test

In [None]:
percentage_train, percentage_test = get_distribution(df_train, df_test)
print(f"Pourcentage de textes d'entraînement : {percentage_train:.2f}%")
print(f"Pourcentage de textes de test : {percentage_test:.2f}%")

On veut enlever les doublons tout en essayant de conserver cette distribution.

In [None]:
def balance_distribution(df_train, df_test):

    train_texts = df_train["texte"].to_list()
    test_texts = df_test["texte"].to_list()

    for text in train_texts:
        if text in test_texts:
            percentage_train, percentage_test = get_distribution(df_train, df_test)
            difference_train = abs(percentage_train - 60)
            difference_test = abs(percentage_test - 40)

            if difference_train > difference_test:
                df_test = df_test[df_test["texte"] != text]
            else:
                df_train = df_train[df_train["texte"] != text]

    return df_train, df_test

In [None]:
df_train, df_test = balance_distribution(df_train, df_test)
percentage_train, percentage_test = get_distribution(df_train, df_test)
print(f"Pourcentage de textes d'entraînement : {percentage_train:.2f}%")
print(f"Pourcentage de textes de test : {percentage_test:.2f}%")

In [None]:
train_texts = df_train["texte"].to_list()
test_texts = df_test["texte"].to_list()
num_copy = check_copy(train_texts, test_texts)
print(f"Il y a {num_copy} textes en commun entre les données d'entraînement et de test.")

### - Retirer jusqu'à 60

Nous n'avons plus de doublons mais nous restons quelque peu éloignés de la distribution 60/40. On va donc retirer des textes de l'ensemble de test pour arriver à la distribution souhaitée.

In [None]:
def reach_desired_distribution(df_train, df_test):

    while True:
        percentage_train, percentage_test = get_distribution(df_train, df_test)

        if percentage_train >= 60:
            break
        else:
            df_test = df_test.drop(df_test.index[0])


    return df_train, df_test

In [None]:
df_train, df_test = reach_desired_distribution(df_train, df_test)
percentage_train, percentage_test = get_distribution(df_train, df_test)
print(f"Pourcentage de textes d'entraînement : {percentage_train:.2f}%")
print(f"Pourcentage de textes de test : {percentage_test:.2f}%")
num_copy = check_copy(train_texts, test_texts)
print(f"Il y a {num_copy} textes en commun entre les données d'entraînement et de test.")

In [None]:
df_train.to_csv("data/train.csv", index=False)
df_test.to_csv("data/test.csv", index=False)

## Visualisation des données

Est-ce que les classes sont représentées de manière égale entre les deux ensembles ?

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def compare_class_distribution(df_train, df_test):

    total_train = len(df_train)
    total_test = len(df_test)

    train_distribution = {source: count / total_train * 100 for source, count in df_train.groupby("valeur").size().to_dict().items()}
    test_distribution = {source: count / total_test * 100 for source, count in df_test.groupby("valeur").size().to_dict().items()}

    x = np.arange(len(train_distribution))
    width = 0.4
    train_bars = plt.bar(x - 0.2, train_distribution.values(), width, label="Ensemble de train")
    test_bars = plt.bar(x + 0.2, test_distribution.values(), width, label="Ensemble de test")

    for bar in train_bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width() / 2, height, f'{round(height, 2)}', ha='center', va='bottom')

    for bar in test_bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width() / 2, height, f'{round(height, 2)}', ha='center', va='bottom')

    plt.xticks(x, train_distribution.keys(), rotation=90)
    plt.xlabel("Parti politique")
    plt.ylabel("Pourcentage de documents")
    plt.legend()
    plt.show()

compare_class_distribution(df_train, df_test)


Les répartitions sont assez égalitaires !


||   ELDR    | GUE/NGL | PPE-DE     | PSE | Verts/ALE
|:---| :---        |    :----   |          :--- | :--- | :--- |
|Nombre de documents| 3 346 | 4 482   |  11 429 | 9 066 | 3 961 |
|Pourcentage de documents|10.33%|13.84%|35.29%|27.99%|12.23%|

On a donc un corpus sans doublons mais avec une répartition et une distribution des classes et des documents fidèle au corpus originel !

# Pé-traitement des données

## - Normalisation

In [None]:
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import spacy
nlp = spacy.load("en_core_web_sm")

In [None]:
def lemmatisation(text):
    doc = nlp(text)
    return " ".join([token.lemma_ for token in doc])

In [None]:
def i_love_lowercase(text):

    return text.lower()

In [None]:
def remove_stop_words(text):

    stop_words = set(stopwords.words('english'))
    word_tokens = word_tokenize(text)
    filtered_text = [word for word in word_tokens if word not in stop_words]

    return " ".join(filtered_text)

In [None]:
def remove_punctuation(text):

    text = re.sub(r"[^\w\s]", "", text)

    return text

In [None]:
def get_beautiful_clean_text(text):

    text = i_love_lowercase(text)
    text = remove_punctuation(text)
    text = remove_stop_words(text)
    text = lemmatisation(text)

    return text

df_train["texte_nettoyé"] = df_train["texte"].map(get_beautiful_clean_text)
df_test["texte_nettoyé"] = df_test["texte"].map(get_beautiful_clean_text)

In [None]:
df_train.to_csv("data/train.csv", index=False)
df_test.to_csv("data/test.csv", index=False)


In [None]:

#### VOUS POUVEZ LANCER À PARTIR D'ICI EN IMPORTANT LES CSV ####
df_train = pd.read_csv("data/train.csv")
df_test = pd.read_csv("data/test.csv")

In [None]:
df_train.head()

## - Vectorisation

Dans l'article, on voit que les paramètres de la vectorisation jouent un rôle assez important.
Nous allons donc essayer de tester cela avec le tfidf et le count vectorizer sur le KNN, l'algorithme utilisé dans l'article.
Nous allons vectorisser de manière différentes nos données et stocker le tout dans une df assez grande. NOus allons ensuite tester le tout sur le KNN pour voir comment cela impacte nos données et jusqu'à ou on peut aller !

## - Organisation

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer, HashingVectorizer

In [None]:
vectorizers = {
    "TfidfVectorizer": {
        "model": TfidfVectorizer(),
        "hyperparameters": {
            "max_df": [0.5, 0.9, 1], # ça c'est pour ignorer les mots qui appraîssent dans 0.5 ou 0.9 des documents #defaut = 1
            "max_features": [1000, 5000, 10000, 15000], # ça c'est pour limiter le nombre de mots à 1000 ou 2000 #defaut = None
            "ngram_range": [(1, 1), (1, 2)], # ça c'est pour prendre en compte les unigrammes ou les bigrammes #defaut = (1, 1)
        }
    },
    "CountVectorizer": {
        "model": CountVectorizer(),
        "hyperparameters": {
            "max_df": [0.8, 0.9],
            "max_features": [1000, 5000, 10000, 15000],
            "ngram_range": [(1, 1), (1, 2)],
        }
    }

}

In [None]:
from itertools import product

def hyperparameter_vectorizer(vectorizers):

    param_grid = vectorizers["hyperparameters"]
    list_vectorizers = []


    if len(param_grid) == 1:
        param_name, param_values = list(param_grid.items())[0]
        for i, value in enumerate(param_values, start=1):
            params = {param_name: value}
            vectorizer = vectorizers["model"].__class__(**params)
            list_vectorizers.append(vectorizer)

    else:
        param_grid_combinations = list(product(*param_grid.values()))
        for i, combination in enumerate(param_grid_combinations, start=1):
            params = {param_name: value for param_name, value in zip(param_grid.keys(), combination)}
            vectorizer = vectorizers["model"].__class__(**params)
            list_vectorizers.append(vectorizer)

    return list_vectorizers

In [None]:
list_vectorizers = hyperparameter_vectorizer(vectorizers["TfidfVectorizer"])
list_vectorizers

## - Vectorisation des données

In [None]:
def store_vectorisations(list_vectorizers, x_train, x_test):

    all_vectorisations = []

    for i, vectorizer in enumerate(list_vectorizers):
        x_train_vectorized = vectorizer.fit_transform(x_train)
        x_test_vectorized = vectorizer.transform(x_test)
        all_vectorisations.append({"vectorizer_id": vectorizer.__class__.__name__ + str(i),
                                   "vectorizer": vectorizer,
                                   "x_train": x_train_vectorized,
                                   "x_test": x_test_vectorized})

    df_vectorisations = pd.DataFrame(all_vectorisations)

    return df_vectorisations

In [None]:
df_vectorisations = store_vectorisations(list_vectorizers, df_train["texte_nettoyé"], df_test["texte_nettoyé"])

## - Test avec KNN

In [None]:
def im_counting_on_you_knn(df_vectorisations, y_train, y_test):

    model = KNeighborsClassifier(n_neighbors=1)
    scores = []

    for i, row in df_vectorisations.iterrows():
        vectorizer = row["vectorizer"]
        x_train = row["x_train"]
        x_test = row["x_test"]
        model.fit(x_train, y_train)
        y_pred = model.predict(x_test)
        report = classification_report(y_test, y_pred, output_dict=True)
        scores.append({"vectorizer": vectorizer, "scores": report})

    return scores


In [None]:
scores = im_counting_on_you_knn(df_vectorisations, df_train["valeur"], df_test["valeur"])

In [None]:
sorted_scores = sorted(scores, key=lambda x: x["scores"]["accuracy"], reverse=True)

In [None]:
for score in sorted_scores:
    print(f"Vectorizer: {score['vectorizer'].__class__.__name__}")
    for key, value in score["scores"].items():
        print(f"\t{key}: {value}")

Résultats médiocres et le model avec la meilleure accuracy a des fscores désastreuses. PLutôt utiliser ces dernières pour l'évaluation.

# Comparaison

In [None]:
import time
def time_function(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        return result, end - start
    return wrapper

In [None]:
class Model:

    # Contient tous les modèles entrainés !
    all_models = []

    def __init__(self, name: str, model=None, vectorisation=None, scores={}, running_time=None, performance=None):
        self.name = name
        self.model = model
        self.vectorisation = vectorisation
        self.scores = scores
        self.running_time = running_time
        self.performance = performance

        Model.all_models.append(self)
        # Pour chaque modèle entraîné, on garde son nom (algo + num), le modèle et ses
        # hyperparamètres pour cet entraînement, les métriques obtenues, et son temps d'execution !

    # Pour vider la liste si nécessaire !
    @classmethod
    def reset(cls):
        cls.all_models = []

    # Entraîne le modèle
    @time_function
    def fit_model(self, x_train, y_train):
        self.model.fit(x_train, y_train)

    # Prédictions et scores
    @time_function
    def predict(self, x_test, y_test):
        y_pred = self.model.predict(x_test)
        report = classification_report(y_test, y_pred, output_dict=True)
        accuracy = accuracy_score(y_test, y_pred)
        accuracy_dict =  {"accuracy": accuracy, **report}
        self.scores = accuracy_dict
        return y_pred

      # Lance les fonctions d'entraînement et de prédiction
    # Stock les temps d'execution
    def test_model(self, x_train, y_train, x_test, y_test):
        _, fit_execution_time = self.fit_model(x_train, y_train)
        y_pred, predict_execution_time = self.predict(x_test, y_test)
        self.running_time = fit_execution_time + predict_execution_time

In [None]:
Model.reset()

In [None]:
models_dict = {
    "LogisticRegression": {"model": LogisticRegression(), "hyperparameters": {"C": [0.5, 1.0]}},
    # "SGDClassifier": {"model": SGDClassifier(), "hyperparameters": {"alpha": [0.0001, 0.001, 0.01], "loss": ["hinge", "squared_hinge"]}},
    "LinearSVC": {"model": LinearSVC(), "hyperparameters": {"C": [0.5, 1.0], "dual":[True, False]}},
    # "SVC": {"model": SVC(), "hyperparameters": {"kernel": ["poly", "sigmoid"]}},
    "RidgeClassifier": {"model": RidgeClassifier(), "hyperparameters": { "alpha": [1.0, 1.5, 2]}},
    "LGBMClassifier": {"model": LGBMClassifier(), "hyperparameters": {"max_depth": [1000], "n_estimators": [1000]}},
    "RandomForestClassifier": {"model": RandomForestClassifier(), "hyperparameters": {"max_depth": [100]}}
 }

In [None]:
from itertools import product

def hyperparameters_training(model_name, model_dict, x_train, y_train, x_test, y_test):
    param_grid = model_dict["hyperparameters"]

    if len(param_grid) == 1:
        param_name, param_values = list(param_grid.items())[0]
        for i, value in enumerate(param_values, start=1):
            params = {param_name: value}
            model = model_dict["model"].__class__(**params)
            model_obj = Model(name=f"{model_name}_{i}", model=model)
            model_obj.test_model(x_train, y_train, x_test, y_test)
    else:
        param_grid_combinations = list(product(*param_grid.values()))
        for i, combination in enumerate(param_grid_combinations, start=1):
            params = {param_name: value for param_name, value in zip(param_grid.keys(), combination)}
            model = model_dict["model"].__class__(**params)
            model_obj = Model(name=f"{model_name}_{i}", model=model)
            model_obj.test_model(x_train, y_train, x_test, y_test)

In [None]:
@time_function
def test_models(models, x_train, y_train, x_test, y_test):

    for model_name, model_dict in models.items():
        print(f"On teste le modèle {model_name} !")
        hyperparameters_training(model_name, model_dict, x_train, y_train, x_test, y_test)

In [None]:
def test_models_and_vectorisations(models, vectorisations, df_train, df_test):

    for i, row in vectorisations.iterrows():
        print(f"Vectorisation {row['vectorizer'].__class__.__name__} en cours !")
        x_train = row["x_train"]
        x_test = row["x_test"]
        test_models(models, x_train, df_train["valeur"], x_test, df_test["valeur"])

In [None]:
test_models_and_vectorisations(models_dict, df_vectorisations, df_train, df_test)

In [None]:
##### POUR VISUALISER TOUS LES RÉSULTADOS ! #####
from prettytable import PrettyTable
def table_results(models):

    bests_of_the_bests = PrettyTable(["Model", "hyperparametres", "Accuracy", "Precision", "Recall", "F1-Score", "Running Time"])


    for model_obj in models:
        if "accuracy" in model_obj.scores.keys():
            bests_of_the_bests.add_row([model_obj.name, model_obj.model, model_obj.scores["accuracy"], model_obj.scores["macro avg"]["precision"], model_obj.scores["macro avg"]["recall"], model_obj.scores["macro avg"]["f1-score"], model_obj.running_time])

    return bests_of_the_bests

table_results(Model.all_models)