## Jean-Eudes Rouffiac  

&nbsp;

&nbsp;

# <center>  TP calcul distribué : classification de sentiments</center>

&nbsp;

<p style="text-align:justify;"> L'objectif de ce projet est d'entraîner un algorithme de classification de sentiments à l'aide d'un Framework distribué. Nous utiliserons pour ce projet la librairie PySpark. 
Nous utiliserons pour ce faire les fichiers suivants : </p>

* train.json contenant le dataset d'entraînement
* test.json contenant le dataset de test
* noclass.json sur lequel il faudra effectuer les prédictions

&nbsp;

<p style="text-align:justify;">Dans un premier temps, on importe les packages et fonctions qui vont être utiles pour réaliser la classification. </p>

&nbsp;

In [1]:
import findspark


from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, StopWordsRemover
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as fn



from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

&nbsp;

<p style="text-align:justify;">On instancie alors un objet SparkContext qui permet de gérer les propriétés globales de l'application, tel que le niveau de parallélisation par défaut. Le SparkContext permet également de charger les données en provenance de diverses sources de données.</p>

&nbsp;

In [2]:
sc =SparkContext()
sqlContext = SQLContext(sc)

&nbsp;

<p style="text-align:justify;">Nous avons ici des fichiers .json, on les charge alors dans deux variables df_train et df_test.</p>

&nbsp;

In [3]:
spark = SparkSession.builder.appName('ops').getOrCreate()
df_train = spark.read.json("train.json")
df_test = spark.read.json("test.json")

&nbsp;

<p style="text-align:justify;">df_train et df_test sont deux dataframes. Ces dataframes de sqlContext sont l'essence même des distributions dans spark. Toutes les opérations (transformations etc) sur ces dataframes seront distribuées.
On peut alors afficher les 5 premières lignes du dataset d'entraînement afin de voir à quoi ressemblent les données.</p>

&nbsp;

In [57]:
df_train.show(5)

+--------------------+--------+
|             message|polarity|
+--------------------+--------+
|! Comment était l...|       0|
|! d'accord! Va-t-...|       0|
|!!! Taihen desu n...|       0|
|!!!! Auto-dj .. c...|       0|
|!!!! Ce n'est que...|       0|
+--------------------+--------+
only showing top 5 rows



&nbsp;

<p style="text-align:justify;">On peut voir que les datasets sont composées de deux colonnes : une colonne contenant le message, et une colonne indiquant si le message est négatif (0) ou positif (4).</p>

&nbsp;

<p style="text-align:justify;">Affichons maintenant le type des deux colonnes.</p>

&nbsp;

In [30]:
df_train.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)



&nbsp;

<p style="text-align:justify;">On remarque que la colonne "polarity" est de type string alors qu'elle prend pour valeur soit 0 soit 5. On doit donc la convertir en entier, pour les deux datasets.</p>

&nbsp;

In [31]:
df_train = df_train.withColumn("polarity", df_train["polarity"].cast('int'))
df_test = df_test.withColumn("polarity", df_test["polarity"].cast('int'))
df_train.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: integer (nullable = true)



&nbsp;

<p style="text-align:justify;">Affichons maintenant le nombre de messages positifs et le nombre de messages négatifs dans le dataset d'entraînement.</p>

&nbsp;

In [32]:
df_train.groupBy("polarity") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

+--------+-----+
|polarity|count|
+--------+-----+
|       4|66926|
|       0|61475|
+--------+-----+



<p style="text-align:justify;">On remarque tout d'abord que le dataset est composé d'un très grand nombre de données et qu'en plus les deux classes sont présentes en grande quantité (+60k pour les deux).</p>

&nbsp;

## Pipeline

&nbsp;
<p style="text-align:justify;">Nous allons maintenant passer à la partie classification. Nous utiliserons un outil très pratique qui est les pipeline. Cela permet de moduler facilement les différentes étapes du code. On défini plusieurs étapes en indiquant la colonne d'entrée et la colonne de sortie, puis on définit la pipeline en indiquant les différentes étapes. Il suffira alors de "fit" la pipeline puis de transformer les jeux de données pour obtenir les prédictions.</p>

&nbsp;

<p style="text-align:justify;">Dans la partie de code suivante, nous importerons les fonctions nécessaires pour traiter le problème et nous définierons les étapes qui serviront à notre pipeline. Nous utiliserons les étapes suivantes (nous ferons plusieurs pipelines avec des algorithmes et des words embedding différents afin de sélectionner le word embedding et la méthode qui permet d'obtenir les meilleurs résultats) : </p>

* Pour la partie "nettoyage" des données :
    * regexTokenizer : permet de spliter le texte de la colonne 'message'.
    * StopWordsRemover : permet de supprimer les stopwords. Malheureusement cela n'existe que pour les mots anglais... Nous ajoutons alors à la liste des stop words des mots ou caractères repérés dans la colonne message qui apporte du bruit.
* Pour la partie word embedding : 
    * CountVectors : transforme la colonne obtenue après avoir enlever les stop words en une nouvelle colonne ou apparaîtra pour chaque message la taille du vocabulaire, ainsi que pour chaque mot du vocabulaire total (on prend tous les messages du dataset), le nombre de fois que le mot est employé. Cela permet de transformer le message en vecteur qui sera interprétable par un algorithme. Par exemple si le message contient plusieurs fois les mots "ne" et "pas", alors il aura plus de chance d'être interprêté comme un message négatif.
    * HashingTF and IDF qui permettent de faire un TF-IDF.
    * Word2Vec qui permet de convertir chaque mot en un vecteur de taille n (à définir). Ainsi il y a maintenant des notions de distance entre les mots.
* Pour la partie modèle :
    * logisticRegression
    * random forest
    * naive bayes
    

<p style="text-align:justify;">Nous allons alors enchaîner les pipelines afin de garder celui qui a le meilleur résultat en classification sur les données test.</p>

&nbsp;

In [4]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

regexTokenizer = RegexTokenizer(inputCol="message", outputCol="words", pattern="\\W")
add_stopwords = ["http","https","amp","rt","t","c","the","null","&","Gt","Lt","gt","lt","Quot", "quot","-","*",".","!","?"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
w2v = Word2Vec(inputCol= 'filtered', outputCol= 'features', vectorSize= 100)
label_stringIdx = StringIndexer(inputCol = "polarity", outputCol = "label")
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

&nbsp;

## Classification

&nbsp;

<p style="text-align:justify;">Dans un premier temps, nous allons utiliser comme vectorisation du texte, la méthode CountVectorizer. Nous regarderons alors les résultats pour les différents algorithmes de classification.</p>

&nbsp;

In [18]:
predictions = {}
models = [lr, rf, nb]
name_models = ['logistic_regression', 'random_forest', 'naive_bayes']
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
for i, model in enumerate(models):
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, model])
    mod = pipeline.fit(df_train)
    prediction_train = mod.transform(df_train)
    prediction_test = mod.transform(df_test)
    predictions['prediction_train_%s_%s'%(name_models[i],'CountVectorizer')] = prediction_train
    predictions['prediction_test_%s_%s'%(name_models[i],'CountVectorizer')] = prediction_test
    print("\nNom du modèle utilisé : %s" %(name_models[i]))
    print("Score sur les données d'entraînement : %.3f" %(evaluator.evaluate(prediction_train)))
    print("Score sur les données test : %.3f \n" %(evaluator.evaluate(prediction_test)))



Nom du modèle utilisé : logistic_regression
Score sur les données d'entraînement : 0.802
Score sur les données test : 0.769 


Nom du modèle utilisé : random_forest
Score sur les données d'entraînement : 0.528
Score sur les données test : 0.523 


Nom du modèle utilisé : naive_bayes
Score sur les données d'entraînement : 0.790
Score sur les données test : 0.768 



&nbsp;

<p style="text-align:justify;">Faisons la même chose avec comme embedding TF-IDF.</p>

&nbsp;

In [11]:
models = [lr, rf, nb]
name_models = ['logistic_regression', 'random_forest', 'naive_bayes']
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
for i, model in enumerate(models):
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx, model])
    mod = pipeline.fit(df_train)
    prediction_train = mod.transform(df_train)
    prediction_test = mod.transform(df_test)
    predictions['prediction_train_%s_%s'%(name_models[i],'tf_idf')] = prediction_train
    predictions['prediction_test_%s_%s'%(name_models[i],'tf_idf')] = prediction_test
    print("\nNom du modèle utilisé : %s" %(name_models[i]))
    print("Score sur les données d'entraînement : %.3f" %(evaluator.evaluate(prediction_train)))
    print("Score sur les données test : %.3f \n" %(evaluator.evaluate(prediction_test)))


Nom du modèle utilisé : logistic_regression
Score sur les données d'entraînement : 0.785
Score sur les données test : 0.751 


Nom du modèle utilisé : random_forest
Score sur les données d'entraînement : 0.556
Score sur les données test : 0.551 


Nom du modèle utilisé : naive_bayes
Score sur les données d'entraînement : 0.772
Score sur les données test : 0.736 



&nbsp;

<p style="text-align:justify;">Et enfin avec Word2vec. Nous ne pouvons pas utiliser naive bayes car les words embedding obtenus avec word2vec peuvent avoir des valeurs négatives.</p>

&nbsp;

In [13]:
models = [lr, rf]
name_models = ['logistic_regression', 'random_forest']
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
for i, model in enumerate(models):
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, w2v, label_stringIdx, model])
    mod = pipeline.fit(df_train)
    prediction_train = mod.transform(df_train)
    prediction_test = mod.transform(df_test)
    predictions['prediction_train_%s_%s'%(name_models[i],'w2v')] = prediction_train
    predictions['prediction_test_%s_%s'%(name_models[i],'w2v')] = prediction_test
    print("\nNom du modèle utilisé : %s" %(name_models[i]))
    print("Score sur les données d'entraînement : %.3f" %(evaluator.evaluate(prediction_train)))
    print("Score sur les données test : %.3f \n" %(evaluator.evaluate(prediction_test)))


Nom du modèle utilisé : logistic_regression
Score sur les données d'entraînement : 0.687
Score sur les données test : 0.684 


Nom du modèle utilisé : random_forest
Score sur les données d'entraînement : 0.663
Score sur les données test : 0.655 



&nbsp;

<p style="text-align:justify;">Les meilleurs scores possibles sont obtenus avec la vectorisation CountVectorizer pour les méthodes regression logistique et naive bayes (respectivement 0.769 et 0.768). On peut alors afficher les messages du jeu de données test avec le vrai label ainsi que la prédiction.</p>

&nbsp;

In [19]:
predictions['prediction_test_logistic_regression_CountVectorizer"'] \
    .select("message","label","prediction") \
    .show(n = 10, truncate = 30)

+------------------------------+-----+----------+
|                       message|label|prediction|
+------------------------------+-----+----------+
|! Et si affamé mais pas de ...|  1.0|       1.0|
|! Identica présente actuell...|  1.0|       0.0|
|! Je vais enfin m'endormir ...|  1.0|       1.0|
|!?!? C'est un jour que j'ai...|  1.0|       1.0|
|"Easy" qu Le plancher en bo...|  1.0|       0.0|
|"Empire du soleil" L'auteur...|  1.0|       1.0|
|"Heart" quot; N'est pas un ...|  1.0|       1.0|
|"I need" & quot; -? V1-1333...|  1.0|       1.0|
|"Ils ont trouvé sonny? & Qu...|  1.0|       0.0|
|"Je vérifie mon twitter cha...|  1.0|       1.0|
+------------------------------+-----+----------+
only showing top 10 rows



&nbsp;

## Cross validation

&nbsp;

<p style="text-align:justify;">Afin d'améliorer encore le score en prédiction, nous pouvons utiliser la cross validation. Pour ce faire, nous utiliserons les fonctions ParamGridBuilder et CrossValidator de pyspark. Nous ferons une grid search seulement pour la regression logistique et la classification naïve bayésienne, pour un embedding via count vectorizer. </p>

&nbsp;

In [50]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [88]:
import numpy as np

lr = LogisticRegression()
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, lr])
vectC = np.logspace(-3, -2, 20)
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, vectC) 
             .addGrid(lr.elasticNetParam, [0.1, 0.15]) 
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=10)
cvModel_lr = cv.fit(df_train)

predictions_test = cvModel_lr.transform(df_test)
predictions_train = cvModel_lr.transform(df_train)
print("\nNom du modèle utilisé : logistic regression")
print("Score sur les données d'entraînement : %.3f" %(evaluator.evaluate(predictions_train)))
print("Score sur les données test : %.3f \n" %(evaluator.evaluate(predictions_test)))


Nom du modèle utilisé : logistic regression
Score sur les données d'entraînement : 0.809
Score sur les données test : 0.778 



In [51]:
import numpy as np

nb = NaiveBayes(modelType="multinomial")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, nb])
paramGrid = (ParamGridBuilder()
             .addGrid(nb.smoothing, [0,1,2,5,10]) 
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=10)
cvModel_nb = cv.fit(df_train)

predictions_test = cvModel_nb.transform(df_test)
predictions_train = cvModel_nb.transform(df_train)
print("\nNom du modèle utilisé : naive bayes")
print("Score sur les données d'entraînement : %.3f" %(evaluator.evaluate(predictions_train)))
print("Score sur les données test : %.3f" %(evaluator.evaluate(predictions_test)))


Nom du modèle utilisé : naive bayes
Score sur les données d'entraînement : 0.788
Score sur les données test : 0.769


&nbsp;

<p style="text-align:justify;">On a alors pu améliorer le score sur les données test en passant de 0.769 à 0.778 avec la regression logistiques. On peut alors afficher les paramètres optimaux. </p>

&nbsp;

In [89]:
bestModel = cvModel_lr.bestModel
param_dict = bestModel.stages[-1].extractParamMap()

sane_dict = {}
for k, v in param_dict.items():
    sane_dict[k.name] = v

print('Best regParam : %.3f' %sane_dict["regParam"])
print('\nBest elastic_net: %.3f' %sane_dict["elasticNetParam"])


Best regParam : 0.009

Best elastic_net: 0.100


&nbsp;

## Classification du fichier noclass.json

&nbsp;

&nbsp;

<p style="text-align:justify;">On peut alors prédire le sentiment d'un nouveau message. On prédit donc le sentiment des messages du fichier noclass.json. </p>

&nbsp;

In [31]:
df_noclass = spark.read.json("noclass.json")
predictions_noclass = cvModel_lr.transform(df_test).select("message","prediction")
predictions_noclass = predictions_noclass.withColumn("prediction", predictions_noclass["prediction"].cast('string'))
predictions_noclass.show()

+--------------------+----------+
|             message|prediction|
+--------------------+----------+
|! Et si affamé ma...|       1.0|
|! Identica présen...|       0.0|
|! Je vais enfin m...|       1.0|
|!?!? C'est un jou...|       1.0|
|"Easy" qu Le plan...|       0.0|
|"Empire du soleil...|       1.0|
|"Heart" quot; N'e...|       1.0|
|"I need" & quot; ...|       1.0|
|"Ils ont trouvé s...|       0.0|
|"Je vérifie mon t...|       0.0|
|"Recevoir un préa...|       0.0|
|"Statut sélectif ...|       1.0|
|"Sur la musique p...|       1.0|
|"The dating exper...|       0.0|
|& Amp; Tout le re...|       1.0|
|& Gt; Le frère es...|       1.0|
|& Gt; Que cela im...|       0.0|
|& Gt; __ & lt; Je...|       0.0|
|& Lt; & lt; -----...|       1.0|
|& Lt; --- ne peut...|       0.0|
+--------------------+----------+
only showing top 20 rows



&nbsp;

<p style="text-align:justify;">Puis on sauvegarde dans un fichier les résultats. </p>

&nbsp;

In [32]:
predictions_noclass.coalesce(1).write.format('json').save('noclass_pred.json')

&nbsp;

## Distribution des calculs

&nbsp;

<p style="text-align:justify;">Afin de suivre la progression des calculs de l'application, on peut utiliser une interface graphique "Spark Web UI" disponible à l'adresse http://localhost:4041. On peut alors voir tous les jobs exécutés et en cours d'exécution. En cliquant sur les jobs on a des indications sur la manière doit les jobs sont exécutés et on peut voir le temps d'exécution, le nombre de tasks par job etc. Voici un aperçu. </p>

&nbsp;

<p style="text-align:justify;">Pendant l'exécution de la grid search pour la regression logistique, voici quelques jobs affichés :</p>

&nbsp;
<img src="https://drive.google.com/uc?id=1d_cOfMyK2pOzxXYWKc_sSfAZaafxqp4y">

&nbsp;

<p style="text-align:justify;">On peut alors retrouver les différentes étapes de notre pipeline avec Count Vectorizer, logistic regression, label_stringIdx etc. On peut alors voir que les calculs sont bien distribués pour toutes les étapes de la pipeline, soit dès qu'un dataframe est modifié. Affichons les étapes qui demandent le plus de temps.</p>

&nbsp;
<img src="https://drive.google.com/uc?id=1viF1j2BaA2sgZx3mHInaSsDHH5UPGK72">

&nbsp;

<p style="text-align:justify;">En plus de la regression logistique, il y a l'étape CountVectorizer qui demande pas mal de temps (1s). On pouvait s'y attendre car ces étapes demandent clairement plus de temps de calcul que les autres. On peut afficher le détail d'un job, prenons celui de la regression logistique.</p>

&nbsp;

<img src="https://drive.google.com/uc?id=1ZOnLu1jyzZq4UyYaXeXHCZvok-4jlzGQ">

&nbsp;

<p style="text-align:justify;">On peut voir que c'est une tâche ou la très grande partie du temps est consacrée uniquement aux calculs.</p>

<p style="text-align:justify;">On peut aussi afficher un graphique montrant les différents opérations (map, filter etc) sur les RDD lors de l'exécution d'une tâche.</p>
&nbsp;

<img src="https://drive.google.com/uc?id=1p7Pt2EwAYUJ3NrxUP7VlgH6oSjSAnLLZ">
