### Lib Imports

In [1]:
import re
import pyspark 
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext, Row, Column
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import NaiveBayes

### Define Pyspark and Spark Streaming config

In [2]:
conf = SparkConf().setAppName("Test").setMaster("local")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,1)

### Defining the function to get the predicted sentiment on the data received

In [3]:
def get_prediction(tweet_text):
    try:
        # remove the blank tweets
        tweet_text = tweet_text.filter(lambda x: len(x) > 0)
        
        # create the dataframe with each row contains a tweet text
        rowRdd = tweet_text.map(lambda w: Row(tweet=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)
        
        # get the sentiments for each row
        test_df = pipelineFit.transform(wordsDataFrame)
        
        # show prediction result
        model.transform(test_df).select('tweet','prediction').show(truncate = 100)
    except : 
        pass

### Reading the pre processed data

In [4]:
# reading the data set from HDFS 
print('\n\nReading the dataset from HDFS.................\n')
# data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('datasetReviewed.csv')
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('hdfs://localhost:9000/user/input/datasetReviewed.csv')
print('\n\nDataset show..................................\n')
# drop null collumns
data = data.dropna()
data.show(3, truncate=15)



Reading the dataset from HDFS.................



Dataset show..................................

+---------------+---------------+-------------+---------------+--------------+-------------+-------------+---------------+----------+
|       tweet_id|        company|company_count|     created_at|favorite_count|retweet_count|  screen_name|          tweet|polaridade|
+---------------+---------------+-------------+---------------+--------------+-------------+-------------+---------------+----------+
|112898458133...|BANCO DO BRASIL|            1|2019-05-16 1...|             0|            3|AgenciaEstado|RT @colunado...|         0|
|112824522429...|            IRB|            1|2019-05-14 1...|             0|            1|AgenciaEstado|RT @colunado...|         1|
|112754225315...|           ITAÚ|            1|2019-05-12 1...|             0|            4|AgenciaEstado|RT @colunado...|         1|
+---------------+---------------+-------------+---------------+--------------+-------------+----

### Function the clean the tweets

In [5]:
udf = UserDefinedFunction(lambda x: re.sub(r"http\S+", "", x).lower().replace('.','').replace(';','').replace('-','').replace(':','').replace(')','').replace('"','').replace('rt',''), StringType())

data = data.select(*[udf(column).alias(column) for column in data.columns])

### Pipeline the processing functions

In [6]:
print('\n\nDefining the pipeline stages.................\n')

tokenizer = Tokenizer(inputCol="tweet", outputCol="words")

remover = StopWordsRemover(inputCol="words", outputCol="filtered")

hashtf = HashingTF(numFeatures=2**16, inputCol="filtered", outputCol='tf')

idf = IDF(inputCol='tf', outputCol="features", minDocFreq=3)

label_stringIdx = StringIndexer(inputCol = "polaridade", outputCol = "label")

print('\n\nStages Defined................................\n')
pipeline = Pipeline(stages=[tokenizer, remover, hashtf, idf, label_stringIdx])



Defining the pipeline stages.................



Stages Defined................................



In [7]:
print('\n\nFit the pipeline with the training data.......\n')
pipelineFit = pipeline.fit(data)
train_df = pipelineFit.transform(data)



Fit the pipeline with the training data.......



### Naive Bayes classification 

In [8]:
# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

print('\n\nModel Trained....Waiting for the Data!!!!!!!!\n')
# train the model
model = nb.fit(train_df)



Model Trained....Waiting for the Data!!!!!!!!



### Data receving and show

In [9]:
# receving the data from socket 
tweet_stream = ssc.socketTextStream("127.0.0.1", 8181)

# get the predicition
tweet_stream.foreachRDD(get_prediction)
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

+----------------------------------------------------------------------------------------------------+----------+
|                                                                                               tweet|prediction|
+----------------------------------------------------------------------------------------------------+----------+
|RT @leiamoneytimes: Fiocruz avalia condições de trabalho na saúde durante a pandemia   #Brasil #C...|       1.0|
+----------------------------------------------------------------------------------------------------+----------+

+---------------------------------------------------------------------------------------+----------+
|                                                                                  tweet|prediction|
+---------------------------------------------------------------------------------------+----------+
|acionista de BBDC4Trinta startups vão para a fase final de edital da Petrobras e Sebrae|       1.0|
+------------------------

+----------------------------------------------------------------------------------------------------+----------+
|                                                                                               tweet|prediction|
+----------------------------------------------------------------------------------------------------+----------+
|RT @leiamoneytimes: Após extraviar mala de cliente na ida e na volta, Azul deverá pagar indenizaç...|       1.0|
|                                                                                   Será q é verdade?|       0.0|
+----------------------------------------------------------------------------------------------------+----------+

+----------------------------------------------------------------------------------------------------+----------+
|                                                                                               tweet|prediction|
+--------------------------------------------------------------------------------------

+----------------------------------------------------------------------------------------------------+----------+
|                                                                                               tweet|prediction|
+----------------------------------------------------------------------------------------------------+----------+
|RT @leiamoneytimes: Renner é a rede de vestuário mais bem posicionada na crise, aponta Inter Rese...|       1.0|
+----------------------------------------------------------------------------------------------------+----------+

+----------------------------------------------------------------------------------------------------+----------+
|                                                                                               tweet|prediction|
+----------------------------------------------------------------------------------------------------+----------+
|Bradesco, Itaú e Santander lançam plano para promover desenvolvimento sustentável na A

+----------------------------------------------------------------------------------------------------+----------+
|                                                                                               tweet|prediction|
+----------------------------------------------------------------------------------------------------+----------+
|#OIBR3 #VIVT4 #TIMP…RT @leiamoneytimes: Oi: Highline do Brasil bate Vivo, Tim e Claro e dá a melh...|       1.0|
+----------------------------------------------------------------------------------------------------+----------+

+----------------------------------------------------------------------------------------------------+----------+
|                                                                                               tweet|prediction|
+----------------------------------------------------------------------------------------------------+----------+
|#OIBR3 #VIVT4 #TIMP…@leiamoneytimes @sergiosanita Amanhã abre com gap em 2 reais.RT @I

KeyboardInterrupt: 