# Sentiment Analysis with Pyspark

One of the tools I'm deeply interested, but haven't had many chance to explore is Apache Spark. Most of the time, Pandas and Scikit-Learn is enough to handle the size of data I'm trying to build a model on. But that also means that I haven't had chance to deal with petabytes of data yet, and I want to be prepared for the case I'm faced with real big data.

I have tried some basic data manipulation with Pyspark before, but only to a very basic level. I want to learn more and be more comfortable in using Pyspark. This post is my endeavour to have a better understanding of Pyspark.

Python is great for data science modelling, thanks to its numerous modules and packages that help achieve data science goals. But what if the data you are dealing with cannot be fit into a single machine? Maybe you can implement careful sampling to do your analysis on a single machine, but with distributed computing framework like Pyspark, you can efficiently implement the task for large data sets.

Spark API is available in multiple programming languages (Scala, Java, Python and R). There are debates about how Spark performance varies depending on which language you run it on, but since the main language I have been using is Python, I will focus on Pyspark without going into too much detail of what language should I choose for Apache Spark.

Spark has three different data structures available through its APIs: RDD, Dataframe (this is different from Pandas dataframe), Dataset. For this post, I will work with Dataframe, and corresponding machine learning library Sparkml. I first decided on the data structure I would like to use based on the advice from the post in Analytics Vidhya. "Dataframe is much faster than RDD because it has metadata (some information about data) associated with it, which allows Spark to optimize query plan." You can find more comprehensive introduction from the original post.
https://www.analyticsvidhya.com/blog/2016/09/comprehensive-introduction-to-apache-spark-rdds-dataframes-using-pyspark/

And there's also an informative post on Databricks comparing different data structures of Apache Spark: "A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets".
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Then I figured out that I need to use Sparkml instead Sparkmllib, if I want to deal with dataframe. Sparkmllib is used with RDD, while Sparkml supports dataframe.

One more thing to note is that I will work in local mode with my laptop. The local mode is often used for prototyping, development, debugging, and testing. However, as Spark's local mode is fully compatible with the cluster mode, codes written locally can be run on a cluster with just a few additional steps.

In order to use Pyspark in Jupyter Notebook, you should either configure Pyspark driver or use a package called Findspark to make a Spark Context available in your Jupyter Notebook. You can easily install Findspark by "pip install findspark" in your command line. Let's first load some of the basic dependencies we need.

In [1]:
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

First step in any Apache programming is to create a SparkContext. SparkContext is needed when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. It is first step to connect with Apache Cluster. 

In [2]:
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


In [3]:
sc.master

u'local[4]'

The dataset I'll use for this post is annotated Tweets from "Sentiment140". It originated from a Stanford research project, and I used this dataset for my previous series of Twitter sentiment analysis. Since I already cleaned the tweets during the process of my previous project, I will use pre-cleaned tweets. If you want to know more in detail about the cleaning process I took, you can check my previous post: "Another Twitter sentiment analysis with Python-Part 2"
https://towardsdatascience.com/another-twitter-sentiment-analysis-with-python-part-2-333514854913

In [4]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('project-capstone/Twitter_sentiment_analysis/clean_tweet.csv')

In [5]:
type(df)

pyspark.sql.dataframe.DataFrame

In [6]:
df.show(5)

+---+--------------------+------+
|_c0|                text|target|
+---+--------------------+------+
|  0|awww that bummer ...|     0|
|  1|is upset that he ...|     0|
|  2|dived many times ...|     0|
|  3|my whole body fee...|     0|
|  4|no it not behavin...|     0|
+---+--------------------+------+
only showing top 5 rows



In [7]:
df = df.dropna()

In [8]:
df.count()

1596019

After successfully loading the data as Spark Dataframe, we can take a peek at the data by calling .show(), which is equivalent to Pandas .head(). After dropping NA, we have a bit less than 1.6 million Tweets. I will split this in three parts; training, validation, test. Since I have around 1.6 million entries, 1% each for validation and test set will be enough to test the models.

In [9]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)

## HashingTF + IDF + Logistic Regression

Through my previous attempt of sentiment analysis with Pandas and Scikit-Learn, I learned that TF-IDF with Logistic Regression is quite strong combination, and showed robust performance, as high as Word2Vec + Convolutional Neural Network model. So in this post, I will try to implement TF-IDF + Logistic Regression model with Pyspark.

In [11]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [10]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|_c0|                text|target|               words|                  tf|            features|label|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
|  0|awww that bummer ...|     0|[awww, that, bumm...|(65536,[8436,8847...|(65536,[8436,8847...|  0.0|
|  1|is upset that he ...|     0|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  0.0|
|  2|dived many times ...|     0|[dived, many, tim...|(65536,[2548,2888...|(65536,[2548,2888...|  0.0|
|  3|my whole body fee...|     0|[my, whole, body,...|(65536,[158,11650...|(65536,[158,11650...|  0.0|
|  4|no it not behavin...|     0|[no, it, not, beh...|(65536,[1968,4488...|(65536,[1968,4488...|  0.0|
+---+--------------------+------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [11]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

In [12]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8612433722998375

0.86! That looks good, maybe too good. Because I already tried the same combination of techniques in Pandas and SKLearn, I know that the result for unigram TF-IDF with Logistic Regression is around 80% accuracy. There can be some slight difference due to the detailed model parameters, but still this looks too good.

And by looking at the Spark documentation I realised that what BinaryClassificationEvaluator evaluates is by default areaUnderROC. https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

And for binary classification, Spark doesn't support accuracy as a metric. But I can still calculate accuracy by counting the number of predictions matching the label and dividing it by the total entries.

In [13]:
evaluator.getMetricName()

'areaUnderROC'

In [15]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

0.7899955788542917

Now it looks more plausible, actually the accuracy is slightly lower than what I have seen from SKLearn's result.

## CountVectorizer + IDF + Logistic Regression

There's another way that you can get term frequecy for IDF (Inverse Document Freqeuncy) calculation. It is CountVectorizer in SparkML. Apart from the reversibility of the features (vocabularies), there is an important difference in how each of them filters top features. In case of HashingTF it is dimensionality reduction with possible collisions. CountVectorizer discards infrequent tokens.

Let's see if performance changes if we use Countvectorizer instead of HashingTF.

In [16]:
%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print "Accuracy Score: {0:.4f}".format(accuracy)
print "ROC-AUC: {0:.4f}".format(roc_auc)

Accuracy Score: 0.7982
ROC-AUC: 0.8681
CPU times: user 45.7 ms, sys: 15.1 ms, total: 60.8 ms
Wall time: 1min 15s


It looks like using CountVectorizer has improved the performance a little bit.

## N-gram Implementation

In Scikit-Learn, n-gram implementation is fairly easy. You can define range of n-grams when you call TfIdf Vectorizer. But with Spark, it is a bit more complicated. It does not automatically combine features from different n-grams, so I had to use VectorAssembler in the pipeline, to combine the features I get from each n-grams.

I first tried to extract around 16,000 features from unigram, bigram, trigram. This means I will get around 48,000 features in total. Then I implemented Chi Squared feature selection to reduce the features back to 16,000 in total.

In [70]:
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector

def build_trigrams(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+lr)

In [71]:
%%time
trigram_pipelineFit = build_trigrams().fit(train_set)
predictions = trigram_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(dev_set.count())
roc_auc = evaluator.evaluate(predictions)

# print accuracy, roc_auc
print "Accuracy Score: {0:.4f}".format(accuracy)
print "ROC-AUC: {0:.4f}".format(roc_auc)

Accuracy Score: 0.8129
ROC-AUC: 0.8884
CPU times: user 2.11 s, sys: 935 ms, total: 3.04 s
Wall time: 4h 1min 9s


Accuracy has improved, but as you might have noticed, fitting the model took 4 hours! And this is mainly because of ChiSqSelector.

What if I extract 5,460 features each from unigram, bigram, trigram in the first place, to have around 16,000 features in total in the end, without Chi Squared feature selection?

In [18]:
from pyspark.ml.feature import NGram, VectorAssembler

def build_ngrams_wocs(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

In [19]:
%%time

trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = evaluator.evaluate(predictions_wocs)

# print accuracy, roc_auc
print "Accuracy Score: {0:.4f}".format(accuracy_wocs)
print "ROC-AUC: {0:.4f}".format(roc_auc_wocs)

Accuracy Score: 0.8126
ROC-AUC: 0.8856
CPU times: user 201 ms, sys: 69.1 ms, total: 270 ms
Wall time: 6min 28s


This has given me almost same result, marginally lower, but the difference is in the fourth digit. Considering it takes only 8 mins without ChiSqSelector, I choose the model without ChiSqSelector.

And finally, let's try this model on the final test set.

In [20]:
test_predictions = trigramwocs_pipelineFit.transform(test_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(test_predictions)

# print accuracy, roc_auc
print "Accuracy Score: {0:.4f}".format(test_accuracy)
print "ROC-AUC: {0:.4f}".format(test_roc_auc)

Accuracy Score: 0.8122
ROC-AUC: 0.8862


Final test accuracy is 81.22%.