In [1]:
import findspark
findspark.init()

import numpy as np
import time
import pyspark as ps
from pyspark import SparkConf

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
conf = SparkConf()
conf.set('spark.executor.memory', '16g')
conf.set('spark.driver.memory', '8g')

<pyspark.conf.SparkConf at 0x7fc470406100>

In [3]:
#Create SparkSession
spark = SparkSession \
    .builder \
    .appName('Restaurant Customer Satisfaction Rating') \
    .config(conf=conf) \
    .getOrCreate()

2022-01-28 23:04:33,317 WARN util.Utils: Your hostname, Jin resolves to a loopback address: 127.0.1.1; using 192.168.43.24 instead (on interface wifi0)
2022-01-28 23:04:33,319 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-01-28 23:04:35,270 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#Load data
load_start = time.process_time_ns()
df = spark.read.csv('smote_data.csv', inferSchema=True, header=True)
load_end = time.process_time_ns() - load_start
df.show(3)

                                                                                

+---+-----+----+-----+-----------------+------------------+------+
|_c0| name|city|price|number of reviews|number of cuisines|rating|
+---+-----+----+-----+-----------------+------------------+------+
|  0|63428|   0|  1.0|            136.0|               3.0|   2.0|
|  1|26471|   0|  2.0|            812.0|               4.0|   2.0|
|  2|52831|   0|  2.0|            567.0|               6.0|   2.0|
+---+-----+----+-----+-----------------+------------------+------+
only showing top 3 rows



2022-01-28 23:05:07,562 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , name, city, price, number of reviews, number of cuisines, rating
 Schema: _c0, name, city, price, number of reviews, number of cuisines, rating
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/jin/smote_data.csv


In [12]:
#Load increased size data
load_start = time.process_time_ns()
df = spark.read.csv('final_increase.csv', inferSchema=True, header=True)
load_end = time.process_time_ns() - load_start
df.show(3)

                                                                                

+---+-----+----+-----+-----------------+------------------+------+
|_c0| name|city|price|number of reviews|number of cuisines|rating|
+---+-----+----+-----+-----------------+------------------+------+
|  0|63434|   0|  1.0|            136.0|               3.0|   2.0|
|  1|26477|   0|  2.0|            812.0|               4.0|   2.0|
|  2|52837|   0|  2.0|            567.0|               6.0|   2.0|
+---+-----+----+-----+-----------------+------------------+------+
only showing top 3 rows



2022-01-16 21:44:02,517 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , name, city, price, number of reviews, number of cuisines, rating
 Schema: _c0, name, city, price, number of reviews, number of cuisines, rating
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/user/jin/final_increase.csv


In [5]:
print("Time elapsed to load larger dataset: %d ns" % (load_end)) 

Time elapsed to load larger dataset: 15625000 ns


In [6]:
#View details of the dataframe
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- name: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- number of reviews: double (nullable = true)
 |-- number of cuisines: double (nullable = true)
 |-- rating: double (nullable = true)



In [7]:
#Drop index and name columns
df = df.drop('_c0')
#df = df.drop('Unnamed: 0')
#df = df.drop('Unnamed: 0.1')
df = df.drop('name')

In [8]:
#Check frequency of target classes
df.groupby('rating').count().toPandas()

                                                                                

Unnamed: 0,rating,count
0,0.0,91993
1,1.0,91993
2,2.0,91993


In [15]:
#Statistical Information of the interval variables
df.select('number of reviews', 'number of cuisines').describe().toPandas()

                                                                                

Unnamed: 0,summary,number of reviews,number of cuisines
0,count,30275979.0,30275979.0
1,mean,458.7242735857035,3.103375919620452
2,stddev,611.6318743593515,1.6788293527402762
3,min,2.0,1.0
4,max,16478.0,21.0


In [16]:
#Check for missing value
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()



+----+-----+-----------------+------------------+------+
|city|price|number of reviews|number of cuisines|rating|
+----+-----+-----------------+------------------+------+
|   0|    0|                0|                 0|     0|
+----+-----+-----------------+------------------+------+



                                                                                

In [17]:
#Check for data skewness
df.select(skewness(df['number of reviews']), skewness(df['number of cuisines'])).show()



+---------------------------+----------------------------+
|skewness(number of reviews)|skewness(number of cuisines)|
+---------------------------+----------------------------+
|         12.136914635728585|           2.876650953501786|
+---------------------------+----------------------------+



                                                                                

In [9]:
#log transformation on skewed features
df = df.withColumn('log_reviews', log(df['number of reviews']))
df = df.withColumn('log_cuisines', log(df['number of cuisines']))

In [10]:
# Assemble and scale all the features with VectorAssembler
required_features = ['city',
                    'price',
                    'log_reviews',
                    'log_cuisines'
                   ]

assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(df)

scaler = MinMaxScaler(inputCol='features', outputCol='scaledFeatures')
scalerModel = scaler.fit(transformed_data.select('features'))
scaled_data = scalerModel.transform(transformed_data)
scaled_data.show(3)

                                                                                

+----+-----+-----------------+------------------+------+-----------------+------------------+--------------------+--------------------+
|city|price|number of reviews|number of cuisines|rating|      log_reviews|      log_cuisines|            features|      scaledFeatures|
+----+-----+-----------------+------------------+------+-----------------+------------------+--------------------+--------------------+
|   0|  1.0|            136.0|               3.0|   2.0|4.912654885736052|1.0986122886681096|[0.0,1.0,4.912654...|[0.0,0.5,0.467969...|
|   0|  2.0|            812.0|               4.0|   2.0|6.699500340161678|1.3862943611198906|[0.0,2.0,6.699500...|[0.0,1.0,0.666141...|
|   0|  2.0|            567.0|               6.0|   2.0|6.340359303727752| 1.791759469228055|[0.0,2.0,6.340359...|[0.0,1.0,0.626310...|
+----+-----+-----------------+------------------+------+-----------------+------------------+--------------------+--------------------+
only showing top 3 rows



In [11]:
#Split into training data and testing data
train_data, test_data = scaled_data.randomSplit([0.7, 0.3], seed=2020) 
print ("Training data rows:", train_data.count(), "; Testing data rows:", test_data.count())



Training data rows: 193213 ; Testing data rows: 82766


                                                                                

In [12]:
#Model Training and Testing
lr = LogisticRegression(featuresCol = 'scaledFeatures', labelCol = 'rating')
model_start = time.process_time_ns()
lrModel = lr.fit(train_data)
lr_predictions = lrModel.transform(test_data)
model_end = time.process_time_ns() - model_start
print("Time elapsed to model data %d ns " % (model_end)) 

2022-01-28 23:06:49,147 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
2022-01-28 23:06:49,168 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

Time elapsed to model data 78125000 ns 


In [13]:
#Model Evaluation
accuracy = MulticlassClassificationEvaluator(labelCol = 'rating', metricName = 'accuracy')
print('Logistic Regression Model Accuracy:', np.round_(accuracy.evaluate(lr_predictions), 2))



Logistic Regression Model Accuracy: 0.41


                                                                                

In [16]:
rf = RandomForestClassifier(labelCol='rating', featuresCol='scaledFeatures')
model_start = time.process_time_ns()
model = rf.fit(train_data)
rf_predictions = model.transform(test_data)
model_end = time.process_time_ns() - model_start
print("Time elapsed to model data %d ns" % (model_end)) 

                                                                                

Time elapsed to model data 171875000 ns


In [17]:
accuracy = MulticlassClassificationEvaluator(labelCol = 'rating', metricName = 'accuracy')
print('Random Forest Model Accuracy:', np.round_(accuracy.evaluate(rf_predictions), 2))



Random Forest Model Accuracy: 0.55


                                                                                