## Twitter sentiment analysis and prediction using pyspark

In [1]:
from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext
from pyspark import SparkContext

from pyspark.sql.types import *

https://www.learndatasci.com/tutorials/sentiment-analysis-reddit-headlines-pythons-nltk/

In [2]:
sc =SparkContext()
sqlContext = SQLContext(sc)

In [3]:
customSchema = StructType([
    StructField("clean_text", StringType()), 
    StructField("category", StringType())])

In [4]:
#modi_data.csv file contains 10000 tweets with seach query modi
filename = 'redt_dataset.csv'

In [20]:
df = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename)
df.show()

In [6]:
data = df.na.drop(how='any')
data.show(5)

+--------------------+--------+
|          clean_text|category|
+--------------------+--------+
|though most the u...|       1|
| parents live nea...|       1|
|jets flying reall...|       1|
| feel for the bra...|       1|
| switched republi...|      -1|
+--------------------+--------+
only showing top 5 rows



In [7]:
data.printSchema()

root
 |-- clean_text: string (nullable = true)
 |-- category: string (nullable = true)



## Preprocessing

In [8]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

+--------+-----+
|category|count|
+--------+-----+
|       1|15749|
|       0|12895|
|      -1| 8244|
+--------+-----+



## Model Pipeline
Spark Machine Learning Pipelines API is similar to Scikit-Learn. Our pipeline includes three steps:

regexTokenizer: Tokenization (with Regular Expression)

stopwordsRemover: Remove Stop Words

countVectors: Count vectors (“document-term vectors”)

In [11]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=15000, minDF=5)

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|          clean_text|category|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
|though most the u...|       1|[though, most, th...|[though, most, us...|(12890,[0,1,2,3,4...|  0.0|
| parents live nea...|       1|[parents, live, n...|[parents, live, n...|(12890,[6,37,88,9...|  0.0|
|jets flying reall...|       1|[jets, flying, re...|[jets, flying, re...|(12890,[36,68,73,...|  0.0|
| feel for the bra...|       1|[feel, for, the, ...|[feel, for, brave...|(12890,[3,14,19,3...|  0.0|
| switched republi...|      -1|[switched, republ...|[switched, republ...|(12890,[2,3,4,6,1...|  2.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



## Partition Training & Test sets¶

In [13]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 25905
Test Dataset Count: 10983


## Model Training and Evaluation
Logistic Regression using Count Vector Features 

Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability.

In [14]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
| modi’ government last four...|       1|[0.9999998432903444,1.43071...|  0.0|       0.0|
|she right she right she rig...|       1|[0.9999997579988142,3.62693...|  0.0|       0.0|
|great job mellowde writing ...|       1|[0.9999997416777889,3.39726...|  0.0|       0.0|
| very interested history an...|       1|[0.9999986290163623,6.14373...|  0.0|       0.0|
|just tried organize the pol...|      -1|[0.9999981355577869,6.46211...|  2.0|       0.0|
| chennai super kings ipl 20...|       1|[0.9999965535336731,3.68668...|  0.0|       0.0|
|this the first time was gre...|       1|[0.9999955444739944,3.64267...|  0.0|       0.0|
|there couple reasons see fo...|       1|[0.9999934766327285,7.64619...|  0.0|       0.0|
|’ gunning

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.697852576519381

## Logistic Regression using TF-IDF Features¶

In [16]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|just tried organize the pol...|      -1|[0.9999999998459883,5.31840...|  2.0|       0.0|
|she right she right she rig...|       1|[0.9999997245487119,2.28599...|  0.0|       0.0|
| take rather separated from...|       1|[0.9999979078617137,2.33964...|  0.0|       0.0|
|translations armor names hu...|      -1|[0.9999965982307848,3.39439...|  2.0|       0.0|
|great job mellowde writing ...|       1|[0.999996386371032,2.457027...|  0.0|       0.0|
| chennai super kings ipl 20...|       1|[0.9999925927459271,9.31544...|  0.0|       0.0|
|typical elitist indian mind...|       1|[0.9999788261906685,1.33554...|  0.0|       0.0|
| which team are you gunning...|       1|[0.9999775608662237,3.86977...|  0.0|       0.0|
|great pos

In [17]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6736147017995611

## Cross-Validation
Let’s now try cross-validation to tune our hyper parameters, and we will only tune the count vectors Logistic Regression.

In [18]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
#print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

0.7449757330752173