Basically in this notebook our focus will be on building a classification model on twitter sentiment analysis data and using it to return result on streaming data of sentiments

Baically before creating a new spark context do "sc.stop()" as the system have automatically initialized the SparkContext(maybe a Object?)when we type "pyspark" in the terminal,so to stop we did sc.stop() before creating a new one otherwise it will give an error showing "cannot run multiple spark context at a time".

In [1]:
sc.stop()

In [2]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

In [3]:
#Initializing the spark session
sc=SparkContext(appName = "Sent_prec")
spark=SparkSession(sc)

In [4]:
#Defining the schema type
#Defining the datatypes of the columns(setting id and label of integer type)
from pyspark.sql import types as tp
my_schema=tp.StructType([tp.StructField(name="id",dataType= tp.IntegerType(),nullable= True),
                         tp.StructField(name="label",dataType= tp.IntegerType(),nullable= True),
                         tp.StructField(name="tweet",dataType= tp.StringType(),nullable= True)])

In [5]:
#Reading the data
data = spark.read.csv("train.csv",my_schema,header = True)

In [6]:
#Printing the schema(our datatypes are being set by us to be int ,int and string)
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)



In [7]:
data.show()

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
|  6|    0|[2/2] huge fan fa...|
|  7|    0| @user camping to...|
|  8|    0|the next school y...|
|  9|    0|we won!!! love th...|
| 10|    0| @user @user welc...|
| 11|    0| â #ireland con...|
| 12|    0|we are so selfish...|
| 13|    0|i get to see my d...|
| 14|    1|@user #cnn calls ...|
| 15|    1|no comment!  in #...|
| 16|    0|ouch...junior is ...|
| 17|    0|i am thankful for...|
| 18|    1|retweet if you ag...|
| 19|    0|its #friday! ð...|
| 20|    0|as we all know, e...|
+---+-----+--------------------+
only showing top 20 rows



Now we need to preprocess and clean our tweet column as it contains punctuations,digits,stopwords etc

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.sql.functions import udf, col, lower, regexp_replace

#Cleaning the text replacing every thing except [^a-zA-Z\\s] with a empty space
data_clean = data.select('id', (lower(regexp_replace('tweet', "[^a-zA-Z\\s]", "")).alias('tweet')))

In [9]:
#Now building a model pipeline
stage1 = RegexTokenizer(inputCol= "tweet", outputCol= "tokens", pattern= "\\W")
stage2 = StopWordsRemover(inputCol= "tokens", outputCol= "filtered_words")
stage3 = Word2Vec(inputCol= "filtered_words", outputCol= "vector", vectorSize= 1000)

Can also use the below code for text preprocessing and cleaning

In [None]:
from nltk.stem.snowball import SnowballStemmer
stemmer = SnowballStemmer(language='english')

#Tokenizing the text
tokenizer = Tokenizer(inputCol="tweet", outputCol="words_token")
df_words_token = tokenizer.transform(data_clean).select('id', 'words_token')

#Removing stopwords
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_words_no_stopw = remover.transform(df_words_token).select('id', 'words_clean')

#Stemming the text(can also use WordNetLemmatizer or Porter Stemmer)
from nltk.stem.snowball import SnowballStemmer
#Taking english words only
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean")).select('id', 'words_stemmed')

#Filter length word > 3(Not much important but again depends)
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
df_final_words = df_stemmed.withColumn('words', filter_length_udf(col('words_stemmed')))

In [10]:
#Creating classification model(using gradient boosted tree classifier model)
from pyspark.ml.classification import GBTClassifier
model = GBTClassifier(maxIter = 10,featuresCol = "vector",labelCol = "label")

In [11]:
#Setting up the pipeline
from pyspark.ml.pipeline import Pipeline
pipeline = Pipeline(stages = [stage1,stage2,stage3,model])
pip_fit = pipeline.fit(data)

Now we will be getting the data in streams and we have to return results

In [None]:
import sys
def rx_predictions(tweets):
    try:
        #Filtering the tweets and taking ones len(x)>0
        tweets = tweets.filter(lambda x:len(x)>0)
        #Creating a data frame containing tweet column
        data_tweet = tweets.map(lambda w:Row(tweet = w))
        #Spark data frame
        data_tweet_frame = spark.createDataFrame(data_tweet)
        #Transforming the data using pipeline
        pip_fit.transform(data_tweet_frame).select("tweet","prediction").show()
    except:
        print("No data")
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,batchDuration = 3)
#Creating a Dstream that will connect the hostname,port
lines = ssc.socketTextStream(sys.argv[1],int(sys.argv[2]))
#Now splitting the tweets so that we can identify which set of words are from which tweet
words = lines.flatMap(lambda line:line.split("PICO"))
#Get the predicted sentiments from tweet received
words.foreachRDD(rx_prediction)
#Starting the computation
ssc.start()
#Waiting for the computation to terminate
ssc.awaitTermination()