In [1]:
# importing required libraries
import pyspark.sql.types as tp
import sys
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row, Column

In [2]:
# define the function to get the predicted sentiment on the data received
def get_prediction(tweet_text):
    try:
        # remove the blank tweets
        tweet_text = tweet_text.filter(lambda x: len(x) > 0)
        # create the dataframe with each row contains a tweet text
        rowRdd = tweet_text.map(lambda w: Row(tweet=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)
        # get the sentiments for each row
        pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
    except : 
        print('No data')

In [3]:
conf = SparkConf().setAppName("Test").setMaster("local")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,10)

In [4]:
# define the schema
my_schema = tp.StructType([
            tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
            tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
            tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)])

In [5]:
# reading the data set
print('\n\nReading the dataset...........................\n')
my_data = spark.read.csv('twitter_sentiments.csv', schema=my_schema, header=True)
my_data.show(3, truncate=50)



Reading the dataset...........................

+---+-----+--------------------------------------------------+
| id|label|                                             tweet|
+---+-----+--------------------------------------------------+
|  1|    0| @user when a father is dysfunctional and is so...|
|  2|    0|@user @user thanks for #lyft credit i can't use...|
|  3|    0|                               bihday your majesty|
+---+-----+--------------------------------------------------+
only showing top 3 rows



In [6]:
my_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)



In [7]:
print('\n\nDefining the pipeline stages.................\n')
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')

stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')

stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)

model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

print('\n\nStages Defined................................\n')
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])



Defining the pipeline stages.................



Stages Defined................................



In [8]:
print('\n\nFit the pipeline with the training data.......\n')
pipelineFit = pipeline.fit(my_data)



Fit the pipeline with the training data.......



In [11]:
print('\n\nModel Trained....Waiting for the Data!!!!!!!!\n')
ssc = StreamingContext(sc, batchDuration= 3)
lines = ssc.socketTextStream("127.0.0.1", 8181)
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

words.foreachRDD(get_prediction)

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate




Model Trained....Waiting for the Data!!!!!!!!

No data
No data
No data
No data
No data
No data
+-----+----------+
|tweet|prediction|
+-----+----------+
| test|       0.0|
+-----+----------+

No data
No data
No data
+----------+----------+
|     tweet|prediction|
+----------+----------+
|i hate you|       1.0|
+----------+----------+

No data
No data


KeyboardInterrupt: 