## Twitter sentiment analysis and prediction using pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=f32c70234474b4f7513ed70c794ec0fa3d9f84e93864fcf83df1a65b211d0db8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext
from pyspark import SparkContext

from pyspark.sql.types import *

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


https://www.learndatasci.com/tutorials/sentiment-analysis-reddit-headlines-pythons-nltk/

In [None]:
sc =SparkContext()
sqlContext = SQLContext(sc)



In [None]:
customSchema = StructType([
    StructField("clean_text", StringType()),
    StructField("category", StringType())])

In [None]:

filename = '/content/drive/MyDrive/QTDL/Report/Sentiment-Analysis-using-Pyspark-on-Multi-Social-Media-Data/redt_dataset.csv'

In [None]:
df = sqlContext.read.format("csv").option("header", "true").schema(customSchema).load(filename)
df.show()

+--------------------+--------+
|          clean_text|category|
+--------------------+--------+
| family mormon ha...|       1|
|buddhism has very...|       1|
|seriously don say...|      -1|
|what you have lea...|       0|
|for your own bene...|       1|
|you should all si...|      -1|
| was teens when d...|       1|
|jesus was zen mee...|       0|
|there are two var...|      -1|
|dont worry about ...|       1|
| recently told fa...|       1|
| unto others you ...|       1|
|first understand ...|       1|
| recently heard s...|       1|
|different times d...|       1|
|does evil include...|      -1|
|our campaign has ...|       1|
|technically you c...|      -1|
|              zarus |       0|
|blood and souls f...|       0|
+--------------------+--------+
only showing top 20 rows



In [None]:
df.count()

38305

In [None]:
data = df.na.drop(how='any')
data.show(5)

+--------------------+--------+
|          clean_text|category|
+--------------------+--------+
| family mormon ha...|       1|
|buddhism has very...|       1|
|seriously don say...|      -1|
|what you have lea...|       0|
|for your own bene...|       1|
+--------------------+--------+
only showing top 5 rows



In [None]:
data.count()

36888

In [None]:
data.printSchema()

root
 |-- clean_text: string (nullable = true)
 |-- category: string (nullable = true)



## Preprocessing

In [None]:
from pyspark.sql.functions import col

data.groupBy("category").count().orderBy(col("count").desc()).show()

+--------+-----+
|category|count|
+--------+-----+
|       1|15749|
|       0|12895|
|      -1| 8244|
+--------+-----+



## Model Pipeline
Spark Machine Learning Pipelines API is similar to Scikit-Learn. Our pipeline includes three steps:

regexTokenizer: Tokenization (with Regular Expression)

stopwordsRemover: Remove Stop Words

countVectors: Count vectors (“document-term vectors”)

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

# stop words
stop_words = list(stopwords.words('english'))
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stop_words)

# bag of words count
# countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=15000, minDF=5)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

# pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# # Fit the pipeline to training documents.
# pipelineFit = pipeline.fit(data)
# dataset = pipelineFit.transform(data)
# dataset.show(5)

## Partition Training & Test sets¶

In [None]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 25905
Test Dataset Count: 10983


## Model Training and Evaluation
Logistic Regression using Count Vector Features

Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability.

In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0).select("clean_text","category","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
| modi’ government last four...|       1|[0.9999998432903444,1.43071...|  0.0|       0.0|
|she right she right she rig...|       1|[0.9999997579988142,3.62693...|  0.0|       0.0|
|great job mellowde writing ...|       1|[0.9999997416777889,3.39726...|  0.0|       0.0|
| very interested history an...|       1|[0.9999986290163623,6.14373...|  0.0|       0.0|
|just tried organize the pol...|      -1|[0.9999981355577869,6.46211...|  2.0|       0.0|
| chennai super kings ipl 20...|       1|[0.9999965535336731,3.68668...|  0.0|       0.0|
|this the first time was gre...|       1|[0.9999955444739944,3.64267...|  0.0|       0.0|
|there couple reasons see fo...|       1|[0.9999934766327285,7.64619...|  0.0|       0.0|
|’ gunning

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.697852576519381

## Logistic Regression using TF-IDF Features¶

In [None]:
# from pyspark.ml.feature import HashingTF, IDF

# hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
# idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
# pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

# pipelineFit = pipeline.fit(data)
# dataset = pipelineFit.transform(data)
# dataset = dataset.dropDuplicates(['clean_text'])

# (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
# lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
# lrModel = lr.fit(trainingData)

# predictions = lrModel.transform(testData)

# predictions.filter(predictions['prediction'] == 0) \
#     .select("clean_text","category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset = dataset.dropDuplicates(['clean_text'])

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
logistic_regression = LogisticRegression(featuresCol='features',
                        labelCol='label',
                        family='multinomial',
                        maxIter=20,
                        regParam=0.3,
                        elasticNetParam=0)

lrModel = logistic_regression.fit(trainingData)

In [None]:
dataset.select("clean_text", "words").show(truncate=False, n = 2)

+----------+-----+
|clean_text|words|
+----------+-----+
|          |[]   |
|          |[]   |
+----------+-----+
only showing top 2 rows



In [None]:
dataset.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
dataset.select("rawFeatures", "features").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
testData.show()

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
|          clean_text|category|               words|            filtered|         rawFeatures|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
|                 ...|       0|                  []|                  []|      (262144,[],[])|      (262144,[],[])|  1.0|
| 101 doesn matter...|       1|[101, doesn, matt...|[101, matter, mod...|(262144,[11018,40...|(262144,[11018,40...|  0.0|
| 10125 10133 pdf ...|      -1|[10125, 10133, pd...|[10125, 10133, pd...|(262144,[13828,40...|(262144,[13828,40...|  2.0|
|                1753|       0|              [1753]|              [1753]|(262144,[25246],[...|(262144,[25246],[...|  1.0|
| 183\tkazari uiha...|       1|[183, kazari, uih...|[183, kazari, uih...|(262144,[10691,18...|(262144,[10691,18...|  0.0|
| 2009 the environ...|  

In [None]:
predictions = lrModel.transform(testData)
predictions.select("rawPrediction", "probability", "prediction").show(truncate=False, n=10)

+---------------------------------------------------------------+------------------------------------------------------------+----------+
|rawPrediction                                                  |probability                                                 |prediction|
+---------------------------------------------------------------+------------------------------------------------------------+----------+
|[-0.10420644789460265,0.7355223005598696,-0.6313158526652669]  |[0.25601326558728676,0.5928598380644066,0.15112689634830664]|1.0       |
|[2.7930520548264592,-1.686064350210994,-1.1069877046154661]    |[0.9693825011848106,0.010996124219506888,0.0196213745956825]|0.0       |
|[0.24766236085192495,-0.40750292923897435,0.15984056838704896] |[0.4106303495736976,0.21326342249948596,0.3761062279268165] |0.0       |
|[-0.10420644789460265,0.7355223005598696,-0.6313158526652669]  |[0.25601326558728676,0.5928598380644066,0.15112689634830664]|1.0       |
|[0.7870714872104246,-0.3475230274

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6800802943098709

# Cross-Validation


In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(logistic_regression.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(logistic_regression.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=logistic_regression, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7145820411995929

## Naive Bayes



In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                    clean_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
| start preparing right now ...|       1|[1.0,9.102151038908848E-17,...|  0.0|       0.0|
|mumbai get chennai style el...|       0|[1.0,5.1888555075530614E-17...|  1.0|       0.0|
| one step forward and two s...|      -1|[1.0,4.8333915946487694E-17...|  2.0|       0.0|
|read you aren working for m...|       0|[1.0,3.041527028080887E-17,...|  1.0|       0.0|
|you forgot the part where t...|       1|[1.0,2.9008791420143995E-17...|  0.0|       0.0|
|thanks dude such useful thi...|       1|[1.0,2.5769787971473954E-17...|  0.0|       0.0|
|lol everyone replying with ...|       1|[1.0,1.5324007731978984E-17...|  0.0|       0.0|
|clouds really like form the...|       1|[1.0,1.1630961431926943E-17...|  0.0|       0.0|
|     love

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.5376241229669134

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create initial Naïve Bayes model
nb = NaiveBayes(labelCol="label", featuresCol="features")

# Create ParamGrid for Cross Validation
nbparamGrid = (ParamGridBuilder()
               .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
               .build())

# Evaluate model
nbevaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Create 5-fold CrossValidator
nbcv = CrossValidator(estimator = nb,
                    estimatorParamMaps = nbparamGrid,
                    evaluator = nbevaluator,
                    numFolds = 5)

# Run cross validations
nbcvModel = nbcv.fit(trainingData)
print(nbcvModel)

# Use test set here so we can measure the accuracy of our model on new data
nbpredictions = nbcvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print('Accuracy:', nbevaluator.evaluate(nbpredictions))

CrossValidatorModel_9e45358885b5
Accuracy: 0.599204392648857


# Random Forest


In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("clean_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)