In [1]:
############################################
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')

############################################
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import random
import spacy
import keras
import os
import wordcloud
from PIL import Image

import sklearn
from sklearn.feature_extraction.text import TfidfVectorizer , CountVectorizer
from sklearn import preprocessing, manifold, metrics
from sklearn.base import BaseEstimator, TransformerMixin

import nltk

############################################
import seaborn as sns
#%matplotlib inline
#%matplotlib notebook
sns.set()

###########################################
#!pip3 install --upgrade tensorflow-gpu
# Install TF-Hub.
#!pip3 install tensorflow-hub
import tensorflow as tf
import tensorflow_hub as hub

In [2]:
#!python -m spacy download en_core_web_sm
nlp = spacy.load("en_core_web_sm")
class ToLowerCase(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        return X.apply(lambda tweet : tweet.lower() )

class RemoveUserName(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        return X.apply( lambda tweet : " ".join( token.text for token in nlp(tweet) if "@" != token.text[0]) )

class Lemmatization(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        return X.apply( lambda tweet : " ".join( token.lemma_ for token in nlp(tweet)) )

class Stemmatization(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        stemmer = nltk.PorterStemmer()
        return X.apply( lambda tweet : " ".join( stemmer.stem(token.text) for token in nlp(tweet)) )

class StopWord(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        stpwd = [":|",":","|","ã","½","¿","","iãâ¯ãâ¿ãâ½in","¯","canãâ¯ãâ¿ãâ½t","=/",":-p",":p","-p","/","=",":|","ãâ¯ãâ¿ãâ½i",
                 "thãâ¯ãâ¿ãâ½n","khãâ¯ãâ¿ãâ½m","ãâ¯ãâ¿ãâ½","â"]
        return X.apply( lambda tweet : " ".join( token.text for token in nlp(tweet) if not token.is_stop and token.text not in stpwd ) )
    
class Ponctuation(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        return X.apply( lambda tweet : " ".join( token.text for token in nlp(tweet) if not token.is_punct ) )
    
class RemoveSpace(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        return X.apply( lambda tweet : " ".join( token.text for token in nlp(tweet) if not token.is_space ) )
    
class RemoveURL(BaseEstimator ,TransformerMixin ): 
    def fit ( self , X , y = None ): 
        return self
    
    def transform ( self , X) :
        return X.apply( lambda tweet : " ".join(token.text for token in nlp(tweet) if not token.like_url ))

def document_encoding_algo ( datas, col , model_encoding = "USE",n_gram=(1,1), min_df = 0.001 , max_df = 1. , vocabulaire=None, get_vocabulary=False) :
    data = datas.copy()
    
    if model_encoding == "USE" :
        module_url = "https://tfhub.dev/google/universal-sentence-encoder/4" 
        model_use = hub.load(module_url)
        
        return pd.DataFrame.from_dict( {  f"{k}": np.array(model_use( [data[col][k]] )).reshape(512,) for k in data.index  } , orient='index' )
    elif model_encoding == "TFIDF" :
        
        tfidf_vect = TfidfVectorizer( ngram_range=n_gram, min_df = min_df , max_df = max_df, vocabulary = vocabulaire )
        tfIdf = tfidf_vect.fit_transform(data[col]).toarray()
        
        if not get_vocabulary :
            return pd.DataFrame(tfIdf, columns= tfidf_vect.get_feature_names())
        else :
            return tfidf_vect.vocabulary_

    elif model_encoding == "countvectorizer" :
        count_vect = CountVectorizer(ngram_range=n_gram, min_df = min_df , max_df = max_df, vocabulary = vocabulaire )
        out = count_vect.fit_transform(data[col]).toarray()
        
        if not get_vocabulary :
            return pd.DataFrame( out , columns=count_vect.get_feature_names())
        else :
            return count_vect.vocabulary_
    else :
        raise ValueError(f"Le modèle d'encodage \"{model_encoding}\" n'est pas disponible")

def TSNE_plot (data , clustering_model , method='exact' , transp = 0.4, perplx = 10) :
    data = data.copy()
    ########## COULEURS

    labl_hc = clustering_model.labels_
    couleurs  = np.random.choice( [ "red", "orange", "green", "blue", "blueviolet","black","brown" ,"navy" ,"purple", "magenta", "gold"] ,\
                                 np.unique( labl_hc ).size , replace = False )
    
    couleur_hc = pd.Series(labl_hc).apply( lambda x : couleurs[x])

    tsne_hc = sklearn.manifold.TSNE(n_components = 3, perplexity = perplx, n_iter = 1500, n_iter_without_progress = 200, init = 'pca', \
                                    n_jobs= os.cpu_count() , method= method)
    tsne_hc.fit( data )

    fig , axes = plt.subplots( 1, 3 ,figsize = (22, 7) )
    plt.title("XY" , size = 15)
    sns.scatterplot(tsne_hc.embedding_[:,0], tsne_hc.embedding_[:,1],  c = couleur_hc, alpha=transp, s=120, ax=axes[0] )
    plt.title("XZ" , size = 15)
    sns.scatterplot(tsne_hc.embedding_[:,0], tsne_hc.embedding_[:,2],  c = couleur_hc, alpha=transp, s=120, ax=axes[1] )
    plt.title("YZ" , size = 15)
    sns.scatterplot(tsne_hc.embedding_[:,1], tsne_hc.embedding_[:,2],  c = couleur_hc, alpha=transp, s=120, ax=axes[2] )

def plot_confusion_matrix(Y_true , Y_predict, title="Matrice de confusion", cmap="hot_r", figsize = (12,8), tic_rot= (0,0)):
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        warnings.filterwarnings('ignore')
        Y_true , Y_predict = np.asarray(Y_true), np.asarray(Y_predict)
        df_confusion = pd.crosstab(Y_true, Y_predict, rownames=['True Labels\n'], colnames=['\nPredicted Labels'])
        plt.figure(figsize=figsize)
        sns.heatmap(df_confusion, cmap=cmap, annot =True) # imshow
        plt.title(title,size =3*figsize[0])
        #plt.colorbar()
        tick_marks = np.arange(len(df_confusion.columns)) +0.5
        plt.xticks(tick_marks, df_confusion.columns, rotation=tic_rot[0], size=1.5*figsize[0])
        plt.yticks(tick_marks, df_confusion.index, rotation=tic_rot[1], size = 1.5*figsize[0])
        #plt.tight_layout()
        plt.ylabel(df_confusion.index.name, size = 2*figsize[0] )
        plt.xlabel(df_confusion.columns.name, size = 2*figsize[0])
        plt.show()

def correspondance_entre_clusters ( True_labels , predict_labels ) :
    clustering_label = np.asarray([ "                     " for i in predict_labels ])
    for clus_class in np.unique( predict_labels ) :
        sub = True_labels[ predict_labels == clus_class ]
        lbl , count = np.unique( sub , return_counts = True )
        clustering_label[predict_labels == clus_class] = f"{lbl[np.argmax(count)]} => Cluster {clus_class}"
        
    return clustering_label

def get_train_test_index( data , label = "label", nombre=(200,200,200) ) :
    data = data.copy()
    
    idx = [[] for i in range(len(nombre)) ]
    for i in range(len(nombre)) :
        for cat in data[label].unique() : 
            sub_data = data[ data[label] == cat]
            for j in random.sample( sorted(sub_data.index.values), nombre[i] ) :
                idx[i].append(j)
        data.drop(index=idx[i] , inplace=True)
        idx[i] = sorted(idx[i])
    
    return idx

def prediction_function_threshold ( model=None , X=None, Y_proba=None , seuil = 0.5 ) :
    """
    La fonction permet d'évaluer la prediction d'un modèle donnée en fonction du modèle, de l'entrée, d'une probabilité fournie et d'un seuil 
    
    Paramètres :
    ------------
        model : modèle de machine learning à utiliser.
        X : pandas.core.frame.DataFrame
            donnée à fournir au modèle pour la prédiction. Si model est fourni alors X doit aussi être fourni et dans ce cas Y_proba n'est pas utilisé.
        Y_proba : Array_type
            probabilité qu'un individu soit du label positif. Si Y_proba est fourni alors model et X ne sont pas utilisés
        seuil : float
            seuil de probalité à utiliser pour la calcul de la prediction. la valeur par defaus est 0.5 et doit toujours être comprise entre 0 et 1
    
    Return : Array_type
        prediction
        
    """
    if ( type(model) == type(None) ) and ( type(X) == type(None) ): 
        return np.array( Y_proba > seuil , dtype = int)
    else :
        try : 
            return np.array( model.predict_proba(X)[:,1] > seuil , dtype = int)
        except :
            print("Le modèle que vous avez fourni ne possède pas de méthode 'predict_proba()'")
            return
        
#  Definition de la meilleure métrique 
#  Definition de la meilleure métrique 
def my_cost( y , y_pred , poids = 4  , seuil = np.linspace(0.008,0.999,100 ) , scorer = False ) : 
    """
    La fonction permet d'évaluer le cout métier d'un modèle donné 
    
    Paramètres :
    ------------
        y : Array_like 
            vraie valeur labels pour chaque individus ou observations
        y_pred : Array_like 
            vecteur probabilité ( d'être positif  ) peu aussi être le vecteur prediction du modèle pour chaque individus ou observations
            
    return : dict
    ------------
        out : dict 
        La dictionnaire renvoyé est de la forme : 
        { "cout" : Array_type de variation du coût en fonction du seuil , "cout_min" : valeur du coût minimal , "seuil_min" : seuil correspondant au cout minimal }
    """
    if type(y) == type(pd.DataFrame()) :
        label = pd.DataFrame( {"Y_test" : y.values.reshape( (y.shape[0],) ) , "Y_prob" : y_pred } )
    else :
        label = pd.DataFrame( {"Y_test" : y , "Y_prob" : y_pred } )

    out = {"cout" : []}
    if type(seuil) in [ float , np.float16, np.float32, np.float64] : 
        label["Y_pred"] = label["Y_prob"].apply( lambda x : int(x > seuil))
        label.loc[ (label["Y_test"] ==0) & (label["Y_pred"] == 0) , "decision"]  = "VN"
        label.loc[ (label["Y_test"] ==1) & (label["Y_pred"] == 1) , "decision"]  = "VP"
        label.loc[ (label["Y_test"] ==0) & (label["Y_pred"] == 1) , "decision"]  = "FP"
        label.loc[ (label["Y_test"] ==1) & (label["Y_pred"] == 0) , "decision"]  = "FN"
        return ( label["decision"] == "FP" ).mean() + poids*( label["decision"] == "FN" ).mean()
    else :
        for s in seuil : 
            label["Y_pred"] = label["Y_prob"].apply( lambda x : int(x > s))
            label.loc[ (label["Y_test"] ==0) & (label["Y_pred"] == 0) , "decision"]  = "VN"
            label.loc[ (label["Y_test"] ==1) & (label["Y_pred"] == 1) , "decision"]  = "VP"
            label.loc[ (label["Y_test"] ==0) & (label["Y_pred"] == 1) , "decision"]  = "FP"
            label.loc[ (label["Y_test"] ==1) & (label["Y_pred"] == 0) , "decision"]  = "FN"
            out["cout"].append( ( label["decision"] == "FP" ).mean() + poids*( label["decision"] == "FN" ).mean() )

        out["cout"] = np.array(out["cout"])
        if scorer : 
            return out["cout"].min()
        else : 
            out["cout_min"] = out["cout"].min()
            out["seuil_min"] = seuil[out["cout"] == out["cout_min"]]
            out["cout"] = list(out["cout"])
            return out
        
def print_scores(model = None , X_test=None , Y_true=None , Y_proba = None  , line_width = 6 , seuil = np.linspace( 0 , 0.9 , 90 ) ,
                 plot_kind = "apr", give_results = False , show_graph = True, poids = None, fig_sz = (12,8)) :
    scores = {} 
    beta = np.linspace( 0.7 , 2. , 2 )
    if ( type(model)!=type(None) ) and ( type(X_test)!=type(None) ) and ( type(Y_proba) == type(None) ) :
        try :
            Y_proba = model.predict_proba(X_test)[:,1]
        except :
            print("Le modèle que vous avez fourni ne possède pas de méthode 'predict_proba'")
            return
    
    ## Pour chaque coefficient beta de F_beta score, je vais calculter la variation du F_beta score avec le seuil
    if "b" in plot_kind :
        for b in beta :
            scores[f"beta = {b}"] = [  metrics.fbeta_score(Y_true , prediction_function_threshold(Y_proba = Y_proba ,seuil = s) , beta = b ) for s in seuil ]
    if "a" in plot_kind :
        scores[f"Accuracy"] = [ metrics.accuracy_score(Y_true , prediction_function_threshold(Y_proba = Y_proba ,seuil = s)  )  for s in seuil  ]      #  Accuracy en fonction du seuil
    if "r" in plot_kind :
        scores[f"Recall"] = [ metrics.recall_score(Y_true , prediction_function_threshold(Y_proba = Y_proba ,seuil = s)  ) for s in seuil  ]           #  Recall en fonction du seuil
    if "p" in plot_kind :
        scores[f"Precision"] = [ metrics.precision_score(Y_true , prediction_function_threshold(Y_proba = Y_proba ,seuil = s)  ) for s in seuil  ]     #  Precision en fonction du seuil
    if "h" in plot_kind :
        scores[f"Hamming Loss"] = [ metrics.hamming_loss(Y_true , prediction_function_threshold(Y_proba = Y_proba ,seuil = s)  ) for s in seuil  ]     #  Precision en fonction du seuil
    if "c" in plot_kind :
        scores["Fonction Coût"] = my_cost(Y_true , Y_proba , seuil = seuil ,scorer=False, poids = poids)["cout"]     #  Precision en fonction du seuil
    
    if show_graph :
        #  Affichage de la figure
        plt.figure(figsize = fig_sz)
        plt.title("\nF_beta-accuracy-Précision-Recall VS seuil" , size=2*fig_sz[0])
        plt.xlabel(" Seuil de probabilité" , size= 1.5*fig_sz[0])
        plt.ylabel("SCORE" , size = 1.5*fig_sz[0] )
        
        for label , y  in scores.items()  :
            plt.plot(seuil , y , lw = line_width ,ls = np.random.choice(["dashed","dotted", "dashdot", "solid"]), label = f"{label}")
            
        plt.legend(loc="best" , fontsize="xx-large")
        plt.show()
        scores["seuil"] = seuil
    if give_results : return pd.DataFrame(scores ).set_index("seuil")

def World_cloud_show( data , fig_size = (8,5), max_wd = 150, do_mask = True, horizontal= .85, min_font = 5, font_step= 2 ) :
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        if do_mask : 
            mask = np.array(Image.open("cloud.jpg"))
            mask[mask > 1] = 255
            x = wordcloud.WordCloud(width = fig_size[0]*100, height = fig_size[1]*100, background_color ='white', colormap="plasma", max_words=max_wd,
                                    repeat = False, min_font_size=min_font, font_step=font_step, prefer_horizontal = horizontal, mask = mask,
                                    relative_scaling =0, collocations =False).generate(" ".join( data))
        else :
            x = wordcloud.WordCloud(width = fig_size[0]*100, height = fig_size[1]*100, background_color ='white', colormap="plasma", max_words=max_wd,
                                    repeat = False, min_font_size=min_font, font_step=font_step, prefer_horizontal = horizontal,relative_scaling =0,
                                    collocations =False).generate(" ".join( data))

        # plot the WordCloud image                      
        plt.figure(figsize = fig_size, facecolor = None)
        plt.imshow(x)
        plt.axis("off")
        plt.tight_layout(pad = 0)
        plt.show()