**SOURCES**

https://spark.apache.org/docs/latest/api/python/index.html

https://www.kaggle.com/code/nezarabdilahprakasa/sentiment-analysis-using-pyspark-big-data



**MODEL PREPARATION**

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
import pandas as pd

In [3]:
#data check
df = pd.read_csv("spark_set.csv", encoding='latin1')

In [4]:
print(df.columns)
print(df.shape[0])

Index(['Lemmas_Text', 'vader_sentiment'], dtype='object')
2112619


In [5]:
df.head(5)

Unnamed: 0,Lemmas_Text,vader_sentiment
0,virginamerica dhepburn said,0.0
1,virginamerica plus youve added commercial expe...,0.0
2,virginamerica didnt today mean need trip,0.0
3,virginamerica really aggressive blast obnoxiou...,-0.3306
4,virginamerica really big bad thing,-0.5829


In [6]:
#session create
spark = SparkSession.builder.master('local').appName("pyspark_model_training").getOrCreate()

24/06/13 12:56:37 WARN Utils: Your hostname, halloa93 resolves to a loopback address: 127.0.1.1; using 192.168.55.114 instead (on interface wlp2s0)
24/06/13 12:56:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/13 12:56:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
#dataframe load
df = spark.read.csv("spark_set.csv", header = True, inferSchema = True)
df.show(truncate = False, n = 3)

                                                                                

+----------------------------------------------------------+---------------+
|Lemmas_Text                                               |vader_sentiment|
+----------------------------------------------------------+---------------+
|virginamerica dhepburn said                               |0.0            |
|virginamerica plus youve added commercial experience tacky|0.0            |
|virginamerica didnt today mean need trip                  |0.0            |
+----------------------------------------------------------+---------------+
only showing top 3 rows



In [8]:
#transforming vader_sentimetn form Float to Int
from pyspark.sql.functions import when, col

df = df.withColumn("vader_sentiment_transformed",
                   when(col("vader_sentiment") >= 0.5, 1).
                   when(col("vader_sentiment") <= -0.5, 0).
                   otherwise(0))


df.select("vader_sentiment", "vader_sentiment_transformed").show(truncate=False, n=50)


+---------------+---------------------------+
|vader_sentiment|vader_sentiment_transformed|
+---------------+---------------------------+
|0.0            |0                          |
|0.0            |0                          |
|0.0            |0                          |
|-0.3306        |0                          |
|-0.5829        |0                          |
|0.0963         |0                          |
|0.4019         |0                          |
|0.1458         |0                          |
|0.0            |0                          |
|0.7717         |1                          |
|-0.8555        |0                          |
|0.7269         |1                          |
|0.6249         |1                          |
|0.1531         |0                          |
|0.4404         |0                          |
|-0.296         |0                          |
|0.7579         |1                          |
|0.4019         |0                          |
|0.0            |0                

In [9]:
df_mod = df.select("Lemmas_Text", col("vader_sentiment_transformed").cast("Int").alias("label"))
df_mod.show(truncate = False, n = 5)

+--------------------------------------------------------------------------------------------+-----+
|Lemmas_Text                                                                                 |label|
+--------------------------------------------------------------------------------------------+-----+
|virginamerica dhepburn said                                                                 |0    |
|virginamerica plus youve added commercial experience tacky                                  |0    |
|virginamerica didnt today mean need trip                                                    |0    |
|virginamerica really aggressive blast obnoxious entertainment guest face amp little recourse|0    |
|virginamerica really big bad thing                                                          |0    |
+--------------------------------------------------------------------------------------------+-----+
only showing top 5 rows



In [10]:
#seting parameters for data split train/test
df_split = df_mod.randomSplit([0.9, 0.1])
train = df_split[0]

test = df_split[1].withColumnRenamed("label", "true_label")
train_rows = train.count()
test_rows = test.count()

print("Train rows = ", train_rows)
print("Test rows = ", test_rows)

[Stage 8:>                                                          (0 + 1) / 1]

Train rows =  1900950
Test rows =  211669


                                                                                

In [11]:
train.printSchema()

root
 |-- Lemmas_Text: string (nullable = true)
 |-- label: integer (nullable = false)



In [12]:
#removing NULL 
train = train.na.drop(subset = ["Lemmas_Text"])
train = train.na.drop(subset = ["label"])
test = test.na.drop(subset = ["Lemmas_Text"])
test = test.na.drop(subset = ["true_label"])

In [13]:
#data tokenization
tokenizer = Tokenizer(inputCol = "Lemmas_Text", outputCol = "Words")
tokenizedTrain = tokenizer.transform(train)
tokenizedTrain.show(truncate = True, n = 5)

[Stage 11:>                                                         (0 + 1) / 1]

+--------------------+-----+--------------------+
|         Lemmas_Text|label|               Words|
+--------------------+-----+--------------------+
|aa definitely wor...|    0|[aa, definitely, ...|
|aa manually enter...|    0|[aa, manually, en...|
|      aaa better kkk|    0|  [aaa, better, kkk]|
|aaa conecte desde...|    0|[aaa, conecte, de...|
|aaa handling dmv ...|    0|[aaa, handling, d...|
+--------------------+-----+--------------------+
only showing top 5 rows



                                                                                

In [14]:
#removing stop words
swr = StopWordsRemover(inputCol = tokenizer.getOutputCol(), outputCol = "MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate = True, n = 5)

[Stage 12:>                                                         (0 + 1) / 1]

+--------------------+-----+--------------------+--------------------+
|         Lemmas_Text|label|               Words|     MeaningfulWords|
+--------------------+-----+--------------------+--------------------+
|aa definitely wor...|    0|[aa, definitely, ...|[aa, definitely, ...|
|aa manually enter...|    0|[aa, manually, en...|[aa, manually, en...|
|      aaa better kkk|    0|  [aaa, better, kkk]|  [aaa, better, kkk]|
|aaa conecte desde...|    0|[aaa, conecte, de...|[aaa, conecte, de...|
|aaa handling dmv ...|    0|[aaa, handling, d...|[aaa, handling, d...|
+--------------------+-----+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [15]:
#hashing data
hashTF = HashingTF(inputCol = swr.getOutputCol(), outputCol = "features")
numericTrain = hashTF.transform(SwRemovedTrain).select('label', 'MeaningfulWords', 'features')
numericTrain.show(truncate = True, n = 5)

[Stage 13:>                                                         (0 + 1) / 1]

+-----+--------------------+--------------------+
|label|     MeaningfulWords|            features|
+-----+--------------------+--------------------+
|    0|[aa, definitely, ...|(262144,[6346,714...|
|    0|[aa, manually, en...|(262144,[5078,978...|
|    0|  [aaa, better, kkk]|(262144,[115611,1...|
|    0|[aaa, conecte, de...|(262144,[50572,13...|
|    0|[aaa, handling, d...|(262144,[70769,74...|
+-----+--------------------+--------------------+
only showing top 5 rows



                                                                                

**LOGISTIC REGRESSION**

In [16]:
#logistic regression model training
lr = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter = 10, regParam = 0.01)
model_lr = lr.fit(numericTrain)
print("Training Done")

                                                                                

Training Done


In [17]:
#logistic regression model testing
tokenizedTest = tokenizer.transform(test)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest)
numericTest.show(truncate = True, n = 5)

[Stage 26:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+--------------------+--------------------+
|         Lemmas_Text|true_label|               Words|     MeaningfulWords|            features|
+--------------------+----------+--------------------+--------------------+--------------------+
|aaa motherfucking...|         0|[aaa, motherfucki...|[aaa, motherfucki...|(262144,[98592,17...|
|aaaa house mess n...|         0|[aaaa, house, mes...|[aaaa, house, mes...|(262144,[41660,89...|
|aaaaa omg sophiel...|         1|[aaaaa, omg, soph...|[aaaaa, omg, soph...|(262144,[3917,561...|
|   aaaaaaaaaaa mcfly|         0|[aaaaaaaaaaa, mcfly]|[aaaaaaaaaaa, mcfly]|(262144,[51537,12...|
|aaaaaaaaaaaaaa do...|         0|[aaaaaaaaaaaaaa, ...|[aaaaaaaaaaaaaa, ...|(262144,[113432,1...|
+--------------------+----------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [18]:
#transforming
raw_prediction_lr = model_lr.transform(numericTest)
raw_prediction_lr.printSchema()

root
 |-- Lemmas_Text: string (nullable = true)
 |-- true_label: integer (nullable = false)
 |-- Words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- MeaningfulWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [19]:
lr_prediction = raw_prediction_lr.select("MeaningfulWords", "prediction", "true_label")
lr_prediction.show(truncate = True, n = 5)

24/06/13 13:00:25 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
[Stage 27:>                                                         (0 + 1) / 1]

+--------------------+----------+----------+
|     MeaningfulWords|prediction|true_label|
+--------------------+----------+----------+
|[aaa, motherfucki...|       0.0|         0|
|[aaaa, house, mes...|       0.0|         0|
|[aaaaa, omg, soph...|       1.0|         1|
|[aaaaaaaaaaa, mcfly]|       0.0|         0|
|[aaaaaaaaaaaaaa, ...|       0.0|         0|
+--------------------+----------+----------+
only showing top 5 rows



                                                                                

In [20]:
total_lr_true = lr_prediction.filter(lr_prediction['prediction'] == lr_prediction['true_label']).count()
alldata = lr_prediction.count()
accuracy = total_lr_true/alldata
print("Score achived is: ", accuracy*100, '%')

24/06/13 13:00:32 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
[Stage 31:>                                                         (0 + 1) / 1]

Score achived is:  93.99581421937081 %


                                                                                

In [22]:
#model save for future automatization
model_lr.save("lr_model_reference")

24/06/13 13:01:03 WARN TaskSetManager: Stage 35 contains a task of very large size (2097 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

**NAIVE BAYES**

In [24]:
nb = NaiveBayes(labelCol = "label", featuresCol = "features", smoothing = 1.0, modelType = "multinomial")
model_nb = nb.fit(numericTrain)
print("Training Done")

[Stage 38:>                                                         (0 + 1) / 1]

Training Done


                                                                                

In [25]:
tokenizedTest = tokenizer.transform(test)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest)
numericTest.show(truncate = True, n = 5)

[Stage 41:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+--------------------+--------------------+
|         Lemmas_Text|true_label|               Words|     MeaningfulWords|            features|
+--------------------+----------+--------------------+--------------------+--------------------+
|aaa motherfucking...|         0|[aaa, motherfucki...|[aaa, motherfucki...|(262144,[98592,17...|
|aaaa house mess n...|         0|[aaaa, house, mes...|[aaaa, house, mes...|(262144,[41660,89...|
|aaaaa omg sophiel...|         1|[aaaaa, omg, soph...|[aaaaa, omg, soph...|(262144,[3917,561...|
|   aaaaaaaaaaa mcfly|         0|[aaaaaaaaaaa, mcfly]|[aaaaaaaaaaa, mcfly]|(262144,[51537,12...|
|aaaaaaaaaaaaaa do...|         0|[aaaaaaaaaaaaaa, ...|[aaaaaaaaaaaaaa, ...|(262144,[113432,1...|
+--------------------+----------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [26]:
raw_prediction_nb = model_nb.transform(numericTest)
raw_prediction_nb.printSchema()

root
 |-- Lemmas_Text: string (nullable = true)
 |-- true_label: integer (nullable = false)
 |-- Words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- MeaningfulWords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [27]:
nb_prediction = raw_prediction_nb.select("MeaningfulWords", "prediction", "true_label")
nb_prediction.show(truncate = True, n = 5)

24/06/13 13:03:18 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
[Stage 42:>                                                         (0 + 1) / 1]

+--------------------+----------+----------+
|     MeaningfulWords|prediction|true_label|
+--------------------+----------+----------+
|[aaa, motherfucki...|       0.0|         0|
|[aaaa, house, mes...|       0.0|         0|
|[aaaaa, omg, soph...|       0.0|         1|
|[aaaaaaaaaaa, mcfly]|       0.0|         0|
|[aaaaaaaaaaaaaa, ...|       1.0|         0|
+--------------------+----------+----------+
only showing top 5 rows



                                                                                

In [28]:
total_nb_true = nb_prediction.filter(nb_prediction['prediction'] == nb_prediction['true_label']).count()
alldata = nb_prediction.count()
accuracy = total_nb_true/alldata
print("Score achived is: ", accuracy*100, '%')

24/06/13 13:03:26 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
[Stage 46:>                                                         (0 + 1) / 1]

Score achived is:  89.23933121997081 %


                                                                                

In [29]:
model_nb.save("nb_model_reference")

24/06/13 13:03:41 WARN TaskSetManager: Stage 50 contains a task of very large size (4188 KiB). The maximum recommended task size is 1000 KiB.
