# CNAM UASB03 - CERTIFICATION ANALYSE DE DONNEES MASSIVES
## Projet d'analyse de sentiment sur les commentaires Airbnb en français

***
Notebook Python réception et traitements des commentaires du site reçu en streaming simple.
Le traitement consiste à :
* recevoir chaque commentaire via SparkStreaming
* détecté la langue pour ne conserver que les commentaire reconnu comme français avec une probabilité supérieure à 80%
* effectuer les mêmes transformations textuelles que pour l'analyse du modèle : mise en forme, lemmatisation et suppression des StopWords
* vectorisation avec la technique optimale trouvée pour le modèle = CountVectorizer
* application du modèle optimal Spark identifié lors de la modélisation = SVM
* enregistrement du commentaire avec l'étiquetage associé dans une base MongoDB pour pouvoir réutiliser les résultats en visualisation et pour la mise en place d'un mécanisme d'amélioration continue


-  ##  Import des librairies, paramétrage et définition de la fonction de lémmatisation

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import from_json
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF,Tokenizer,StopWordsRemover
from pyspark.ml.feature import CountVectorizer,CountVectorizerModel
from pyspark.sql.functions import *
from pyspark.mllib.linalg import Vectors, SparseVector
from pyspark.mllib.util import MLUtils
from  pyspark.mllib.classification import SVMModel, SVMWithSGD
from pymongo import MongoClient
import pprint as p
import pandas, json, sys
import treetaggerwrapper
tagger = treetaggerwrapper.TreeTagger(TAGLANG='fr')

from whatthelang import WhatTheLang
wtl = WhatTheLang()
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.2.0 pyspark-shell'

# Création d'un spark context, d'un sql context et d'un streaming context
sc = SparkContext("local[2]","Commentaires AirBnB")
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, '127.0.0.1:2181', 'spark-streaming', {'AirBnb_income':1})
#socket_stream = ssc.socketTextStream("localhost",9999)

stopwords = sc.textFile("Data/French_stop_words").collect()

def comment_to_lemme(comment):
    t=treetaggerwrapper.make_tags(tagger.tag_text(comment))
    lemme=''
    for i in t:
        if type(i)==treetaggerwrapper.Tag:
            if i.pos[:3] in ('ADJ', 'ADV', 'INT','KON','NOM','VER'): 
                if i.lemma !='dns-remplacé':
                    if len(i.lemma)>1 :
                        lemme =lemme+' '+i.lemma.split('|')[0].lower()
    return lemme

- ## Réception des données, transformation, prédiction et enregistrement sur MongoDB


In [None]:
### # Récupération sur une fenêtre d'1s
flux = kafkaStream.window( 1 )

## Chargement des modèles calculés
SVMModel = SVMModel.load(sc, "modele/SVM_CV/")
CountVector = CountVectorizerModel.load("modele/CountVectorizer/")

# Traitement sur chaque ligne du commentaire

def TraitementCommentaire(x):
 # Récupération des RDD dans une liste
    x = x.map(lambda y: y[1]).collect()

 # Gestion de la fin du fichier
    if (x == 'Fini'):
        # arrêt des context
        global ssc
        global sqlContext
        global sc
        ssc.stop()
        sqlContext.stop()
        sc.stop()
        print("Le traitement du flux est terminé")
        return 1
            
    # Connection à la base MongoDB (qui tourne sur Docker)
    client = MongoClient('localhost:27017')
    db=client.test.Comments 
    

    # Gestion des différents item de la liste
    for item in x:
        #comment_texte = item[0]
        #print(item)
        
        #transformation en dataframe pandas
        #comment = pandas.read_json(comment_texte,typ='series').to_frame().transpose()
        comment = pandas.read_json(item)
        comment['comments']=comment['comments'].replace("\r\n", "")
        comment['comments']=comment['comments'].replace("\n", "")
        comment['comments']=comment['comments'].astype(str)
        # Détection de la langue et ajout au dataframe
        comment['langage']=comment['comments'].apply(wtl.pred_prob)
        comment['langue']=comment['langage'].str[0].str[0].str[0]
        comment['lg_proba']=comment['langage'].str[0].str[0].str[1]
        #print(comment)
        comment=comment.drop(['langage'],axis=1)
        comment = comment[(comment['lg_proba']>0.8) & (comment['langue']=='fr')]
        comment['comments'] = comment['comments'].apply(lambda x: x.replace('.',' ').replace('@',' ').replace('+',' ').
            replace(',',' ').replace(';',' ').replace('!',' ').replace('  ',' '))

        comment['comment_lemm'] = comment['comments'].apply(comment_to_lemme)
        #print(comment)
        #print(comment.shape[0])
        test_langue = comment.shape[0]
        if (test_langue > 0) :
            print("test OK")
            #transformation du dataframe Pandas en RDD
            CommentDF = sqlContext.createDataFrame(comment[['id','comment_lemm']])
            CommentDF.registerTempTable("CommentDF")
            
            tokenizer = Tokenizer(inputCol="comment_lemm", outputCol="words")
            wordsData = tokenizer.transform(CommentDF)
            remover = StopWordsRemover(stopWords=stopwords, inputCol="words",outputCol="Liste_mots")
            wordsData_SW = remover.transform(wordsData)
            #hashingTF = HashingTF(inputCol="removed", outputCol="features", numFeatures=12000)
            #hashingTF_model = hashingTF.transform(wordsData_SW)
            #hashingTF_transfo = MLUtils.convertVectorColumnsFromML(hashingTF_model, "features")
            CV_transfo = CountVector.transform(wordsData_SW)
            CV_transfo  = CV_transfo.select(col('id'),col('comment_lemm'),col('words'),
                  col('Liste_mots'),col('vecteurs').alias('features'))
            CV_transfo.take(1)
            CV_data = MLUtils.convertVectorColumnsFromML(CV_transfo, "features")
            CV_data.take(1)
            #Vecteur_Apredire=hashingTF_transfo.select("features").take(1)
            #Vecteur_Apredire.show(n=1)
            #P=Vecteur_Apredire[0].features
            
            #P=SparseVector(20000, {4543: 1.0, 5284: 1.0, 8353: 1.0, 8809: 1.0, 10935: 1.0, 10983: 1.0})

            #prediction_GBT = GBTModel.predict(hashingTF_transfo.rdd.map(lambda x: x.features))
            ## Chargement du modèle calculé

            prediction_SVM = SVMModel.predict(CV_data.rdd.map(lambda x: x.features))
            prediction=prediction_SVM.take(1)

            comment['Prediction']=prediction[0]
        
            print("Insertion")
            
            # insertion dans MongoDB si Français ou Anglais avec une probabilité à plus de 80%
            db.insert_many(comment[(comment['lg_proba']>0.8) & (comment['langue']=='fr')].to_dict('records'))
    
            return 0
    # Fermeture du client MongoDB
    client.close()

    return 0
# Traitement de tous les RDD reçus dans le streaming en appliquant la fonction précédente
flux.foreachRDD(TraitementCommentaire)


# démarrage du flux
ssc.start()

- ## Arrêt du streaming avant la fin du traitement du flux

In [4]:
ssc.stop()

In [5]:
sc.stop()

- ## Test des données présentes en base


In [3]:
# connection à la base MongoDB
client = MongoClient('localhost:27017')
db=client.test.Comments 


# récupération des données de la collection
data = db.find({"Prediction":1})
nb = data.count()
# display the data
print(nb)
for i in data:
    p.pprint(i)

1
{'Prediction': 1,
 '_id': ObjectId('5b797c20b726e840e3ed2c3b'),
 'comment_lemm': ' tout être bien dérouler merci bien',
 'comments': "Tout s'est bien déroulé Merci bien PG",
 'date': datetime.datetime(2017, 10, 28, 0, 0),
 'id': 207127433,
 'langue': 'fr',
 'lg_proba': 0.9611431033472262,
 'listing_id': 3109,
 'reviewer_id': 51636494,
 'reviewer_name': 'Patricia'}


Adapté des scripts du projet GitHub suivant : Stream-Processing-using-PySpark-and-MongoDB--master

https://github.com/vipulkrishnanmd/Stream-Processing-using-PySpark-and-MongoDB-