<a href="https://colab.research.google.com/github/Dcodinginsane/Pandas-for-Data-Analysis/blob/main/Sentiment_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Large-Scale Sentiment Analysis with PySpark
Comparative study of classification algorithms and feature extraction functions implemented in PySpark on 1,600,000 Tweets.

In [55]:
pip install pyspark



In [56]:
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

In [57]:
# Necessary imports
import findspark
findspark.init("/usr/local/lib/python3.10/dist-packages/pyspark")
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [58]:
spark1 = SparkSession.builder\
            .master("local[*]")\
            .appName("Twitter")\
            .getOrCreate()

In [59]:
path = "/content/training.1600000.processed.noemoticon.csv"

# Data schema
schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("query", StringType(), True),
    StructField("author", StringType(), True),
    StructField("tweet", StringType(), True)])

df = spark1.read.csv(path,
                     inferSchema=True, # Spark uses the defined schema
                     header=False,
                     schema=schema)

df.dropna()  # Drop rows containing NaN values for simplicity

DataFrame[target: int, id: string, date: string, query: string, author: string, tweet: string]

As you can see in the schema we’ve defined, our dataset contains 6 fields:

* target: the polarity of the tweet (0 = negative, 2 = neutral, 4 = positive)
* id: the id of the tweet (1235)
* date: the date of the tweet (Sat May 16 23:58:44 UTC 2009)
* query: the API query used to get the data
* author: the user that tweeted
* tweet: the text of the tweet (what we are most interested in)

In [60]:
import re
def pre_process(text):
    # Remove links
    text = re.sub('http://\S+|https://\S+', '', text)
    text = re.sub('http[s]?://\S+', '', text)
    text = re.sub(r"http\S+", "", text)

    # Convert HTML references
    text = re.sub('&amp', 'and', text)
    text = re.sub('&lt', '<', text)
    text = re.sub('&gt', '>', text)
    #text = re.sub('\xao', '', text)


    # Remove new line characters
    text = re.sub('[\r\n]+', ' ', text)

    # Remove mentions
    text = re.sub(r'@\w+', '', text)

    # Remove hashtags
    text = re.sub(r'#\w+', '', text)

    # Remove multiple space characters
    text = re.sub('\s+',' ', text)

    # Convert to lowercase
    text = text.lower()
    return text

In [61]:
import pyspark.sql.functions as F

df = df.withColumn("AddCol",F.when(F.col("tweet").like("3"),"three").otherwise("notthree"))

In [62]:
#

In [63]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, VectorAssembler, ChiSqSelector

tokenizer = Tokenizer(inputCol="tweet", outputCol="words")

In [64]:
hashtf = HashingTF(inputCol="words", outputCol='tf')

In [65]:
idf = IDF(inputCol='tf', outputCol="features")

In [66]:
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

In [67]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])

In [68]:
def build_trigrams(inputCol=["tweet","target"], n=3):

    tokenizer = [Tokenizer(inputCol="tweet", outputCol="words")]

    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]

    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]

    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]

    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]

    lr = [LogisticRegression()]

    return Pipeline(stages=tokenizer + ngrams + cv + idf + assembler + label_stringIdx + selector + lr)

In [93]:
%time time.sleep(5)
from datetime import datetime

datetime.utcnow()





st = datetime.utcnow()
pipelineFit = pipeline.fit(train_set)
print('Training time:', datetime.utcnow() - st)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

CPU times: user 26.5 ms, sys: 341 µs, total: 26.8 ms
Wall time: 5 s


NameError: ignored

In [88]:
%%time

st = datetime.utcnow()
pipelineFit = pipeline.fit(train_set)
print('Training time:', datetime.utcnow() - st)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

AttributeError: ignored

In [78]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.mllib.util import MLUtils
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

In [79]:
tokenizer = Tokenizer(inputCol="tweet", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0, 10.0]) \
    .addGrid(lr.maxIter, [20, 50, 100, 500, 1000]) \
    .addGrid(lr.elasticNetParam, [0, 0.5, 1.0]) \
    .build()

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)

In [80]:
%%time
pipelineFit = cv.fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

bestModel = pipelineFit.bestModel
pipelineFit.getEstimatorParamMaps()[np.argmax(pipelineFit.avgMetrics)]

NameError: ignored

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(5,5))

plt.plot(pipelineFit1.stages[-1].summary.roc.select('FPR').collect(),
         pipelineFit1.stages[-1].summary.roc.select('TPR').collect(),
        label="AUC=" +str(round(pipelineFit1.stages[-1].summary.areaUnderROC, 3)))

plt.plot(pipelineFit2.stages[-1].summary.roc.select('FPR').collect(),
         pipelineFit2.stages[-1].summary.roc.select('TPR').collect(),
        label="AUC=" +str(round(pipelineFit2.stages[-1].summary.areaUnderROC, 3)))

plt.plot(pipelineFit3.stages[-1].summary.roc.select('FPR').collect(),
         pipelineFit3.stages[-1].summary.roc.select('TPR').collect(),
        label="AUC=" +str(round(pipelineFit3.stages[-1].summary.areaUnderROC, 3)))

plt.plot(pipelineFit4.stages[-1].summary.roc.select('FPR').collect(),
         pipelineFit4.stages[-1].summary.roc.select('TPR').collect(),
        label="AUC=" +str(round(pipelineFit4.stages [-1].summary.areaUnderROC, 3)))


plt.plot(pipelineFit5.stages[-1].summary.roc.select('FPR').collect(),
         pipelineFit5.stages[-1].summary.roc.select('TPR').collect(),
        label="AUC=" +str(round(pipelineFit5.stages[-1].summary.areaUnderROC, 3)))

plt.plot([0, 1], [0, 1], "r--", label="Guess")

plt.xlabel('FPR')
plt.ylabel('TPR')
plt.legend()
plt.show()