In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import Config as config
import findspark
from pyspark.sql import types as typ
import pyspark.sql.functions as F
import nltk,string
from nltk import word_tokenize,SnowballStemmer
from nltk.corpus import stopwords
import string
import pyspark.ml.feature  as feat
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegressionModel
findspark.init("/usr/local/spark/spark-2.2.0-bin-hadoop2.7")
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 pyspark-shell'




In [None]:
spark = SparkSession.builder.master("local[*]").appName("real-time").getOrCreate()

@F.udf(typ.StringType())
def process_text(text):    
    tokenizer = nltk.data.load('tokenizers/punkt/french.pickle')
    stemmer_snowball = SnowballStemmer('french')
    stopword = set(stopwords.words('french'))
    sentence = tokenizer.tokenize(text)
    sentence = " ".join([sent.lower() for sent in sentence])
    clean_words = [word for word in nltk.word_tokenize(sentence) if word not in stopword.union(string.punctuation)]
    clean_words = [stemmer_snowball.stem(lem) for lem in clean_words]
    clean_words = " ".join(clean_words)
    return clean_words

def vectorizer(df):
    df = df.select('sentences',"polarity").withColumn("wordSentences",process_text("sentences"))\
    .withColumn("label", F.col("polarity").cast(typ.DoubleType()))
    df = df.select("sentences","wordSentences","label").withColumn("words",F.split(F.col("wordSentences"),' '))
    df = df.select("sentences","words","label")
    tf = feat.HashingTF(inputCol="words", outputCol="rawFeatures")
    idf = feat.IDF(inputCol="rawFeatures",outputCol="features")
    pipelineTFIDF = Pipeline(stages=[tf,idf])
    pipelineFit = pipelineTFIDF.fit(df)
    df = pipelineFit.transform(df)
    return df

In [None]:
def func(x):
    df = x.toDF(['sentences','polarity'])
    df = vectorizer(df)
    logregModel = LogisticRegressionModel.load("model_nlp")
    pred = logregModel.transform(df)
    pred = pred.select("sentences","prediction")
    pred.show()

In [None]:
ssc = StreamingContext(spark.sparkContext,1)
stream = KafkaUtils.createDirectStream(ssc, [config.kafka_topic_name], {'metadata.broker.list':config.bootstrap_servers})
lines = stream.map(lambda x: x[1])
lines.map(lambda x: x.split('\t')).foreachRDD(func)
ssc.start()
#ssc.awaitTermination()

In [11]:
ssc.stop()