
***
# Classification - ML Section

***


## Importing Packages 

In [11]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 53 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 70.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=587ebf0d1d43b6dfb4ae3d14f3dc1a056c8bc316dcdb6f07cf9a204d748530f6
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [23]:
#import libary
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

import pyspark
from pyspark.sql import types
from pyspark.sql.functions import col, isnan, when, count, explode, array, lit


from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

warnings.filterwarnings('ignore')

In [33]:
pandadf = pd.read_csv('cleaning_data 2.csv')

## connect to  spark 

In [24]:
from pyspark.sql import SparkSession


In [28]:
spark = pyspark.sql.SparkSession.builder.config("spark.executor.memory", "16g").config("spark.driver.memory", "16g").getOrCreate()

In [34]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


In [38]:
df = spark.createDataFrame(pandadf)


## Loading Dataset 

In [39]:
#SHOW DATA
df

DataFrame[restaurant_name: string, country: string, region: string, city: string, latitude: double, longitude: double, claimed: string, popularity_detailed: string, popularity_generic: string, cuisines: string, vegetarian_friendly: bigint, vegan_options: bigint, gluten_free: bigint, open_days_per_week: bigint, open_hours_per_week: double, working_shifts_per_week: bigint, avg_rating: double, total_reviews_count: bigint, default_language: string, reviews_count_in_default_language: bigint, excellent: double, very_good: double, average: double, poor: double, terrible: double, food: double, service: double, value: double, atmosphere: double, price_range: string]

In [42]:
print('Shape of the dataset: ',(df.count(), len(df.columns)))

Shape of the dataset:  (749544, 30)


## drop non values

In [43]:
#drop all null valures
data=df.dropna()


***
# ML Classification Section 

***

## Feature Engineering and Feature Selection

In [46]:
data.printSchema()#show 

root
 |-- restaurant_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- claimed: string (nullable = true)
 |-- popularity_detailed: string (nullable = true)
 |-- popularity_generic: string (nullable = true)
 |-- cuisines: string (nullable = true)
 |-- vegetarian_friendly: long (nullable = true)
 |-- vegan_options: long (nullable = true)
 |-- gluten_free: long (nullable = true)
 |-- open_days_per_week: long (nullable = true)
 |-- open_hours_per_week: double (nullable = true)
 |-- working_shifts_per_week: long (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_reviews_count: long (nullable = true)
 |-- default_language: string (nullable = true)
 |-- reviews_count_in_default_language: long (nullable = true)
 |-- excellent: double (nullable = true)
 |-- very_good: double (nullable = true)
 |-- 

In [47]:
from pyspark.sql.types import FloatType 


spark = data.replace( [1.0,1.5,2.0,2.5] , 0, subset=["avg_rating"]) #replace avg_rating to 0




In [48]:
spark = spark.replace([3,3.5,4.0] , 1, subset=["avg_rating"]) #replace avg_rating to 1


In [49]:
spark = spark.replace([4.5,5.0] , 2, subset=["avg_rating"]) #replace avg_rating 3,4 to 2


In [50]:
from pyspark.sql.types import IntegerType #to change type to integer


In [51]:
spark = spark.withColumn("vegetarian_friendly", spark["vegetarian_friendly"].cast(IntegerType())) #convert vegetarian_friendly type to  int type


In [52]:
spark = spark.withColumn("avg_rating", spark["avg_rating"].cast(IntegerType())) #convert avg_rating type to  int type


In [53]:
spark = spark.withColumn("vegan_options", spark["vegan_options"].cast(IntegerType())) #convert vegan_options type to  int type


In [54]:
spark = spark.withColumn("reviews_count_in_default_language", spark["reviews_count_in_default_language"].cast(IntegerType())) #convert reviews_count_in_default_language type to  int type


In [55]:
spark.groupBy("avg_rating").count().show()#check 


+----------+------+
|avg_rating| count|
+----------+------+
|         1|408493|
|         2|313881|
|         0| 27150|
+----------+------+



convert unbalance data to balance data

In [56]:
major_df = spark.filter(col("avg_rating") == 2)
minor_df = spark.filter(col("avg_rating") == 0)
major2_df = spark.filter(col("avg_rating") == 1)



#we will calculate the ratio to determine the difference between the number of avg_rating 1 and avg_rating 0 transactions.
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 11


In [57]:
#create a range on this ratio and store it in variable a
a = range(ratio)

#duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

In [58]:
oversampled_df.count()

298650

In [59]:
combined = major_df.unionAll(oversampled_df)

combined_df = combined.unionAll(major2_df)


In [60]:
combined_df.printSchema()

root
 |-- restaurant_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- claimed: string (nullable = true)
 |-- popularity_detailed: string (nullable = true)
 |-- popularity_generic: string (nullable = true)
 |-- cuisines: string (nullable = true)
 |-- vegetarian_friendly: integer (nullable = true)
 |-- vegan_options: integer (nullable = true)
 |-- gluten_free: long (nullable = true)
 |-- open_days_per_week: long (nullable = true)
 |-- open_hours_per_week: double (nullable = true)
 |-- working_shifts_per_week: long (nullable = true)
 |-- avg_rating: integer (nullable = true)
 |-- total_reviews_count: long (nullable = true)
 |-- default_language: string (nullable = true)
 |-- reviews_count_in_default_language: integer (nullable = true)
 |-- excellent: double (nullable = true)
 |-- very_good: double (nullable = t

In [61]:
combined_df.groupBy("avg_rating").count().show()#check


+----------+------+
|avg_rating| count|
+----------+------+
|         2|313881|
|         0|298650|
|         1|408493|
+----------+------+



In [62]:
#This step will label encode all the categorical columns and store them in different columns with the same name + '_idx', 
#so category will become category_idx 
cat_cols = ['price_range','cuisines','claimed'] #cuisines and price_range

#StringIndexer() is equivalent to LabelEncoder()
for c in cat_cols: 
    indexer = StringIndexer(inputCol=c, outputCol=c+'_idx') #we pass the columns from the list as input one by one
    combined_df = indexer.fit(combined_df).transform(combined_df) #here we fit and transform the data altogether
    
final_df2 = combined_df.drop(*cat_cols) #we will drop all the categorical columns we defined earlier

## Split Data

In [63]:
#drop unneed columns

final_df=final_df2.drop('service','value','average','reviews_count_in_default_language','price_range', 'atmosphere','default_language','working_shifts_per_week','open_hours_per_week','open_days_per_week','popularity_generic','popularity_detailed','longitude','latitude','region','city','country','restaurant_name')

cols = final_df.columns #extract the column names from the dataframe
cols.remove('avg_rating') #remove stars -> we need this to be our label

#vector assembler will take all the columns and convert them into one column called features
assembler = VectorAssembler(inputCols=cols, outputCol='features')

#the .transform will apply the changes here
final_df = assembler.transform(final_df)

In [64]:
#we can now see that features column will appear within the dataframe
final_df

DataFrame[vegetarian_friendly: int, vegan_options: int, gluten_free: bigint, avg_rating: int, total_reviews_count: bigint, excellent: double, very_good: double, poor: double, terrible: double, food: double, price_range_idx: double, cuisines_idx: double, claimed_idx: double, features: vector]


Lets Split

80% in training set and 20% is testing set.

In [66]:
# We will now create a new dataframe only consisting of the features column and the label column (actually stars column but renamed)
df_data = final_df.select(col('features'), col('avg_rating').alias('label'))

#simple data splitting
df_train, df_test = df_data.randomSplit([0.8, 0.2])

## Train Models



In [67]:
# Import some classifiers and multiclass evaluator
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### 1st Classification Model 

In [68]:
# Decision Tree
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features" ,maxBins=800000)
model_dt = dt.fit(df_train)

### 2nd Classification Model  

In [78]:
# Random Forest
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=8,maxDepth=5 ,maxBins=800000)
model_rf = rf.fit(df_train)

### 3rd Classification Model  

In [76]:
# Logistic Regression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol="label", featuresCol="features")
model_lr = lr.fit(df_train)

### 4 Classification Model  

In [77]:
#NaiveBayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, 
                modelType="gaussian", 
                featuresCol='features', labelCol='label')
model_niv = nb.fit(df_train)

## Model Evaluation



### 1st Model  Evaluation

In [72]:
#  pyspark has .transform
pred_dt = model_dt.transform(df_test)


## report  and matrix


In [73]:
import sklearn 
from pyspark.ml.classification import RandomForestClassifier

y_true = pred_dt.select(['label']).collect()
y_pred = pred_dt.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.80      0.90      0.85     59778
           1       0.81      0.65      0.72     81848
           2       0.76      0.86      0.81     62913

    accuracy                           0.79    204539
   macro avg       0.79      0.80      0.79    204539
weighted avg       0.79      0.79      0.78    204539



In [74]:
#matrix
from sklearn import metrics
matrix=metrics.confusion_matrix(y_true,y_pred)
print(matrix)

[[53864  5000   914]
 [12334 53164 16350]
 [ 1204  7394 54315]]


### 2nd Model  Evaluation

In [79]:
#  pyspark has .transform
pred_rf = model_rf.transform(df_test)


## report  and matrix


In [80]:

y_true3 = pred_rf.select(['label']).collect()
y_pred3 = pred_rf.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true3, y_pred3))

              precision    recall  f1-score   support

           0       0.86      0.87      0.86     59778
           1       0.79      0.71      0.75     81848
           2       0.77      0.86      0.81     62913

    accuracy                           0.80    204539
   macro avg       0.80      0.81      0.81    204539
weighted avg       0.80      0.80      0.80    204539



In [81]:
from sklearn import metrics
matrix=metrics.confusion_matrix(y_true3,y_pred3)
print(matrix)

[[51831  7188   759]
 [ 8587 58076 15185]
 [  198  8586 54129]]


### 3rd Model  Evaluation

In [82]:
#  pyspark has .transform
pred_lr = model_lr.transform(df_test)


## report  and matrix


In [83]:
y_true4 = pred_lr.select(['label']).collect()
y_pred4 = pred_lr.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true4, y_pred4))

              precision    recall  f1-score   support

           0       0.98      0.00      0.01     59778
           1       0.40      1.00      0.57     81848
           2       0.00      0.00      0.00     62913

    accuracy                           0.40    204539
   macro avg       0.46      0.33      0.19    204539
weighted avg       0.45      0.40      0.23    204539



In [84]:
from sklearn import metrics
matrix=metrics.confusion_matrix(y_true4,y_pred4)
print(matrix)

[[  162 59616     0]
 [    3 81845     0]
 [    0 62913     0]]


### 4th Model  Evaluation


## 5th Model Evaluation

In [85]:
pred_nb = model_niv.transform(df_test)

## report  and matrix


In [86]:
y_true6 = pred_nb.select(['label']).collect()
y_pred6 = pred_nb.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true6, y_pred6))

              precision    recall  f1-score   support

           0       0.40      0.96      0.56     59778
           1       0.82      0.15      0.26     81848
           2       0.64      0.45      0.53     62913

    accuracy                           0.48    204539
   macro avg       0.62      0.52      0.45    204539
weighted avg       0.64      0.48      0.43    204539



In [87]:
from sklearn import metrics
matrix=metrics.confusion_matrix(y_true6,y_pred6)
print(matrix)

[[57468  1547   763]
 [54253 12473 15122]
 [33656  1179 28078]]


## Show ML Evaluation as Dataframe


In [88]:
# Accuracy Metric
evaluator_A = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# F1 Metric
evaluator_F = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Weighted Precision
evaluator_P = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

# Weighted Recall
evaluator_R = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

# Our models
models = [pred_dt, pred_rf, pred_lr,pred_nb]

# Empty lists that will store the scores for each metric for each model.
accuracy = []
F1 = []
precision = []
recall = []

# Simple loop to populate the empty lists with scores of models for each metric.
for model in models:
    accuracy.append(evaluator_A.evaluate(model))
    F1.append(evaluator_F.evaluate(model))
    precision.append(evaluator_P.evaluate(model))
    recall.append(evaluator_R.evaluate(model))

In [89]:
# We will convert all lists created above into a dataframe for easy viewing.
df_ev = pd.DataFrame(list(zip(accuracy, F1, precision, recall)), 
                     columns = ['Accuracy', 'F1-Score', 'Weighted Precision', 'Weighted Recall'],
                     index = ['Decision Tree', 'Random Forest', 'Logistic Regression','nive'])

In [90]:
df_ev

Unnamed: 0,Accuracy,F1-Score,Weighted Precision,Weighted Recall
Decision Tree,0.788813,0.78464,0.791462,0.788813
Random Forest,0.801979,0.800552,0.802186,0.801979
Logistic Regression,0.400936,0.23043,0.447194,0.400936
nive,0.479219,0.42821,0.640365,0.479219


the best model is random tree with 0.80 Accuracy , and the worest model is Logistic Regression	 with 0.40 


### select model

Random Forst is the best model with accurcy 0.80 and 


# ML Pipeline for Best Model

In [91]:
from pyspark.ml import Pipeline


In [92]:
pipeline = Pipeline(stages=[rf])


In [93]:

# Fit the pipeline to training documents.
model = pipeline.fit(df_train)

In [94]:
#lets try to get the accuracy of our model on the testing set
evaluator_A.evaluate(model.transform(df_test))

0.8019790846733386