In [1]:
# !apt update
# !apt-get install openjdk-11-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
# !tar -xvf spark-3.3.0-bin-hadoop3.tgz
# !pip install -q findspark
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [2]:
# from google.colab import drive
# drive.mount('/content/gdrive', force_remount=True)
# %cd '/content/gdrive/MyDrive/DATA SCIENCE - ĐH KHTN/LDS9 - Big Data in Machine Learning/'

In [3]:
import findspark
findspark.init()

In [4]:
import pyspark
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegressionModel

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from datetime import datetime

In [6]:
spark = SparkSession.builder.appName('Project3-Sendo-Sentiment').getOrCreate()

In [7]:
data = spark.read.csv("Sendo_reviews_cleaned.csv", inferSchema=True,header=True)
data.count()

5268

In [8]:
data.show(5)

+----------+--------------+----------------+------------------+------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------+--------+----------+---------+-----------+--------------------+
|product_id|   customer_id|       full_name|      created_time|rating|             content|         content_raw|positive_words_count|negative_words_count|positive_emojis_count|negative_emojis_count|positive|negative|rating_new|sentiment|length_text|           temp_list|
+----------+--------------+----------------+------------------+------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------+--------+----------+---------+-----------+--------------------+
|  10119100|NguyenCatTuong|Nguyễn Cát Tường|20:22 | 21/12/2018|     4|    shop phục_vụ tốt|Shop phục vụ khá ...|                   1|                   0|                    0|           

In [9]:
data.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- created_time: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- content: string (nullable = true)
 |-- content_raw: string (nullable = true)
 |-- positive_words_count: string (nullable = true)
 |-- negative_words_count: integer (nullable = true)
 |-- positive_emojis_count: integer (nullable = true)
 |-- negative_emojis_count: integer (nullable = true)
 |-- positive: integer (nullable = true)
 |-- negative: integer (nullable = true)
 |-- rating_new: integer (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- length_text: string (nullable = true)
 |-- temp_list: string (nullable = true)



In [10]:
data = data.filter(data['sentiment']!='6')

In [11]:
print('Before: ', data.count())

cols = ['rating','content','sentiment']
for col in cols:
    data = data.filter(data[col].isNotNull())
print('After: ', data.count())

Before:  5267
After:  5127


In [12]:
data.groupBy('sentiment').count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
| positive| 4619|
|  neutral|  152|
| negative|  356|
+---------+-----+



In [13]:
# chọn những cột cần thiết
data_new = data.select(['content', 'sentiment'])
data_new.show(5)

+--------------------+---------+
|             content|sentiment|
+--------------------+---------+
|    shop phục_vụ tốt| positive|
|      sản_phẩm mô_tả| positive|
|                hàng| positive|
|    sản_phẩm dịch_vụ| positive|
|sản_phẩm dịch_vụ ...| positive|
+--------------------+---------+
only showing top 5 rows



#### Xử lý dữ liệu văn bản

In [14]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [15]:
data_new = data_new.withColumn('length', length(data_new['content']))
data_new.show(10)

+--------------------+---------+------+
|             content|sentiment|length|
+--------------------+---------+------+
|    shop phục_vụ tốt| positive|    16|
|      sản_phẩm mô_tả| positive|    14|
|                hàng| positive|     4|
|    sản_phẩm dịch_vụ| positive|    16|
|sản_phẩm dịch_vụ ...| positive|   103|
|shop tư_vấn hướng...| positive|    28|
|    sản_phẩm tốt đợi| positive|    16|
|      sản_phẩm mô_tả|  neutral|    14|
|sản_phẩm hàng đón...| positive|    40|
|chuyên_nghiệp đón...| positive|    46|
+--------------------+---------+------+
only showing top 10 rows



In [16]:
# Calculate class weights based on class distribution
class_counts = data_new.groupBy("sentiment").count()
total_count = data_new.count()
class_weights = class_counts.withColumn("weight", total_count/ class_counts["count"] )

In [17]:
class_weights.show()

+---------+-----+------------------+
|sentiment|count|            weight|
+---------+-----+------------------+
| positive| 4619| 1.109980515263044|
|  neutral|  152| 33.73026315789474|
| negative|  356|14.401685393258427|
+---------+-----+------------------+



In [18]:
# Join the calculated class weights with your original DataFrame
data_new = data_new.join(class_weights, on="sentiment")

In [19]:
data_new.show(10)

+---------+--------------------+------+-----+-----------------+
|sentiment|             content|length|count|           weight|
+---------+--------------------+------+-----+-----------------+
| positive|    shop phục_vụ tốt|    16| 4619|1.109980515263044|
| positive|      sản_phẩm mô_tả|    14| 4619|1.109980515263044|
| positive|                hàng|     4| 4619|1.109980515263044|
| positive|    sản_phẩm dịch_vụ|    16| 4619|1.109980515263044|
| positive|sản_phẩm dịch_vụ ...|   103| 4619|1.109980515263044|
| positive|shop tư_vấn hướng...|    28| 4619|1.109980515263044|
| positive|    sản_phẩm tốt đợi|    16| 4619|1.109980515263044|
|  neutral|      sản_phẩm mô_tả|    14|  152|33.73026315789474|
| positive|sản_phẩm hàng đón...|    40| 4619|1.109980515263044|
| positive|chuyên_nghiệp đón...|    46| 4619|1.109980515263044|
+---------+--------------------+------+-----+-----------------+
only showing top 10 rows



In [20]:
tokenizer = Tokenizer(inputCol='content', outputCol='token_text')
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='c_vec')
idf = IDF(inputCol='c_vec', outputCol='tf_idf')
class_to_num = StringIndexer(inputCol='sentiment', outputCol='label')

In [21]:
clean_up = VectorAssembler(inputCols=['tf_idf', 'length','weight'], outputCol='features')

### Pipeline

In [22]:
from pyspark.ml import Pipeline

In [23]:
data_prep_pipe = Pipeline(stages=[class_to_num, tokenizer,
                                  stopremove, count_vec,
                                  idf, 
                                  clean_up])

In [24]:
cleaner = data_prep_pipe.fit(data_new)

In [25]:
clean_data = cleaner.transform(data_new)

### Training và Evaluation

In [26]:
clean_data = clean_data.select(['label', 'features'])

In [27]:
clean_data.show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(1625,[0,7,23,162...|
|  0.0|(1625,[1,2,1623,1...|
|  0.0|(1625,[6,1623,162...|
|  0.0|(1625,[1,3,1623,1...|
|  0.0|(1625,[0,1,2,3,4,...|
|  0.0|(1625,[7,13,14,15...|
|  0.0|(1625,[0,1,29,162...|
|  2.0|(1625,[1,2,1623,1...|
|  0.0|(1625,[0,1,6,9,18...|
|  0.0|(1625,[0,4,6,8,9,...|
+-----+--------------------+
only showing top 10 rows



In [28]:
clean_data.groupBy('label').count().show() # 0: positive, 1: negative, 2: neutral

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 4619|
|  1.0|  356|
|  2.0|  152|
+-----+-----+



In [29]:
(training, testing) = clean_data.randomSplit([0.8, 0.2])

## Select model

* Trong bài toán này, áp dụng các model sau đây, so sánh kết quả và chọn model tốt nhất
    - NaiveBayes
    - LogisticRegression
    - DecisionTreeClassifier
    - RandomForestClassifier
    - GBTClassifier

In [30]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes

In [31]:
class_counts = clean_data.groupBy("label").count()
class_counts.show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 4619|
|  1.0|  356|
|  2.0|  152|
+-----+-----+



In [32]:
##NaiveBayes
t0= datetime.now()
# Use defaults
nb = NaiveBayes()
nb_model = nb.fit(training)
nb_predictions = nb_model.transform(testing)
time_nb = datetime.now() -t0

In [33]:
##LogisticRegression
t0= datetime.now()
logistic = LogisticRegression(featuresCol='features', labelCol= 'label', predictionCol='prediction')
logistic_model = logistic.fit(training)
logistic_predictions = logistic_model.transform(testing)
time_logistic = datetime.now() -t0

In [34]:
##DecisionTreeClassifier
t0= datetime.now()
# Create a classifier object and fit to the traning data
dtc = DecisionTreeClassifier(featuresCol='features', labelCol= 'label', predictionCol='prediction')
dtc_model = dtc.fit(training)
dtc_predictions = dtc_model.transform(testing)
time_dt = datetime.now() -t0

In [35]:
## RandomForestClassifier
t0= datetime.now()
rfc = RandomForestClassifier(featuresCol='features', labelCol= 'label', predictionCol='prediction')
rfc_model = rfc.fit(training)
rfc_predictions = rfc_model.transform(testing)
time_rf = datetime.now() -t0

In [36]:
# ## GBTClassifier
# t0= datetime.now()
# gbt = GBTClassifier(featuresCol='features', labelCol= 'label', predictionCol='prediction')
# gbt_model = gbt.fit(training)
# gbt_predictions = gbt_model.transform(testing)
# time_gbt = datetime.now() -t0

In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
# select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                                  predictionCol='prediction',
                                                  metricName='accuracy')
f1_evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                                  predictionCol='prediction',
                                                  metricName='f1')
precision_evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                                  predictionCol='prediction',
                                                  metricName='precisionByLabel')
recall_evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                                  predictionCol='prediction',
                                                  metricName='recallByLabel')

In [38]:
## Accuracy
nb_acc = acc_evaluator.evaluate(nb_predictions)
log_acc = acc_evaluator.evaluate(logistic_predictions)
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
# gbt_acc = acc_evaluator.evaluate(gbt_predictions)

## f1
nb_f1 = f1_evaluator.evaluate(nb_predictions)
log_f1 = f1_evaluator.evaluate(logistic_predictions)
dtc_f1 = f1_evaluator.evaluate(dtc_predictions)
rfc_f1 = f1_evaluator.evaluate(rfc_predictions)
# gbt_f1 = f1_evaluator.evaluate(gbt_predictions)

## precision
nb_precision = precision_evaluator.evaluate(nb_predictions)
log_precision = precision_evaluator.evaluate(logistic_predictions)
dtc_precision = precision_evaluator.evaluate(dtc_predictions)
rfc_precision= precision_evaluator.evaluate(rfc_predictions)
# gbt_precision = precision_evaluator.evaluate(gbt_predictions)

## recall
nb_recall = recall_evaluator.evaluate(nb_predictions)
log_recall = recall_evaluator.evaluate(logistic_predictions)
dtc_recall = recall_evaluator.evaluate(dtc_predictions)
rfc_recall = recall_evaluator.evaluate(rfc_predictions)
# gbt_recall = recall_evaluator.evaluate(gbt_predictions)

In [39]:
print("Results:")
print('-'*100)
print('Naive Bayes')
print(f'+ A single Naive Bayes has an accuracy of {nb_acc*100:.2f}%')
print(f'+ A single Naive Bayes has an f1 score of {nb_f1*100:.2f}%')
print(f'+ A single Naive Bayes has an precision score of {nb_precision*100:.2f}%')
print(f'+ A single Naive Bayes has an recall score of {nb_recall*100:.2f}%')
print('Total time Naive Bayes:', time_nb)
print('Confusion Matrix: \n',MulticlassMetrics(nb_predictions.select(['prediction','label']).rdd.map(tuple)).confusionMatrix().toArray())
nb_predictions.groupBy('label', 'prediction').count().show()

print('-'*100)
print('Logistic Regression')
print(f'+ A ansemble using Logistic has an accuracy of {log_acc*100:.2f}%')
print(f'+ A single Logistic has an f1 score of {log_f1*100:.2f}%')
print(f'+ A single Logistic has an precision score of {log_precision*100:.2f}%')
print(f'+ A single Logistic has an recall score of {log_recall*100:.2f}%')
print('Total time Logistic:', time_logistic)
print('Confusion Matrix:\n',MulticlassMetrics(logistic_predictions.select(['prediction','label']).rdd.map(tuple)).confusionMatrix().toArray())
logistic_predictions.groupBy('label', 'prediction').count().show()

print('-'*100)
print('Decision Tree')
print(f'+ A single Decision Tree has an accuracy of {dtc_acc*100:.2f}%')
print(f'+ A single Decision Tree has an f1 score of {dtc_f1*100:.2f}%')
print(f'+ A single Decision Tree has an precision score of {dtc_precision*100:.2f}%')
print(f'+ A single Decision Tree has an recall score of {dtc_recall*100:.2f}%')
print('Total time Decision Tree:', time_dt)
print('Confusion Matrix:\n',MulticlassMetrics(dtc_predictions.select(['prediction','label']).rdd.map(tuple)).confusionMatrix().toArray())
dtc_predictions.groupBy('label', 'prediction').count().show()

print('-'*100)
print('Random Forest')
print(f'+ A Random Forest ensemble has an accuracy of {rfc_acc*100:.2f}%')
print(f'+ A Random Forest ensemble has an f1 score of {rfc_f1*100:.2f}%')
print(f'+ A Random Forest ensemble has an precision score of {rfc_precision*100:.2f}%')
print(f'+ A Random Forest ensemble has an recall score of {rfc_recall*100:.2f}%')
print('Total time Random Forest ensemble:', time_rf)
print('Confusion Matrix:\n',MulticlassMetrics(rfc_predictions.select(['prediction','label']).rdd.map(tuple)).confusionMatrix().toArray())
rfc_predictions.groupBy('label', 'prediction').count().show()

# print('-'*100)
# print(f'+ A ansemble using GBT has an accuracy of\t\t{gbt_acc*100:.2f}%')
# print(f'+ A single Naive Bayes has an f1 score of\t\t\t{gbt_f1*100:.2f}%')
# print(f'+ A single Naive Bayes has an precision score of\t\t\t{gbt_precision*100:.2f}%')
# print(f'+ A single Naive Bayes has an recall score of\t\t\t{gbt_recall*100:.2f}%')
# print('Confusion Matrix:'MulticlassMetrics(gbt_predictions.select(['prediction','label']).rdd.map(tuple)).confusionMatrix().toArray())
# gbt_predictions.groupBy('label', 'prediction').count().show()
print('-'*42*2)

Results:
----------------------------------------------------------------------------------------------------
Naive Bayes
+ A single Naive Bayes has an accuracy of 90.79%
+ A single Naive Bayes has an f1 score of 92.20%
+ A single Naive Bayes has an precision score of 100.00%
+ A single Naive Bayes has an recall score of 93.04%
Total time Naive Bayes: 0:00:01.647500




Confusion Matrix: 
 [[869.  52.  13.]
 [  0.  45.  24.]
 [  0.   6.  22.]]
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   45|
|  0.0|       1.0|   52|
|  2.0|       2.0|   22|
|  2.0|       1.0|    6|
|  1.0|       2.0|   24|
|  0.0|       0.0|  869|
|  0.0|       2.0|   13|
+-----+----------+-----+

----------------------------------------------------------------------------------------------------
Logistic Regression
+ A ansemble using Logistic has an accuracy of 96.51%
+ A single Logistic has an f1 score of 96.77%
+ A single Logistic has an precision score of 100.00%
+ A single Logistic has an recall score of 98.29%
Total time Logistic: 0:00:03.453211
Confusion Matrix:
 [[918.  11.   5.]
 [  0.  53.  16.]
 [  0.   4.  24.]]
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   53|
|  0.0|       1.0|   11|
|  2.0|       2.0|   24|
|  2.0|       1.0|    4|
|  1.0|       2.0|   16|
|  0.0|       

**Nhận xét:**
* Từ kết quả trên có thể thấy thời gian chạy giữa các mô hình không có quá nhiều chênh lệch
* Decision Tree tuy có các chỉ số rất cao, tuy nhiên vì tất cả đều là 100% nên có sẽ xảy ra hiện tượng bias khi dự đoán dữ liệu mới
*Thời gian chạy giữa các mô hình không chênh lệch quá nhiều
* Vì vậy, chọn mô hình có chỉ số cao thứ 2 là Logistic Regression cho dữ liệu này

#### Cross-Validation

In [40]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [41]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(training)

predictions = cvModel.transform(testing)
# Evaluate best model
evaluator.evaluate(predictions)

0.9260346879943813

* Kết quả không cải thiện so với default

## Áp dụng model

In [42]:
logistic = LogisticRegression(featuresCol='features', labelCol= 'label', predictionCol='prediction')
logistic_model = logistic.fit(training)
logistic_predictions = logistic_model.transform(testing)

In [43]:
## Accuracy
log_acc = acc_evaluator.evaluate(logistic_predictions)

## f1
log_f1 = f1_evaluator.evaluate(logistic_predictions)

## precision
log_precision = precision_evaluator.evaluate(logistic_predictions)

## recall
log_recall = recall_evaluator.evaluate(logistic_predictions)

In [44]:
print('Logistic Regression')
print(f'+ A ansemble using Logistic has an accuracy of {log_acc*100:.2f}%')
print(f'+ A single Logistic has an f1 score of {log_f1*100:.2f}%')
print(f'+ A single Logistic has an precision score of {log_precision*100:.2f}%')
print(f'+ A single Logistic has an recall score of {log_recall*100:.2f}%')
logistic_predictions.groupBy('label', 'prediction').count().show()

Logistic Regression
+ A ansemble using Logistic has an accuracy of 96.51%
+ A single Logistic has an f1 score of 96.77%
+ A single Logistic has an precision score of 100.00%
+ A single Logistic has an recall score of 98.29%
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   53|
|  0.0|       1.0|   11|
|  2.0|       2.0|   24|
|  2.0|       1.0|    4|
|  1.0|       2.0|   16|
|  0.0|       0.0|  918|
|  0.0|       2.0|    5|
+-----+----------+-----+

