# Twitter sentiment analysis and prediction using pyspark

In [None]:
from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext
from pyspark import SparkContext

from pyspark.sql.types import *

In [None]:

sc =SparkContext()
sqlContext = SQLContext(sc)

In [None]:
customSchema = StructType([
    StructField("clean_text", StringType()), 
    StructField("category", StringType())])

In [None]:
#modi_data.csv file contains 10000 tweets with seach query modi
filename = '/content/drive/MyDrive/twtr_dataset.csv'

In [None]:
df = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename)

In [None]:

data = df.na.drop(how='any')
data.show(5)

+--------------------+--------+
|          clean_text|category|
+--------------------+--------+
|when modi promise...|      -1|
|talk all the nons...|       0|
|what did just say...|       1|
|asking his suppor...|       1|
|answer who among ...|       1|
+--------------------+--------+
only showing top 5 rows



In [None]:
data.printSchema()

root
 |-- clean_text: string (nullable = true)
 |-- category: string (nullable = true)



# Preprocessing

In [None]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

+--------+-----+
|category|count|
+--------+-----+
|       1|70475|
|       0|53551|
|      -1|34664|
+--------+-----+




#Model Pipeline
Spark Machine Learning Pipelines API is similar to Scikit-Learn. Our pipeline includes three steps:

regexTokenizer: Tokenization (with Regular Expression)

stopwordsRemover: Remove Stop Words

countVectors: Count vectors (“document-term vectors”)

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=20000, minDF=5)

In [None]:

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|          clean_text|category|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
|when modi promise...|      -1|[when, modi, prom...|[when, modi, prom...|(19479,[0,1,4,30,...|  2.0|
|talk all the nons...|       0|[talk, all, the, ...|[talk, all, nonse...|(19479,[0,1,2,5,1...|  1.0|
|what did just say...|       1|[what, did, just,...|[what, did, just,...|(19479,[0,2,3,19,...|  0.0|
|asking his suppor...|       1|[asking, his, sup...|[asking, his, sup...|(19479,[0,4,5,9,1...|  0.0|
|answer who among ...|       1|[answer, who, amo...|[answer, who, amo...|(19479,[0,20,78,1...|  0.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



# Partition Training & Test sets

In [None]:

# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 111062
Test Dataset Count: 47628


#Model Training and Evaluation
Logistic Regression using Count Vector Features

Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability.

In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|agree take crisis 2008 tank...|       1|[0.9991513253573526,1.65713...|  0.0|       0.0|
|much love too the beautiful...|       1|[0.9990368857446261,2.91859...|  0.0|       0.0|
|chacha modi new vision have...|       1|[0.9990354345977572,1.00719...|  0.0|       0.0|
|appeasement hai mumkeen hai...|       1|[0.9988034011551032,3.62290...|  0.0|       0.0|
|very true our country needs...|       1|[0.9984626558069003,1.77380...|  0.0|       0.0|
|modi will wish you many man...|       1|[0.9983693606015881,7.48362...|  0.0|       0.0|
|india becoming superhero na...|       1|[0.9979699958557681,5.29687...|  0.0|       0.0|
|have chapter this great per...|       1|[0.9979261076257133,3.33235...|  0.0|       0.0|
|the very 

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7885893153659636


# Logistic Regression using TF-IDF Features

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|agree take crisis 2008 tank...|       1|[0.9991752951148799,4.16452...|  0.0|       0.0|
|appeasement hai mumkeen hai...|       1|[0.9988554034429974,3.64709...|  0.0|       0.0|
|chacha modi new vision have...|       1|[0.9980556769534737,2.50636...|  0.0|       0.0|
|vote for pappu straight mig...|       1|[0.9980066892297601,2.91804...|  0.0|       0.0|
|can modi win can modi win k...|       1|[0.9977329090810103,0.00119...|  0.0|       0.0|
|much love too the beautiful...|       1|[0.9975824836038859,6.52193...|  0.0|       0.0|
|arnab sir nation wants know...|       1|[0.9974563163425116,1.90441...|  0.0|       0.0|
|very true our country needs...|       1|[0.9972855347002976,2.36914...|  0.0|       0.0|
|much more

In [None]:

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.752506587829302


#Cross-Validation
Let’s now try cross-validation to tune our hyper parameters, and we will only tune the count vectors Logistic Regression.

In [None]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [None]:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.836646950243763