# Verification des versions

In [1]:
!python --version

Python 3.6.5 :: Anaconda, Inc.


In [2]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
                        
Using Scala version 2.11.12, Java HotSpot(TM) Client VM, 1.8.0_251
Branch 
Compiled by user  on 2019-08-27T21:21:38Z
Revision 
Url 
Type --help for more information.


# Lancement de la session spark 

SparkSession dans Spark 2.0 fournit une prise en charge intégrée des fonctionnalités Hive, notamment la possibilité d'écrire des requêtes à l'aide de HiveQL, l'accès aux UDF Hive et la possibilité de lire les données des tables Hive. Pour utiliser ces fonctionnalités, vous n'avez pas besoin d'avoir une configuration Hive existante. 

J'utilise spark SQL. Ceci nous permet d'introduire un objet : le DataFrame en spark. Il s'agit d'un objet proche du RDD, mais qui permet de stocker de manière distribuée des données strucutrées, là ou les RDD nous permettent de stocker des données non strucutrées. Il se rapproche très fortement du DataFrame de Pandas

In [3]:
#On importe findspark
import findspark
#On initialise findspark pour identifier nos chemins Spark
findspark.init()
#On importe spark session
from pyspark.sql import SparkSession
#On crée une session Spark
spark = SparkSession.builder \
    .appName ("Projet calcul distribué") \
    .getOrCreate()

# Importation de données

J'importe mon fichier d'entrainement $\textbf{train.json}$ qui est en format json

In [4]:
train_data = spark.read.json("C:/Users/na_to/OneDrive/Bureau/Insa/Mapromo/Calcul_distribue/Projet/train.json")
#4 partitions sur l'importation de données : voir http://localhost:4040 : données distribué RDD 

train_data.printSchema()

root
 |-- message: string (nullable = true)
 |-- polarity: string (nullable = true)



On a deux variables présentes dans notre base de données : message et polarity. Polarity et message sont des variables "string" . On doit les convertir en format numérique pour appliquer un modèle de classification.

In [5]:
train_data.groupby("polarity")\
     .count()\
     .show()

+--------+-----+
|polarity|count|
+--------+-----+
|       0|61475|
|       4|66926|
+--------+-----+



On a deux valeurs à prédire 4 et 0. On remarque que les données sont équilibrées.

In [6]:
from pyspark.sql.types import * 
from pyspark.sql.functions import * 
train_data = train_data.select("message", col("polarity").cast("Int").alias("label"))

On a convertit notre variable cible en entier et nommé la variable cible "Polarity" par "label" 

In [7]:
train_data.show() 

+--------------------+-----+
|             message|label|
+--------------------+-----+
|! Comment était l...|    0|
|! d'accord! Va-t-...|    0|
|!!! Taihen desu n...|    0|
|!!!! Auto-dj .. c...|    0|
|!!!! Ce n'est que...|    0|
|"Aimant" Le jour ...|    0|
|"Attrape-moi si t...|    0|
|"Beverley Knight"...|    0|
|"Crack ... break ...|    0|
|"Désolé" une cond...|    0|
|"Effrayant" Pense...|    0|
|"En espérant qu'i...|    0|
|"Feelin on my a" ...|    0|
|"Graisse réduite"...|    0|
|"Je ne sais pas s...|    0|
|"Je regarde la té...|    0|
|"Je suis fatigué ...|    0|
|"Les deux adultes...|    0|
|"Les étoiles dans...|    0|
|"Nous avons vraim...|    0|
+--------------------+-----+
only showing top 20 rows



In [8]:
train_data.take(5)

[Row(message='! Comment était la terre des écossais?', label=0),
 Row(message="! d'accord! Va-t-il apporter des armes à feu et des voitures? Nous n'avons pas ici ici des axes et des cavaliers ... o_o", label=0),
 Row(message='!!! Taihen desu ne. Une idée de quoi ou qui a causé cela?', label=0),
 Row(message="!!!! Auto-dj .. ce serait un peu dope lol ... sauf que cela me met hors d'un travail", label=0),
 Row(message="!!!! Ce n'est que hier que j'ai vu que les flambeaux continuaient à avoir des pains croisés chauds ne doivent pas être partout", label=0)]

Les tweet contienent beaucoup d’aléatoire qui nuit à l’estimation des modèles : les minuscules, les majuscules, les signes de ponctuation… On peut les garder mais plus de variabilité implique plus de données pour les apprendre. On préfère alors le nettoyer avant de le découper en mot (ou caractères ou syllabe)

# Suppression des ponctuations

on remarque qu'on a beaucoup de ponctuation dans les tweet, il serait intéressant de les supprimer pour réduire nos variables dans l'apprentissage

In [9]:
from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
    """Supprime la ponctuation, passe en minuscules et supprime les espaces de début et de fin..

    Note:
        Seuls les espaces, les lettres et les chiffres doivent être conservés.  

    Args:
        colonne (Column): Une colonne contenant une phrase.

    Returns:
        Colonne : une colonne nommée "message" avec des opérations de nettoyage appliquées.
    """
    return lower(trim(regexp_replace(column,'\\p{Punct}',''))).alias('message')

lower : Pour mettre les tweets en minuscules

\\\p{Punct} : la classe de caractères pour la ponctuation 

In [10]:
remove_train = train_data.select(removePunctuation(col('message')),"label")

In [11]:
remove_train.show()

+--------------------+-----+
|             message|label|
+--------------------+-----+
|comment était la ...|    0|
|daccord vatil app...|    0|
|taihen desu ne un...|    0|
|autodj  ce serait...|    0|
|ce nest que hier ...|    0|
|aimant le jour de...|    0|
|attrapemoi si tu ...|    0|
|beverley knight p...|    0|
|crack  break  sha...|    0|
|désolé une condit...|    0|
|effrayant penser ...|    0|
|en espérant quil ...|    0|
|feelin on my a qu...|    0|
|graisse réduite b...|    0|
|je ne sais pas si...|    0|
|je regarde la tél...|    0|
|je suis fatigué d...|    0|
|les deux adultes ...|    0|
|les étoiles dans ...|    0|
|nous avons vraime...|    0|
+--------------------+-----+
only showing top 20 rows



In [12]:
remove_train.take(5)

[Row(message='comment était la terre des écossais', label=0),
 Row(message='daccord vatil apporter des armes à feu et des voitures nous navons pas ici ici des axes et des cavaliers  oo', label=0),
 Row(message='taihen desu ne une idée de quoi ou qui a causé cela', label=0),
 Row(message='autodj  ce serait un peu dope lol  sauf que cela me met hors dun travail', label=0),
 Row(message='ce nest que hier que jai vu que les flambeaux continuaient à avoir des pains croisés chauds ne doivent pas être partout', label=0)]

Dans cette partie, on peut remarquer que les ponctuations ont bien été supprimées et que les caractères sont tous en minuscules. 

# Ajout de variable

On ajoute une nouvelle variable $\textbf{lenght_of_message}$ qui consiste à compter le nombre de caractère par tweet. Cette variable on l'ajoutera lors de la modélisation.

In [13]:
import pyspark.sql.functions as F
train_data_bis = remove_train.withColumn("length_of_message", F.length("message"))
train_data_bis.show(truncate=True)

+--------------------+-----+-----------------+
|             message|label|length_of_message|
+--------------------+-----+-----------------+
|comment était la ...|    0|               35|
|daccord vatil app...|    0|              108|
|taihen desu ne un...|    0|               51|
|autodj  ce serait...|    0|               72|
|ce nest que hier ...|    0|              119|
|aimant le jour de...|    0|               28|
|attrapemoi si tu ...|    0|              111|
|beverley knight p...|    0|              145|
|crack  break  sha...|    0|               39|
|désolé une condit...|    0|              102|
|effrayant penser ...|    0|               50|
|en espérant quil ...|    0|              138|
|feelin on my a qu...|    0|               60|
|graisse réduite b...|    0|              130|
|je ne sais pas si...|    0|              120|
|je regarde la tél...|    0|               48|
|je suis fatigué d...|    0|              133|
|les deux adultes ...|    0|              111|
|les étoiles 

# Mots les plus fréquents

Il serait intéressant de regarder les mots les plus fréquents et d'observer s'ils ont réellement une importance sur la classification des sentiments. S'il s'agit des pronoms indéfinis, définis, compléments... ils peuvent avoir très peu d'impact.

In [14]:
from pyspark.sql.functions import split, explode
train_word =  (train_data_bis
                    .select(explode(split(train_data_bis.message,'[\s]+')) 
                    .alias('word'))
                    .where("word!=''"))  
train_word.show(15)

+--------+
|    word|
+--------+
| comment|
|   était|
|      la|
|   terre|
|     des|
|écossais|
| daccord|
|   vatil|
|apporter|
|     des|
|   armes|
|       à|
|     feu|
|      et|
|     des|
+--------+
only showing top 15 rows



Chaque ligne représente un mot. Il est utile de définir cette variable $\textbf{train_word}$ pour qu'on puisse regarder la fréquence des mots par la suite.

In [15]:
  def wordCount(wordListDF):
        """Crée un DataFrame avec le nombre de mots..

        Args:
           wordListDF (str): Un DataFrame composé d'une colonne de chaîne appelée 'word'.

        Returns:
            DataFrame of (str, int) : un DataFrame contenant les colonnes 'word' et 'count'.
        """
        return wordListDF.groupBy('word').count()

In [16]:
topwords_train = wordCount(train_word).orderBy(['count'],ascending=False) 
topwords_train.show(50)

+----------+-----+
|      word|count|
+----------+-----+
|        de|63026|
|        je|60897|
|        le|34041|
|        la|32126|
|         à|31028|
|       pas|28539|
|       que|27591|
|        et|26238|
|        un|21668|
|      vous|20095|
|        ne|19732|
|      pour|19489|
|       les|18223|
|       est|14878|
|        en|14072|
|       jai|13849|
|       une|13594|
|      suis|13133|
|       mon|13051|
|        ce|12264|
|        me|11398|
|       des|11008|
|      mais|10627|
|         a|10558|
|       sur| 9967|
|      cest| 9588|
|      avec| 9165|
|      dans| 9084|
|        il| 9059|
|        au| 8920|
|      plus| 7662|
|        ma| 7585|
|        du| 7394|
|        si| 6852|
|      bien| 6545|
|     faire| 6450|
|maintenant| 6447|
|      nous| 5995|
|aujourdhui| 5516|
|      tout| 5451|
|        se| 5217|
|      fait| 5078|
|        ça| 5038|
|     merci| 4889|
|       mes| 4784|
|       qui| 4758|
|      être| 4652|
|       moi| 4588|
| tellement| 4496|
|       lol|

On observe que parmis les tweets les mots les plus fréquents sont : de, je, le, la, à ...

In [17]:
uniqueWordsCount = topwords_train.distinct().groupBy().count().head()[0]
print(uniqueWordsCount) 

68107


On a plus de 68107 mots distincts dans notre base de données.

In [None]:
averageCount = topwords_train.groupBy().mean('count').head()[0]
print(averageCount)

Il s'agit du nombre moyen d'occurrences de mots dans topwords_train.

# Traitement des tweets

$\underline{Tokenisation}$

On applique la tokenisation qui consiste à prendre du texte (comme une phrase) et à le décomposer en termes individuels
(généralement des mots).

In [21]:
from pyspark.ml.feature import HashingTF, StopWordsRemover, Tokenizer

tokenizer = Tokenizer(inputCol = "message", outputCol= "words")
tokenized = tokenizer.transform(train_data_bis)
tokenized.show(truncate=True, n=5)

+--------------------+-----+-----------------+--------------------+
|             message|label|length_of_message|               words|
+--------------------+-----+-----------------+--------------------+
|comment était la ...|    0|               35|[comment, était, ...|
|daccord vatil app...|    0|              108|[daccord, vatil, ...|
|taihen desu ne un...|    0|               51|[taihen, desu, ne...|
|autodj  ce serait...|    0|               72|[autodj, , ce, se...|
|ce nest que hier ...|    0|              119|[ce, nest, que, h...|
+--------------------+-----+-----------------+--------------------+
only showing top 5 rows



La nouvelle colonne "words" contient les mots.

$\underline{StopWordsRemover}$

On enlève ensuite les mots vides avec StopWordsRemover. Ce sont des mots qui doivent être exclus de l’entrée, gnéralement parce que les mots apparaissent fréquemment et n’ont pas autant de sens. La majorité des tweets sont en français, on va supprimer les mots vides en précisant la langue french

In [22]:
import nltk
stopwordList = nltk.corpus.stopwords.words('french')

In [23]:

stop_words_remove = StopWordsRemover(inputCol= tokenizer.getOutputCol(),
                                    outputCol = "clean_word", stopWords=stopwordList)
stop_words_remove_bis = stop_words_remove.transform(tokenized)
stop_words_remove_bis.show(truncate=True, n=5)

+--------------------+-----+-----------------+--------------------+--------------------+
|             message|label|length_of_message|               words|          clean_word|
+--------------------+-----+-----------------+--------------------+--------------------+
|comment était la ...|    0|               35|[comment, était, ...|[comment, terre, ...|
|daccord vatil app...|    0|              108|[daccord, vatil, ...|[daccord, vatil, ...|
|taihen desu ne un...|    0|               51|[taihen, desu, ne...|[taihen, desu, id...|
|autodj  ce serait...|    0|               72|[autodj, , ce, se...|[autodj, , peu, d...|
|ce nest que hier ...|    0|              119|[ce, nest, que, h...|[nest, hier, jai,...|
+--------------------+-----+-----------------+--------------------+--------------------+
only showing top 5 rows



In [25]:
stop_words_remove_bis.take(1)

[Row(message='comment était la terre des écossais', label=0, length_of_message=35, words=['comment', 'était', 'la', 'terre', 'des', 'écossais'], clean_word=['comment', 'terre', 'écossais'])]

Comme on peut le voir, lorsqu'on affiche le premier tweet, les mots "était", "la", "des" ont été supprimés.

$\underline{HashingTF}$

On convertit ensuite les vecteurs de mots sous forme de valeur numérique pour la modélisation. On utilise HashingTF.

HashingTF est un transformateur qui prend des ensembles de termes et convertit ces ensembles en vecteurs de caractéristiques de longueur fixe. Dans le traitement de texte, un "ensemble de termes" peut être un sac de mots. HashingTF utilise l’astuce de hachage. Une caractéristique brute est mappée dans un index (terme) en appliquant une fonction de hachage.

In [26]:
hashTF = HashingTF(inputCol=stop_words_remove.getOutputCol(), outputCol="transform_num")
numericDataTrain = hashTF.transform(stop_words_remove_bis).select(
"message","length_of_message" ,"clean_word", "transform_num", "label" )
numericDataTrain.show(truncate=True, n=5)

+--------------------+-----------------+--------------------+--------------------+-----+
|             message|length_of_message|          clean_word|       transform_num|label|
+--------------------+-----------------+--------------------+--------------------+-----+
|comment était la ...|               35|[comment, terre, ...|(262144,[3086,178...|    0|
|daccord vatil app...|              108|[daccord, vatil, ...|(262144,[60211,10...|    0|
|taihen desu ne un...|               51|[taihen, desu, id...|(262144,[151360,1...|    0|
|autodj  ce serait...|               72|[autodj, , peu, d...|(262144,[31950,47...|    0|
|ce nest que hier ...|              119|[nest, hier, jai,...|(262144,[2967,440...|    0|
+--------------------+-----------------+--------------------+--------------------+-----+
only showing top 5 rows



# Données Test 

On fait le même procédé avec les données test : $\textbf{test.json}$

In [29]:
test_data = spark.read.json("C:/Users/na_to/OneDrive/Bureau/Insa/Mapromo/Calcul_distribue/Projet/test.json")
#4 partitions sur l'importation de données : voir http://localhost:4040 donc données déjà distribué RDD 

test_data = test_data.select("message", col("polarity").cast("Int").alias("label"))

test_data = test_data.select(removePunctuation(col('message')),"label")

#ajout variable length_of_message 
test_data_bis = test_data.withColumn("length_of_message", F.length("message"))

#Liste de mots
tokenizedTest = tokenizer.transform(test_data_bis)

#enleve mots vide
stop_words_remove_test = stop_words_remove.transform(tokenizedTest)

numericTestData = hashTF.transform(stop_words_remove_test).select(
"message","length_of_message" ,"clean_word", "transform_num", "label" )
numericTestData.show(truncate=True, n=3)

+--------------------+-----------------+--------------------+--------------------+-----+
|             message|length_of_message|          clean_word|       transform_num|label|
+--------------------+-----------------+--------------------+--------------------+-----+
|et si affamé mais...|               67|[si, affamé, souc...|(262144,[16672,74...|    0|
|identica présente...|              140|[identica, présen...|(262144,[7054,328...|    0|
|je vais enfin men...|               80|[vais, enfin, men...|(262144,[16620,35...|    0|
+--------------------+-----------------+--------------------+--------------------+-----+
only showing top 3 rows



# Préparation features

In [30]:
#On construit ensuite un vecteur rassemblant toutes les variables explicatives
from pyspark.ml.feature import VectorAssembler

#On rassemble la liste des colonnes numériques que l'on va utiliser
numericCols = ['length_of_message','transform_num']

In [31]:
#On crée un objet qui rassemble toutes ces colonnes dans une colonne nommée features
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")

# Modèle regression logistique

J'utilise la bibliothèque spark.ml qui permet de faire du machine learning sur les DataFrame. La bibliotéque spark.ml
ressemble beaucoup par ses principes à scikit-learn. On crée des objets en utilisant des classes spécifiques et on applique des méthodes .fit()

In [32]:
from pyspark.ml.classification import LogisticRegression
#On crée notre modèle 
model_reg = LogisticRegression(labelCol="label", featuresCol="features")
from pyspark.ml import Pipeline 
#On construit le pipeline qui est composé des 2 étapes développées auparavant 
pipeline = Pipeline(stages=[assembler, model_reg])

In [33]:
#ajustement du model
model_reg = pipeline.fit(numericDataTrain)

In [34]:
#prévision sur les données de validation
predictions_reg = model_reg.transform(numericTestData)

Par défaut spark va créer de nouvelles colonnes dans nos données avec les prédictions (colonne prediction) et les probabilitées de prédiction (colonne rawPrediction)

In [35]:
#Sauvegarde du modèle 
model_reg.save("C:/Users/na_to/OneDrive/Bureau/Insa/Mapromo/Calcul_distribue/Projet/logistic_model")

Le modèle est sauvegardé pour éviter de refaire le fit. 

$\underline{Evaluation}$

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
#Cette classe calcule l'AUC de notre modèle
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="label")
#On applique les données prédites à notre objet d'évaluation
evaluator.evaluate(predictions_reg)
#L'AUC est affiché 

0.5423502204199602

In [37]:
predictionsfinal = predictions_reg.select(
"clean_word", "label", "prediction")
predictionsfinal.show(n=10)

+--------------------+-----+----------+
|          clean_word|label|prediction|
+--------------------+-----+----------+
|[si, affamé, souc...|    0|       0.0|
|[identica, présen...|    0|       4.0|
|[vais, enfin, men...|    0|       0.0|
|[cest, jour, jai,...|    0|       0.0|
|[easy, plancher, ...|    0|       0.0|
|[empire, soleil, ...|    0|       0.0|
|[heart, quot, nes...|    0|       0.0|
|[i, need, , quot,...|    0|       4.0|
|[trouvé, sonny, ,...|    0|       4.0|
|[vérifie, twitter...|    0|       4.0|
+--------------------+-----+----------+
only showing top 10 rows



In [38]:
correctpredictions = predictionsfinal.filter(predictionsfinal["prediction"] == predictionsfinal["label"]).count()
totalData = predictionsfinal.count()
print("correct prediction:", correctpredictions, ", total data:", totalData, ", accuracy:" ,correctpredictions/totalData)

correct prediction: 54259 , total data: 76759 , accuracy: 0.7068747638713375


On obtient un accuracy d'environ 70%, presque 3 quart des tweet ont été bien classé sur les données test

# Data no class

In [39]:
noclass_data = spark.read.json("C:/Users/na_to/OneDrive/Bureau/Insa/Mapromo/Calcul_distribue/Projet/noclass.json")

In [40]:
noclass_data.show() 

+--------------------+
|             message|
+--------------------+
|"Dans ganga" Ne v...|
|"Dieu elton" vous...|
|"I was up up the ...|
|"Quasi" toute la ...|
|# $ & Amp; * # vi...|
|& Amp; Je parle d...|
|& Amp; nd je ne p...|
|& Gt; Pas bon à p...|
|& Lt; 3 il semble...|
|& Lt; 3 shopping ...|
|& Quot si la lice...|
|& Quot; can not s...|
|& Quot; je vais g...|
|& Quot; joellen a...|
|& Quot; page not ...|
|& Quot; si vous ê...|
|&quot;une facture...|
|'S foot is endorm...|
|() 'Jizzed dans m...|
|() J'ai l'impress...|
+--------------------+
only showing top 20 rows



In [42]:
#remove punctuation  
noclass_data_bis = noclass_data.select(removePunctuation(col('message')))

#ajout variable length_of_message 
noclass_data_bis = noclass_data_bis.withColumn("length_of_message", F.length("message"))

#Liste de mots
tokenizedNoclass = tokenizer.transform(noclass_data_bis)

#enleve mots vide
stop_words_remove_noclass = stop_words_remove.transform(tokenizedNoclass)

In [43]:
numericNoclassData = hashTF.transform(stop_words_remove_noclass).select(
"message","length_of_message" ,"clean_word", "transform_num" )
numericNoclassData.show(truncate=True, n=5)

+--------------------+-----------------+--------------------+--------------------+
|             message|length_of_message|          clean_word|       transform_num|
+--------------------+-----------------+--------------------+--------------------+
|dans ganga ne veu...|              101|[ganga, veuxtu, d...|(262144,[13779,42...|
|dieu elton vous n...|              159|[dieu, elton, pou...|(262144,[10504,76...|
|i was up up the h...|              118|[i, was, up, up, ...|(262144,[17252,24...|
|quasi toute la nu...|               76|[quasi, toute, nu...|(262144,[65739,76...|
|amp   vient dache...|              156|[amp, , , vient, ...|(262144,[21832,37...|
+--------------------+-----------------+--------------------+--------------------+
only showing top 5 rows



In [44]:
predictions_noclass = model_reg.transform(numericNoclassData)

In [45]:
predictions_noclass.show() 

+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|             message|length_of_message|          clean_word|       transform_num|            features|       rawPrediction|         probability|prediction|
+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|dans ganga ne veu...|              101|[ganga, veuxtu, d...|(262144,[13779,42...|(262145,[0,13780,...|[11.4306507822779...|[0.99987920178591...|       0.0|
|dieu elton vous n...|              159|[dieu, elton, pou...|(262144,[10504,76...|(262145,[0,10505,...|[26.5138823979166...|[0.99999999999991...|       0.0|
|i was up up the h...|              118|[i, was, up, up, ...|(262144,[17252,24...|(262145,[0,17253,...|[3.33330455891403...|[6.54357162137833...|       4.0|
|quasi toute la nu...|               76|[quasi, toute, nu.

In [46]:
predictions_noclass.coalesce(1).write.format('json').save("C:/Users/na_to/OneDrive/Bureau/Insa/Mapromo/Calcul_distribue/Projet/noclass_bis.json")

On sauvegarde en format json la base de données predictions_noclass en le nommant noclass_bis.json