## Import modules and create spark session

In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=4e5b967bc996b866728024d60c47d1c395d6ec6c7055052fdd599e39cbdd4deb
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [5]:
#import modules
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

#create Spark session
appName = "Sentiment Analysis in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Read data file into Spark dataFrame

In [6]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [7]:
#read csv file into dataFrame with automatically inferred schema
tweets_csv = spark.read.csv('/content/tweets_dataset.csv', inferSchema=True, header=True)
tweets_csv.show(truncate=False, n=3)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only showing top 3 rows



In [8]:
tweets_csv.head(5)

[Row(ItemID=1038, Sentiment=1, SentimentSource='Sentiment140', SentimentText='that film is fantastic #brilliant'),
 Row(ItemID=1804, Sentiment=1, SentimentSource='Sentiment140', SentimentText='this music is really bad #myband'),
 Row(ItemID=1693, Sentiment=0, SentimentSource='Sentiment140', SentimentText='winter is terrible #thumbs-down'),
 Row(ItemID=1477, Sentiment=0, SentimentSource='Sentiment140', SentimentText='this game is awful #nightmare'),
 Row(ItemID=45, Sentiment=1, SentimentSource='Sentiment140', SentimentText='I love jam #loveit')]

## Select the related data

In [9]:
#select only "SentimentText" and "Sentiment" column,
#and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



## Divide data into training and testing data

In [10]:
#divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3])
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1360 ; Testing data rows: 572


## Prepare training data

Separate "SentimentText" into individual words using tokenizer

In [11]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

+----------------------------------+-----+----------------------------------------+
|SentimentText                     |label|SentimentWords                          |
+----------------------------------+-----+----------------------------------------+
|I adore cheese #brilliant         |1    |[i, adore, cheese, #brilliant]          |
|I adore cheese #toptastic         |1    |[i, adore, cheese, #toptastic]          |
|I adore classical music #bestever |1    |[i, adore, classical, music, #bestever] |
|I adore classical music #favorite |1    |[i, adore, classical, music, #favorite] |
|I adore classical music #thumbs-up|1    |[i, adore, classical, music, #thumbs-up]|
+----------------------------------+-----+----------------------------------------+
only showing top 5 rows



Removing stop words (unimportant words to be features)

In [12]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(),outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+----------------------------------+-----+----------------------------------------+-------------------------------------+
|SentimentText                     |label|SentimentWords                          |MeaningfulWords                      |
+----------------------------------+-----+----------------------------------------+-------------------------------------+
|I adore cheese #brilliant         |1    |[i, adore, cheese, #brilliant]          |[adore, cheese, #brilliant]          |
|I adore cheese #toptastic         |1    |[i, adore, cheese, #toptastic]          |[adore, cheese, #toptastic]          |
|I adore classical music #bestever |1    |[i, adore, classical, music, #bestever] |[adore, classical, music, #bestever] |
|I adore classical music #favorite |1    |[i, adore, classical, music, #favorite] |[adore, classical, music, #favorite] |
|I adore classical music #thumbs-up|1    |[i, adore, classical, music, #thumbs-up]|[adore, classical, music, #thumbs-up]|
+-----------------------

Converting words feature into numerical feature. In Spark 2.2.1,it is implemented in HashingTF funtion using Austin Appleby's MurmurHash 3 algorithm

In [22]:
#from pyspark.ml.feature import CountVectorizer

countVectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="features")
countVectorizerModel = countVectorizer.fit(SwRemovedTrain)
numericTrainData = countVectorizerModel.transform(SwRemovedTrain).select('label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)


+-----+------------------------------------+-----------------------------------+
|label|MeaningfulWords                     |features                           |
+-----+------------------------------------+-----------------------------------+
|1    |[adore, cheese, #brilliant]         |(48,[14,23,39],[1.0,1.0,1.0])      |
|1    |[adore, cheese, #toptastic]         |(48,[13,23,39],[1.0,1.0,1.0])      |
|1    |[adore, classical, music, #bestever]|(48,[0,17,23,43],[1.0,1.0,1.0,1.0])|
+-----+------------------------------------+-----------------------------------+
only showing top 3 rows



CountVectorizer is a feature extraction technique commonly used in natural language processing (NLP) and machine learning for converting a collection of text documents into a numerical feature vector. The basic idea behind CountVectorizer is to represent each document as a vector of term frequencies, indicating how often each term (word) appears in the document.



In [23]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(SwRemovedTrain)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
numericTrainData = idfModel.transform(featurizedData).select('label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)


+-----+------------------------------------+--------------------------------------------------------------------------------------------+
|label|MeaningfulWords                     |features                                                                                    |
+-----+------------------------------------+--------------------------------------------------------------------------------------------+
|1    |[adore, cheese, #brilliant]         |(10,[1,3,5],[1.6138561817717652,0.8041567349415689,1.2837298152034553])                     |
|1    |[adore, cheese, #toptastic]         |(10,[3,4,5],[0.8041567349415689,1.3988638426882618,1.2837298152034553])                     |
|1    |[adore, classical, music, #bestever]|(10,[0,1,5,7],[0.6493025728482252,1.6138561817717652,1.2837298152034553,1.2523956590330196])|
+-----+------------------------------------+--------------------------------------------------------------------------------------------+
only showing top 3 rows



TF-IDF (Term Frequency-Inverse Document Frequency). This is a vectorization where it shows how relevant a word is to a document in a collection of documents. It is often used in information retrieval and text mining. For this we need HashTF.


In [25]:
from pyspark.ml.feature import CountVectorizer, PCA
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import col

countVectorizer = CountVectorizer(inputCol=swr.getOutputCol(), outputCol="features")
pca = PCA(k=5, inputCol="features", outputCol="pca_features")

pipeline = Pipeline(stages=[countVectorizer, pca])
model = pipeline.fit(SwRemovedTrain)
numericTrainData = model.transform(SwRemovedTrain).select('label', 'MeaningfulWords', 'pca_features')

numericTrainData.show(truncate=False, n=3)



+-----+------------------------------------+-------------------------------------------------------------------------------------------------------+
|label|MeaningfulWords                     |pca_features                                                                                           |
+-----+------------------------------------+-------------------------------------------------------------------------------------------------------+
|1    |[adore, cheese, #brilliant]         |[-0.3067392965026029,0.11863761241711032,0.23348234054443562,-0.08554811256824954,-0.2331803288141199] |
|1    |[adore, cheese, #toptastic]         |[-0.3198324346870657,0.06925904412739661,0.24410500995423473,-0.0648631679881858,-0.010678835876804271]|
|1    |[adore, classical, music, #bestever]|[-0.5178150571374159,-1.0272791377888453,0.20433782921131646,-0.1340323524389118,0.09752765410588701]  |
+-----+------------------------------------+--------------------------------------------------------------

PCA (Principal Component Analysis):
PCA is a dimensionality reduction vectorization technique that can be applied to reduce the number of features in your dataset.



In [17]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select('label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+------------------------------------+-------------------------------------------------------+
|label|MeaningfulWords                     |features                                               |
+-----+------------------------------------+-------------------------------------------------------+
|1    |[adore, cheese, #brilliant]         |(262144,[1689,45361,100089],[1.0,1.0,1.0])             |
|1    |[adore, cheese, #toptastic]         |(262144,[1689,42010,100089],[1.0,1.0,1.0])             |
|1    |[adore, classical, music, #bestever]|(262144,[91011,100089,102383,131250],[1.0,1.0,1.0,1.0])|
+-----+------------------------------------+-------------------------------------------------------+
only showing top 3 rows



## Train our classifier model using training data

In [14]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print("Training is done!")


Training is done!


In [11]:
lr = LogisticRegression(labelCol="label", featuresCol="features",maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

Training is done!


## Prepare testing data

In [15]:
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select('Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)


+-----+---------------------------+------------------------------------------+
|Label|MeaningfulWords            |features                                  |
+-----+---------------------------+------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0])|
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0])|
+-----+---------------------------+------------------------------------------+
only showing top 2 rows



## Predict testing data and calculate the accuracy model

In [13]:
prediction = model.transform(numericTest)
predictionFinal = prediction.select("MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=4, truncate = False)
correctPrediction = predictionFinal.filter(predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData,", accuracy:", correctPrediction/totalData)

+-------------------------------------+----------+-----+
|MeaningfulWords                      |prediction|Label|
+-------------------------------------+----------+-----+
|[adore, cheese, #bestever]           |1.0       |1    |
|[adore, cheese, #brilliant]          |1.0       |1    |
|[adore, classical, music, #bestever] |1.0       |1    |
|[adore, classical, music, #brilliant]|1.0       |1    |
+-------------------------------------+----------+-----+
only showing top 4 rows

correct prediction: 562 , total data: 571 , accuracy: 0.9842381786339754
