- Apache Spark is an open-source framework that can speedily handle processing Big Data in a single machine or cluster of machines. It supports Python language through the Pyspark library, which allows for scalable analysis and machine learning pipelines [1].

- In this Python Notebook, we will use Pyspark to perform sentiment analysis on the movies reviews dataset in 5 main steps:
1. Install then import the needed packages, and then Connet to the Spark cluster. 
2. load the dataset into Spark.
3. Perform basic EDA.
4. Preprocessing.
5. Build a prodection model (Logestic Regrision Model) [2].

# 1. Install then import the needed packages, and then Connet to the Spark cluster.

- To work with PySpark, we should first check if we have the java installed in our machine, and to do that, we can write this line in the command prompt shell 'java -version', the output will be something like this 'java version "1.8.0_271".

- The next step is to install PySpark either from the Apache Spark download page or by using a pip install line in Jupyter.
- Install findspark and import it to use its init() function to identify the path where Spark has installed.

- Then import PySpark as ps to create SparkContext, the access point to the Spark cluster, and then create SQLContext, the access point to the Spark SQL to work with structured data.

In [None]:
pip install pyspark

In [None]:
pip install findspark

In [1]:
# define the location of spark in my device.

SPARK_HOME = (r'C:\Users\sweet\anaconda3\Lib\site-packages\spark-3.1.1-bin-hadoop2.7')

In [2]:
# import the needed libraries.

import findspark
findspark.init(SPARK_HOME)
import pyspark as ps
from pyspark.sql import SQLContext

In [3]:
# create a SparkContext that allow accessing the Spark cluster,
# then create SQLContext to initialize SparkSQL functionality that can be recieved from SparkContext.

sc = ps.SparkContext()
sqlContext = SQLContext(sc)

# 2. load the dataset into Spark.

In [4]:
# loading the training, validation, and testing datasets into Spark using
# load() function with the path where the dataset is saved.

df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferSchema='true', multiLine='true', escape='"').load(r'C:\Users\sweet\Desktop\Train.csv')

valid = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferSchema='true', multiLine='true', escape='"').load(r'C:\Users\sweet\Desktop\Valid.csv')

test = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferSchema='true', multiLine='true', escape='"').load(r'C:\Users\sweet\Desktop\Test.csv')


# 3. Perform basic Exploratory Data Analysis.

- 1. Its schema shows two columns: text of string data type and the label with an integer data type.
- 2. The show function shows the first N rows of the dataframe; we notice that the text column is multiline text.
- 3. We then use the isnan() function to count() the null values; here, the output is 0, indicating no missing values in our dataframe.
- 4. The count of the training set is 40.000, the validation set is 5000, and the testing set is 5000.
- 5. Use describe() function to perform basic statistics on the numeric column such as the mean, std, min, max, and count. Here we notice that the label column has just two values (binary), 0 and 1.
- 6. Using the groupBy() function, we can group all reviews equal to label 1 in one group and reviews that equal to 0 in another one, and then we aggregate them by summing their count.
- 7. We can show the first reviews in the dataframe by using the where() function. It defines a condition that shows the reviews equal to label 1, indicating positive reviews, and showing those equal to 0 indicating negative reviews. We can see that negative reviews are slightly more than negative reviews.   


In [5]:
# 1. show the dataframe schema

df.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [6]:
# 2. show the first 5 rows of the dataframe

df.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|I grew up (b. 196...|    0|
|When I put this m...|    0|
|Why do people who...|    0|
|Even though I hav...|    0|
|Im a die hard Dad...|    1|
+--------------------+-----+
only showing top 5 rows



In [7]:
valid.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|It's been about 1...|    0|
|someone needed to...|    0|
|The Guidelines st...|    0|
|This movie is a m...|    0|
|Before Stan Laure...|    0|
+--------------------+-----+
only showing top 5 rows



In [8]:
test.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|I always wrote th...|    0|
|1st watched 12/7/...|    0|
|This movie was so...|    0|
|The most interest...|    1|
|when i first read...|    0|
+--------------------+-----+
only showing top 5 rows



In [9]:
# 3. check for missing values

from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(text), text)).alias(text) for text in df.columns]).show()

+----+-----+
|text|label|
+----+-----+
|   0|    0|
+----+-----+



In [10]:
# 4. count each dataset entries

[df.count(), valid.count(), test.count()]

[40000, 5000, 5000]

In [11]:
# show the data type of each variable in the dataframe

df.dtypes

[('text', 'string'), ('label', 'int')]

In [12]:
# 5. shows basic statistics for the numeric variables

df.describe(['label']).show()

+-------+------------------+
|summary|             label|
+-------+------------------+
|  count|             40000|
|   mean|          0.499525|
| stddev|0.5000060244893201|
|    min|                 0|
|    max|                 1|
+-------+------------------+



In [13]:
# 6. Count the size of positive (=1) and negative (=0) reviews 

from pyspark.sql import functions as fn

df.groupBy('label').agg(fn.count('*')).show()


+-----+--------+
|label|count(1)|
+-----+--------+
|    1|   19981|
|    0|   20019|
+-----+--------+



In [14]:
# 7. show the first rows of positive reviews

df.where(fn.col('label') == 1).first()

Row(text='Im a die hard Dads Army fan and nothing will ever change that. I got all the tapes, DVD\'s and audiobooks and every time i watch/listen to them its brand new. <br /><br />The film. The film is a re run of certain episodes, Man and the hour, Enemy within the gates, Battle School and numerous others with a different edge. Introduction of a new General instead of Captain Square was a brilliant move - especially when he wouldn\'t cash the cheque (something that is rarely done now).<br /><br />It follows through the early years of getting equipment and uniforms, starting up and training. All in all, its a great film for a boring Sunday afternoon. <br /><br />Two draw backs. One is the Germans bogus dodgy accents (come one, Germans cant pronounced the letter "W" like us) and Two The casting of Liz Frazer instead of the familiar Janet Davis. I like Liz in other films like the carry ons but she doesn\'t carry it correctly in this and Janet Davis would have been the better choice.', l

In [15]:
# show the first rows of negative reviews

df.where(fn.col('label') == 0).first()

Row(text='I grew up (b. 1965) watching and loving the Thunderbirds. All my mates at school watched. We played "Thunderbirds" before school, during lunch and after school. We all wanted to be Virgil or Scott. No one wanted to be Alan. Counting down from 5 became an art form. I took my children to see the movie hoping they would get a glimpse of what I loved as a child. How bitterly disappointing. The only high point was the snappy theme tune. Not that it could compare with the original score of the Thunderbirds. Thankfully early Saturday mornings one television channel still plays reruns of the series Gerry Anderson and his wife created. Jonatha Frakes should hand in his directors chair, his version was completely hopeless. A waste of film. Utter rubbish. A CGI remake may be acceptable but replacing marionettes with Homo sapiens subsp. sapiens was a huge error of judgment.', label=0)

# 4. Preprocessing.

- Many preprocessing techniques can be used to prepare the text for sentiment analysis.
- Here, we will apply three main techniques as follows:

- 1. Tokenization: It is the process of splitting sentences into words. We are using the RegexTokenizer() function from the PySpark library that converts the sentences in the text column into individual terms in a words column. This process is essential to convert the words in a format that the Machine Learning model can accept.

- 2. Remove Stop Words: Many words are considered unnecessary and unuseful in our analysis and need to be removed to make an accurate prediction. Here we request a list of stop words in English and use the PySpark stopWordsRemover() function to remove them from the words column and save the rest of the words in the filtered column.

- 3. Term Frequency tf: Using the countVectorizer() function, we convert each term or word into a vector and count each term's frequency in each review. We specify the terms' size to 1000, and we ignore the terms that appear in 10 reviews or less.

- 4. Pipelines: It combines all preprocessing work stages in one entity in which each stage is considered either a transformer or an estimator. The transformer is a function that adds, delete, or updates the current features in a dataframe, such as the tokenizer() and the stopWordsRemover(). On the other hand, the estimator such as the countVectorizer() and the LogisticRegression() in the next stage must learn from the input data and then returns a transformer. Any estimator has a fit() method that returns transformer() that is called when the pipeline gets executed [3]. 

- 5. Inverse Document Frequency IDF: It counts a term's appearance in the tf across all terms in all reviews. We then calculate the tf-IDF, which gives each term weight or a score indicating its importance (the high score is the important is the term). This technique helps in performing sentiment analysis by converting the words into vectors with different weights that can be understood by the ML Model [4].



In [19]:
# 1. tokenize the text column (convert text into words)
# first stage

from pyspark.ml.feature import RegexTokenizer

tokenizer = RegexTokenizer().setGaps(False).setPattern('\\p{L}+').setInputCol('text').setOutputCol('words')


In [55]:
tokenizer.transform(df).show(5)

+--------------------+-----+--------------------+
|                text|label|               words|
+--------------------+-----+--------------------+
|I grew up (b. 196...|    0|[i, grew, up, b, ...|
|When I put this m...|    0|[when, i, put, th...|
|Why do people who...|    0|[why, do, people,...|
|Even though I hav...|    0|[even, though, i,...|
|Im a die hard Dad...|    1|[im, a, die, hard...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [57]:
# 2. get stop words list from the link

import requests
stopWords = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()

In [58]:
# remove stop words from words column and store the output in the filtered column
# second stage
 
from pyspark.ml.feature import StopWordsRemover

filter = StopWordsRemover().setStopWords(stopWords).setCaseSensitive(False).setInputCol('words').setOutputCol('filtered')

In [191]:
# 3. use CountVectorizer to convert the filtered words to vectors of token counts (counts of Term Frecuency in documents)
# remove words that appear in 10 documents or less
# third stage

from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(minTF=1., minDF=10., vocabSize=1000).setInputCol('filtered').setOutputCol('tf')


In [192]:
# 4. create a pipelined transformer that group all previous stages in a single workflow

from pyspark.ml import Pipeline

cv_pipeline = Pipeline(stages=[tokenizer, filter, cv]).fit(df)

In [193]:
cv_pipeline.transform(df).show(5)  

+--------------------+-----+--------------------+--------------------+--------------------+
|                text|label|               words|            filtered|                  tf|
+--------------------+-----+--------------------+--------------------+--------------------+
|I grew up (b. 196...|    0|[i, grew, up, b, ...|[grew, b, watchin...|(1000,[2,3,43,71,...|
|When I put this m...|    0|[when, i, put, th...|[movie, dvd, play...|(1000,[0,1,2,4,5,...|
|Why do people who...|    0|[why, do, people,...|[people, know, pa...|(1000,[1,2,3,4,5,...|
|Even though I hav...|    0|[even, though, i,...|[great, biblical,...|(1000,[2,8,10,11,...|
|Im a die hard Dad...|    1|[im, a, die, hard...|[im, die, hard, d...|(1000,[0,1,3,4,5,...|
+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [194]:
# 5. use Inverse Document Frequency to count the score of rare appeared terms 
from pyspark.ml.feature import IDF
idf = IDF().setInputCol('tf').setOutputCol('tfidf')

In [195]:
# create second pipeline that group the results from the last stage with this one
idf_pipeline = Pipeline(stages=[cv_pipeline, idf]).fit(df)

In [196]:
# transform it to vectors representation and then show the first 5 rows

idf_pipeline.transform(df).show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|                text|label|               words|            filtered|                  tf|               tfidf|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|I grew up (b. 196...|    0|[i, grew, up, b, ...|[grew, b, watchin...|(1000,[2,3,43,71,...|(1000,[2,3,43,71,...|
|When I put this m...|    0|[when, i, put, th...|[movie, dvd, play...|(1000,[0,1,2,4,5,...|(1000,[0,1,2,4,5,...|
|Why do people who...|    0|[why, do, people,...|[people, know, pa...|(1000,[1,2,3,4,5,...|(1000,[1,2,3,4,5,...|
|Even though I hav...|    0|[even, though, i,...|[great, biblical,...|(1000,[2,8,10,11,...|(1000,[2,8,10,11,...|
|Im a die hard Dad...|    1|[im, a, die, hard...|[im, die, hard, d...|(1000,[0,1,3,4,5,...|(1000,[0,1,3,4,5,...|
+--------------------+-----+--------------------+--------------------+--------------------+-----

In [197]:
tfidf_df = idf_pipeline.transform(df).show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|                text|label|               words|            filtered|                  tf|               tfidf|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|I grew up (b. 196...|    0|[i, grew, up, b, ...|[grew, b, watchin...|(1000,[2,3,43,71,...|(1000,[2,3,43,71,...|
|When I put this m...|    0|[when, i, put, th...|[movie, dvd, play...|(1000,[0,1,2,4,5,...|(1000,[0,1,2,4,5,...|
|Why do people who...|    0|[why, do, people,...|[people, know, pa...|(1000,[1,2,3,4,5,...|(1000,[1,2,3,4,5,...|
|Even though I hav...|    0|[even, though, i,...|[great, biblical,...|(1000,[2,8,10,11,...|(1000,[2,8,10,11,...|
|Im a die hard Dad...|    1|[im, a, die, hard...|[im, die, hard, d...|(1000,[0,1,3,4,5,...|(1000,[0,1,3,4,5,...|
+--------------------+-----+--------------------+--------------------+--------------------+-----

# 5. Build a prodection model (Logestic Regrision Model)

- Logistic Regression is a classification algorithm for categorical variables; we will use it to predict if the reviews are either positive = 1 or negative = 2.
- 1. We will use the PySpark LogisticRegresion() function and set the 'label' column as a target and the 'tfidf' column as a feature.

- 2. Create a pipeline to group the two stages and call the fit function to start the training. 

- 3. use the BinaryClassificationEvaluater() function to measure the accuracy of the model predictions. It gives a score of probability for the binary classes 0 and 1. The probability is either positive or negative value in which the negative one indicates that the review is = 0 and the positive probability indicates the review is = 1.

- 4. Make a prediction using the validating dataset. Then evaluate the model accuracy, which can be calculated by dividing the count of the label equal to the prediction by the count of all entries. The roc-auc (Receiver Operating Characteristics-Area Under The Curve) score tells us how good our prediction curve is; the high the score, the better prediction is.

- 5. Make another prediction using the test dataset and evaluate the results as well. We can see that the accuracy score is about 85 and the roc-auc score is 92, which is a good result. We can increase the accuracy by increasing the size of the words in the tfidf column. 

- 6. We can compare the actual label and the predicted one for each review.

- 7. Finally, we can create a vocabulary of the positive and negative words based on their weights from the tfidf calculation, then show the top 20 negative and positive words in all reviews.

In [198]:
# 1. build the prediction model (Logistic Regression)

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression().setLabelCol('label').setFeaturesCol('tfidf').setRegParam(0.0).\
setMaxIter(100).setElasticNetParam(0.)

In [199]:
# 2. create a pipeline to group previous stage with the current one

lr_pipeline = Pipeline(stages=[idf_pipeline, lr]).fit(df)

In [200]:
# 3. evaluate accuracy of the classifier model 

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')


In [201]:
# 4. evaluate the prediction accuracy of the model ( use the validation set) 

predict = lr_pipeline.transform(valid)
accuracy = predict.filter(predict.label == predict.prediction).count() / float(valid.count())
roc_auc = evaluator.evaluate(predict)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.8530
ROC-AUC: 0.9275


In [202]:
# 5. evaluate the prediction accuracy of the model ( use the test set) 

test_prediction = lr_pipeline.transform(test)

accuracy = test_prediction.filter(test_prediction.label == test_prediction.prediction).count() / float(test.count())
roc_auc = evaluator.evaluate(test_prediction)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.8614
ROC-AUC: 0.9292


In [203]:
# 6. compare between the first 20 actual label and the predicted one 

test_prediction.select('label', 'prediction').show(20)

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 20 rows



In [204]:
# 7. create a vocabulary of a positive and negative words

import pandas as pd
vocabulary = idf_pipeline.stages[0].stages[-1].vocabulary
weights = lr_pipeline.stages[-1].coefficients.toArray()
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})

In [205]:
# top 20 negative words in all reviews

coeffs_df.sort_values('weight').head(20)

Unnamed: 0,word,weight
103,worst,-0.696979
273,waste,-0.663751
202,awful,-0.512901
635,poorly,-0.403877
11,bad,-0.383148
528,dull,-0.362985
210,terrible,-0.352712
180,boring,-0.352188
778,fails,-0.338895
771,wasted,-0.331485


In [206]:
# top 20 positive words in all reviews

coeffs_df.sort_values('weight', ascending=False).head(20)

Unnamed: 0,word,weight
13,great,0.42138
160,excellent,0.41353
680,superb,0.33542
303,amazing,0.334918
229,wonderful,0.313465
30,best,0.310018
575,fantastic,0.28958
392,hilarious,0.278421
321,brilliant,0.271617
696,perfectly,0.262539


- References
- [1] Chand, S. (2020). PySpark Programming â€“ Integrating Speed With Simplicity. Retrieved from https://www.edureka.co/blog/pyspark-programming/
- [2] Spark ML. (n.d.). Introduction to Spark ML: An application to Sentiment Analysis. Retrieved from http://classes.ischool.syr.edu/ist718/content/unit09/lab-sentiment_analysis/
- [3] Kumar, S. (2018). Machine Learning pipelines with Spark ML. Retrieved from https://medium.com/@Sushil_Kumar/machine-learning-pipelines-with-spark-ml-94cd9b4c973d
- [4] How to process textual data using TF-IDF in Python. (n.d.). Retrieved from https://www.freecodecamp.org/news/how-to-process-textual-data-using-tf-idf-in-python-cd2bbc0a94a3/
- [5] Introduction to Spark ML: An application to Sentiment Analysis (n.d.). http://classes.ischool.syr.edu/ist718/content/unit09/lab-sentiment_analysis/
- [6] Kim, R. (2018). Sentiment Analysis with PySpark. Retrieved from https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35
- [7] Narkhede, S. (2018). Understanding AUC - ROC Curve. Retrieved from https://towardsdatascience.com/understanding-auc-roc-curve-68b2303cc9c5