Winnie VORIHILALA <br>
MS ESD 2019-2020 <br>
INSA ROUEN <br>

# Projet 

Ce TP consiste à entraîner un algorithme de classification de sentiments à l'aide d'un Framework de calcul distribué de notre choix, en utilisant les jeux de données suivants :
- train.json : contenant le dataset d'entraînement de l'algorithme
- test.json : contenant le dataset de test de l'algorithme
- noclass.json : sur lequel il faudra effectuer des prédictions.

<strong> SOMMAIRE <br>
    
1- Contexte <br>
<br>
1.1 - Big Data <br>
1.2 - Calculs parallèles vs calculs distribués : différence <br>
1.3 - Calculs parallèles vs calculs distribués : avantages <br>
1.4 - Hadoop MapReduce vs Apache Spark <br>
<br>

2- Code <br>
<br>
2.1 - Chargement des librairies <br>
<br>
2.2 - Configuration de l'enviromment pySpark <br>
<br>
2.3 - Chargement des données et visualisation <br>
2.3.1 - Chargement des données <br>
2.3.2 - Visualisation des données <br>
2.3.2.1 - Visualisation du jeu de données train <br>
2.3.2.2 - Visualisation du jeu de données test <br>
2.3.2.3 - Visualisation du jeu de données noclass <br>
2.3.2.4 - Analyse des jeux de données <br>

2.4 - Pré-traitement des données <br>
2.4.1 - Fonction de pre-processing <br>
2.4.2 - 1ère méthode de pre processing : nltk <br>
2.4.3 - 2ème méthode de pre processing : regexTokenizer et stopwordsRemover <br>

2.5 - Extraction de features avec CountVector, TF IDF, word2vec et StringIndexer <br>

2.6 - Classification de sentiments <br>
2.6.1 -  Clasification avec NaivesBayes et Logistic Regression <br>
2.6.2 - Prediction sur les données de test <br>
2.6.2.1 - 1ère méthode : avec CountVectors <br>
2.6.2.2 - 2ème méthode : avec TF-IDF <br>
2.6.3 -  Cross validation <br>

2.7 - Prediction sur le jeu de données noclass.json et sauveragarde du fichier <br>
<br>
3- Récapituatif des méthodes utilisées <br>
<br>
4- Distribution des calculs <br>
4.1 - Map Reduce <br>
4.2 - Cluster Spark <br>
4.3 - RDD <br>
4.4 - Calculs distribués sous forme de graphe avec les DAG <br>
4.5 - Fonctionnement de spark : cycle de vie d'une application <br>
4.6 - Distribution des calculs dans notre application Spark

Sources
<br>
</strong>

# 1- CONTEXTE

## 1.1 - Big Data

On parle de Big Data de manière générale lorsque :
- les données sont trop grosses pour être stockées en RAM
- la quantité de données excède la faculté d'une machine à les stocker et les analyser en un temps acceptable.
<br>
La solution à ce problème est de paralléliser les calculs sur plusieurs machines différentes. D'où les notions de calculs parallèles et de calculs distribués.

## 1.2 - Calculs parallèles vs calculs distribués : différence 

- Calculs parallèles : lors d'un calcul réalisé en parallèle, différents threads d'exécutions sont exécutés en même temps et partagent une mémoire commune qui leur permettent de se synchroniser entre eux. <br>
- Calculs distribués : Dans le calcul distribué, les nœuds sur lequels les calculs sont exécutés sont distants, autonomes et ne partagent pas de ressources ; la communication entre les nœuds s'effectue grâce à l'envoi de messages, au sein d'un cluster. <br>

## 1.3 - Calculs parallèles vs calculs distribués : avantages 

- Calculs distribués : <br>
=> ce modèle de calcul résout un certain nombre de problèmes : par exemple, le passage à l'échelle s'effectue de manière <font color="red" > horizontale </font> c'est à dire qu'il suffit d'ajouter des nœuds au cluster pour augmenter sa capacité de calcul. <br>
=> permet une plus grande tolérance aux pannes (lorsqu'un nœud du cluster subit une panne, il suffit d'affecter la tâche qu'il était en train de traiter à un autre nœud, alors que dans le modèle parallèle la machine sur laquelle le calcul est exécuté constitue un point unique de défaillance. Cependant, cette stratégie de tolérance aux pannes nécessite de pouvoir recréer l'état du nœud en échec et cela est assez complexe) 
<br>
<br>
- Calculs parallèles : Dans le modèle parallèle, on passe à l'échelle de manière <font color="red" > verticale</font>, en augmentant la puissance des processeurs. Mais avec le ralentissement de la loi de Moore, ce dernier modèle est remis en question.
<br>

## 1.4 - Hadoop MapReduce vs Apache Spark

MapReduce et Hadoop permettent d'effectuer des calculs distribués. <br>
Cependant, à l'usage, Hadoop MapReduce présente deux inconvénients majeurs : <br>
- Après une opération map ou reduce, le résultat doit être écrit sur disque. Ce sont ces données écrites sur disque qui permettent aux mappers et aux reducers de communiquer entre eux. C'est également l'écriture sur disque qui permet une certaine tolérance aux pannes : si une opération map ou reduce échoue, il suffit de lire les données à partir du disque pour reprendre là où on en était. Cependant, ces écritures et lectures sont coûteuses en temps. <br>
- Le jeu d'expressions composé exclusivement d'opérations map et reduce est très limité et peu expressif. En d'autres termes, il est difficile d'exprimer des opérations complexes en n'utilisant que cet ensemble de deux opérations. <br>
<br>
Apache Spark est une alternative à Hadoop MapReduce pour le calcul distribué qui vise à résoudre ces deux problèmes. C'est par conséquent ce framework de calcul distribué que nous allons utiliser dans ce TP. <br>


La différence fondamentale entre Hadoop MapReduce et Spark est que Spark écrit les données en RAM, et non sur disque. Ceci a plusieurs conséquences importantes sur la rapidité de traitement des calculs ainsi que sur l'architecture globale de Spark.

# 2- CODE

## 2.1 - Chargement des librairies

In [1]:
import findspark

from pyspark import SparkContext

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql import functions as fn

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## 2.2 - Configuration de l'enviromment Spark

In [2]:
findspark.init('/Users/winnievorihilala/Documents/Spark/spark/spark-2.4.5-bin-hadoop2.7')

In [3]:
sc = SparkContext()
spark = SparkSession.builder.getOrCreate()

In [4]:
sqlContext = SQLContext(sc)

In [5]:
sqlContext

<pyspark.sql.context.SQLContext at 0x111f882b0>

In [6]:
sc

In [7]:
spark

SparkContext est le point d'entrée principal pour la fonctionnalité Spark. Un SparkContext représente la connexion à un cluster Spark et peut être utilisé pour créer des RDD, des accumulateurs et des variables de diffusion sur ce cluster.
Un seul SparkContext peut être actif par JVM. On doit stopper sc.stop() le SparkContext actif avant d'en créer un nouveau.

La fonction getOrCreate() peut être utilisée pour obtenir ou instancier un SparkContext et l'enregistrer en tant qu'objet singleton. Étant donné que nous ne pouvons avoir qu'un seul SparkContext actif par machine virtuelle Java, cela est utile lorsque les applications peuvent souhaiter partager un SparkContext.

La connexion a un cluster Spark ici prend par défaut comme master local et comme nom d'application pyspark-shell. Ces attributs sont modifiables. Le local[*] indique que j'utilise un cluster local, qui est une autre façon de dire que je travaille en mode mono-ordinateur. Le * indique à Spark de créer autant de threads de travail que de coeurs logiques sur mon ordinateur.

Voici ci-après un schéma pour illustrer SparkContext dans pySpark: 

<p>
    <center> <strong> SparkContext </strong> </center> <br />
    <img src="SparkContext.png" alt="SparkContext" />
</p>

## 2.3 - Chargement des données et visualisation

## 2.3.1 - Chargement des données 

In [8]:
df_train = spark.read.json("train.json")
df_test = spark.read.json("test.json")
df_noclass = spark.read.json('noclass.json')

In [70]:
print("Le dataset df_train contient", df_train.count(),"lignes")
print("Le dataset df_test contient", df_test.count(),"lignes")
print("Le dataset df_noclass contient", df_noclass.count(),"lignes")

Le dataset df_train contient 128401 lignes
Le dataset df_test contient 76759 lignes
Le dataset df_noclass contient 51681 lignes


## 2.3.2 - Visualisation des données

### 2.3.2.1 - Visualisation du jeu de données train

In [23]:
df_train.describe().show()

+-------+--------------------+------------------+
|summary|             message|          polarity|
+-------+--------------------+------------------+
|  count|              128401|            128401|
|   mean|              2336.5|2.0849058807953207|
| stddev|   637.1032098490793| 1.998204716216392|
|    min|! A perdu 2 jeux ...|                 0|
|    max|ߧ  ǿ     ж  ؜    ...|                 4|
+-------+--------------------+------------------+



In [20]:
df_train.show(10)

+--------------------+--------+
|             message|polarity|
+--------------------+--------+
|! Comment était l...|       0|
|! d'accord! Va-t-...|       0|
|!!! Taihen desu n...|       0|
|!!!! Auto-dj .. c...|       0|
|!!!! Ce n'est que...|       0|
|"Aimant" Le jour ...|       0|
|"Attrape-moi si t...|       0|
|"Beverley Knight"...|       0|
|"Crack ... break ...|       0|
|"Désolé" une cond...|       0|
+--------------------+--------+
only showing top 10 rows



In [47]:
freq_train = df_train.stat.freqItems(["polarity"], 0.5)

In [48]:
freq_train.collect()[0]

Row(polarity_freqItems=['4', '0'])

In [65]:
df_train.printSchema() 

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)



In [71]:
df_train.groupBy("polarity") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

+--------+-----+
|polarity|count|
+--------+-----+
|       4|66926|
|       0|61475|
+--------+-----+



### 2.3.2.2 - Visualisation du jeu de données test

In [24]:
df_test.describe().show()

+-------+--------------------+------------------+
|summary|             message|          polarity|
+-------+--------------------+------------------+
|  count|               76759|             76759|
|   mean|                null|  2.07761956252687|
| stddev|                null|1.9985062513923468|
|    min|! C'est tellement...|                 0|
|    max|être un begfriend...|                 4|
+-------+--------------------+------------------+



In [49]:
df_test.show(10)

+--------------------+--------+
|             message|polarity|
+--------------------+--------+
|! Et si affamé ma...|       0|
|! Identica présen...|       0|
|! Je vais enfin m...|       0|
|!?!? C'est un jou...|       0|
|"Easy" qu Le plan...|       0|
|"Empire du soleil...|       0|
|"Heart" quot; N'e...|       0|
|"I need" & quot; ...|       0|
|"Ils ont trouvé s...|       0|
|"Je vérifie mon t...|       0|
+--------------------+--------+
only showing top 10 rows



In [54]:
freq_test = df_test.stat.freqItems(["polarity"], 0.5)

In [55]:
freq_test.collect()[0]

Row(polarity_freqItems=['4', '0'])

In [64]:
df_test.printSchema() 

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)



In [72]:
df_test.groupBy("polarity") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

+--------+-----+
|polarity|count|
+--------+-----+
|       4|39869|
|       0|36890|
+--------+-----+



### 2.3.2.3 - Visualisation du jeu de données noclass

In [25]:
df_noclass.describe().show()

+-------+--------------------+
|summary|             message|
+-------+--------------------+
|  count|               51681|
|   mean|                null|
| stddev|                null|
|    min|! @ # $ Lundi de ...|
|    max|œil rose? Ce n'es...|
+-------+--------------------+



In [22]:
df_noclass.show(10)

+--------------------+
|             message|
+--------------------+
|"Dans ganga" Ne v...|
|"Dieu elton" vous...|
|"I was up up the ...|
|"Quasi" toute la ...|
|# $ & Amp; * # vi...|
|& Amp; Je parle d...|
|& Amp; nd je ne p...|
|& Gt; Pas bon à p...|
|& Lt; 3 il semble...|
|& Lt; 3 shopping ...|
+--------------------+
only showing top 10 rows



In [59]:
freq_noclass = df_noclass.stat.freqItems(["message"], 0.9)

In [61]:
freq_noclass.collect()[0]

Row(message_freqItems=["Pardonnez-moi à mon frère de ne pas payer la facture du câble mais puis-je surveiller le jeu des cavs dans le lit de quelqu'un? Ou des ailes"])

In [66]:
df_noclass.printSchema() 

root
 |-- message: string (nullable = true)



In [73]:
df_noclass.groupBy("message") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

+--------------------+-----+
|             message|count|
+--------------------+-----+
|           Mercinull|   58|
|       Nettoyez-moi!|   58|
|    Je vous remercie|   33|
|       Merci pour le|   20|
|Je ne peux pas me...|   19|
|          Merci!null|   18|
|   Je vous remercie!|   17|
|          bonne nuit|   16|
|           moi aussi|   15|
|         Bonjournull|   15|
|Si vous aimez rir...|   14|
|je suis perdu. Ai...|   13|
|    Merci pour le ff|    8|
|        Bonjour!null|    8|
|            il pleut|    7|
|Déclaration de mi...|    7|
| Merci pour le suivi|    7|
|           je t'aime|    7|
|             de rien|    7|
|          je connais|    7|
+--------------------+-----+
only showing top 20 rows



Visualisation : pour aller plus loin : https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html

### 2.3.2.4 - Analyse du jeu de données 

On peut constater à travers à la visualisation des données que : 
- les données (train, test et no class) contiennent des ponctions, ainsi que des majuscules. Pour obtenir de meilleures performances, une normalisation des données sera par conséquent nécessaire.
- les données de train et de test sont composées de 2 colonnes : message et polarity. La colonne message contient des chaînes de caractères et la colonne polarity contient les chiffres 0 et 4. Chaque chaîne de caractère est associée à un chiffre 0 ou 4. On peut considérer que nos jeux de données sont donc constitués d'un corpus de texte français, labellisé en 2 classes : 0 et 4.
- les chaines de caractères composants notre corpus dans train et test sont quasiment réparties équitablement entre les 2 classes 0 et 4.
- les données noclass sont constituées d'une unique colonne message. C'est sur ce jeu de données que nous devons effectuer notre prédiction et déterminer ainsi pour chaque chaîne de caractère de noclass, si elle appartient à la classe 0 ou à la classe 4.

## 2.4 - Pré-traitement des données

In [76]:
def word_tokenize(x):
    import nltk
    return nltk.word_tokenize(x)

def pos_tag(x):
    import nltk
    return nltk.pos_tag([x])

In [77]:
data = spark.sparkContext.textFile('train.json')

In [78]:
words = data.flatMap(word_tokenize)
words.saveAsTextFile('train_tokens')

pos_word = words.map(pos_tag)
pos_word.saveAsTextFile('train_token_pos')

In [79]:
words

PythonRDD[224] at RDD at PythonRDD.scala:53

In [80]:
pos_word

PythonRDD[225] at RDD at PythonRDD.scala:53

### 2.4.1 - Fonction de pre-processing

In [82]:
!pip install langid

Collecting langid
[?25l  Downloading https://files.pythonhosted.org/packages/ea/4c/0fb7d900d3b0b9c8703be316fbddffecdab23c64e1b46c7a83561d78bd43/langid-1.1.6.tar.gz (1.9MB)
[K    100% |████████████████████████████████| 1.9MB 1.4MB/s ta 0:00:01
Building wheels for collected packages: langid
  Building wheel for langid (setup.py) ... [?25ldone
[?25h  Stored in directory: /Users/winnievorihilala/Library/Caches/pip/wheels/29/bc/61/50a93be85d1afe9436c3dc61f38da8ad7b637a38af4824e86e
Successfully built langid
Installing collected packages: langid
Successfully installed langid-1.1.6


In [90]:
import preproc #file : preproc.py

In [91]:
dir(preproc) 

['PerceptronTagger',
 'WordNetLemmatizer',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__spec__',
 'check_blanks',
 'check_lang',
 'langid',
 'lemmatize',
 're',
 'remove_features',
 'remove_stops',
 'stopwords',
 'string',
 'tag_and_remove',
 'tagger']

### 2.4.2 - 1ère méthode de pre processing : nltk

In [92]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp 
# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())

In [115]:
# predict language and filter out those with less than 90% chance of being French
lang_df_train = df_train.withColumn("lang", check_lang_udf(df_train["message"]))
fr_df_train = lang_df_train.filter(lang_df_train["lang"] == "fr")

In [112]:
lang_df_train.show(5)

+--------------------+--------+----+
|             message|polarity|lang|
+--------------------+--------+----+
|! Comment était l...|       0|  NA|
|! d'accord! Va-t-...|       0|  NA|
|!!! Taihen desu n...|       0|  NA|
|!!!! Auto-dj .. c...|       0|  NA|
|!!!! Ce n'est que...|       0|  NA|
+--------------------+--------+----+
only showing top 5 rows



In [113]:
fr_df_train.show(5)

+--------------------+--------+----+
|             message|polarity|lang|
+--------------------+--------+----+
|        Cours nooooo|       0|  fr|
|Horrible n'est-ce...|       0|  fr|
|           Le hoquet|       0|  fr|
|          Maths suce|       0|  fr|
|             Mec fyl|       0|  fr|
+--------------------+--------+----+
only showing top 5 rows



In [116]:
# remove stop words to reduce dimensionality
rm_stops_df_train = fr_df_train.withColumn("stop_message", remove_stops_udf(fr_df_train["message"]))


In [118]:
rm_stops_df_train.show(5)

+--------------------+--------+----+--------------------+
|             message|polarity|lang|        stop_message|
+--------------------+--------+----+--------------------+
|        Cours nooooo|       0|  fr|        Cours nooooo|
|Horrible n'est-ce...|       0|  fr|Horrible n'est-ce...|
|           Le hoquet|       0|  fr|           Le hoquet|
|          Maths suce|       0|  fr|          Maths suce|
|             Mec fyl|       0|  fr|             Mec fyl|
+--------------------+--------+----+--------------------+
only showing top 5 rows



In [130]:
# remove other non essential words, think of it as my personal stop word list
rm_features_df_train = rm_stops_df_train.withColumn("feat_message", remove_features_udf(rm_stops_df_train["stop_message"]))


In [131]:
rm_features_df_train.show(5)

+--------------------+--------+----+--------------------+------------------+
|             message|polarity|lang|        stop_message|      feat_message|
+--------------------+--------+----+--------------------+------------------+
|        Cours nooooo|       0|  fr|        Cours nooooo|      cours nooooo|
|Horrible n'est-ce...|       0|  fr|Horrible n'est-ce...|horrible  est  pas|
|           Le hoquet|       0|  fr|           Le hoquet|            hoquet|
|          Maths suce|       0|  fr|          Maths suce|        maths suce|
|             Mec fyl|       0|  fr|             Mec fyl|           mec fyl|
+--------------------+--------+----+--------------------+------------------+
only showing top 5 rows



In [145]:
rm_features_df_train.printSchema() 

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_message: string (nullable = true)
 |-- feat_message: string (nullable = true)



In [132]:
# tag the words remaining and keep only Nouns, Verbs and Adjectives
tagged_df_train = rm_features_df_train.withColumn("tagged_message", tag_and_remove_udf(rm_features_df_train["feat_message"]))


In [133]:
tagged_df_train.show(5)

+--------------------+--------+----+--------------------+------------------+------------------+
|             message|polarity|lang|        stop_message|      feat_message|    tagged_message|
+--------------------+--------+----+--------------------+------------------+------------------+
|        Cours nooooo|       0|  fr|        Cours nooooo|      cours nooooo|     cours nooooo |
|Horrible n'est-ce...|       0|  fr|Horrible n'est-ce...|horrible  est  pas| horrible est pas |
|           Le hoquet|       0|  fr|           Le hoquet|            hoquet|           hoquet |
|          Maths suce|       0|  fr|          Maths suce|        maths suce|       maths suce |
|             Mec fyl|       0|  fr|             Mec fyl|           mec fyl|          mec fyl |
+--------------------+--------+----+--------------------+------------------+------------------+
only showing top 5 rows



In [146]:
tagged_df_train.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_message: string (nullable = true)
 |-- feat_message: string (nullable = true)
 |-- tagged_message: string (nullable = true)



In [134]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df_train = tagged_df_train.withColumn("lemm_message", lemmatize_udf(tagged_df_train["tagged_message"]))

In [135]:
lemm_df_train.show(5)

+--------------------+--------+----+--------------------+------------------+------------------+---------------+
|             message|polarity|lang|        stop_message|      feat_message|    tagged_message|   lemm_message|
+--------------------+--------+----+--------------------+------------------+------------------+---------------+
|        Cours nooooo|       0|  fr|        Cours nooooo|      cours nooooo|     cours nooooo |   cours nooooo|
|Horrible n'est-ce...|       0|  fr|Horrible n'est-ce...|horrible  est  pas| horrible est pas |horrible est pa|
|           Le hoquet|       0|  fr|           Le hoquet|            hoquet|           hoquet |         hoquet|
|          Maths suce|       0|  fr|          Maths suce|        maths suce|       maths suce |      math suce|
|             Mec fyl|       0|  fr|             Mec fyl|           mec fyl|          mec fyl |        mec fyl|
+--------------------+--------+----+--------------------+------------------+------------------+---------

In [147]:
lemm_df_train.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_message: string (nullable = true)
 |-- feat_message: string (nullable = true)
 |-- tagged_message: string (nullable = true)
 |-- lemm_message: string (nullable = true)



In [166]:
# remove all rows containing only blank spaces
check_blanks_df_train = lemm_df_train.withColumn("is_blank", check_blanks_udf(lemm_df_train["lemm_message"]))
no_blanks_df_train = check_blanks_df_train.filter(check_blanks_df_train["is_blank"] == "False")

In [137]:
check_blanks_df_train.show(5)

+--------------------+--------+----+--------------------+------------------+------------------+---------------+--------+
|             message|polarity|lang|        stop_message|      feat_message|    tagged_message|   lemm_message|is_blank|
+--------------------+--------+----+--------------------+------------------+------------------+---------------+--------+
|        Cours nooooo|       0|  fr|        Cours nooooo|      cours nooooo|     cours nooooo |   cours nooooo|   False|
|Horrible n'est-ce...|       0|  fr|Horrible n'est-ce...|horrible  est  pas| horrible est pas |horrible est pa|   False|
|           Le hoquet|       0|  fr|           Le hoquet|            hoquet|           hoquet |         hoquet|   False|
|          Maths suce|       0|  fr|          Maths suce|        maths suce|       maths suce |      math suce|   False|
|             Mec fyl|       0|  fr|             Mec fyl|           mec fyl|          mec fyl |        mec fyl|   False|
+--------------------+--------+-

In [148]:
check_blanks_df_train.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_message: string (nullable = true)
 |-- feat_message: string (nullable = true)
 |-- tagged_message: string (nullable = true)
 |-- lemm_message: string (nullable = true)
 |-- is_blank: string (nullable = true)



In [138]:
no_blanks_df_train.show(5)

+--------------------+--------+----+--------------------+------------------+------------------+---------------+--------+
|             message|polarity|lang|        stop_message|      feat_message|    tagged_message|   lemm_message|is_blank|
+--------------------+--------+----+--------------------+------------------+------------------+---------------+--------+
|        Cours nooooo|       0|  fr|        Cours nooooo|      cours nooooo|     cours nooooo |   cours nooooo|   False|
|Horrible n'est-ce...|       0|  fr|Horrible n'est-ce...|horrible  est  pas| horrible est pas |horrible est pa|   False|
|           Le hoquet|       0|  fr|           Le hoquet|            hoquet|           hoquet |         hoquet|   False|
|          Maths suce|       0|  fr|          Maths suce|        maths suce|       maths suce |      math suce|   False|
|             Mec fyl|       0|  fr|             Mec fyl|           mec fyl|          mec fyl |        mec fyl|   False|
+--------------------+--------+-

In [169]:
no_blanks_df_train.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- stop_message: string (nullable = true)
 |-- feat_message: string (nullable = true)
 |-- tagged_message: string (nullable = true)
 |-- lemm_message: string (nullable = true)
 |-- is_blank: string (nullable = true)



In [172]:
# select only the columns we care about
train_data_set = no_blanks_df_train.select(no_blanks_df_train['lemm_message'], no_blanks_df_train['polarity'])

In [173]:
train_data_set.printSchema() 

root
 |-- lemm_message: string (nullable = true)
 |-- polarity: string (nullable = true)



In [174]:
train_data_set = train_data_set.withColumnRenamed("lemm_message", "message")

In [175]:
train_data_set.printSchema() 

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)



In [176]:
train_data_set.show()

+--------------------+--------+
|             message|polarity|
+--------------------+--------+
|        cours nooooo|       0|
|     horrible est pa|       0|
|              hoquet|       0|
|           math suce|       0|
|             mec fyl|       0|
|                  pa|       0|
|     toooooday pleut|       0|
|        urgent lundi|       0|
|         fonctionner|       0|
|veut son argent rudd|       0|
|              bureau|       0|
|             le sims|       0|
|              pa bon|       0|
|              pa bon|       0|
|            toutnull|       0|
|     vraiment triste|       0|
|            matthieu|       0|
|                 plu|       0|
|              raison|       0|
|   suppose est parti|       0|
+--------------------+--------+
only showing top 20 rows



In [177]:
train_data_set.count()

119

In [178]:
train_data_set.groupBy("polarity") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

+--------+-----+
|polarity|count|
+--------+-----+
|       4|   73|
|       0|   46|
+--------+-----+



In [180]:
train_data_set.describe().show()

+-------+-----------+------------------+
|summary|    message|          polarity|
+-------+-----------+------------------+
|  count|        119|               119|
|   mean|       null| 2.453781512605042|
| stddev|       null|1.9560765779941627|
|    min|           |                 0|
|    max|yeay accord|                 4|
+-------+-----------+------------------+



In [181]:
train_data_set.show(30)

+--------------------+--------+
|             message|polarity|
+--------------------+--------+
|        cours nooooo|       0|
|     horrible est pa|       0|
|              hoquet|       0|
|           math suce|       0|
|             mec fyl|       0|
|                  pa|       0|
|     toooooday pleut|       0|
|        urgent lundi|       0|
|         fonctionner|       0|
|veut son argent rudd|       0|
|              bureau|       0|
|             le sims|       0|
|              pa bon|       0|
|              pa bon|       0|
|            toutnull|       0|
|     vraiment triste|       0|
|            matthieu|       0|
|                 plu|       0|
|              raison|       0|
|   suppose est parti|       0|
|              hoquet|       0|
|            med suck|       0|
|      mon ventre tue|       0|
|         est dommage|       0|
|         est dommage|       0|
|                 bbm|       0|
|           peut suce|       0|
|              est pa|       0|
|       

In [9]:
#conversion du type de la colonne polarity en int
#train_data_set = train_data_set.withColumn("polarity", train_data_set["polarity"].cast('int'))
df_train = df_train.withColumn("polarity", df_train["polarity"].cast('int'))
df_test = df_test.withColumn("polarity", df_test["polarity"].cast('int'))
df_train.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: integer (nullable = true)



In [182]:
# split training & validation sets with 80% to training and use a seed value of 1987
splits = train_data_set.randomSplit([0.8, 0.2])
training_df = splits[0]
test_df = splits[1]

Avec cette première méthode de pre processing, on constate que les données sont bien normalisées ( plus de ponctuations, ni de stopwords français (préalablement définis dans la librairie ntlk et composés d'un ensemble de mots courants tels que de, je, ...),  plus de caractères spéciaux et les majuscules ont été transformés en minuscule). Cependant, on passe d'un jeu de donnée non pré-traité faisant 128401 lignes à un jeu de donnée pré-traité faisant 119 lignes, soit une perte d'informations de 99% ce qui n'est absolument pas normal. Par conséquent, les mots stopwords à soustraire à un jeu de données dépendent de notre jeu de données. Dans notre cas, le données ne présentant pas une quantité de bruit énorme, il est préférable de spécifier à la main les stopwords à enlever.

### 2.4.3 - 2ème méthode de pre processing : regexTokenizer et stopwordsRemover

In [26]:
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer

add_stopwords = ["!","!!!","...","..","-","*",".","!","?","Gt","Lt","gt","lt","http","https","amp","rt","t","c","the","null","&","Quot", "quot"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
regexTokenizer = RegexTokenizer(inputCol="message", outputCol="words", pattern="\\W")

## 2.5 - Extraction de features avec CountVector, TF IDF, word2vec et StringIndexer


In [27]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, StringIndexer

countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=4)
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=4)
w2v = Word2Vec(inputCol= 'filtered', outputCol= 'features', vectorSize= 100)
label_stringIdx = StringIndexer(inputCol = "polarity", outputCol = "label")

## 2.6 - Classification de sentiments

### 2.6.1 -  Clasification avec NaivesBayes et Logistic Regression

In [28]:
from pyspark.ml.classification import NaiveBayes, LogisticRegression

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

### 2.6.2 - Prediction sur les données de test

### 2.6.2.1 - 1ère méthode : avec CountVectors

In [30]:
%%time

from pyspark.ml import Pipeline

predictions = {}
models = [nb, lr]
name_models = ['naive_bayes','logistic_regression']
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

for i, model in enumerate(models):
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, model])
    mod = pipeline.fit(df_train)
    prediction_train = mod.transform(df_train)
    prediction_test = mod.transform(df_test)
    
    predictions['prediction_train_%s_%s'%(name_models[i],'CountVectorizer')] = prediction_train
    predictions['prediction_test_%s_%s'%(name_models[i],'CountVectorizer')] = prediction_test
    
    print("\nLe nom du modèle utilisé est : %s" %(name_models[i]))
    print("Le score obtenu sur les données d'entraînement est : %.3f" %(evaluator.evaluate(prediction_train)))
    print("Le Score sur les données de test est: %.3f \n" %(evaluator.evaluate(prediction_test)))


Le nom du modèle utilisé est : naive_bayes
Le score obtenu sur les données d'entraînement est : 0.790
Le Score sur les données de test est: 0.768 


Le nom du modèle utilisé est : logistic_regression
Le score obtenu sur les données d'entraînement est : 0.802
Le Score sur les données de test est: 0.769 

CPU times: user 266 ms, sys: 66.4 ms, total: 333 ms
Wall time: 27.3 s


### 2.6.2.2 - 2ème méthode : avec TF-IDF

In [33]:
%%time

from pyspark.ml import Pipeline

models = [nb, lr]
name_models = ['naive_bayes','logistic_regression']
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

for i, model in enumerate(models):
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx, model])
    mod = pipeline.fit(df_train)
    prediction_train = mod.transform(df_train)
    prediction_test = mod.transform(df_test)
    
    predictions['prediction_train_%s_%s'%(name_models[i],'tf_idf')] = prediction_train
    predictions['prediction_test_%s_%s'%(name_models[i],'tf_idf')] = prediction_test
    
    print("\nLe nom du modèle utilisé est : %s" %(name_models[i]))
    print("Le score obtenu sur les données d'entraînement est : %.3f" %(evaluator.evaluate(prediction_train)))
    print("Le Score sur les données de test est: %.3f \n" %(evaluator.evaluate(prediction_test)))


Le nom du modèle utilisé est : naive_bayes
Le score obtenu sur les données d'entraînement est : 0.773
Le Score sur les données de test est: 0.736 


Le nom du modèle utilisé est : logistic_regression
Le score obtenu sur les données d'entraînement est : 0.786
Le Score sur les données de test est: 0.751 

CPU times: user 320 ms, sys: 79.3 ms, total: 399 ms
Wall time: 33.8 s


Pour un temps d'execution pratiquement identique, nous obtenons de meilleures performances avec CountVectorizer plutôt qu'avec TF-IDF.

In [35]:
predictions['prediction_test_logistic_regression_CountVectorizer'] \
    .select("message","label","prediction") \
    .show(n = 20, truncate = 30)

+------------------------------+-----+----------+
|                       message|label|prediction|
+------------------------------+-----+----------+
|! Et si affamé mais pas de ...|  1.0|       1.0|
|! Identica présente actuell...|  1.0|       0.0|
|! Je vais enfin m'endormir ...|  1.0|       1.0|
|!?!? C'est un jour que j'ai...|  1.0|       1.0|
|"Easy" qu Le plancher en bo...|  1.0|       0.0|
|"Empire du soleil" L'auteur...|  1.0|       1.0|
|"Heart" quot; N'est pas un ...|  1.0|       1.0|
|"I need" & quot; -? V1-1333...|  1.0|       1.0|
|"Ils ont trouvé sonny? & Qu...|  1.0|       0.0|
|"Je vérifie mon twitter cha...|  1.0|       1.0|
|"Recevoir un préavis avec t...|  1.0|       0.0|
|"Statut sélectif de twitter...|  1.0|       1.0|
|"Sur la musique populaire &...|  1.0|       1.0|
|"The dating expert" Me suit...|  1.0|       0.0|
|& Amp; Tout le reste je n'a...|  1.0|       1.0|
|& Gt; Le frère est sur son ...|  1.0|       1.0|
|& Gt; Que cela impliquera l...|  1.0|       0.0|


### 2.6.3 -  Cross validation 

Pyspark offre la possibilité d'utilsier des fonctions telles que CrossValidator et ParamGridBuilder pour effectuer la cross validation et optimiser ainsi les performances des modèles.


In [19]:
#temps d'execution, environ 45 min

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import numpy as np

lr = LogisticRegression()
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, lr])
vectC = np.logspace(-3, -2, 20)
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, vectC) 
             .addGrid(lr.elasticNetParam, [0.1, 0.15]) 
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=10)
cvModel_lr = cv.fit(df_train)

predictions_test = cvModel_lr.transform(df_test)
predictions_train = cvModel_lr.transform(df_train)
print("\nNom du modèle utilisé : logistic regression")
print("Score sur les données d'entraînement : %.3f" %(evaluator.evaluate(predictions_train)))
print("Score sur les données test : %.3f \n" %(evaluator.evaluate(predictions_test)))


Nom du modèle utilisé : logistic regression
Score sur les données d'entraînement : 0.809
Score sur les données test : 0.778 



In [36]:
%%time

import numpy as np

nb = NaiveBayes(modelType="multinomial")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, nb])
paramGrid = (ParamGridBuilder()
             .addGrid(nb.smoothing, [0,1,2,5,10]) 
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=10)
cvModel_nb = cv.fit(df_train)

predictions_test = cvModel_nb.transform(df_test)
predictions_train = cvModel_nb.transform(df_train)

print("\nLe nom du modèle utilisé est : naive bayes")
print("Le score obtenu sur les données d'entraînement est : %.3f" %(evaluator.evaluate(predictions_train)))
print("Le score obtenu sur les données de test est : %.3f" %(evaluator.evaluate(predictions_test)))


Le nom du modèle utilisé est : naive bayes
Le score obtenu sur les données d'entraînement est : 0.789
Le score obtenu sur les données de test est : 0.768
CPU times: user 8.08 s, sys: 2.04 s, total: 10.1 s
Wall time: 3min 45s


In [39]:
%%time

bestModel = cvModel_lr.bestModel
param_dict = bestModel.stages[-1].extractParamMap()

sane_dict = {}
for k, v in param_dict.items():
    sane_dict[k.name] = v

print('La meilleure valeur à attribuer au paramètre regParam de Logistic Regression est : %.3f' %sane_dict["regParam"])
print('\nBest elastic_net à attribuer au paramètre regParam de Logistic Regression est : %.3f' %sane_dict["elasticNetParam"])

print( " ")

La meilleure valeur à attribuer au paramètre regParam de Logistic Regression est : 0.009

Best elastic_net à attribuer au paramètre regParam de Logistic Regression est : 0.100
 
CPU times: user 473 µs, sys: 256 µs, total: 729 µs
Wall time: 532 µs


## 2.7 - Prediction sur le jeu de données noclass.json et sauveragarde du fichier

In [40]:
predictions_noclass = cvModel_lr.transform(df_test).select("message","prediction")
predictions_noclass = predictions_noclass.withColumn("prediction", predictions_noclass["prediction"].cast('string'))
predictions_noclass.show()

+--------------------+----------+
|             message|prediction|
+--------------------+----------+
|! Et si affamé ma...|       1.0|
|! Identica présen...|       0.0|
|! Je vais enfin m...|       1.0|
|!?!? C'est un jou...|       1.0|
|"Easy" qu Le plan...|       0.0|
|"Empire du soleil...|       1.0|
|"Heart" quot; N'e...|       1.0|
|"I need" & quot; ...|       1.0|
|"Ils ont trouvé s...|       0.0|
|"Je vérifie mon t...|       0.0|
|"Recevoir un préa...|       0.0|
|"Statut sélectif ...|       1.0|
|"Sur la musique p...|       1.0|
|"The dating exper...|       0.0|
|& Amp; Tout le re...|       1.0|
|& Gt; Le frère es...|       1.0|
|& Gt; Que cela im...|       0.0|
|& Gt; __ & lt; Je...|       0.0|
|& Lt; & lt; -----...|       1.0|
|& Lt; --- ne peut...|       0.0|
+--------------------+----------+
only showing top 20 rows



#### Sauvegarde du fichier :

In [41]:
predictions_noclass.coalesce(1).write.format('json').save('noclass_predit.json')

# 3 - RECAPITULATIF DES METHODES UTILISEES

Pour pré-processer nos données, nous avons utilisé 2 méthodes : <br>

- <p style="text-align:justify;"><strong> ntlk : </strong> NLTK signifie Natural Language Toolkit. Cette boîte à outils est l'une des bibliothèques NLP les plus puissantes qui contient des packages permettant aux machines de comprendre le langage humain et d'y répondre avec une réponse appropriée. Tokenization, Stemming, Lemmatization, Punctuation, Character count, word count sont quelques-uns de ces packages.<br> </p>
<br>
- <p style="text-align:justify;"><strong> regexTokenizer et stopWordsRemover </strong> : La tokenisation est le processus consistant à prendre du texte (comme une phrase) et à le diviser en termes individuels (généralement des mots). Une simple classe Tokenizer fournit cette fonctionnalité. L'exemple ci-dessous montre comment diviser des phrases en séquences de mots. <br>
RegexTokenizer permet une tokenisation plus avancée basée sur la correspondance d'expressions régulières (regex). Par défaut, le paramètre «pattern» (regex, default:) "\\s+"est utilisé comme délimiteur pour diviser le texte d'entrée. Alternativement, les utilisateurs peuvent définir le paramètre «lacunes» sur faux indiquant que le «modèle» regex désigne les «jetons» plutôt que de fractionner les lacunes, et trouver toutes les occurrences correspondantes comme résultat de la tokenisation. <br> </p>
<br>

Pour l'extraction de features, nous avons utilisé : <br>
- <p style="text-align:justify;"><strong> CountVectors : </strong> CountVectorizer et CountVectorizerModel visent à aider à convertir une collection de documents texte en vecteurs de décompte de jetons. Lorsqu'un dictionnaire a priori n'est pas disponible, CountVectorizerpeut être utilisé comme un Estimator pour extraire le vocabulaire, et génère un CountVectorizerModel. Le modèle produit des représentations clairsemées pour les documents sur le vocabulaire, qui peuvent ensuite être transmises à d'autres algorithmes comme LDA.
<br>
Au cours du processus d'ajustement, CountVectorizersélectionnera les meilleurs vocabSizemots classés par fréquence de terme dans le corpus. Un paramètre facultatif minDFaffecte également le processus d'ajustement en spécifiant le nombre minimum (ou la fraction si inférieur à 1,0) de documents dans lesquels un terme doit apparaître pour être inclus dans le vocabulaire. Un autre paramètre de basculement binaire facultatif contrôle le vecteur de sortie. Si la valeur est vraie, tous les nombres non nuls sont définis sur 1. Ceci est particulièrement utile pour les modèles probabilistes discrets qui modélisent les nombres binaires, plutôt qu'entiers. <br> </p>
<br>
- <p style="text-align:justify;"> <strong> TDF IDF </strong> : TF-IDF (Term Frequency-inverse Document Frequency) est une méthode de vectorisation de features largement utilisée dans l'exploration de texte ou text mining pour refléter l'importance d'un terme pour un document dans le corpus. Notons un terme par t, un document par d et le corpus par D. <br>
La fréquence du terme TF(t,d) est le nombre de fois où ce terme t apparaît dans le document d, tandis que la fréquence du document DF(t,D) est le nombre de documents qui contient le terme t. Si nous utilisons uniquement la fréquence des termes pour mesurer l'importance, il est très facile de surestimer les termes qui apparaissent très souvent mais qui contiennent peu d'informations sur le document, par exemple «a», «le» et «de». Si un terme apparaît très souvent dans le corpus, cela signifie qu'il ne contient pas d'informations spéciales sur un document particulier. La fréquence inverse des documents est une mesure numérique de la quantité d'informations fournies par un terme : <br> </p>
<p>
    <img src="idf.png" alt="Formule de calcul de l''IDF" />
</p>
<br>
<p style="text-align:justify;">où |D| est le nombre total de documents dans le corpus. Puisque le logarithme est utilisé, si un terme apparaît dans tous les documents, sa valeur IDF devient 0. Notons qu'un terme de lissage est appliqué pour éviter de diviser par zéro pour les termes en dehors du corpus. La mesure TF-IDF est simplement le produit de TF et IDF: <br> </p>
<p>
    <img src="tfidf.png" alt="Formule de calcul du TFIDF" />
</p>
<br>
<p style="text-align:justify;">Il existe plusieurs variantes sur la définition de la fréquence des termes et de la fréquence des documents. Dans MLlib, nous séparons TF et IDF pour les rendre flexibles. <br> </p>
<br>
<p style="text-align:justify;"><strong> TF : </strong> Les deux HashingTFet CountVectorizerpeuvent être utilisés pour générer le terme vecteurs de fréquence. HashingTF est un transformateur qui prend des ensembles de termes et convertit ces ensembles en vecteurs d'entités de longueur fixe. Dans le traitement de texte, un «ensemble de termes» peut être un sac de mots. HashingTF utilise l'astuce de hachage. Une fonction brute est mappée dans un index (terme) en appliquant une fonction de hachage. La fonction de hachage utilisée ici est MurmurHash 3. Ensuite, les fréquences des termes sont calculées sur la base des indices cartographiés. Cette approche évite d'avoir à calculer une carte terme-index globale, ce qui peut être coûteux pour un grand corpus, mais elle souffre de collisions de hachage potentielles, où différentes caractéristiques brutes peuvent devenir le même terme après le hachage. Pour réduire les risques de collision, nous pouvons augmenter la dimension de l'entité cible, c'est-à-dire le nombre de compartiments de la table de hachage. Puisqu'un simple modulo est utilisé pour transformer la fonction de hachage en un index de colonne, il est conseillé d'utiliser une puissance de deux comme dimension d'entité, sinon les entités ne seront pas mappées uniformément aux colonnes. La dimension d'entité par défaut est : 2^18 = 262 144. Un paramètre à bascule binaire facultatif contrôle le nombre de fréquences des termes. Lorsqu'il est défini sur true, tous les nombres de fréquences non nuls sont définis sur 1. Ceci est particulièrement utile pour les modèles probabilistes discrets qui modélisent les nombres binaires plutôt qu'entiers. <br> </p>
<br>
<p style="text-align:justify;"><strong> IDF : </strong> IDFest un Estimator qui est adapté à un ensemble de données et produit un IDFModel. La IDFModel prend des vecteurs de caractéristiques (généralement créés à partir de HashingTFou CountVectorizer) et met à l'échelle chaque colonne. Intuitivement, il pondère les colonnes qui apparaissent fréquemment dans un corpus. <br> </p>
<br>

<p style="text-align:justify;">=>Voici ce qui se fait dans la pratique : supposons que nous disposons d'un ensemble de phrases. Nous allons diviser chaque phrase en mots en utilisant Tokenizer. Pour chaque phrase (sac de mots), nous utilisons HashingTF pour hacher la phrase en un vecteur de feature. Puis nous utilisons IDF pour redimensionner les vecteurs de feature; cela améliore généralement les performances lors de l'utilisation de texte comme features. Nos vecteurs de caractéristiques pourraient ensuite être passés à un algorithme d'apprentissage. </p>
<br>

- <p style="text-align:justify;"><strong> Word2Vec : </strong> Word2Vec est un Estimator qui prend des séquences de mots représentant des documents et forme un Word2VecModel. Le modèle associe chaque mot à un vecteur unique de taille fixe. Le Word2VecModel transforme chaque document en vecteur en utilisant la moyenne de tous les mots du document; ce vecteur peut ensuite être utilisé comme fonctionnalités pour la prédiction. <br> </p>
- <p style="text-align:justify;"><strong> StringIndexer : </strong> StringIndexer code une colonne de chaînes d'étiquettes en une colonne d'index d'étiquettes. Les indices sont entrés (0, numLabels), classés par fréquences d'étiquettes, de sorte que l'étiquette la plus fréquente est indexée 0. Si la colonne d'entrée est numérique, nous la convertissons en chaîne et indexons les valeurs de chaîne. <br>
Supposons que nous ayons le DataFrame suivant avec des colonnes id et category: <br> </p>
<p>
    <img src="exemple1.png" alt="StringIndexer_exemple1" />
</p>
<br>
<p style="text-align:justify;">category est une colonne de chaîne de caractères avec trois étiquettes: «a», «b» et «c». En appliquant StringIndexer avec category comme colonne d'entrée et categoryIndex comme colonne de sortie, nous devrions obtenir ce qui suit: </p>
<p>
    <img src="exemple2.png" alt="StringIndexer_exemple2" />
</p>
<br>
«a» obtient l'index 0 car c'est le plus fréquent, suivi de «c» avec index 1 et de «b» avec index 2.
<br>
<br>
<br>
Pour la classification de sentiments, nous avons utilisé : <br>
<br>
- <p style="text-align:justify;"><strong> Naives Bayes :</strong> Naive Bayes est un algorithme de classification multiclasse simple avec l'hypothèse d'indépendance entre chaque paire de fonctionnalités. Naive Bayes peut être formé très efficacement. En un seul passage aux données d'apprentissage, il calcule la distribution de probabilité conditionnelle de chaque entité donnée étiquettée, puis il applique le théorème de Bayes pour calculer la distribution de probabilité conditionnelle de l'étiquette à partir d'une observation, et il va l'utiliser pour la prédiction. <br>
spark.mllib prend en charge les Bayes naïfs multinomiaux et les Bayes naïfs de Bernoulli . Ces modèles sont généralement utilisés pour la classification des documents . Dans ce contexte, chaque observation est un document et chaque entité représente un terme dont la valeur est la fréquence du terme (dans les Bayes naïfs multinomiaux) ou un zéro ou un indiquant si le terme a été trouvé dans le document (dans les Bayes naïfs de Bernoulli). Les valeurs des entités doivent être <font color="red">non négatives</font>. Le type de modèle est sélectionné avec un paramètre facultatif «multinomial» ou «bernoulli» avec «multinomial» par défaut. Le lissage additif peut être utilisé en définissant le paramètre λ (par défaut à 1.0). Pour la classification des documents, les vecteurs d'entités en entrée sont généralement clairsemés et les vecteurs clairsemés doivent être fournis en entrée pour tirer parti de la rareté. Étant donné que les données d'apprentissage ne sont utilisées qu'une seule fois, il n'est pas nécessaire de les mettre en cache. <br></p>
<br>
- <p style="text-align:justify;"><strong> Regression logistique :</strong> la régression logistique est une méthode populaire pour prédire une réponse catégorique. Il s'agit d'un cas particulier des modèles linéaires généralisés qui prédit la probabilité des résultats. Dans spark.ml, on peut utiliser la régression logistique pour prédire un résultat binaire en utilisant la régression logistique binomiale. On peut également utiliser la régression logistique pour prédire un résultat multiclasse en utilisant la régression logistique multinomiale. Pour cela, il faut utiliser le paramètre family pour selectionner entre ces deux algorithmes. On peut également laisser ce paramètre non-défini, et dans ce cas, c'est Spark qui déduira la bonne variante. </p>
<br>
<br>
<p style="text-align:justify;"> Spark ML fournit d'autres méthodes de classification telles que le decision tree classifier, le random forest classifier, le gradient-boosted tree classifier, le multilayer perceptron classifier, le One-vs-Rest classifier (a.k.a.One-vs-All). Dans ce TP, nous n'avons utilisé que Naive Bayes et Logistic Regression, mais il peut être intéressant d'utiliser toutes ces méthodes et de déterminer laquelle donne la meilleure performance su notre jeu de donnée. </p>

# 4 - DISTRIBUTION DES CALCULS

## 4.1 - Map Reduce


<strong> QUOI : </strong>  modèle de programmation qui fournit un cadre pour automatiser le calcul parallèle sur des données massives. <br>
<br>
MapReduce a été conçu chez Google car c'est une opération nécessaire au calcul du fameux PageRank, utilisé pour ordonnancer les résultats d'une recherche Web. <br>
Ce modèle a été proposé dans les années 2000, par deux ingénieurs de chez Google, qui ont observé qu'un grand nombre des traitements massivement parallèles, mis en place pour les besoins de leur moteur de recherche, suivaient une stratégie de parallélisation identique. De ces observations est né le modèle de programmation MapReduce, décrit pour la première fois en 2004 dans un article de recherche. (link : https://static.googleusercontent.com/media/research.google.com/fr//archive/mapreduce-osdi04.pdf) 
<br>

<p style="text-align:justify;"><strong> COMMENT : </strong> MapReduce c'est "divisez pour distribuer pour régner" en ce sens que la stratégie mise en place pour exécuter un calcul sur des données massives consiste à découper les données en sous-ensembles de plus petite taille, qui sont appelés des lots ou des fragments, et à affecter chaque lot à une machine du cluster permettant ainsi leur traitement en parallèle. Il suffira ensuite d'agréger l'ensemble des résultats intermédiaires obtenus pour chaque lot pour construire le résultat final. <br>
* MAP consiste à appliquer une même fonction à tous les éléments de la liste; <br>
* REDUCE applique une fonction récursivement à une liste et retourne un seul résultat; <br>
Map et Reduce sont des opérateurs génériques et leur combinaison permet donc de modéliser énormément de problèmes. <br> </p>
L'ensemble des données est représentée sous la forme de paires(clé, valeur), à l'instar des tables d'association. <br>

Pour résumer :  <br>

1-L'ensemble des données à traiter est découpé en plusieurs lots ou sous-ensembles.<br>

2-Dans une première étape, l'étape MAP, l'opération map, spécifiée pour notre problème, est appliquée à chaque lot. Cette opération transforme la paire(clé, valeur)représentant le lot en une liste de nouvelles paires(clé, valeur)constituant ainsi des résultats intermédiaires du traitement à effectuer sur les données complètes.<br>

3-Avant d'être envoyés à l'étape REDUCE, les résultats intermédiaires sont regroupés et triés par clé. C'est l'étape deSHUFFLE and SORT.<br>

4-Enfin, l'étape REDUCE consiste à appliquer l'opération reduce, spécifiée pour notre problème, à chaque clé. Elle agrège tous les résultats intermédiaires associés à une même clé et renvoie donc pour chaque clé une valeur unique.<br>


<p>
    <center> Architecture Map Reduce : </center> <br />
    <img src="mapreduce.png" alt="Architecture Map Reduce" />
</p>

<p style="text-align:justify;">Spark élargit le cadre map/reduce en proposant à l'utilisateur des opérations supplémentaires pouvant être réalisées de manière distribuée. Spark emploie diverses techniques d'optimisation, mais au final tout calcul distribué est réalisé sous la forme d'opérations map/reduce. D'une certaine manière, MapReduce est le langage assembleur du calcul distribué : les outils permettant de réaliser des calculs distribués, tel Spark, permettent à l'utilisateur de s'abstraire de MapReduce ; tout comme les langages de programmation de haut niveau peuvent être compilés en assembleur mais permettent de ne pas avoir à écrire soi-même des programmes en assembleur.</p

## 4.2 - Cluster Spark

Un cluster Spark est composé de :

- un ou plusieurs workers : chaque worker instancie un executor chargé d'exécuter les différentes tâches de calculs.
- un driver : chargé de répartir les tâches sur les différents executors. C'est le driver qui exécute la méthodemainde nos applications.
- un cluster manager : chargé d'instancier les différents workers.

La différence entre worker et executor n'est pas évidente au premier coup d'œil. En pratique, on peut voir un worker comme une machine physique, et un executor comme une application qui tourne sur cette machine. Cette distinction permet d'exécuter plusieurs applications Spark sur une même machine en même temps. Chaque worker a alors plusieurs executors. 

<p>
    <center> Workers et Executors : </center> <br />
    <img src="workers_&_executors.png" alt="Workers et Executors" />
</p>

L'option --master permet de préciser à quel type de cluster manager l'application Spark peut être envoyée. Spark peut fonctionner en se connectant à des cluster managers de types différents :<br>

--master spark://HOTE:PORT: utilise le cluster manager autonome de Spark. <br>
--master mesos://HOTE:PORT: se connecte à un cluster manager Mesos. <br>
--master yarn: se connecte à un cluster manager Yarn.<br>
--master local: pas de cluster manager, Spark fonctionne en mode local. Il est possible de spécifier le nombre d'executors dans le cluster en passant une valeur entre crochets :local[1]oulocal[4], par exemple. <br>

Par défaut, si l'option --master n'est pas spécifiée, Spark fonctionne en mode local avec un nombre d'executors égal au nombre de cœurs physique de la machine. Alors que si l'on spécifie--master local[1]un seul des quatre cœurs de la machine sera utilisé. <br>
Le cluster manager est à ne pas confondre avec le driver. Le cluster manager est responsable de l'allocation des ressources, notamment lorsque plusieurs applications concurrentes sont exécutées sur le cluster Spark. Ce rôle d'allocation des ressources ne peut pas être confié au driver parce que le driver n'est responsable que de sa propre application.

## 4.3 - RDD

un RDD (Resilient Distributed Dataset) est un framework Spark permettant de faire du calcul distribué.

Les RDD possèdent deux types de méthodes :

- Les transformations : qui donnent en sortie un autre RDD.
- Les actions qui donnent en sortie... autre chose qu'un RDD.

Pourquoi est-ce que cette distinction est importante ? Parce que les transformations ne sont pas évaluées immédiatement. On dit qu'elles sont évaluées de manière paresseuse ("lazy evaluation"), c'est à dire uniquement lorsqu'on en a besoin. Et on n'a besoin du résultat d'une transformation que lorsqu'on effectue une action. C'est pour cela que les appels à flatMap,map et reduceByKey se font de manière quasi instantanée. C'est parce que ces appels, en soit, ne font pas grand-chose. Les fonctions passées en argument de ces transformations ne sont appelées qu'au moment de l'appel àcollect

Les principales transformations et actions disponibles dans Spark sont les suivantes :

<p>
    <center> Principales transformations et actions : </center> <br />
    <img src="RDD.png" alt="RDD" />
</p>

## 4.4 - Calculs distribués sous forme de graphe avec les DAG

Si les RDD sont aussi importants dans Spark, c'est parce qu'ils dictent la manière dont les calculs vont être distribués sur les différentes machines. Ce sont aussi les RDD qui permettent une tolérance aux pannes, sujet épineux comme on a pu le voir en introduction de ce chapitre.

Dans une application Spark, les transformations et les actions réalisées sur les RDD permettent de construire un graphe acyclique orienté (DAG : "directed acyclic graph").

<p>
    <center> Calculs distribués sous forme de graphes avec les DAG : </center> <br />
    <img src="dag.png" alt="DAG" />
</p>

Cette architecture DAG peut être visualisé lors de l'execution d'une application Spark en tapant localhost:4040/jobs/ dans un navigateur. Voici ci-dessous un exemple de visualisation de l'architecture DAG d'un job exécuté ci-dessus.

<p>
    <center> Spark et DAG Visualisation : </center> <br />
    <img src="nb.png" alt="nb" />
</p>

## 4.5 - Fonctionnement de spark : cycle de vie d'une application

Comment Spark distribue les calculs sur les différents executors ? En particulier, comment les données sont-elles réparties entre les executors ?

Les données sont découpées en partitions. Chaque partition est assignée à un des executors. Le traitement d'une partition représente une tâche : c'est la plus petite unité de traitement de données. Un cluster Spark ne peut traiter qu'une tâche à la fois par executor, et en général il y a un executor par cœur de processeur. Par ailleurs, la taille d'une partition doit rester inférieure à la mémoire disponible pour son executor. Le choix du nombre de partitions est donc crucial, puisqu'il détermine le nombre de tâches qui seront réalisées de manière concurrente sur le cluster. 

Un ensemble de tâches réalisées en parallèle, une par partition d'un RDD, constitue une étape (stage). Toutes les tâches d'une étape doivent être terminées avant que l'on puisse passer à l'étape suivante. Un job Spark est composé d'une succession d'étapes ; la progression d'un job peut donc être mesurée au nombre d'étapes qui ont été réalisées. Un job Spark est créé pour chaque action qu'on réalise sur un RDD.

<p>
    Job Spark : <br />
    <img src="job_spark.png" alt="Job Spark" />
</p>

Quelles tâches regroupent-on dans une même étape ? On passe d'une étape à une autre dès lors qu'on doit redistribuer les données entre les nœuds. On dit alors qu'il y a un shuffle. Les shuffle peuvent se produire pour des raisons différentes. Par exemple, lors d'unreduceByKeytoutes les données correspondant à une même clé sont regroupées sur la même partition. Il est important de comprendre quelles actions nécessitent un shuffle car le transfert de données entre différentes machines (par le réseau) est coûteux en temps.

<p>
    Job Spark :<br />
    <img src="job_spark_2.png" alt="Job Spark" />
</p>

Récapitulatif :
- Un job Spark correspond à une action sur un RDD et est composé de plusieurs étapes séparées par des shuffles.
- Chaque étape est composée de tâches.
- Chaque tâche s'exécute sur une partition différente des données.
- Les partitions sont réparties sur les différents executors.
- Les partitions sont créées par les Resilient Distributed Datasets (RDD).

Pour résumer, la distribution des calculs sur Spark intervient lorsque ce dernier décompose le traitement des opérations RDD en tâches, chacune étant exécutée par un exécuteur.Spark permet de partager les données en mémoire entre les graphes, de façon à ce que plusieurs jobs puissent travailler sur le même jeu de données.

# 4.6 - Distribution des calculs dans notre application Spark

Chaque SparkContext lance une interface utilisateur Web, par défaut sur le port 4040, qui affiche des informations utiles et des metrics relatifs à l'application. Ceci comprend:

- Une liste des étapes et des tâches du planificateur
- Un résumé des tailles RDD et de l'utilisation de la mémoire
- Informations environnementales.
- Informations sur les exécuteurs exécutants

<p>
    <center> Spark UI : </center><br />
    <img src="spark.png" alt="Spark UI" />
</p>
<br>
Elle permet également de visualiser l'achitecture en graphe des job (DAG Visualistion).
<p>
    <img src="job1.png" alt="Spark UI" />
</p>
<br>
Lors de la création de sparkContext, je n'ai pas configuré le master, il a pris par défaut la valeur local (etoile), ce qui indique à Spark de créer autant de threads de travail que de coeurs logiques sur mon ordinateur. Nous pouvons ainsi constater grâce à l'interface web de Spark UI que les différents calculs sont distribués par Spark sur les 4 coeurs logiquesdont dispose mon ordinateur. <br>
<p>
    <img src="logic_2.png" alt="Coeur logic Mac" />
</p>
<br>
Lorsqu'on clique sur un stage, on accède à différentes informations sur les métriques relatifs à ce stage. <br>
<p>
    <img src="metrics.png" alt="Details metrics stages" />
</p>
<br>
En suivant le tutoriel suivant https://blog.ippon.fr/2014/11/20/utiliser-apache-spark-en-cluster/, j'ai tenté de visualiser l'interface web des clusters mais cela a échoué, pourtant ./start-master.sh et tail -f spark-winnievorihilala-org.apache.spark.deploy.master.Master-1-Air-de-Winnie.out créent bien un fichier de log et affiche dans le terminal les urls pour accéder à l'interface : 

20/03/16 11:18:40 INFO Utils: Successfully started service 'sparkMaster' on port 7077.

20/03/16 11:18:40 INFO Master: Starting Spark master at spark://Air-de-Winnie:7077

20/03/16 11:18:41 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://air-de-winnie:8080

<p>
    <img src="cluster.png" alt="Job Spark" />
</p>

#### Sources :  
- https://meritis.fr/challenge/bigdata/larchitecture-framework-spark/ <br>
- https://spark.apache.org/docs/latest/api/python/index.html <br>
- https://spark.apache.org/docs/latest/rdd-programming-guide.html <br>
- https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives <br>
- https://blog.jetoile.fr/2014/05/rdd-quest-ce-que-cest.html <br>
- http://b3d.bdpedia.fr/spark-batch.html <br>
- https://blog.zenika.com/2015/02/02/introduction-a-spark/ <br>
- https://blog.ippon.fr/2014/11/20/utiliser-apache-spark-en-cluster/
