In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *

# Build feature
from pyspark.ml.feature import Tokenizer, RegexTokenizer,StringIndexer,OneHotEncoder
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF, Tokenizer
from pyspark.ml.feature import NGram
from pyspark.ml.feature import VectorAssembler

# ML algrothism
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, NaiveBayesModel, GBTClassifier, MultilayerPerceptronClassifier, LinearSVC
from pyspark.ml import Pipeline

# other
import time


In [50]:
def calculate_confusion_matrix(predicted_df, num_class=3, label='label'):
    #calculate confusion matrix
    import numpy as np
    _df = predicted_df.groupBy(label, 'prediction').count()
    confusion_matrix = np.array([])
    for i in range(3):
        for j in range(num_class):
            try: 
                x = _df.select('count').where((_df[label]==i) & (_df['prediction']==j)).collect()[0][0]
                confusion_matrix = np.append(confusion_matrix, x)
            except IndexError as e:
                confusion_matrix = np.append(confusion_matrix, 0)
    return confusion_matrix.reshape(num_class,num_class).astype(int)

In [4]:
def classification_report(confusion_matrix, num_class=2):
    import numpy as np
    #def for calculate accuracy, recall, precision from confusion matrix
    cnf_sum = np.sum(confusion_matrix)
    sum_ver = np.sum(confusion_matrix, axis=0)
    sum_hor = np.sum(confusion_matrix, axis=1)
    recall = np.array([])
    preci = np.array([])
    acc = (confusion_matrix[0,0]+confusion_matrix[1,1])/cnf_sum
    for i in range(num_class):
        #recall
        _recall = np.array(confusion_matrix[i,i]/sum_hor[i])
        recall = np.append(recall, _recall)
        #precision
        _preci = np.array(confusion_matrix[i,i]/sum_ver[i])
        preci = np.append(preci, _preci)
        #f1-score
        f1 = 2/((1/recall)+(1/preci))
    return {'accuracy': acc, 'recall':recall,'precision': preci, 'f1':f1}

In [5]:
def plot_confusion_matrix(confusion_matrix, normalize=False):
    # Visualization confusion matrix
    import matplotlib.pyplot as plt
    import seaborn as sns
    plt.figure(figsize=(6,6))
    plt.subplot(1,2,1)
    
    sns.heatmap(confusion_matrix, square=True ,annot=True, fmt='.0f', cbar=False,
                xticklabels=[0, 1], yticklabels=[0, 1], cmap=plt.cm.GnBu)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('Actual Label')
    
    if normalize:
        plt.subplot(1,2,2)
        confusion_matrix = confusion_matrix.astype('float') / confusion_matrix.sum(axis=1, keepdims = True)
        sns.heatmap(confusion_matrix, square=True ,annot=True, fmt='.4f', cbar=False,
                xticklabels=[0, 1], yticklabels=[0, 1], cmap=plt.cm.GnBu)
        plt.title('Confusion Matrix with normalize')
        plt.xlabel('Predicted Label')
        plt.ylabel('Actual Label')
    plt.tight_layout()
    plt.show()

In [6]:
SparkContext.setSystemProperty('spark.executor.memory', '16g')

In [7]:
SparkContext.setSystemProperty(key='spark.hadoop.dfs.client.use.datanode.hostname',value='true')

In [8]:
sc =SparkContext()

In [9]:
sc.setLogLevel("ERROR")

In [10]:
spark = SparkSession(sc)

# About Dataset
## Acknowledgements
Acccording to data about review, build a model to determine this review is positive or negative or netral

In [12]:
restaurant_data = spark.read.option("multiline", "true").csv("../Clean_data/clean_restaurant.csv",
                                                             inferSchema=False,
                                                             header=True)

restaurant_data.show()

+---+--------------------+--------------------+--------+---------+---------+----------+--------+
| ID|          Restaurant|             Address|district|price_max|price_min|start_time|end_time|
+---+--------------------+--------------------+--------+---------+---------+----------+--------+
|  1|Gà Rán & Burger M...|2 - 6 Bis Điện Bi...|       1|  50000.0| 200000.0|     00:00|   23:59|
|  2|Cháo Trắng - Cháo...|112B Phạm Viết Ch...|       1|   5000.0|  40000.0|     00:00|   23:59|
|  3|Texas Chicken - N...|115 Nguyễn Thái H...|       1|  30000.0| 300000.0|      null|    null|
|  4|        Bếp Chay 365|Tầng 1, 35 Nguyễn...|       1|  15000.0|  50000.0|      null|    null|
|  5|  Bánh Canh Cua Linh|80A Điện Biên Phủ...|       1|  25000.0|  35000.0|     06:20|   22:00|
|  6|Bún Đậu Mạc Văn K...|90 Trần Quang Khả...|       1|  45000.0| 140000.0|      null|    null|
|  7|Bún Riêu Cua Ốc P...|66 Nguyễn Thái Bì...|       1|  35000.0|  55000.0|     06:30|   21:30|
|  8|Há Cảo Đặc Biệt N...|86 N

In [13]:
review_data = spark.read.option("multiline", "true").csv("../Clean_data/clean_review_data.csv",
                      inferSchema=True,header=True)
review_data.show()

+------------+-------------------+-------+------------------+--------------------+----------------+-------------+-----+
|IDRestaurant|          date_time|user_id|     rating_scaler|        clean_review|sentiment_encode|rating_encode|label|
+------------+-------------------+-------+------------------+--------------------+----------------+-------------+-----+
|           1|2023-12-20 21:13:00|   8670| 5.680000000000001|gà chiên còn sống...|             0.0|          2.0|  2.0|
|           1|2023-09-25 15:43:00|  11063|               5.5|     đã ăn ăn đồng_ý|             0.0|          2.0|  2.0|
|           1|2023-06-24 11:11:00|   9541|               5.5|     đã thư rất ngon|             1.0|          2.0|  2.0|
|           1|2022-12-22 14:58:00|   9112|               1.9|về gói thêm khách...|             0.0|          0.0|  0.0|
|           1|2022-09-23 22:40:00|   9651|              4.78|nhỏ kêu đói hồi b...|             0.0|          2.0|  2.0|
|           1|2022-09-15 11:32:00|  1134

# Overview

In [14]:
# create label with fake news = 0
review_data.show(10)
review_data.printSchema()
print(f'There are {review_data.count()} rows and {len(review_data.columns)} columns in dataframe')

+------------+-------------------+-------+-----------------+--------------------+----------------+-------------+-----+
|IDRestaurant|          date_time|user_id|    rating_scaler|        clean_review|sentiment_encode|rating_encode|label|
+------------+-------------------+-------+-----------------+--------------------+----------------+-------------+-----+
|           1|2023-12-20 21:13:00|   8670|5.680000000000001|gà chiên còn sống...|             0.0|          2.0|  2.0|
|           1|2023-09-25 15:43:00|  11063|              5.5|     đã ăn ăn đồng_ý|             0.0|          2.0|  2.0|
|           1|2023-06-24 11:11:00|   9541|              5.5|     đã thư rất ngon|             1.0|          2.0|  2.0|
|           1|2022-12-22 14:58:00|   9112|              1.9|về gói thêm khách...|             0.0|          0.0|  0.0|
|           1|2022-09-23 22:40:00|   9651|             4.78|nhỏ kêu đói hồi b...|             0.0|          2.0|  2.0|
|           1|2022-09-15 11:32:00|  11343|      

- Combine 2 data frame 

In [15]:
review_data.groupby('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 2204|
|  1.0|16717|
|  2.0|11037|
+-----+-----+



In [16]:
review_data.show()

+------------+-------------------+-------+------------------+--------------------+----------------+-------------+-----+
|IDRestaurant|          date_time|user_id|     rating_scaler|        clean_review|sentiment_encode|rating_encode|label|
+------------+-------------------+-------+------------------+--------------------+----------------+-------------+-----+
|           1|2023-12-20 21:13:00|   8670| 5.680000000000001|gà chiên còn sống...|             0.0|          2.0|  2.0|
|           1|2023-09-25 15:43:00|  11063|               5.5|     đã ăn ăn đồng_ý|             0.0|          2.0|  2.0|
|           1|2023-06-24 11:11:00|   9541|               5.5|     đã thư rất ngon|             1.0|          2.0|  2.0|
|           1|2022-12-22 14:58:00|   9112|               1.9|về gói thêm khách...|             0.0|          0.0|  0.0|
|           1|2022-09-23 22:40:00|   9651|              4.78|nhỏ kêu đói hồi b...|             0.0|          2.0|  2.0|
|           1|2022-09-15 11:32:00|  1134

## check null

In [17]:
review_data.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in review_data.columns if c not in ['date_time']]).show()

+------------+-------+-------------+------------+----------------+-------------+-----+
|IDRestaurant|user_id|rating_scaler|clean_review|sentiment_encode|rating_encode|label|
+------------+-------+-------------+------------+----------------+-------------+-----+
|           0|      0|            0|           0|               0|            0|    0|
+------------+-------+-------------+------------+----------------+-------------+-----+



In [18]:
review_data.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in review_data.columns if c not in ['date_time']]).show()

+------------+-------+-------------+------------+----------------+-------------+-----+
|IDRestaurant|user_id|rating_scaler|clean_review|sentiment_encode|rating_encode|label|
+------------+-------+-------------+------------+----------------+-------------+-----+
|           0|      0|            0|         110|               0|            0|    0|
+------------+-------+-------------+------------+----------------+-------------+-----+



In [19]:
# df contains null values => drop it
review_data.filter(f.isnull(f.col('clean_review'))).show()

+------------+-------------------+-------+-----------------+------------+----------------+-------------+-----+
|IDRestaurant|          date_time|user_id|    rating_scaler|clean_review|sentiment_encode|rating_encode|label|
+------------+-------------------+-------+-----------------+------------+----------------+-------------+-----+
|           1|2021-01-02 22:42:00|  12116|              7.3|        null|             2.0|          1.0|  1.0|
|          19|2023-11-22 13:07:00|   8334|              5.5|        null|             2.0|          2.0|  2.0|
|          19|2023-11-22 10:44:00|   9208|             10.0|        null|             2.0|          1.0|  1.0|
|          20|2024-01-25 10:55:00|   1438|             10.0|        null|             2.0|          1.0|  1.0|
|          38|2023-10-10 13:06:00|  10272|              5.5|        null|             2.0|          2.0|  2.0|
|          38|2023-10-09 13:02:00|  11209|             10.0|        null|             2.0|          1.0|  1.0|
|

In [20]:
clean_review_data = review_data.dropna(subset=['clean_review']).drop('date_time')
clean_review_data.show(10)
clean_review_data.printSchema()
print(f'There are {clean_review_data.count()} rows and {len(clean_review_data.columns)} columns in dataframe')

+------------+-------+-----------------+--------------------+----------------+-------------+-----+
|IDRestaurant|user_id|    rating_scaler|        clean_review|sentiment_encode|rating_encode|label|
+------------+-------+-----------------+--------------------+----------------+-------------+-----+
|           1|   8670|5.680000000000001|gà chiên còn sống...|             0.0|          2.0|  2.0|
|           1|  11063|              5.5|     đã ăn ăn đồng_ý|             0.0|          2.0|  2.0|
|           1|   9541|              5.5|     đã thư rất ngon|             1.0|          2.0|  2.0|
|           1|   9112|              1.9|về gói thêm khách...|             0.0|          0.0|  0.0|
|           1|   9651|             4.78|nhỏ kêu đói hồi b...|             0.0|          2.0|  2.0|
|           1|  11343|             7.66|có khuyến_mại tặn...|             1.0|          1.0|  1.0|
|           1|    376|              1.9|thề lâu lắm mới t...|             0.0|          0.0|  0.0|
|         

## Descirble data

In [21]:
cat_col = ['subject']
cat_col

['subject']

## Category data

### Check distinct values

In [22]:
(clean_review_data
 .groupby('IDRestaurant','label')
 .agg(f.count('rating_encode').alias('Count label'))
 .orderBy(f.col('Count label').desc())
 .show())

+------------+-----+-----------+
|IDRestaurant|label|Count label|
+------------+-----+-----------+
|        1589|  1.0|         96|
|         475|  1.0|         94|
|         917|  2.0|         94|
|        1180|  1.0|         92|
|        1164|  1.0|         90|
|        1521|  1.0|         90|
|         161|  1.0|         89|
|        1277|  1.0|         86|
|        1169|  1.0|         86|
|         523|  1.0|         85|
|         772|  1.0|         83|
|          20|  1.0|         81|
|         543|  1.0|         81|
|         600|  1.0|         81|
|         169|  1.0|         79|
|         371|  1.0|         78|
|          51|  1.0|         78|
|         198|  1.0|         78|
|         603|  1.0|         77|
|        1074|  1.0|         77|
+------------+-----+-----------+
only showing top 20 rows



### Check value count of column label in df 

In [23]:

(clean_review_data.groupby('label').count()
 .withColumn('Normalize',(f.col('count')/clean_review_data.count()))
 .orderBy(f.col('count').desc())
 .show(20,vertical=False,truncate=100))

+-----+-----+-------------------+
|label|count|          Normalize|
+-----+-----+-------------------+
|  1.0|16648| 0.5577593138568748|
|  2.0|11010| 0.3688689359421067|
|  0.0| 2190|0.07337175020101849|
+-----+-----+-------------------+



transform tf-idf of feature `clean_review`

In [24]:
df_final = clean_review_data.select('clean_review','label')
df_final.show(5)

+--------------------+-----+
|        clean_review|label|
+--------------------+-----+
|gà chiên còn sống...|  2.0|
|     đã ăn ăn đồng_ý|  2.0|
|     đã thư rất ngon|  2.0|
|về gói thêm khách...|  0.0|
|nhỏ kêu đói hồi b...|  2.0|
+--------------------+-----+
only showing top 5 rows



# Feature Transform

In [25]:
#some parameter for countvectorizer
#define vocabsize for maximum number of words list, the higher values is higher accuracy but more expensive computing.
#define mindf for the minimum of the word frequency that appear in all dataset,
#the word that appear less frequently than this threshold will be ignored
_vocabsize = 1000
_mindf = int((0.4/100)*(df_final.count()))
_mindf

119

In [26]:
tokenizer = Tokenizer(inputCol='clean_review',outputCol='token_text')
remover = StopWordsRemover(inputCol='token_text', outputCol='stop_token')
count_vec = CountVectorizer(inputCol='stop_token',outputCol='c_vec',vocabSize=_vocabsize, minDF=_mindf)
idf = IDF(inputCol='c_vec', outputCol='tf_idf',minDocFreq=2)

In [27]:
clean_up = VectorAssembler(inputCols=['tf_idf'],
                           handleInvalid='keep',
                           outputCol='features') # 6

# Pipeline 

In [28]:
nlp_pre_pipeline = Pipeline(stages=[tokenizer,
                                   remover, 
                                   count_vec, 
                                   idf,
                                   clean_up]).fit(df_final)

In [29]:
trans_data = nlp_pre_pipeline.transform(df_final)

In [30]:
trans_clean_data = trans_data.select('features','label')

## Suffle data

In [31]:
trans_clean_data = trans_clean_data.orderBy(f.rand(42))
trans_clean_data.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(978,[0,1,2,3,4,5...|  1.0|
|(978,[0,1,4,7,12,...|  1.0|
|(978,[2,6,8,17,21...|  1.0|
|(978,[0,1,2,4,5,6...|  1.0|
|(978,[0,9,13,15,3...|  1.0|
|(978,[14,18,55,11...|  2.0|
|(978,[0,1,3,4,5,7...|  1.0|
|(978,[4,23,36,59,...|  1.0|
|(978,[0,4,7,21,39...|  1.0|
|(978,[1,2,3,4,5,7...|  1.0|
+--------------------+-----+
only showing top 10 rows



In [32]:
(train, test) = trans_clean_data.randomSplit([0.8,0.2],seed=42)

In [33]:
test.count()

5842

# Model

## Naive Bayes 

In [77]:
start = time.time()
nb_model = NaiveBayes(featuresCol='features',
                    labelCol='label',
                    predictionCol='prediction').fit(train)
period = (time.time() - start)/60
print(f'Total time to train model: {period} mins')

Total time to train model: 0.1582290291786194 mins


In [80]:
model = Pipeline(stages=[nlp_pre_pipeline,nb_model]).fit(train)

## Random Forest

In [35]:
start = time.time()
rf = RandomForestClassifier(featuresCol='features',
                            labelCol='label',
                            predictionCol='prediction').fit(train)
period = (time.time() - start)/60
print(f'Total time to train model: {period} mins')

Total time to train model: 0.3419677257537842 mins


## Multilayer perceptron classifier

In [36]:
start = time.time()
layers = [5, 5, 4, 3]
pecep = MultilayerPerceptronClassifier(maxIter=100, 
                                       layers=layers, 
                                       blockSize=128, 
                                       seed=1234)
pecep_model = pecep.fit(train)
period = (time.time() - start)/60
print(f'Total time to train model: {period} mins')

Total time to train model: 0.4014607350031535 mins


**As can be seen, the fastest model is Naive Baiyes, Next model is Random Forest  and the last one is  Multilayer perceptron classifier** 

# Evaluation

In [37]:
nb_pred = nb_model.transform(test)
nb_pred.groupby('label','prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|  375|
|  1.0|       1.0| 2716|
|  0.0|       1.0|   14|
|  2.0|       2.0|  980|
|  1.0|       0.0|   79|
|  2.0|       1.0|  800|
|  1.0|       2.0|  458|
|  0.0|       0.0|  313|
|  0.0|       2.0|  107|
+-----+----------+-----+



In [42]:
rf_pred = rf.transform(test)
rf_pred.groupby('label','prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0| 3246|
|  0.0|       1.0|  329|
|  2.0|       2.0|   81|
|  2.0|       1.0| 2074|
|  0.0|       2.0|  104|
|  1.0|       2.0|    7|
|  0.0|       0.0|    1|
+-----+----------+-----+



## Confusion matrix of 2 models

In [52]:
# Confusion matrix for Naive Bayes
cm_nb = calculate_confusion_matrix(nb_pred)

In [62]:
pd.Dacm_nb

array([[ 313,   14,  107],
       [  79, 2716,  458],
       [ 375,  800,  980]])

In [51]:
# Confusion matrix for Random Forest
cm_rf = calculate_confusion_matrix(rf_pred)

In [54]:
cm_rf

array([[   1,  329,  104],
       [   0, 3246,    7],
       [   0, 2074,   81]])

In [68]:
classification_report(cm_nb,3)

{'accuracy': 0.5184868195823348,
 'recall': array([0.72119816, 0.83492161, 0.45475638]),
 'precision': array([0.40808344, 0.7694051 , 0.63430421]),
 'f1': array([0.52123231, 0.80082559, 0.52972973])}

In [69]:
classification_report(cm_rf,3)

{'accuracy': 0.5558028072577884,
 'recall': array([0.00230415, 0.99784814, 0.03758701]),
 'precision': array([1.        , 0.57461498, 0.421875  ]),
 'f1': array([0.0045977 , 0.72927432, 0.06902429])}

- Random Forest give the worst result than Naive Bayes

In [70]:
multi_evaluator = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='label')

# Naive Baiyes
acc_nb = multi_evaluator.evaluate(nb_pred,{multi_evaluator.metricName:'accuracy'})
precision_nb = multi_evaluator.evaluate(nb_pred,{multi_evaluator.metricName:'weightedPrecision'})
recall_nb = multi_evaluator.evaluate(nb_pred,{multi_evaluator.metricName:'weightedRecall'})
f1_nb = multi_evaluator.evaluate(nb_pred,{multi_evaluator.metricName:'f1'})

# Random Forest 
acc_rf = multi_evaluator.evaluate(rf_pred,{multi_evaluator.metricName:'accuracy'})
precision_rf = multi_evaluator.evaluate(rf_pred,{multi_evaluator.metricName:'weightedPrecision'})
recall_rf = multi_evaluator.evaluate(rf_pred,{multi_evaluator.metricName:'weightedRecall'})
f1_rf = multi_evaluator.evaluate(rf_pred,{multi_evaluator.metricName:'f1'})


print("Accuracy score of Random Forest :", acc_rf)
print("Accuracy score of Naive Bayes:", acc_nb)
print('\n')
print("Precision score of Random Forest :", precision_rf)
print("Precision score of Naive Bayes:", precision_nb)
print('\n')
print("Recall score of Random Forest :", recall_rf)
print("Recall score of Naive Bayes:", recall_nb)
print('\n')
print("F1 score of Random Forest :", f1_rf)
print("F1 score of Naive Bayes:", f1_nb)
print('\n')

Accuracy score of Random Forest : 0.5696679219445395
Accuracy score of Naive Bayes: 0.6862375898664841


Precision score of Random Forest : 0.5498738689249729
Precision score of Naive Bayes: 0.6927265607152568


Recall score of Random Forest : 0.5696679219445396
Recall score of Naive Bayes: 0.6862375898664841


F1 score of Random Forest : 0.43188498861907987
F1 score of Naive Bayes: 0.6800527291791921




**From the above results, we can see Naive Baiyes better than Random Forest in both performance and training time**

`=> Choose Naive Bayes`


# Save model 

In [76]:
nb_model.save()

Py4JJavaError: An error occurred while calling o1849.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1599)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1599)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1585)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1585)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.classification.NaiveBayesModel$NaiveBayesModelWriter.saveImpl(NaiveBayes.scala:571)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
	at org.apache.hadoop.mapred.OutputCommitter.commitJob(OutputCommitter.java:291)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$3(SparkHadoopWriter.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
	... 51 more
