## Importing the necessary libraries

In [1]:
import os
import pandas as pd
import findspark
import re

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, udf
from pyspark.sql.types import StringType

In [3]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, DecisionTreeClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator

In [4]:

from sklearn.metrics import confusion_matrix, classification_report

## Verifying the directories of Java and Spark

In [5]:
print("JAVA_HOME:", os.environ.get("JAVA_HOME"))
print("SPARK_HOME:", os.environ.get("SPARK_HOME"))


JAVA_HOME: C:\Java
SPARK_HOME: C:\Spark\spark-3.4.1-bin-hadoop3


## Initializing the Spark Session

In [6]:
findspark.init()
spark = SparkSession.builder.master("spark://192.168.0.103:7077").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## Loading the Data

In [7]:
data = spark.read.json("E:\MSc_DS_4\spark_project\code\Clothing_Shoes_and_Jewelry.json")

## Description of the Data

In [8]:
data.columns

['_id',
 'asin',
 'category',
 'class',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime']

In [9]:
data.show(n=5, truncate=0)

+--------------------------+----------+--------------------------+-----+-------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------+---------------------------------+-----------------------------+--------------+
|_id                       |asin      |category                  |class|helpful|overall|reviewText                                  

In [10]:
data.groupby('class').count()

class,count
0.0,1169449
1.0,4334882


In [8]:
final_data = data.select("reviewText", "class")

In [9]:

len_text = final_data.withColumn('length',length(final_data['reviewText']))
len_text.show()

+--------------------+-----+------+
|          reviewText|class|length|
+--------------------+-----+------+
|My 3-yr-old daugh...|  0.0|   672|
|This was a really...|  1.0|   136|
|Perfect red tutu ...|  1.0|   113|
|Bought it for my ...|  1.0|   143|
|This is a great t...|  1.0|   172|
|Got this for our ...|  1.0|   109|
|the tutu color wa...|  0.0|   236|
|Just as described...|  1.0|   236|
|I bought this for...|  1.0|   306|
|This really is a ...|  1.0|   166|
|I ordered this fo...|  1.0|   234|
|Vey cute and perf...|  1.0|   101|
|Loved it and so d...|  1.0|   139|
|Purchased it for ...|  1.0|   135|
|Very cute, shorte...|  1.0|   462|
|Our 3-year-old pe...|  1.0|   134|
|The waistband was...|  0.0|   123|
|The tutu's was fo...|  1.0|   187|
|My 5 year old dau...|  1.0|   110|
|I just got this t...|  1.0|   701|
+--------------------+-----+------+
only showing top 20 rows



## Data Pre-processing

In [10]:
def remove_special_chars_and_links(text):
    text = re.sub(r"http\S+", "", text)
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
    return text

In [11]:
remove_special_chars_and_links_udf = udf(remove_special_chars_and_links, StringType())

In [12]:
final_data = final_data.withColumn("processed_text", remove_special_chars_and_links_udf(col("reviewText")))

In [13]:
final_data.show(n=5, truncate=0)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
train_data, test_data = final_data.randomSplit([0.8,0.2],seed=42)

In [15]:
tokenizer = Tokenizer(inputCol="processed_text", outputCol="tokenized_text")
stopwords_remover = StopWordsRemover(inputCol="tokenized_text", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")

## Model Building

# Naive Bayes Classifier

In [16]:
nb = NaiveBayes(labelCol="class", featuresCol="features")
pipeline_nb = Pipeline(stages=[tokenizer, stopwords_remover, hashingTF, idf, nb])
nb_model = pipeline_nb.fit(train_data)

In [22]:
nb_predictions = nb_model.transform(test_data)
nb_predictions.show(n=5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|          reviewText|class|      processed_text|      tokenized_text|      filtered_words|        raw_features|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|!!!For as much as...|  0.0|For as much as th...|[for, as, much, a...|[much, boots, cos...|(262144,[5381,655...|(262144,[5381,655...|[-529.81268026826...|[0.99999951334894...|       0.0|
|""""YOU GET WHAT ...|  0.0|YOU GET WHAT YOU ...|[you, get, what, ...|[get, pay, forthi...|(262144,[1546,331...|(262144,[1546,331...|[-1277.1973837243...|[1.0,1.8299602968...|       0.0|
|"Awesome"  Earrin...|  1.0|Awesome  Earrings...|[awesome, , earr

In [21]:
nb_predictions_pd = nb_predictions.select("class", "prediction").toPandas()
print(nb_predictions_pd)

         class  prediction
0          0.0         0.0
1          0.0         0.0
2          1.0         1.0
3          1.0         1.0
4          1.0         1.0
...        ...         ...
1100982    1.0         1.0
1100983    0.0         0.0
1100984    0.0         0.0
1100985    0.0         0.0
1100986    0.0         0.0

[1100987 rows x 2 columns]


In [24]:
nb = NaiveBayes(labelCol="class", featuresCol="features")
pipeline_nb = Pipeline(stages=[tokenizer, stopwords_remover, hashingTF, idf, nb])
nb_model = pipeline_nb.fit(train_data)
nb_predictions = nb_model.transform(test_data)
nb_predictions_pd = nb_predictions.select("class", "prediction").toPandas()
print(nb_predictions_pd)
y_true = nb_predictions_pd["class"]
y_pred = nb_predictions_pd["prediction"]
conf_matrix = confusion_matrix(y_true, y_pred)
class_report = classification_report(y_true, y_pred)
print("Naive Bayes Classifier")
print("\n\nConfusion Matrix:")
print(conf_matrix)
print("\nClassification Report:")
print(class_report)

# Logistic Regression

In [20]:
lr = LogisticRegression(labelCol="class", featuresCol="features")
pipeline_lr = Pipeline(stages=[tokenizer, stopwords_remover, hashingTF, idf, lr])
lr_model = pipeline_lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)
lr_predictions_pd = lr_predictions.select("class", "prediction").toPandas()
y_true = lr_predictions_pd["class"]
y_pred = lr_predictions_pd["prediction"]
conf_matrix = confusion_matrix(y_true, y_pred)
class_report = classification_report(y_true, y_pred)
print("Logistic Regression")
print("\n\nConfusion Matrix:")
print(conf_matrix)
print("\nClassification Report:")
print(class_report)

Logistic Regression


Confusion Matrix:
[[148207  86070]
 [ 50825 815885]]

Classification Report:
              precision    recall  f1-score   support

         0.0       0.74      0.63      0.68    234277
         1.0       0.90      0.94      0.92    866710

    accuracy                           0.88   1100987
   macro avg       0.82      0.79      0.80   1100987
weighted avg       0.87      0.88      0.87   1100987



* Session 26.7s
* Loading Data: 1m 27.7s
* Naive Bayes: 31m 48s
* Logistic Regression: 62m 57.4s