## Question 1: Classification - Women's E-Commerce Clothing Reviews
- Use womens-ecommerce-clothing-reviews dataset (file Womens_Clothing_E_Commerce_Reviews.xlsx, sheetName: Reviews) to build a model to predict products’ ratings (based on Review Text and other optional features)
- Please predict ratings for products in Womens_Clothing_E_Commerce_Reviews.xlsx, sheetName: new_reviews.
- Read more information here: https://www.kaggle.com/nicapotato/womens-ecommerce-clothing-reviews

# 0.Working environment preparation & Package import

In [1]:
# !apt-get update

In [2]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
# !tar -xvf spark-2.4.0-bin-hadoop2.7.tgz
# !pip install -q findspark
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

In [3]:
# from google.colab import drive
# drive.mount('/content/gdrive', force_remount=True)

In [4]:
# %cd "/content/gdrive/MyDrive/LDS9_K272_ONLINE_DamThiMinhPhuong/LDS9_K272_DamThiMinhPhuong_Cuoi_ky"

In [5]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [6]:
import pandas as pd
from time import time
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, Tokenizer, StopWordsRemover
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from builtins import round

# 1. Data set understanding

In [7]:
spark = SparkSession.builder.appName('Q1_classification').getOrCreate()

22/01/24 11:18:29 WARN Utils: Your hostname, Ellies-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
22/01/24 11:18:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/24 11:18:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/24 11:18:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [8]:
# Read excel file using pandas
df = pd.read_excel('data/womens-ecommerce-clothing-reviews/Womens_Clothing_E_Commerce_Reviews.xlsx', sheet_name='Reviews', index_col=0)

In [9]:
df.head(5)

Unnamed: 0,Clothing ID,Age,Title,Review Text,Rating,Recommended IND,Positive Feedback Count,Division Name,Department Name,Class Name
0,767,33,,Absolutely wonderful - silky and sexy and comf...,4,1,0,Initmates,Intimate,Intimates
1,1080,34,,Love this dress! it's sooo pretty. i happene...,5,1,4,General,Dresses,Dresses
2,1077,60,Some major design flaws,I had such high hopes for this dress and reall...,3,0,0,General,Dresses,Dresses
3,1049,50,My favorite buy!,"I love, love, love this jumpsuit. it's fun, fl...",5,1,0,General Petite,Bottoms,Pants
4,847,47,Flattering shirt,This shirt is very flattering to all due to th...,5,1,6,General,Tops,Blouses


In [10]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 23481 entries, 0 to 23480
Data columns (total 10 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   Clothing ID              23481 non-null  int64 
 1   Age                      23481 non-null  int64 
 2   Title                    19671 non-null  object
 3   Review Text              22636 non-null  object
 4   Rating                   23481 non-null  int64 
 5   Recommended IND          23481 non-null  int64 
 6   Positive Feedback Count  23481 non-null  int64 
 7   Division Name            23467 non-null  object
 8   Department Name          23467 non-null  object
 9   Class Name               23467 non-null  object
dtypes: int64(5), object(5)
memory usage: 2.0+ MB


In [11]:
df_sub = df[['Review Text', 'Rating']]

In [12]:
# convert pandas dataframe to pyspark dataframe
df_schema = StructType([StructField("Review Text", StringType(), True)\
                       ,StructField("Rating", IntegerType(), True)])
data = spark.createDataFrame(df_sub, schema=df_schema)

In [13]:
data.count()

                                                                                

23481

In [14]:
data.printSchema()

root
 |-- Review Text: string (nullable = true)
 |-- Rating: integer (nullable = true)



In [15]:
data.show(5)

+--------------------+------+
|         Review Text|Rating|
+--------------------+------+
|Absolutely wonder...|     4|
|Love this dress! ...|     5|
|I had such high h...|     3|
|I love, love, lov...|     5|
|This shirt is ver...|     5|
+--------------------+------+
only showing top 5 rows



# 2. Data preprocessing

In [16]:
# Check null values
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas()

Unnamed: 0,Review Text,Rating
0,0,0


In [17]:
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).toPandas()

Unnamed: 0,Review Text,Rating
0,845,0


In [18]:
def to_null(c):
    return when(~(col(c).isNull() | isnan(col(c)) | (trim(col(c)) == "")), col(c))


data = data.select([to_null(c).alias(c) for c in data.columns]).na.drop()
data.count()

22636

In [19]:
# Check duplicates
dup_rows = data.count() - data.distinct().count()
dup_rows

                                                                                

7

In [20]:
# Drop duplicates
data = data.drop_duplicates()
data.count()

                                                                                

22629

In [21]:
# Create column 'length'
data = data.withColumn('length', length(data['Review Text']))
data.show(5)

[Stage 28:>                                                         (0 + 8) / 8]

+--------------------+------+------+
|         Review Text|Rating|length|
+--------------------+------+------+
|Omg - wish it cam...|     5|    35|
|This is a beautif...|     3|   203|
|This skirt fits m...|     5|   276|
|I bought three of...|     3|   500|
|I can't figure ou...|     3|   254|
+--------------------+------+------+
only showing top 5 rows



                                                                                

In [22]:
data.groupBy('Rating').mean().show()

+------+-----------+------------------+
|Rating|avg(Rating)|       avg(length)|
+------+-----------+------------------+
|     1|        1.0| 305.0365853658537|
|     3|        3.0| 328.4156626506024|
|     5|        5.0|298.94318997845687|
|     4|        4.0|323.43386998165886|
|     2|        2.0| 319.2857142857143|
+------+-----------+------------------+



#### Nhận xét: Không có sự khác biệt rõ rệt giữa độ dài của 'Review Text' ở các Rating khác nhau

In [23]:
data.groupby('Rating').count().orderBy('Rating', ascending=False).show()

+------+-----+
|Rating|count|
+------+-----+
|     5|12533|
|     4| 4907|
|     3| 2822|
|     2| 1547|
|     1|  820|
+------+-----+



#### Có thể thấy rằng dữ liệu thiếu cân bằng => cân nhắc đến việc resampling

### ---Using NLP tools 

In [24]:
rating_to_num = StringIndexer(inputCol='Rating', outputCol='label')

tokenizer = Tokenizer(inputCol='Review Text', outputCol='token_text')
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_token')
count_vec = CountVectorizer(inputCol='stop_token', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='tf_idf')

22/01/24 11:18:53 WARN StopWordsRemover: Default locale set was [en_VN]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


In [25]:
clean_up = VectorAssembler(inputCols=['tf_idf'], 
                           outputCol='features')

In [26]:
data_prep_pipe = Pipeline(stages=[rating_to_num,
                                  tokenizer,
                                  stopremove,
                                  count_vec,
                                  idf,
                                  clean_up
                                 ])

In [27]:
cleaner = data_prep_pipe.fit(data)

                                                                                

In [28]:
clean_data = cleaner.transform(data)

In [29]:
clean_data = clean_data.select('label', 'features')
clean_data.show(10)

22/01/24 11:19:06 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(38354,[22,119,23...|
|  2.0|(38354,[9,13,17,3...|
|  0.0|(38354,[20,23,38,...|
|  2.0|(38354,[13,18,24,...|
|  2.0|(38354,[8,28,36,3...|
|  4.0|(38354,[29,30,35,...|
|  0.0|(38354,[0,32,33,3...|
|  0.0|(38354,[0,2,4,5,7...|
|  0.0|(38354,[0,26,65,8...|
|  0.0|(38354,[0,1,4,15,...|
+-----+--------------------+
only showing top 10 rows



In [30]:
training, testing = clean_data.randomSplit([0.8, 0.2], seed=42)

# 3. Build Model - Original data
- Model 1 - NaiveBayes
- Model 2 - Logistic Regression
- Model 3 - Decision Tree
- Model 4 - Random Forest

## --- NaiveBayes

In [31]:
nb = NaiveBayes()

In [32]:
t0 = time()
predictor_1 = nb.fit(training)
t1 = round(time() -t0,4)

22/01/24 11:19:08 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:10 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

In [33]:
test_result_1 = predictor_1.transform(testing)
test_result_1.show(3)

22/01/24 11:19:12 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(38354,[0,1,2,3,4...|[-1492.8908018867...|[0.99999999999999...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[-1191.3073531971...|[0.99999995131989...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[-1824.8218492129...|[1.42706522115141...|       1.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



[Stage 71:>                                                         (0 + 1) / 1]                                                                                

In [34]:
# Confusion matrix
test_result_1.groupBy('label', 'prediction').count().show(10)

22/01/24 11:19:13 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
[Stage 74:>                                                         (0 + 5) / 5]

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|   51|
|  1.0|       1.0|  357|
|  3.0|       2.0|  113|
|  4.0|       2.0|   67|
|  0.0|       1.0|  524|
|  0.0|       4.0|   73|
|  1.0|       0.0|  291|
|  2.0|       2.0|  211|
|  3.0|       1.0|   51|
|  2.0|       3.0|  104|
+-----+----------+-----+
only showing top 10 rows



22/01/24 11:19:14 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [35]:
# Evaluation
multi_eval  = MulticlassClassificationEvaluator()
acc_1       = round(multi_eval.evaluate(test_result_1, {multi_eval.metricName:'accuracy'}),4)
precision_1 = round(multi_eval.evaluate(test_result_1, {multi_eval.metricName:'weightedPrecision'}),4)
recall_1    = round(multi_eval.evaluate(test_result_1, {multi_eval.metricName:'weightedRecall'}),4)
f1_1        = round(multi_eval.evaluate(test_result_1, {multi_eval.metricName:'f1'}),4)

22/01/24 11:19:16 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:19:18 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:19:19 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:19:22 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [36]:
result_columns = ['Model','Accuracy','Precision','Recall','F1 Score','Duration']
result = spark.createDataFrame([('Naive Bayes',acc_1, precision_1, recall_1, f1_1 , t1)],
                               result_columns)

In [37]:
result.show()

+-----------+--------+---------+------+--------+--------+
|      Model|Accuracy|Precision|Recall|F1 Score|Duration|
+-----------+--------+---------+------+--------+--------+
|Naive Bayes|  0.5245|   0.5806|0.5245|  0.5464|  3.9727|
+-----------+--------+---------+------+--------+--------+



## --- Logistic Regression

In [38]:
lg = LogisticRegression(maxIter=10, regParam=0, elasticNetParam=0)

In [39]:
t0 = time()
predictor_2 = lg.fit(training)
t1 = round(time() -t0,4)

22/01/24 11:19:24 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:26 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/24 11:19:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/01/24 11:19:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/01/24 11:19:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/01/24 11:19:27 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:28 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:28 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:28 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:29 WARN DAGS

In [40]:
test_result_2 = predictor_2.transform(testing)
test_result_2.show(3)

22/01/24 11:19:33 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
[Stage 126:>                                                        (0 + 1) / 1]

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(38354,[0,1,2,3,4...|[5.70533564429449...|[0.92167053550081...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[8.10740101150328...|[0.99903787756441...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[-4.2442765183582...|[7.27163479146906...|       1.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



                                                                                

In [41]:
# Confusion matrix
test_result_2.groupBy('label', 'prediction').count().show()

22/01/24 11:19:35 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
[Stage 129:>                                                        (0 + 5) / 5]

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|   82|
|  1.0|       1.0|  301|
|  3.0|       2.0|  110|
|  4.0|       2.0|   70|
|  0.0|       1.0|  486|
|  0.0|       4.0|   31|
|  1.0|       0.0|  367|
|  2.0|       2.0|  194|
|  3.0|       1.0|   61|
|  2.0|       3.0|   95|
|  1.0|       4.0|   16|
|  4.0|       4.0|   29|
|  2.0|       4.0|   30|
|  3.0|       4.0|   37|
|  2.0|       1.0|  136|
|  1.0|       2.0|  194|
|  0.0|       0.0| 1729|
|  1.0|       3.0|   75|
|  4.0|       3.0|   38|
|  0.0|       2.0|  112|
+-----+----------+-----+
only showing top 20 rows



22/01/24 11:19:36 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [42]:
# Evaluation
acc_2       = round(multi_eval.evaluate(test_result_2, {multi_eval.metricName:'accuracy'}),4)
precision_2 = round(multi_eval.evaluate(test_result_2, {multi_eval.metricName:'weightedPrecision'}),4)
recall_2    = round(multi_eval.evaluate(test_result_2, {multi_eval.metricName:'weightedRecall'}),4)
f1_2        = round(multi_eval.evaluate(test_result_2, {multi_eval.metricName:'f1'}),4)

22/01/24 11:19:36 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:19:37 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:19:39 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:19:41 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [43]:
result_2 = spark.createDataFrame([('Logistic Regression',acc_2, precision_2, recall_2, f1_2 , t1)],
                                 result_columns)
result = result.union(result_2)
result.show()

+-------------------+--------+---------+------+--------+--------+
|              Model|Accuracy|Precision|Recall|F1 Score|Duration|
+-------------------+--------+---------+------+--------+--------+
|        Naive Bayes|  0.5245|   0.5806|0.5245|  0.5464|  3.9727|
|Logistic Regression|  0.5275|    0.549|0.5275|  0.5368|   9.427|
+-------------------+--------+---------+------+--------+--------+



## --- Random Forest

In [44]:
rf = RandomForestClassifier(labelCol='label',
                           featuresCol='features',
                           numTrees=500,
                           maxDepth=5,
                           maxBins=64)

In [45]:
t0 = time()
predictor_3 = rf.fit(training)
t1 = round(time() - t0,4)

22/01/24 11:19:44 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:44 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:19:45 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
22/01/24 11:19:53 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
22/01/24 11:19:54 WARN MemoryStore: Not enough space to cache rdd_304_2 in memory! (computed 62.1 MiB so far)
22/01/24 11:19:54 WARN MemoryStore: Not enough space to cache rdd_304_0 in memory! (computed 62.1 MiB so far)
22/01/24 11:19:54 WARN MemoryStore: Not enough space to cache rdd_304_3 in memory! (computed 62.1 MiB so far)
22/01/24 11:19:54 WARN MemoryStore: Not enough space to cache rdd_304_1 in memory! (computed 62.1 MiB so far)
22/01/24 11:19:54 WARN BlockManager: Persisting block rdd_304_1 to disk instead.
22/01/24 11:19:55 WARN BlockManager: Persisting block rdd_304_0 to disk instead.
22/01/24 11:19:55 WARN BlockManager: Persisting block 

In [46]:
test_result_3 = predictor_3.transform(testing)
test_result_3.show(3)

22/01/24 11:20:55 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(38354,[0,1,2,3,4...|[283.719431530566...|[0.56743886306113...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[280.461505210456...|[0.56092301042091...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[273.762981175837...|[0.54752596235167...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [47]:
# Confusion matrix
test_result_3.groupBy('label', 'prediction').count().show()

22/01/24 11:20:56 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|  537|
|  1.0|       0.0|  953|
|  0.0|       0.0| 2423|
|  4.0|       0.0|  178|
|  3.0|       0.0|  315|
+-----+----------+-----+



22/01/24 11:20:57 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
                                                                                

In [48]:
# Evaluation
acc_3       = round(multi_eval.evaluate(test_result_3, {multi_eval.metricName:'accuracy'}),4)
precision_3 = round(multi_eval.evaluate(test_result_3, {multi_eval.metricName:'weightedPrecision'}),4)
recall_3    = round(multi_eval.evaluate(test_result_3, {multi_eval.metricName:'weightedRecall'}),4)
f1_3        = round(multi_eval.evaluate(test_result_3, {multi_eval.metricName:'f1'}),4)

22/01/24 11:20:58 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/01/24 11:21:00 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/01/24 11:21:02 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/01/24 11:21:04 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
                                                                                

In [49]:
result_3 = spark.createDataFrame([('Random Forest',acc_3, precision_3, recall_3, f1_3 , t1)],
                                 result_columns)
result = result.union(result_3)
result.show()

+-------------------+--------+---------+------+--------+--------+
|              Model|Accuracy|Precision|Recall|F1 Score|Duration|
+-------------------+--------+---------+------+--------+--------+
|        Naive Bayes|  0.5245|   0.5806|0.5245|  0.5464|  3.9727|
|Logistic Regression|  0.5275|    0.549|0.5275|  0.5368|   9.427|
|      Random Forest|  0.5499|   0.3024|0.5499|  0.3902| 71.8243|
+-------------------+--------+---------+------+--------+--------+



## --- Decision Tree

In [50]:
tree = DecisionTreeClassifier(featuresCol='features', #inputs
                      labelCol='label', #ouput
                      predictionCol='prediction'  #prediction
                     )

In [51]:
# Train data
t0 = time()
predictor_4 = tree.fit(training)
t1 = round(time() - t0,4)

22/01/24 11:21:06 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:06 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:07 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
22/01/24 11:21:15 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
22/01/24 11:21:15 WARN MemoryStore: Not enough space to cache rdd_415_1 in memory! (computed 61.3 MiB so far)
22/01/24 11:21:15 WARN BlockManager: Persisting block rdd_415_1 to disk instead.
22/01/24 11:21:15 WARN MemoryStore: Not enough space to cache rdd_415_0 in memory! (computed 61.3 MiB so far)
22/01/24 11:21:15 WARN BlockManager: Persisting block rdd_415_0 to disk instead.
22/01/24 11:21:15 WARN MemoryStore: Not enough space to cache rdd_415_4 in memory! (computed 39.9 MiB so far)
22/01/24 11:21:15 WARN BlockManager: Persisting block rdd_415_4 to disk instead.
22/01/24 11:21:15 WARN MemoryStore: Not enough space to cache rdd_415_3 in memory! 

In [52]:
# Predict test 
test_result_4 = predictor_4.transform(testing)
test_result_4.show(3)

22/01/24 11:21:35 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(38354,[0,1,2,3,4...|[3698.0,879.0,297...|[0.73343911146370...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[3698.0,879.0,297...|[0.73343911146370...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[3698.0,879.0,297...|[0.73343911146370...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [53]:
# Confusion matrix
test_result_4.groupBy('label', 'prediction').count().show()

22/01/24 11:21:36 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
[Stage 234:>                                                        (0 + 5) / 5]

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|  451|
|  1.0|       1.0|    4|
|  3.0|       2.0|   60|
|  4.0|       2.0|   26|
|  1.0|       0.0|  903|
|  2.0|       2.0|   79|
|  2.0|       3.0|    4|
|  1.0|       2.0|   45|
|  0.0|       0.0| 2369|
|  0.0|       2.0|   51|
|  4.0|       0.0|  150|
|  3.0|       0.0|  250|
|  4.0|       1.0|    1|
|  0.0|       1.0|    3|
|  3.0|       1.0|    3|
|  2.0|       1.0|    3|
|  3.0|       3.0|    2|
|  1.0|       3.0|    1|
|  4.0|       3.0|    1|
+-----+----------+-----+



22/01/24 11:21:37 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

In [54]:
# Evaluation
acc_4       = round(multi_eval.evaluate(test_result_4, {multi_eval.metricName:'accuracy'}),4)
precision_4 = round(multi_eval.evaluate(test_result_4, {multi_eval.metricName:'weightedPrecision'}),4)
recall_4    = round(multi_eval.evaluate(test_result_4, {multi_eval.metricName:'weightedRecall'}),4)
f1_4        = round(multi_eval.evaluate(test_result_4, {multi_eval.metricName:'f1'}),4)

22/01/24 11:21:38 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:40 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:41 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:42 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

In [55]:
# Show result
result_4 = spark.createDataFrame([('Decision Tree',acc_4, precision_4, recall_4, f1_4 , t1)],
                                 result_columns)
result = result.union(result_4)
result.show()

+-------------------+--------+---------+------+--------+--------+
|              Model|Accuracy|Precision|Recall|F1 Score|Duration|
+-------------------+--------+---------+------+--------+--------+
|        Naive Bayes|  0.5245|   0.5806|0.5245|  0.5464|  3.9727|
|Logistic Regression|  0.5275|    0.549|0.5275|  0.5368|   9.427|
|      Random Forest|  0.5499|   0.3024|0.5499|  0.3902| 71.8243|
|      Decision Tree|   0.557|   0.4325| 0.557|  0.4248| 28.5125|
+-------------------+--------+---------+------+--------+--------+



### Nhận xét:
- Các model đều cho kết quả không tốt (Accuracy: 52~55%)
- Như đã nói ở trên, dataset thiếu cân bằng => tiến hành resample và build model trên data mới 

# 4. Build Model - Resample data
- Model 5 - NaiveBayes
- Model 6 - Logistic Regression

## --- Oversampling

In [56]:
training.groupby('label').count().show()

22/01/24 11:21:45 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
[Stage 260:>                                                        (0 + 5) / 5]

+-----+-----+
|label|count|
+-----+-----+
|  0.0|10110|
|  1.0| 3954|
|  4.0|  642|
|  3.0| 1232|
|  2.0| 2285|
+-----+-----+



22/01/24 11:21:46 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

In [57]:
label0 = training.filter(col('label')==0)
label1 = training.filter(col('label')==1)
label2 = training.filter(col('label')==2)
label3 = training.filter(col('label')==3)
label4 = training.filter(col('label')==4)

ratio_01 = int(label0.count()/label1.count())
ratio_02 = int(label0.count()/label2.count())
ratio_03 = int(label0.count()/label3.count())
ratio_04 = int(label0.count()/label4.count())

print('ratio label0/label1: {}'.format(ratio_01))
print('ratio label0/label2: {}'.format(ratio_02))
print('ratio label0/label3: {}'.format(ratio_03))
print('ratio label0/label4: {}'.format(ratio_04))

22/01/24 11:21:47 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:48 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:49 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:52 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:54 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:56 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
22/01/24 11:21:57 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
[Stage 308:>                                                        (0 + 5) / 5]

ratio label0/label1: 2
ratio label0/label2: 4
ratio label0/label3: 8
ratio label0/label4: 15


                                                                                

In [58]:
# resample label==1
a1 = range(ratio_01)
oversampled_label1 = label1.withColumn('dummy',explode(array([lit(x) for x in a1]))).drop('dummy')

In [59]:
# resample label==2
a2 = range(ratio_02)
oversampled_label2 = label2.withColumn('dummy',explode(array([lit(x) for x in a2]))).drop('dummy')

In [60]:
# resample label==3
a3 = range(ratio_03)
oversampled_label3 = label3.withColumn('dummy',explode(array([lit(x) for x in a3]))).drop('dummy')

In [61]:
# resample label==4
a4 = range(ratio_04)
oversampled_label4 = label4.withColumn('dummy',explode(array([lit(x) for x in a4]))).drop('dummy')

In [62]:
combined_df = label0.unionAll(oversampled_label1).unionAll(oversampled_label2).\
                     unionAll(oversampled_label3).unionAll(oversampled_label4)
combined_df.groupBy('label').count().show()

22/01/24 11:21:59 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB

+-----+-----+
|label|count|
+-----+-----+
|  0.0|10106|
|  1.0| 7898|
|  2.0| 9092|
|  3.0| 9984|
|  4.0|10005|
+-----+-----+



22/01/24 11:22:04 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

## --- NaiveBayes & Resample data

In [63]:
# Train data
t0 = time()
predictor_5 = nb.fit(combined_df)
t1 = round(time() - t0,4)

22/01/24 11:22:08 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:12 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

In [64]:
# Predict test 
test_result_5 = predictor_5.transform(testing)
test_result_5.show(3)

22/01/24 11:22:14 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(38354,[0,1,2,3,4...|[-1468.1016283134...|[1.0,4.8476976375...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[-1192.9534841717...|[0.99979332103364...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[-1749.5744833291...|[0.99999999986717...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [65]:
# Confusion matrix
test_result_5.groupBy('label', 'prediction').count().show()

22/01/24 11:22:14 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|   25|
|  1.0|       1.0|  733|
|  3.0|       2.0|   24|
|  4.0|       2.0|    5|
|  0.0|       1.0|  186|
|  1.0|       0.0|  140|
|  2.0|       2.0|  458|
|  3.0|       1.0|   18|
|  2.0|       3.0|   20|
|  1.0|       4.0|    8|
|  4.0|       4.0|  156|
|  2.0|       4.0|    9|
|  2.0|       1.0|   25|
|  1.0|       2.0|   55|
|  0.0|       0.0| 2148|
|  1.0|       3.0|   17|
|  4.0|       3.0|    8|
|  0.0|       2.0|   53|
|  3.0|       3.0|  249|
|  0.0|       3.0|   25|
+-----+----------+-----+
only showing top 20 rows



22/01/24 11:22:16 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [66]:
# Evaluation
acc_5       = round(multi_eval.evaluate(test_result_5, {multi_eval.metricName:'accuracy'}),4)
precision_5 = round(multi_eval.evaluate(test_result_5, {multi_eval.metricName:'weightedPrecision'}),4)
recall_5    = round(multi_eval.evaluate(test_result_5, {multi_eval.metricName:'weightedRecall'}),4)
f1_5       = round(multi_eval.evaluate(test_result_5, {multi_eval.metricName:'f1'}),4)

22/01/24 11:22:16 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:22:18 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:22:19 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:22:20 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [67]:
# Show result
result_columns = ['Model','Accuracy','Precision','Recall','F1 Score','Duration']
result_5 = spark.createDataFrame([('NB resample',acc_5, precision_5, recall_5, f1_5 , t1)],
                                 result_columns)
result = result.union(result_5)
result.show()

+-------------------+--------+---------+------+--------+--------+
|              Model|Accuracy|Precision|Recall|F1 Score|Duration|
+-------------------+--------+---------+------+--------+--------+
|        Naive Bayes|  0.5245|   0.5806|0.5245|  0.5464|  3.9727|
|Logistic Regression|  0.5275|    0.549|0.5275|  0.5368|   9.427|
|      Random Forest|  0.5499|   0.3024|0.5499|  0.3902| 71.8243|
|      Decision Tree|   0.557|   0.4325| 0.557|  0.4248| 28.5125|
|        NB resample|  0.8498|   0.8529|0.8498|  0.8508|  8.3558|
+-------------------+--------+---------+------+--------+--------+



## --- Logistic Regression & Resample data

In [68]:
# Train data
t0 = time()
predictor_6 = lg.fit(combined_df)
t1 = round(time() - t0,4)

22/01/24 11:22:24 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:28 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:29 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:33 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:33 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:35 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:36 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:38 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:38 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:40 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:41 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
22/01/24 11:22:43 WARN DAGScheduler: Broadcasting larg

In [69]:
# Predict test 
test_result_6 = predictor_6.transform(testing)
test_result_6.show(3)

22/01/24 11:23:00 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(38354,[0,1,2,3,4...|[10.9474641219806...|[0.99963677759057...|       0.0|
|  0.0|(38354,[0,1,2,4,5...|[7.59509168354888...|[0.40733282120097...|       1.0|
|  0.0|(38354,[0,1,2,4,5...|[12.1003545826426...|[0.99996459177030...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [70]:
# Confusion matrix
test_result_6.groupBy('label', 'prediction').count().show()

22/01/24 11:23:01 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
[Stage 398:>                                                        (0 + 5) / 5]

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|   25|
|  1.0|       1.0|  827|
|  3.0|       2.0|   17|
|  0.0|       1.0|   99|
|  1.0|       0.0|   76|
|  2.0|       2.0|  471|
|  2.0|       3.0|   15|
|  4.0|       4.0|  163|
|  2.0|       4.0|    5|
|  2.0|       1.0|   21|
|  1.0|       2.0|   38|
|  0.0|       0.0| 2291|
|  1.0|       3.0|   10|
|  4.0|       3.0|    7|
|  0.0|       2.0|   21|
|  3.0|       3.0|  269|
|  0.0|       3.0|    6|
|  3.0|       0.0|   12|
|  0.0|       4.0|    6|
|  3.0|       1.0|   11|
+-----+----------+-----+
only showing top 20 rows



22/01/24 11:23:02 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [71]:
# Evaluation
acc_6       = round(multi_eval.evaluate(test_result_6, {multi_eval.metricName:'accuracy'}),4)
precision_6 = round(multi_eval.evaluate(test_result_6, {multi_eval.metricName:'weightedPrecision'}),4)
recall_6    = round(multi_eval.evaluate(test_result_6, {multi_eval.metricName:'weightedRecall'}),4)
f1_6        = round(multi_eval.evaluate(test_result_6, {multi_eval.metricName:'f1'}),4)


22/01/24 11:23:03 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:23:05 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:23:06 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/01/24 11:23:07 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
                                                                                

In [72]:
# Show result
result_6 = spark.createDataFrame([('Logit resample',acc_6, precision_6, recall_6, f1_6 , t1)],
                                 result_columns)
result = result.union(result_6)
result.show()

+-------------------+--------+---------+------+--------+--------+
|              Model|Accuracy|Precision|Recall|F1 Score|Duration|
+-------------------+--------+---------+------+--------+--------+
|        Naive Bayes|  0.5245|   0.5806|0.5245|  0.5464|  3.9727|
|Logistic Regression|  0.5275|    0.549|0.5275|  0.5368|   9.427|
|      Random Forest|  0.5499|   0.3024|0.5499|  0.3902| 71.8243|
|      Decision Tree|   0.557|   0.4325| 0.557|  0.4248| 28.5125|
|        NB resample|  0.8498|   0.8529|0.8498|  0.8508|  8.3558|
|     Logit resample|  0.9126|    0.913|0.9126|  0.9128| 37.0791|
+-------------------+--------+---------+------+--------+--------+



### Nhận xét:
- Có thể nhận thấy model với resampled data cho kết quả tốt hơn rất nhiều.
- Trong đó model xây dựng dựa trên thuật toán Logistic Regression cho kết quả khá tốt với các metrics đều >91% => phù hợp nhất với tập dữ liệu
- => Kết luận: Chọn model Logit resample

# 5. New Prediction
Please predict ratings for products in Womens_Clothing_E_Commerce_Reviews.xlsx, sheetName: new_reviews.

In [73]:
# Read excel file using pandas
df_new = pd.read_excel('data/womens-ecommerce-clothing-reviews/Womens_Clothing_E_Commerce_Reviews.xlsx', sheet_name='new_reviews', index_col=0)

In [74]:
df_new.head(2)

Unnamed: 0,Clothing ID,Age,Title,Review Text,Recommended IND,Positive Feedback Count,Division Name,Department Name,Class Name
0,1077,53,Dress looks like it's made of cheap material,Dress runs small esp where the zipper area run...,0,14,General,Dresses,Dresses
1,862,66,Cute top,Nice top. armholes are a bit oversized but as ...,1,2,General,Tops,Knits


In [75]:
df_new.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 9 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   Clothing ID              5 non-null      int64 
 1   Age                      5 non-null      int64 
 2   Title                    5 non-null      object
 3   Review Text              5 non-null      object
 4   Recommended IND          5 non-null      int64 
 5   Positive Feedback Count  5 non-null      int64 
 6   Division Name            5 non-null      object
 7   Department Name          5 non-null      object
 8   Class Name               5 non-null      object
dtypes: int64(4), object(5)
memory usage: 400.0+ bytes


In [76]:
# convert pandas dataframe to pyspark dataframe
df_new_sub = df_new[['Review Text']]
df_schema = StructType([StructField("Review Text", StringType(), True)])
data_new = spark.createDataFrame(df_new_sub, schema=df_schema)

In [77]:
data_new.show()

+--------------------+
|         Review Text|
+--------------------+
|Dress runs small ...|
|Nice top. armhole...|
|Was really excite...|
|If you are going ...|
|I saw this online...|
+--------------------+



In [78]:
clean_data_new = cleaner.transform(data_new)

22/01/24 11:23:09 WARN StringIndexerModel: Input column Rating does not exist during transformation. Skip StringIndexerModel for this column.


In [79]:
result_new = predictor_6.transform(clean_data_new)

In [80]:
result_new.select('Review Text','prediction').show()

22/01/24 11:23:09 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
22/01/24 11:23:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
22/01/24 11:23:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


+--------------------+----------+
|         Review Text|prediction|
+--------------------+----------+
|Dress runs small ...|       3.0|
|Nice top. armhole...|       1.0|
|Was really excite...|       3.0|
|If you are going ...|       4.0|
|I saw this online...|       0.0|
+--------------------+----------+



In [81]:
# convert 'prediction' to 'Rating'
result_new = result_new.withColumn('Rating', when(result_new.prediction==0,5)
                                            .when(result_new.prediction==1,4)
                                            .when(result_new.prediction==2,3)
                                            .when(result_new.prediction==3,2)
                                            .when(result_new.prediction==4,1)
                                  )

In [82]:
result_new.select('Review Text','Rating').show()

+--------------------+------+
|         Review Text|Rating|
+--------------------+------+
|Dress runs small ...|     2|
|Nice top. armhole...|     4|
|Was really excite...|     2|
|If you are going ...|     1|
|I saw this online...|     5|
+--------------------+------+



22/01/24 11:23:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
22/01/24 11:23:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
22/01/24 11:23:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
