In [36]:
import re
import numpy as np
import random
import torch
import pandas as pd
from collections import Counter, defaultdict
import xml.etree.ElementTree as ET
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score

# WSD

Ce notebook permet d'effectuer des tests de comparaison des performances d'apprentissage supervisée et semi-supervisée pour la tâche de Word Sense Disambiguation. Nous développerons deux méthodes de classification : un MLP pour la classification supervisée et un constrained K-means pour la classification semi-supervisée. Nous effectuerons plusieurs tests en considérant plusieurs mots à désambiguiser pour lesquels nous évaluerons les performances de ces deux méthodes.

In [37]:
from collections import Counter, defaultdict
import re
import xml.etree.ElementTree as ET
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
import numpy as np
from sklearn.metrics import accuracy_score
import random
import pandas as pd

## Classification supervisé

On construit une classe Classifieur qui va nous permettre d'accéder aux différents classifieurs (un classifieur pour un mot ambigü).

Cette classe possède les méthodes suivantes :  
        1 : extract_examples() : pour extraire les données d'entraînement et de test   
        2 : extract_embeddings() : pour extraire les embeddings à partir d'un fichier crée en amont, qui ne regroupe que les embeddings nécéssaires à notre jeu de données  
        3 : look_up() : pour effectuer l'étape de look-up avant la classifcation en elle-même  
        4 : select_examples() :  pour sélectionner des données représentatives lorsqu'on ne considère pas toutes les données annotées i.e. chaque étiquette présente dans les données est présente au moins une fois dans le set d'entraînement 
        5 : classify() : pour afficher et évaluer la classification  
        6 : get_mean_accuracy() : qui renvoie une accuracy moyenne pour plusieurs tests de classification  
        7 : test_classifications() : pour comparer différents classifieurs en prenant en considération de moins en moins de données annotées

In [140]:
class Classifieurs :
    
    def __init__(self,data_path,gold_path,embeddings_path,context_size):
        
        #récupération des données XML
        tree = ET.parse(data_path)
        data_file = tree.getroot()[0]

        #récupération des données .txt
        gold_file = open(gold_path, "r",encoding="utf-8")
        
        self.w2examples, self.w2senses = self.extract_examples_and_senses(data_file,gold_file,context_size)
        self.w2emb = self.extract_embeddings(embeddings_path)
    
    def extract_examples_and_senses(self,data_file, gold_file, context_size):
        """Extract the data from the files.

        Args:
            data_file (Element): Sentences
            gold_file (TextIOWrapper): Golds keys

        Returns:
            dictionary: associates the list of context vectors corresponding to the instance
            dictionary : associates to the word each senses
        """
    
        w2examples={}
        w2senses = defaultdict(set)
        
        for (sentence,gold_line) in zip(data_file,gold_file.readlines()) :
            
            #pour chaque phrase, on initialise deux listes qui permettront de respecter les tailles des contextes (+10,-10)
            context_before = []
            context_after = []
            context = []
            
            #on boucle sur les mots de la phrase pour construire les listes
            #on cherche l'instance et on repart en arrière pour constuire le contexte avant
            i_instance = 0
            while sentence[i_instance].tag != "instance" : 
                i_instance+=1
            
            instance = sentence[i_instance].attrib["lemma"].lower()
            
            if instance not in w2examples : 
                w2examples[instance] = []
            
            #on vérifie la longueur des phrases pour ne pas soulever d'erreur
            
            #context_before 
            
            #si le contexte avant l'instance est supérieur ou égale à la taille du contexte choisie
            #on ajoute à la liste chaque mot aux index from i-instance-1 to i_instance-5
            if (len(sentence[:i_instance])>=context_size) :
                    for i in range(1,context_size+1) :
                        context_before.append(sentence[i_instance-i].text.lower())
            
            #sinon, on ajoute à la liste tous les mots et on ajoutera des balises pour compléter
            else :
                for i in range(1,len(sentence[:i_instance])+1) :
                    context_before.append(sentence[i_instance-i].text.lower())

            #context_after
            
            #si le contexte après l'instance est supérieur ou égale à la taille du contexte choisie
            #on ajoute à la liste chaque mot aux index from i-instance+1 to i_instance+11
            if(len(sentence[i_instance+1:])>= context_size) :
                for i in range(i_instance+1,i_instance+(context_size+1)):
                    context_after.append(sentence[i].text.lower())
            
            #sinon, on ajoute à la liste tous les mots et on ajoutera des balises pour compléter
            else :
                for i in range(i_instance+1,len(sentence)):
                    context_after.append(sentence[i].text.lower())
            
            #une fois les listes constituées, on ajoute les balises de début et de fin de phrase si nécessaire
            for i in range(context_size-len(context_before)) :
                context_before.append("<d>")
                
            for i in range(context_size-len(context_after)) :
                context_after.append("<f>")
                
            #le vecteur sera une concaténation des contextes d'avant et d'après
            context = context_before
            context.append(instance)
            context.extend(context_after)
                
            #on récupère ensuite le nombre associé au sens pour constuire l'exemple + ajouter au dictionnaire w2sense
            gold = int((re.findall("ws_[0-9]",gold_line)[0]).replace("ws_",""))
            
            w2senses[instance].add(gold)
            w2examples[instance].append((context,gold))
            
        return w2examples,w2senses
    
    def extract_embeddings(self,path_embeddings) :
        '''
        Récupère les embeddings dans le fichier générée.

        Args:
            path_embeddings (string)

        Returns:
            dictionnary: Associe à chaque mot son embedding
        '''

        f = open(path_embeddings , "r", encoding="UTF-8")

        #On récupère dans le fichier crée les embeddings pour créer un dictionnaire
        w2emb = {}
        for line in f.readlines():
            splitted_line = line.split(" ")
            word = splitted_line[0]
            embedding = list(map(float,splitted_line[1:]))
            w2emb[word] = embedding
        return w2emb

    def look_up(self,context, w2emb) :
        '''
        Remplace dans le vecteur de contexte les mots par leur embedding.

        Args:
            context (list): liste de taille (size_window*2)+1
            w2emb (dictionnary): Associe à chaque mot son embedding

        Returns:
            list : liste de taille size_embedding : BOW
        '''

        emb_size = len(list(w2emb.values())[0]) #on récupère la taille d'un embedding : 300
        context_emb = np.zeros(emb_size)
        for word in context :
            if word in w2emb :
                context_emb = np.add(context_emb, np.array(w2emb[word]))             
        return context_emb
    
    def select_examples(self,examples,senses,size):
        '''
        Choisit des examples d'entraînement représentatifs du corpus.

        Args:
            examples (list)
            n_senses (int): nombre de senses associés à l'instance
            size (float): quantité des données d'entraînement considérés

        Returns:
            list: examples qui contiennent au moins un example de chaque sense
        '''

        selected_examples = []
        
        #Pour chaque sens, on ajoute un example associé à ce sens ,au hasard
        for sense in senses :
            selected_examples.append(random.choice(list(filter((lambda example:example[1]==sense),examples))))
        
        #On calcule ensuite le nombre d'examples qu'il reste à ajouter pour atteindre la quantité de données souhaitée
        size_to_add = round(size*(len(examples)))-len(selected_examples)
        
        #On ajoute ce nombre de données (non-présentes déjà dans la liste) selectionnées au hasard
        selected_examples.extend(random.choices(list(filter((lambda example : example not in selected_examples),examples)),k=size_to_add))
        
        return selected_examples
    
    def get_sets(self, instance,data_size):
        selected_examples = self.select_examples(self.w2examples[instance],self.w2senses[instance],data_size)
        X = [self.look_up(context,self.w2emb)for context,gold in selected_examples]
        y = [gold for context,gold in selected_examples]
            
        X_train, X_test, y_train, y_test = train_test_split(X,y,train_size=0.8)
        
        self.X_train = X_train
        self.X_test = X_test
        self.y_train = y_train
        self.y_test = y_test 
        return X_train, X_test, y_train, y_test
    
    def get_classifier(self,instance,data_size) :
        """_summary_

        Args:
            instance (_type_): _description_
            data_size (_type_): _description_

        Returns:
            _type_: _description_
        """
        clf = MLPClassifier(random_state=1,hidden_layer_sizes=(100,)) 
        self.clf = clf
        X_train, X_test, y_train, y_test = self.get_sets(instance, data_size)
        clf.fit(X_train, y_train)
        return clf
        

    def classify(self,instance,data_size,affichage=True) :
        """Permet d'afficher les données de classification et de prédire.

        Args:
            instance (string): mot anbigü à désambiguïser
            data_size (float): quantité de données à considérer
            affichage (bool, optional): affichage ou non des données de classification. Defaults to True.

        Returns:
            _type_: _description_
        """
        clf = self.get_classifier(instance,data_size)
        y_pred = clf.predict(self.X_test)
        
        if affichage :
            print("instance :",instance)
            print(f'{data_size*100}% des données annotées considérées')
            print("nombre de données d'entraînement : ", len(self.X_train))
            print("étiquettes possibles pour cette instance : ", self.w2senses[instance])
            print("étiquettes présentes dans les données d'entraînement :",Counter(self.y_train))
            print("prédiction :", y_pred)
            print("gold :",self.y_test)
            print("accuracy score : ", accuracy_score(y_pred,self.y_test),"\n")
        
        return y_pred
    
    def get_mean_accuracy(self,instance,data_size,n_repeat,affichage = False):
        """Permet d'effectuer différentes classifications et de rendre une moyenne d'accuracies.

        Args:
            instance (string): mot à désambiguiser
            data_size (float): quantité de données à considérer
            n_repeat (int): nombre de classifications tests à effectuer

        Returns:
            int: moyenne des accuracies obtenues
        """
        accuracies = []
        for i in range(n_repeat) :
            y_pred = self.classify(instance,data_size,False)
            accuracies.append(accuracy_score(y_pred,self.y_test))
            
        if affichage :
            print(instance)
            print(accuracies)
        return sum(accuracies)/len(accuracies)

    def test_classifications(self,instances,step,n_repeat,affichage=True):
        """Permet d'obtenir une accuracy moyenne pour une certaine quantité de données considérée.

        Args:
            instances (string): mot à désambiguiser
            step (float): pas de descente dans la quantité de données à considérer
            n_repeat (int): nombre de classifications tests à effectuer

        Returns:
            dictionnary: associe à chaque instance sa liste d'accuracies moyenne 
        """
        
        instance2acc = {instance : [] for instance in instances}
        data_sizes =[]
        for i in range(round(1.0/step)):
            
            data_size = 1.0 - (step*float(i))
            data_sizes.append(data_size)
            
            if affichage :
                print(data_size)
            for instance in instances :
                
                instance2acc[instance].append(self.get_mean_accuracy(instance,data_size,n_repeat,True))
                print()
        
        if affichage :

            tab = instance2acc
            tab["data_sizes"] = data_sizes 
            df = pd.DataFrame(tab)
            df.set_index("data_sizes")
            print(f"Accuracies moyennes obtenues avec {n_repeat} classifications")
            print(df)
            
        return instance2acc     


Pour créer le Classifieur, il faut définir les chemins des données, le chemin du fichier stockant les embeddings et la taille du contexte.

In [141]:
#A définir
# chemin pour récupérer les données annotées
data_path = "../donnees/FSE-1.1-191210/FSE-1.1.data.xml"
# chemin pour récupérer les gold class
gold_path = "../donnees/FSE-1.1-191210/FSE-1.1.gold.key.txt"
# choix de la fenêtre du contexte
context_size = 4
# chemin pour pouvoir faire l'opération look-up. Les embeddings sont extraits de fasttext
embeddings_path = "embeddings.txt"

In [142]:
Clf = Classifieurs(data_path,gold_path,embeddings_path,context_size)

On effectue un premier test sur le premier mot ambigü des données d'entraînement : "aboutir". On choisit de considérer 100% des données.

In [143]:
Clf.classify("aboutir",1)

instance : aboutir
100% des données annotées considérées
nombre de données d'entraînement :  20
étiquettes possibles pour cette instance :  {1, 2, 3, 4}
étiquettes présentes dans les données d'entraînement : Counter({3: 19, 1: 1})
prédiction : [3 3 3 3 3]
gold : [3, 4, 3, 2, 3]
accuracy score :  0.6 



array([3, 3, 3, 3, 3])

Dans un second temps, on effectue nos tests sur plusieurs classifieurs en choisissant un pas de descente dans la quantité des données annotées considérées. Pour chaque classifieur et chaque quantitée de données considérées, on effectue n_repeat classifications pour obtenir une accuracy moyenne représentative du classifieur. Par conséquent, pour n_repeat=5 et step=0.25, nous obtiendrons pour chaque classifieur une liste d'accuracies correspondante à la moyenne des accuracies de 5 prédictions pour 100%, 75%, 50% et 25% des données.

In [144]:
#A définir
#Nombre de classifieurs choisis au hasard à tester
n_rand_instances = 3
#Pas de descente dans la quantité de données considérées
step = 0.25
#Nombre de classifications pour un classifieur pour obtenir une accuracy moyenne
n_repeat = 10

In [145]:
instances = random.choices(list(Clf.w2examples.keys()),k=n_rand_instances)
Clf.test_classifications(instances,step,n_repeat,affichage=True)


1.0
agir
[0.8, 0.6, 0.6, 0.2, 0.8, 0.6, 1.0, 0.8, 0.6, 0.8]

prononcer
[0.8, 0.2, 0.8, 1.0, 0.6, 1.0, 0.6, 0.4, 0.8, 0.8]





changer
[0.8, 0.8, 0.8, 0.8, 0.8, 0.6, 0.8, 0.8, 0.0, 0.8]

0.75
agir
[0.5, 0.5, 0.25, 0.25, 0.5, 0.5, 0.75, 0.75, 0.5, 0.75]

prononcer
[0.5, 0.5, 0.5, 0.75, 0.5, 0.75, 0.25, 0.75, 0.25, 0.25]

changer
[0.25, 0.5, 0.5, 0.5, 0.25, 0.25, 0.25, 0.5, 0.75, 0.5]

0.5
agir
[0.6666666666666666, 0.6666666666666666, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.6666666666666666, 0.6666666666666666, 0.6666666666666666, 0.3333333333333333]

prononcer
[0.3333333333333333, 0.0, 0.6666666666666666, 0.0, 0.3333333333333333, 0.3333333333333333, 0.6666666666666666, 0.0, 0.3333333333333333, 0.3333333333333333]

changer
[0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.0, 0.3333333333333333, 0.6666666666666666, 0.3333333333333333]

0.25
agir
[0.0, 0.5, 0.5, 0.5, 0.0, 0.5, 0.0, 0.0, 0.0, 0.5]

prononcer
[0.0, 0.5, 0.0, 0.0, 0.0, 0.0, 0.5, 0.0, 0.0, 0.0]

changer
[0.0, 0.5, 0.0, 0.0, 0.0, 0.0, 0.0,

{'agir': [0.6799999999999999, 0.525, 0.4999999999999999, 0.25],
 'prononcer': [0.7, 0.5, 0.3, 0.1],
 'changer': [0.7, 0.425, 0.3333333333333333, 0.05],
 'data_sizes': [1.0, 0.75, 0.5, 0.25]}

## Classification semi-supervisée

In [146]:
class K_Means():
    ''' 
    classifieur K-means pour un mot particulier
    '''

    def __init__(self, examples):
        '''
        Instancie les différentes variables utiles pour l'algorithme du K-means

        examples : liste d'examples dont le mot à désambiguiser est le même pour 
                   chaque example
        example : couple d'un mot avec son contexte de fenêtre 4 (sous forme 
                  d'embedding) et du numéro de sens attendu du mot à désambiguiser 
                  (gold class sous forme d'integer)
                    si example = ([1.9, 2.3, 0.6], 1),
                    - le contexte avec le mot à désambiguiser et son lemme est 
                      l'embedding [1.9, 2.3, 0.6]
                    - le numéro de sens est 1
        '''

        # transforme l'ensemble des examples en une liste pour pouvoir garder le 
        # même indice pour chaque example par la suite
        self.examples = list(examples)
        # transforme les embeddings en tensors
        self.tensors_examples = [example[0] for example in self.examples]
        # détermine le nombre de sens possibles k (donc le nombre de clusters) 
        # à l'aide des données
        self.k = self.nb_senses()
        # initialisation de centroids : pour chaque sens, un example est pris au hasard
        self.tensors_centroids = [random.choice(example) 
                                  for example in self.examples_of_same_sense().values()]
        # initialisation de clusters : tous les examples sont associés au cluster 0
        self.clusters = np.zeros(len(examples))

    def nb_senses(self):
        '''
        Renvoie le nombre de sens existants dans un ensemble d'examples
        '''

        known_senses = []
        # pour chaque exemple
        for example in self.examples:
            # si le sens attendu (gold class) n'a pas encore été rencontré
            if example[1] not in known_senses:
                # l'ajoute à la liste des sens possibles
                known_senses.append(example[1])
        # renvoie le nombre de sens
        return len(known_senses)
    
    def examples_of_same_sense(self):
        '''
        Regroupe les contextes des examples dans un dictionnaire triés selon le 
        sens du mot à désambiguiser
        '''

        # clé : numéro du sens
        # valeur : liste de contextes avec ce sens en gold class
        sense2examples = {}
        # pour chaque example
        for example in self.examples:
            # si sa gold class n'a pas été déjà rencontrée
            if example[1] not in sense2examples:
                # ajoute une clé pour cette gold class
                sense2examples[example[1]] = []
            # ajoute le contexte au dictionnaire correspondant au sens utilisé
            sense2examples[example[1]].append(example[0])

        return sense2examples
    
    def learn_clusters(self):
        '''
        Algorithme de K-Means
        Retourne les coordonnées de chaque centroide ainsi que le cluster auquel 
        appartient chaque example
        '''

        # différence initialisée à Vrai
        diff = True
        
        # tant qu'il y a une différence entre l'ancienne liste et la nouvelle 
        # liste de centroides
        while diff:

            # CALCUL DES DISTANCES ENTRE CHAQUE EXAMPLE ET CHAQUE CENTROIDE

            # pour chaque couple (indice, coordonnées) dans les examples
            for i, tensor_example in enumerate(self.tensors_examples):
                # initialisation de la distance minimum à l'infini
                min_dist = float('inf')
                # pour chaque couple (indice, coordonnées) dans les centroides
                for j, tensor_centroid in enumerate(self.tensors_centroids):
                    # calcul de la distance entre cet example et ce centroide
                    d = 0
                    for k in range(len(tensor_example)):
                        d += (tensor_centroid[k].item() - tensor_example[k].item())**2
                    d = np.sqrt(d)
                    # si une distance plus faible est trouvée
                    if min_dist > d:
                        # la distance ainsi que le centroide sont stockés
                        min_dist = d
                        self.clusters[i] = j
            
            # CALCUL DES NOUVEAUX CENTROIDES

            # calcul des nouveaux centroides en utilisant le point au milieu de tous les
            # autres points du même cluster
            new_centroids = pd.DataFrame(self.tensors_examples).groupby(by = self.clusters).mean()
            # transforme ces nouveaux centroides en tensors
            tensors_new_centroids = []
            for i in range(len(new_centroids.index)):
                colums = []
                for j in range(len(new_centroids.columns)):
                    colums.append(int(new_centroids.iat[i,j]))
                tensors_new_centroids.append(torch.tensor(colums))

            # MISE A JOUR DES CENTROIDES

            count_diff = 0
            # pour chaque centroide
            for i in range(len(self.tensors_centroids)):
                # si l'ancien centroide et le nouveau ne sont pas les mêmes
                if not(torch.equal(self.tensors_centroids[i], tensors_new_centroids[i])):
                    count_diff += 1
                    # met à jour le centroide
                    self.tensors_centroids = tensors_new_centroids
            # s'il n'y a eu aucune différence entre les anciens et les nouveaux centroides, 
            # la boucle while se termine
            if count_diff == 0:
                diff = False
            
    

### on écrit encore des trucs