#### <p style="text-align:right";> *Elements Logiciels Pour Le Traitement De Données Massives - ENSAE ParisTech - Janvier 2018*</p>  <p style="text-align:right";> Gilles Cornec - Samuel Ritchie </p>

# <p style="text-align:center";><span style="color: #fb4141">Sentiment analysis - Classifying tweets using PySpark MapReduce Approach</span></p>

_**Abstract:**_ _Ce notebook présente une approache PySpark afin de répondre à un problème de sentiment analysis : la classification de tweets. Les données sont issues de http://thinknook.com/twitter-sentiment-analysis-training-corpus-dataset-2012-09-22/, qui regroupe un DataSet d'entraînement composées de 1,578,627 tweets. Chaque ligne est labélisée par un 1 pour un sentiment 'positif' et par un 0 pour un sentiment 'négatif'.
Dans ce notebook, nous présentons dans un premier temps la création de la base par approche MapReduce et dans un second temps divers algorithmes de classification binaire (Régression logistique, Naive Bayes) parallélisés par nos soins, que nous comparons aux algorithmes de la librairie MLLib de Apache Spark.

In [1]:
import pandas as pd
import numpy as np
import re
from time import time
import matplotlib.pyplot as plt
import seaborn as sns
%pylab inline
from jyquickhelper import add_notebook_menu
add_notebook_menu()

Populating the interactive namespace from numpy and matplotlib


## 0. Création d'un environnement PySpark - Importation des données

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession \
    .builder \
    .appName("ELTDM_CORNEC_RITCHIE") \
    .getOrCreate()
sc = spark.sparkContext
  
spark

In [3]:
t0 = time()
bdd = sc.textFile("Sentiment_Analysis_Dataset.csv").map(lambda line: line.split(","))
t1 = time() - t0
print("Réalisé en {} secondes".format(round(t1,3)))

Réalisé en 0.608 secondes


In [4]:
t0 = time()
print('La base de donées contient ' + str(bdd.count()) + ' tweets.')
t1 = time() - t0
print("Réalisé en {} secondes".format(round(t1,3)))

La base de donées contient 1578628 tweets.
Réalisé en 5.248 secondes


In [5]:
#Afficher une ligne de la base
print(bdd.take(2))

[['ItemID', 'Sentiment', 'SentimentSource', 'SentimentText'], ['1', '0', 'Sentiment140', '                     is so sad for my APL friend.............']]


## 1. Travail préliminaire sur la base de données par MapReduce

Dans un premier temps, on sépare le texte du tweet simplement en séparant grâce à la fonction strip et on transforme tous les mots en minuscule. Ceci est effectué en mappant sur toutes les lignes la fonction split en environnement Spark. On obtient de cette manière une liste de mots pour chaque tweet. On récupère également son label 0-1 qui nous indique s'il s'agit d'un tweet positif ou négatif.

In [6]:
def split(tweet):
    tweet_list = []
    word_list = re.split('\W+',tweet.strip())
    for w in word_list: 
        if w != '':
            tweet_list.append(w.lower()) # append words in lowercase to their corresponding file
    return tweet_list

In [7]:
bdd_test = bdd.map(lambda x: (x[0],x[1], split(x[3])))
header = bdd_test.first()
bdd_test = bdd_test.filter(lambda line: line != header)
bdd_test = sc.parallelize(bdd_test.take(10000)) #On teste sur un sous-échantillon des données

On représente les données sous la forme d'un tableau (environnement DataFrame de PySpark, différent de l'environnement RDD)

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import udf
rdd_row = bdd_test.map(lambda line: Row(ID = line[0],label = line[1],tweet = line[2]))
df = spark.createDataFrame(rdd_row)
df.show()

+---+-----+--------------------+
| ID|label|               tweet|
+---+-----+--------------------+
|  1|    0|[is, so, sad, for...|
|  2|    0|[i, missed, the, ...|
|  3|    1|[omg, its, alread...|
|  4|    0|[omgaga, im, sooo...|
|  5|    0|[i, think, mi, bf...|
|  6|    0|[or, i, just, wor...|
|  7|    1|[juuuuuuuuuuuuuuu...|
|  8|    0|[sunny, again, wo...|
|  9|    1|[handed, in, my, ...|
| 10|    1|[hmmmm, i, wonder...|
| 11|    0|[i, must, think, ...|
| 12|    1|[thanks, to, all,...|
| 13|    0|[this, weekend, h...|
| 14|    0|[jb, isnt, showin...|
| 15|    0|[ok, thats, it, y...|
| 16|    0|[lt, this, is, th...|
| 17|    0|[awhhe, man, i, m...|
| 18|    1|[feeling, strange...|
| 19|    0|[huge, roll, of, ...|
| 20|    0|[i, just, cut, my...|
+---+-----+--------------------+
only showing top 20 rows



On sépare les données en train et test

In [9]:
df_train, df_test = df.randomSplit([0.6, 0.4], seed=12345)

### 1.1. Hashing du texte à la main

Dans cette partie, on retravaille la base de données pour pouvoir l'exploiter dans un second temps dans des algorithmes de classifications. L'idée est de créer un certain nombre de colonnes traduisant l'importance des mots au sein de chaque tweet / dans l'ensemble des tweets. Dans la mesure où le projet se concentre sur l'utilisation et la découverte de PySpark, nous n'avons pas poussé très loin ce pré-traitement. Dans un premier temps, on définit manuellement les colonnes en passant par des fonctions de hashage, puis dans un second temps nous utiliserons l'approche TF-IDF en se basant sur la librairie sparkMLLib.

On définit tout d'abord les occurences des mots pour chaque tweet en appliquant un map sur l'ensemble des lignes des colonnes, et ce pour train et test.

In [10]:
from collections import Counter
train = df_train.rdd.map(lambda x: (x[1],list(Counter(x[2]).items())))
test = df_test.rdd.map(lambda x: (x[1],list(Counter(x[2]).items())))
#train.take(2)
train = train.map(lambda vec:(vec[0],[(w,c/sum([c for (w, c) in vec[1]]))for (w,c) in vec[1]])) # normalise
test = test.map(lambda vec:(vec[0],[(w,c/sum([c for (w, c) in vec[1]]))for (w,c) in vec[1]])) # normalise
train.take(2)

[('0',
  [('is', 0.14285714285714285),
   ('so', 0.14285714285714285),
   ('sad', 0.14285714285714285),
   ('for', 0.14285714285714285),
   ('my', 0.14285714285714285),
   ('apl', 0.14285714285714285),
   ('friend', 0.14285714285714285)]),
 ('1',
  [('hmmmm', 0.14285714285714285),
   ('i', 0.14285714285714285),
   ('wonder', 0.14285714285714285),
   ('how', 0.14285714285714285),
   ('she', 0.14285714285714285),
   ('my', 0.14285714285714285),
   ('number', 0.14285714285714285)])]

On passe ensuite ces listes de mots par une fonction de hashage, en fixant nous même le nombre de colonnes utilisées. Plus ce nombre est élevé, plus la précision de la régression menée en aval sera grande mais plus le temps d'exécution de l'alogorithme sera long 

In [11]:
def hashing(liste_mots, N): 
    v = [0] * N  
    for mot_count in liste_mots: 
        mot,count = mot_count
        h = hash(mot)
        v[h % N] = v[h % N] + count 
    return v

In [12]:
def hashing_liste(df, N): # function (g) applies the hashing vectoriser
    liste = df.map(lambda x: (x[0],hashing(x[1],N))) 
    return liste

N=100
train = hashing_liste(train,N)
test = hashing_liste(test,N)
print(train.take(4))

[('0', [0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.2857142857142857, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), ('1', [0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0.14285714285714285, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), ('0', [0.076923076923076927, 0, 0, 0, 0, 0, 0.038461538461538464, 0, 0.038461538461538464, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.038461538461538464, 0.038461

### 1.2. TF-IDF approach

TF-IDF est un acronyme pour Term Frequency - Inverse Document Frequency. Son but est de juger de l'importance d'un mot dans un corpus. Cette valeur correspond à un équilibre entre l'occurence du mot dans le tweet et son occurence dans le corpus. Ainsi l'occurence d'un mot dans un tweet est pondéré par sa fréquence dans un corpus donné. 
En terme mathématique, le Term Frequency TF() d'un mot m dans un tweet t correspond au nombre de fois ou le mot apparait dans le document.
$$ TF(m; t) = f_{m;t} $$
L'Inverse Document Frequency est une mesure de l'information que la présence du mot donne, c'est à dire à quel point il est commun ou rare dans le corpus entier. Il est donné par la formule suivante:

$$IDF(m;C) = log(\frac{card(C)}{\{t \in C, m \in t \}})$$

Enfin, le TF-IDF est calculé comme la multiplication de ces deux derniers termes.
Ainsi, une valeur importante montre soit que le mot apparait dans beaucoup de tweets soit qu'il apparait peu de fois sur l'ensemble des tweets.

On utilise ici les fonctions pré-implémentées par la libraire pysparkML

In [13]:
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
htf = MLHashingTF(inputCol="tweet", outputCol="tf")
tf_train = htf.transform(df_train)
tf_test = htf.transform(df_test)
tf_train.show(truncate=False)

+----+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID  |label|tweet                                                                                                                                                       |tf                                                                                                                                                                                                                                                           |
+----+-----+--------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
idf = MLIDF(inputCol="tf", outputCol="idf")
tfidf_train = idf.fit(tf_train).transform(tf_train)
tfidf_test = idf.fit(tf_test).transform(tf_test)
tfidf_train.show(truncate=False)

+----+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
res_train = tfidf_train.rdd.map(lambda x : (x.label,x.tweet,x.tf,x.idf,(None if x.idf is None else x.idf.values.sum())))
res_test = tfidf_test.rdd.map(lambda x : (x.label,x.tweet,x.tf,x.idf,(None if x.idf is None else x.idf.values.sum())))

In [16]:
from pyspark.ml import linalg as ml_linalg
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors

def as_mllib(v):
    if isinstance(v, ml_linalg.SparseVector):
        return MLLibVectors.sparse(v.size, v.indices, v.values)
    elif isinstance(v, ml_linalg.DenseVector):
        return MLLibVectors.dense(v.toArray())
    else:
        raise TypeError("Unsupported type: {0}".format(type(v)))

### 1.3. Création d'objets RDD 'LabeledPoint' pour la classification

L'environnement PySpark possède un objet 'LabeledPoint' très utile pour mener dans un second temps des algorithmes de classification. On définit ici de nouveaux RDD contenant ces objets.

In [17]:
from pyspark.mllib.regression import LabeledPoint

In [18]:
def make_label_point_RDD(df): # function (f) creates labelled points where 1=spam; 0=non-spam and works when the argument inp is either a path (when trg =='path') or an RDD (when trg==''). The latter becomes useful at Task e  
    label_point_RDD = df.map(lambda x: LabeledPoint(0 if (x[0]=='0') else 1,x[1])) # assign labelled points
    return label_point_RDD

In [19]:
train_label = make_label_point_RDD(train)
test_label = make_label_point_RDD(test)
print(train.take(4)) # test

[('0', [0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.2857142857142857, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), ('1', [0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0.14285714285714285, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.14285714285714285, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), ('0', [0.076923076923076927, 0, 0, 0, 0, 0, 0.038461538461538464, 0, 0.038461538461538464, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.038461538461538464, 0.038461

In [20]:
def make_label_point_RDD_2(df): 
    label_point_RDD = df.map(lambda x: LabeledPoint(0 if (x[0]=='0') else 1,as_mllib(x[3]))) # assign labelled points
    return label_point_RDD

In [21]:
train_label_2 = make_label_point_RDD_2(res_train) # uses function (f) to build labelled points when argument inp is a path (i.e. trg=='path')
test_label_2 = make_label_point_RDD_2(res_test)
train_label_2.take(2)

[LabeledPoint(0.0, (262144,[15889,16332,37852,87401,158870,188424,258668],[2.0454397444,2.34612590128,1.82384778378,8.00386443743,5.1416635565,2.66392539619,3.79917181804])),
 LabeledPoint(1.0, (262144,[24417,37852,58370,114980,172634,229103,246349],[1.11071531084,1.82384778378,6.49978704066,8.00386443743,3.96962379928,5.47813579312,3.77703069216]))]

## 2. Algorithmes de classification

### 2.1. Avec la librairie MLLib (basé sur objets RDD)

In [22]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, NaiveBayes, SVMWithSGD
from pyspark.mllib.util import MLUtils

In [23]:
def train_and_test_model(train,test): # function(g) trains a logistic regression model to detect spam vs. non-spam files
    logReg_model = LogisticRegressionWithLBFGS.train(train) # train the algorithm
    #logReg_model = NaiveBayes.train(train)
    #logReg_model = SVMWithSGD.train(train)
    correct_train = train.map(lambda lp: 1 if logReg_model.predict(lp.features) == lp.label else 0).sum() # calculate correctly classified data points
    correct_test = test.map(lambda lp: 1 if logReg_model.predict(lp.features) == lp.label else 0).sum() # calculate correctly classified data points
    count_train = train.count() # counts the size of training set
    count_test = test.count() # counts the size of test set
    #print('training Logistic Regression with Limited-memory the Broyden–Fletcher–Goldfarb–Shanno (BFGS)')
    print('Training data items: {}, Correct: {}'.format(count_train, correct_train))
    print('Test data items: {}, Correct: {}'.format(count_test, correct_test))
    print('Train accuracy {:.1%}'.format(correct_train/count_train)) 
    print('Test accuracy {:.1%}'.format(correct_test/count_test)) 
    return logReg_model

In [24]:
logReg_model = train_and_test_model(train_label,test_label)

Training data items: 5984, Correct: 3968
Test data items: 4016, Correct: 2616
Train accuracy 66.3%
Test accuracy 65.1%


In [25]:
logReg_model = train_and_test_model(train_label_2,test_label_2)

Training data items: 5984, Correct: 5973
Test data items: 4016, Correct: 2821
Train accuracy 99.8%
Test accuracy 70.2%


### 2.2. En parallélisant manuellement la descente de gradient

On définit dans un premier temps les fonctions utiles pour effectuer la descente de gradient, que l'on utilisera lors de l'étape map_reduce dans un second temps. A ce titre, on définit la fonction logistique ainsi que le gradient associé à cette fonction objectif (calculé à la main). On définit également la fonction score qui permet d'indiquer si la classification est réussie ou non (en fixant un seuil à 0.5 puisque la régression logistique prédit une probabilité), et enfin une fonction add utile pour l'étape "reduce".

In [26]:
def logistic(x, w):
    return  1.0 / (1.0 + np.exp(-(np.asarray(x).dot(w))))

def logistic_gradient(x, y, w): 
    return (x * ((1.0 / (1.0 + np.exp(-np.asarray(x*w)))) - y))

def gradient(matrix, w):
    Y = matrix[0]
    X = matrix[1:]
    return logistic_gradient(X, Y, w)

def score(matrix, w):
    Y = matrix[0]
    X = matrix[1:]
    pred = logistic(X, w)
    return accuracy_score(np.array([Y > 0.5]) , np.array([pred > 0.5]), normalize=False)

def add(x, y):
    x += y
    return x

w = 2 * np.random.ranf(N) - 1
wb = sc.broadcast(w)

On transforme les données de train et de test au bon format pour pouvoir appliquer la descente de gradient

In [27]:
train_main = train.map(lambda x: [int(x[0])] + x[1])
test_main = test.map(lambda x: [int(x[0])] + x[1])

On fixe ici un nombre d'itérations de la descente de gradient à la main. Il aurait également été envisageable de définir un seuil de variations entre deux valeurs consécutives pour arrêter l'algorithme.
On retrouve ensuite le pattern typique MapReduce : on applique le gradient à chaque ligne qui contient le label et les features, puis on somme ce gradient, et ce pour le nombre d'itérations choisi.

In [28]:
from sklearn.metrics import accuracy_score

N_iter=10
for i in range(N_iter):
    # nous retrouvons ici le pattern typique MapReduce
    # map: calcul des gradients pour chaque exemple
    # reduce: aggrégation des gradients par somme
    
    #t0 = time()
    grad_sum = train_main.map(lambda x: gradient(x, wb.value)).reduce(lambda x,y:x+y) 
    #print(grad_sum.take(2)[0].shape)
    #t1 = time() - t0
    #print(t1)
    #t1 = time()
    w = np.array(w)
    #print(grad_sum.shape)
    w = w - 0.5*grad_sum
    #t2 = time() - t1
    #print(t2)
    #t2 = time()
    #print(w.shape,grad_sum.shape)
    wb = sc.broadcast(w)
    train_nsc = train_main.map(lambda x: score(x, wb.value)).reduce(add)
    #t3 = time() - t2
    #print(t3)
    test_nsc = test_main.map(lambda m: score(m, wb.value)).reduce(add)
    print("Itération", i, ": Train Score", (train_nsc / train_main.count()).round(3) , " ; Test Score", (test_nsc / test_main.count()).round(3))

Itération 0 : Train Score 0.59  ; Test Score 0.59
Itération 1 : Train Score 0.522  ; Test Score 0.528
Itération 2 : Train Score 0.599  ; Test Score 0.596
Itération 3 : Train Score 0.594  ; Test Score 0.593
Itération 4 : Train Score 0.605  ; Test Score 0.6
Itération 5 : Train Score 0.595  ; Test Score 0.591
Itération 6 : Train Score 0.59  ; Test Score 0.58
Itération 7 : Train Score 0.595  ; Test Score 0.589
Itération 8 : Train Score 0.601  ; Test Score 0.598
Itération 9 : Train Score 0.598  ; Test Score 0.594


## 3. Comparaison des différentes approches - Conclusion

Sans grande surprise, les fonctions implémentées au sein de la librairie MLLib sont plus rapides et donnent de meilleurs résultats que notre approche de gradient 'classique'. Cependant la régression logistique utilisée dans MLLIb est optimisée tandis que la notre ne l'est pas.
Par ailleurs, la régression logistique sur-performe les approches SVM / NaiveBayes que nous avons également essayé via la librairie SparkMLLib.
Les résultats sont en outre meilleurs lorsque l'on effectue un pré-traitement des données à l'aide de la méthode TF-IDF.

Il est surtout très surprenant que dans les deux cas, sur le jeu de données complet, les algorithmes mettent beaucoup de temps à converger / itérer. Peut être que l'approche MapReduce est plus adaptée sur de plus gros jeux de données et sur des ordinateurs avec d'avantages de coeurs que sur nos MacBook Air.

Ce travail préliminaire de découverte de PySpark gagnerait à être essayé sur de plus gros jeux de données et sur des ordinateurs plus puissants pour pouvoir constater le gain de temps comparé à une approche classique.