# AutoComplete Algorithm


|<h2>Nltk Packages Utiles dans le traitement de corpus  </h2> |
| :------------------: |

In [11]:
import math
import nltk
import random
import pickle
import numpy as np
import pandas as pd
import heapq
import string
import re
from sklearn.model_selection import train_test_split
from typing import List, Tuple
from nltk.tokenize import TweetTokenizer
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# Téléchargement des ressources NLTK si nécessaire
nltk.download('stopwords')
nltk.download('wordnet')


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\hp\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\hp\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

|<h2>Fonctions Utiles pour le traitement du Corpus </h2> |
| :------------------: |

In [12]:
def preprocess_tweet(tweet: str) -> str:
    """
    Prétraitement d'un tweet en supprimant les emojis, les mentions d'utilisateur,
    les hashtags, les liens URL, les chiffres et la ponctuation.
    """
    # Supprimer les emojis
    tweet = re.sub(r'[^\w\s,]', '', tweet)
    
    # Supprimer les mentions d'utilisateur (@username)
    tweet = re.sub(r'@[A-Za-z0-9_]+', '', tweet)
    
    # Supprimer les hashtags (#hashtag)
    tweet = re.sub(r'#([^\s]+)', '', tweet)
    
    # Supprimer les liens URL (http://...)
    tweet = re.sub(r'http\S+', '', tweet)
    
    # Supprimer les chiffres
    tweet = re.sub(r'\d+', '', tweet)
    
    # Supprimer la ponctuation
    tweet = tweet.translate(str.maketrans('', '', string.punctuation))
    
    return tweet

def tokenize_tweet(tweet: str) -> list:
    """
    Tokenisation d'un tweet en utilisant TweetTokenizer de NLTK.
    """
    tokenizer = TweetTokenizer()
    tokens = tokenizer.tokenize(tweet)
    return tokens

def remove_stopwords(tokens: list) -> list:
    """
    Suppression des mots vides de la liste de tokens.
    """
    stop_words = set(stopwords.words('english'))
    filtered_tokens = [token for token in tokens if token.lower() not in stop_words]
    return filtered_tokens

def lemmatize_tokens(tokens: list) -> list:
    """
    Lemmatisation des tokens en utilisant WordNetLemmatizer de NLTK.
    """
    lemmatizer = WordNetLemmatizer()
    lemmatized_tokens = [lemmatizer.lemmatize(token) for token in tokens]
    return lemmatized_tokens

def preprocess_tweets_from_file(file_path: str) -> list:
    """
    Prétraitement des tweets à partir d'un fichier.
    """
    preprocessed_tweets = []
    
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            tweet = line.strip()
            preprocessed_tweet = preprocess_tweet(tweet)
            tokens = tokenize_tweet(preprocessed_tweet)
            filtered_tokens = remove_stopwords(tokens)
            lemmatized_tokens = lemmatize_tokens(filtered_tokens)
            preprocessed_tweets.append(lemmatized_tokens)
    
    return preprocessed_tweets


In [13]:
def preprocess_data(data: str) -> Tuple[List[str], List[List[str]]]:
    """
    Tokenize les données en phrases et en mots, en utilisant le prétraitement des tweets.

    Paramètres :
        data (str) : Les données d'entrée sous forme de chaîne de caractères.

    Retourne :
        Tuple[List[str], List[List[str]]] : Un tuple contenant la liste des phrases et la liste des phrases tokenisées.
    """
    def tokenizer_phrase(phrase: str) -> List[str]:
        return preprocess_tweet(phrase).split()

    phrases = data.split('\n')
    phrases = [phrase.strip() for phrase in phrases if phrase.strip()]
    phrases_tokenisees = [tokenizer_phrase(p) for p in phrases]
    # Liste pour stocker toutes les phrases tokenisées
    tokenized_sentences = []
    tokenized_sentences.extend(phrases_tokenisees)
    
    return phrases, tokenized_sentences


file_path = "C:\\Users\\hp\\Downloads\\TP2_NLP\\PA_files\\big_data_chunk_1.txt"

# Lecture du contenu du fichier
with open(file_path, 'r', encoding='utf-8') as file:
    data = file.read()

# Traitement des données en utilisant la fonction process_data
_, tokenized_sentences = preprocess_data(data)


# Affichage de la grande liste contenant toutes les phrases tokenisées
print("Toutes les phrases tokenisées dans une grande liste:")
#for i in tokenized_sentences: 
    #print(i)

Toutes les phrases tokenisées dans une grande liste:


|<h2>Partitionnement des données tokénisées en train et test </h2> |
| :------------------: |

In [14]:
train, test = train_test_split(tokenized_sentences, test_size=0.1, random_state=42)

In [15]:
def count_the_words(sentences) -> 'dict':
    # Création d'un dictionnaire de comptage des mots
    word_counts = {}

    # Itération sur les phrases
    for sentence in sentences:
        # Itération sur les jetons
        for token in sentence:
            # Ajout d'un compteur pour un nouveau mot
            if token not in word_counts.keys():
                word_counts[token] = 1
            # Incrémentation du compteur
            else:
                word_counts[token] += 1

    return word_counts


def handling_oov(tokenized_sentences, count_threshold) -> 'list':
    # Liste vide pour le vocabulaire fermé
    closed_vocabulary = []

    # Obtention du dictionnaire de fréquences à l'aide de la fonction définie précédemment
    words_count = count_the_words(tokenized_sentences)

    # Itération sur les mots et leurs comptes
    for word, count in words_count.items():
        # Ajout si le mot est plus (ou égal) au seuil
        if count >= count_threshold:
            closed_vocabulary.append(word)

    return closed_vocabulary



In [16]:
def unk_tokenize(tokenized_sentences, vocabulary, unknown_token="<unk>") -> 'list':
    # Convertir le vocabulaire en un ensemble
    vocabulary = set(vocabulary)

    # Créer une liste vide pour les phrases
    new_tokenized_sentences = []

    # Itération sur les phrases
    for sentence in tokenized_sentences:
        # Itération sur la phrase et ajout de <unk> si le jeton est absent du vocabulaire
        new_sentence = [token if token in vocabulary else unknown_token for token in sentence]
        # Ajout de la phrase à la nouvelle liste
        new_tokenized_sentences.append(new_sentence)

    return new_tokenized_sentences


|<h2>Nettoyage des données</h2> |
| :------------------: |
| Cette fonction cleansing effectue le nettoyage des données en suivant les étapes suivantes :
Obtenir le Vocabulaire Restreint : Utilise la fonction handling_oov(train_data, count_threshold) pour obtenir le vocabulaire restreint à partir des données d'entraînement. Les mots qui apparaissent moins de count_threshold fois sont considérés comme hors vocabulaire (OOV).
Mise à Jour des Données d'Entraînement : Utilise la fonction unk_tokenize(train_data, vocabulary) pour remplacer les mots hors vocabulaire dans les données d'entraînement par le token "<unk>".
Mise à Jour des Données de Test : Utilise la même fonction pour remplacer les mots hors vocabulaire dans les données de test par le token "<unk>".
Retourner les Nouvelles Données et le Vocabulaire : Retourne les données d'entraînement et de test mises à jour, ainsi que le vocabulaire restreint.|

In [17]:
def cleansing(train_data, test_data, count_threshold):
    
  # Get closed Vocabulary
  vocabulary = handling_oov(train_data, count_threshold)
    
  # Updated Training Dataset
  new_train_data = unk_tokenize(train_data, vocabulary)
    
  # Updated Test Dataset
  new_test_data = unk_tokenize(test_data, vocabulary)

  return new_train_data, new_test_data, vocabulary

min_freq = 3
final_train, final_test, vocabulary = cleansing(train, test, min_freq)


|<h2>Comptage des n-grammes(1,2,3)</h2> |
| :------------------: |
| Cette fonction count_n_grams itère sur chaque phrase dans les données, ajoute les tokens de début et de fin appropriés, puis compte chaque n-gramme en itérant sur la phrase et en l'ajoutant au dictionnaire des n-grammes avec son compte. Enfin, elle retourne ce dictionnaire selon le nombre de grammes specifié 'n'.|

In [20]:

def count_n_grams(data, n, start_token="<s>", end_token="</s>") -> 'dict':
    # Dictionnaire vide pour les n-grammes
    n_grams = {}

    # Itération sur toutes les phrases dans les données
    for sentence in data:
        # Ajout de n jetons de départ et d'un jeton de fin à la phrase
        sentence = [start_token] * (n-1) + sentence + [end_token]
        # Conversion de la phrase en un tuple
        sentence = tuple(sentence)

        # Variable temporaire pour stocker la longueur de début de l'gramme n à la fin
        m = len(sentence) if n == 1 else len(sentence) - 1

        # Itération sur cette longueur
        for i in range(m):
            # Obtention de l'gramme n
            n_gram = sentence[i:i + n]
            # Ajout du compte de l'gramme n à notre dictionnaire
            # Si l'gramme n est déjà présent
            if n_gram in n_grams.keys():
                n_grams[n_gram] += 1
            # Ajout du compte de l'gramme n
            else:
                n_grams[n_gram] = 1

    return n_grams

# Comptage des unigrammes
unigram_counts = count_n_grams(final_train, n=1)
# Comptage des bigrammes
bigram_counts = count_n_grams(final_train, n=2)
# Comptage des trigrammes
trigram_counts = count_n_grams(final_train, n=3)

#print("Unigram Counts:", unigram_counts)
print("Bigram Counts:", bigram_counts)
#print("Trigram Counts:", trigram_counts)


Bigram Counts: {('<s>', 'make'): 3, ('make', 'it'): 26, ('it', 'crack'): 1, ('crack', 'then'): 1, ('then', 'nigga'): 1, ('nigga', 'at'): 1, ('at', 'least'): 19, ('least', 'tweet'): 1, ('tweet', 'sum'): 1, ('sum', 'pg'): 1, ('pg', 'dead'): 1, ('dead', '<unk>'): 3, ('<unk>', 'a'): 120, ('a', '<unk>'): 524, ('<unk>', 'lol'): 29, ('lol', '</s>'): 85, ('<s>', 'trying'): 3, ('trying', 'to'): 33, ('to', 'finish'): 6, ('finish', 'this'): 2, ('this', '<unk>'): 53, ('<unk>', 'so'): 45, ('so', 'i'): 4, ('i', 'can'): 9, ('can', 'call'): 1, ('call', '<unk>'): 5, ('<unk>', 'but'): 71, ('but', 'thats'): 2, ('thats', 'not'): 2, ('not', 'happening'): 1, ('happening', '</s>'): 3, ('<s>', 'happy'): 9, ('happy', 'birthday'): 9, ('birthday', 'We'): 1, ('We', 'met'): 2, ('met', 'at'): 1, ('at', 'Ace'): 1, ('Ace', 'hotel'): 1, ('hotel', '<unk>'): 3, ('<unk>', '</s>'): 2046, ('<s>', 'Looking'): 20, ('Looking', 'back'): 1, ('back', 'i'): 1, ('i', 'really'): 6, ('really', 'didnt'): 1, ('didnt', 'even'): 1, ('ev

|<h2>Probabilités des n-grammes</h2> |
| :------------------: |
|
<h3> prob_for_single_word() :</h3> Cette fonction calcule la probabilité d'un mot donné sachant un n-gramme précédent. Elle prend en entrée le mot à évaluer, le n-gramme précédent, les comptes des n-grammes et des n+1-grammes, la taille du vocabulaire, et un paramètre de lissage add-k optionnel. Elle retourne la probabilité du mot donné sachant le n-gramme précédent.<h3> 
prob() : </h3> Cette fonction calcule les probabilités de chaque mot dans le vocabulaire, sachant un n-gramme précédent. Elle prend en entrée le n-gramme précédent, les comptes des n-grammes et des n+1-grammes, le vocabulaire, et un paramètre de lissage add-k optionne). Elle retourne un dictionnaire où chaque mot du vocabulaire est associé à sa probabilité sachant le n-gramme précéden. .|

In [21]:
def prob_for_single_word(word, previous_n_gram, n_gram_counts, nplus1_gram_counts, vocabulary_size, k=0.01) -> 'float':
    # Convertir previous_n_gram en un tuple
    previous_n_gram = tuple(previous_n_gram)

    # Calcul du nombre de previous_n_gram, s'il existe dans notre dictionnaire de fréquences, sinon zéro
    previous_n_gram_count = n_gram_counts[previous_n_gram] if previous_n_gram in n_gram_counts else 0

    # Le dénominateur
    denom = previous_n_gram_count + k * vocabulary_size

    # previous n-gram plus le mot actuel comme un tuple
    nplus1_gram = previous_n_gram + (word,)

    # Calcul du nombre de nplus1_gram, s'il existe dans notre dictionnaire de fréquences, sinon zéro
    nplus1_gram_count = nplus1_gram_counts[nplus1_gram] if nplus1_gram in nplus1_gram_counts else 0

    # Numérateur
    num = nplus1_gram_count + k

    # Fraction finale
    prob = num / denom
    return prob


def probs(previous_n_gram, n_gram_counts, nplus1_gram_counts, vocabulary, k=0.1) -> 'dict':
    # Convertir en tuple
    previous_n_gram = tuple(previous_n_gram)

    # Ajouter les jetons de fin et inconnus au vocabulaire
    vocabulary = vocabulary + ["</s>", "<unk>"]

    # Calcul de la taille du vocabulaire
    vocabulary_size = len(vocabulary)

    # Dictionnaire vide pour les probabilités
    probabilities = {}

    # Itération sur les mots
    for word in vocabulary:
        # Calcul de la probabilité
        probability = prob_for_single_word(word, previous_n_gram,
                                           n_gram_counts, nplus1_gram_counts,
                                           vocabulary_size, k=k)
        # Créer la correspondance : mot -> probabilité
        probabilities[word] = probability

    return probabilities



|<h2>AutoComplétement et Suggestions</h2> |
| :------------------: |
|
<h3> auto_complete() :</h3> Cette fonction génère des suggestions de mots en fonction des tokens précédents, des comptes des n-grammes et des n+1-grammes, du vocabulaire, d'un paramètre de lissage add-k optionnel, du nombre de suggestions souhaitées, et d'un préfixe éventuel pour restreindre les suggestions à celles commençant par ce préfixe. Elle utilise la fonction probs pour calculer les probabilités des mots sachant le n-gramme précédent, puis sélectionne les meilleures suggestions en fonction de ces probabilités.
<h3> get_suggestions() : </h3>Cette fonction génère des suggestions en utilisant plusieurs modèles de langage n-gramme. Elle prend en entrée les tokens précédents, une liste de comptes des n-grammes (probablement de différentes tailles de n), le vocabulaire, les paramètres de lissage add-k et le nombre de suggestions souhaitées, ainsi que le préfixe éventuel. Elle itère sur chaque paire de modèles n-gramme consécutifs pour obtenir des suggestions à partir de chaque modèle, puis retourne une liste de listes de suggestions pour chaque modèle. |

In [22]:
def auto_complete(previous_tokens, n_gram_counts, nplus1_gram_counts, vocabulary, k=0.01, num_suggestions=3, start_with=None):
    # Length of previous words
    n = len(list(n_gram_counts.keys())[0]) 
    
    # Most recent 'n' words
    previous_n_gram = previous_tokens[-n:]
    
    # Calculate probabilty for all words
    probabilities = probs(previous_n_gram, n_gram_counts, nplus1_gram_counts, vocabulary, k=k)

    # Initialize a suggestion heap
    suggestion_heap = []

    # Iterate over all words and probabilities, adding to heap
    # We also add a check if the start_with parameter is provided
    for word, prob in probabilities.items():
        if start_with is not None and not word.startswith(start_with):
            continue
        heapq.heappush(suggestion_heap, (-prob, word))

    # Retrieve the top k suggestions
    suggestions = []
    for _ in range(num_suggestions):
        if suggestion_heap:
            prob, word = heapq.heappop(suggestion_heap)
            suggestions.append((word, -prob))

    return suggestions

def get_suggestions(previous_tokens, n_gram_counts_list, vocabulary, k=0.01, num_suggestions=3, start_with=None):
    # See how many models we have
    count = len(n_gram_counts_list)
    
    # Empty list for suggestions
    suggestions = []
    
    # Loop over counts
    for i in range(count-1):
        # Get n and nplus1 counts
        n_gram_counts = n_gram_counts_list[i]
        nplus1_gram_counts = n_gram_counts_list[i+1]
        
        # Get suggestions 
        suggestion = auto_complete(previous_tokens, n_gram_counts,
                                    nplus1_gram_counts, vocabulary,
                                    k=k, num_suggestions=num_suggestions, start_with=start_with)
        # Append to list
        suggestions.append(suggestion)
        
    return suggestions

# Liste des comptages de n-grammes
n_gram_counts_list = [bigram_counts, trigram_counts]

# Test de génération de suggestions
previous_tokens = ["hey", "how","are", "u"]
suggestions = get_suggestions(previous_tokens, n_gram_counts_list, vocabulary, k=0.01, start_with=None)

print("Suggestions:", suggestions)


Suggestions: [[('<unk>', 0.005104521147301895), ('fucking', 0.00267379679144385), ('tonight', 0.00267379679144385)]]


|<h2>Evaluation du modèle par Perplexité</h2> |
| :------------------: |

In [38]:
def calculate_perplexity(test_data, n_gram_counts_list, vocabulary, k=0.01):
    total_log_prob = 0
    total_words = 0
    for sentence_tokens in test_data:
        previous_tokens = []  # Initialiser previous_tokens à une liste vide
        for i in range(len(sentence_tokens)):
            if i == 0:
                pass
            elif i == 1:
                previous_tokens = [sentence_tokens[i - 1]]
            else:
                previous_tokens = [sentence_tokens[i - 2], sentence_tokens[i - 1]]
            current_token = sentence_tokens[i]
            if current_token in vocabulary:
                prob = prob_for_single_word(current_token, previous_tokens, n_gram_counts_list[-2], n_gram_counts_list[-1], len(vocabulary), k)
                total_log_prob += math.log10(prob)
                total_words += 1
    perplexity = 2 ** (-total_log_prob / total_words)
    return perplexity

perplexity = calculate_perplexity(final_test, n_gram_counts_list, vocabulary, k=0.1)
print("Perplexity:", perplexity)


Perplexity: 10.682989134737873
