In [3]:
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import VectorAssembler
from wordcloud import WordCloud
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col
from sklearn.model_selection import train_test_split
from pyspark.sql.functions import ntile
from pyspark.sql.window import Window

In [4]:
# map function to convert the data into tuples

class NB: 
    def __init__(self):
        self.frequencyTable = None

    def mapFunction(self, row):

        # Extract the features and target from the row 
        features = row[:-1]
        target = row[-1]

        # Create a list of dictionary to save the feature index, value, target, and add 1 to each value to use  it in reduce step
        map = []
        print (features , target , "\n")

        # Loop through each feature index and value
        for i, x in enumerate(features):
            # Append a tuple of feature index, value, target, and count to the list
            map.append(((i, x, target), 1))

        # Return the list of dictionary
        return map

    def reduceFunction(self, x, y):
        # Return the sum of x and y
        return x + y
    



    def predict(self, row, frequencyTablebroadCast, test=False):
        freqTable = frequencyTablebroadCast.value
        # Extract the features and target from the line
        features = row[:-1]
        if test:
            target = int(row[-1])

        # Posterior probabilities for the target classes
        posteriors = []

        # Loop through each class label that is 0 or 1 in our dataset
        for label in [0, 1]:
            # Initialize the posterior probability as the prior probability of the class label with Laplace smoothing
            posterior = (freqTable[('total', label)] + 1) / (freqTable[('total', 'train')] + 2)

            # Loop through each feature index and value
            for i, feat in enumerate(features):
                feat = float(feat)
                try:
                    # Update the posterior probability with the likelihood of the feature value given the class label with Laplace smoothing
                    posterior *= (freqTable[(i, feat, label)] + 1) / (freqTable[('total', label)] + len(freqTable)) # likelihood * prior
                except:
                    # If the feature value given the class label is not found in the frequency table, set the likelihood to 0
                    posterior *= (0 + 1) / (freqTable[('total', label)] + len(freqTable)) # likelihood * prior 

            # Save the posterior probability for the class label to the list
            posteriors.append(posterior)

        # Get the predicted class label by finding the class label with the largest posterior probability
        prediction = np.argmax(posteriors)

        if test:
            # finally return the tuple of target and prediction
            return (float(target), float(prediction))
        else:
            # finally return the prediction
            return (float(prediction))
    
    

In [5]:
model = NB()

In [6]:


df = pd.read_csv('../data/zomato.csv',low_memory=False)
df.rename({'approx_cost(for two people)': 'approx_cost_2_people',
           'listed_in(type)':'listed_in_type',
           'listed_in(city)':'listed_in_city'
          }, axis=1, inplace=True)

df = df.loc[df.rate !='NEW']
df = df.loc[df.rate !='-'].reset_index(drop=True)

new_format = lambda x: x.replace('/5', '') if type(x) == np.str else x
df.rate = df.rate.apply(new_format).str.strip().astype('float')
def label_encode(df):
    for col in df.columns[~df.columns.isin(['rate', 'approx_cost_2_people', 'votes'])]:
        df[col] = df[col].factorize()[0]
    return df

df_encoded = label_encode(df.copy())

df_encoded = df_encoded.dropna()


def set_rating_flag(row):
    if row['rate'] > 3.9:
        return 1
    else:
        return 0

df_encoded_out = df_encoded.copy()
# Add a new column based on the function
df_encoded_out['output'] = df_encoded.apply(lambda row: set_rating_flag(row), axis=1)

spark = SparkSession.builder \
    .appName("NB") \
    .config("spark.driver.memory", "15g") \
    .config("spark.executor.memory", "15g") \
    .getOrCreate()
sc=spark.sparkContext

spark_df = spark.createDataFrame(df_encoded_out)

dff_train , dff_test=spark_df.randomSplit([0.7, 0.3], seed=12345)

# Calculate the quartiles for the "votes" column
votes_window = Window.orderBy("votes")
dff_train = dff_train.withColumn("votes_quartile", ntile(3).over(votes_window))
dff_test = dff_test.withColumn("votes_quartile", ntile(3).over(votes_window))

# Calculate the quartiles for the "approx_cost_2_people" column
cost_window = Window.orderBy("approx_cost_2_people")
dff_train = dff_train.withColumn("cost_quartile", ntile(3).over(cost_window))
dff_test = dff_test.withColumn("cost_quartile", ntile(3).over(cost_window))

# Show the resulting DataFrame with the quartile columns
dff_train.drop("votes", "approx_cost_2_people")
dff_test.drop("votes", "approx_cost_2_people")

# drop votes and approx_cost_2_people columns from dff_train and dff_test
dff_train = dff_train.drop('votes', 'approx_cost_2_people')
dff_test = dff_test.drop('votes', 'approx_cost_2_people')


dff_train = dff_train.rdd
dff_test = dff_test.rdd
# Apply the map function to the data RDD
mappedRdd = dff_train.flatMap(model.mapFunction)

# Sum the counts for each tuple
reducedRdd = mappedRdd.reduceByKey(model.reduceFunction) 

# Collect the frequency table as a dictionary
model.frequencyTable = reducedRdd.collectAsMap()
# Naive Bayes with laplace smothing

# Add a ('total , 'train) key-value pair to save the total n_records to the frequency_table dictionary
model.frequencyTable[('total', 'train')] = dff_train.count()

# Add another  ('total , 0) and ('total , 1) key-value pair to save the total n_records with class label 0 and 1 to the frequency_table dictionary
model.frequencyTable[('total', 0)] = dff_train.filter(lambda row: row.output == 0).count() # this is the number of records with class output 0
model.frequencyTable[('total', 1)] = dff_train.filter(lambda row: row.output == 1).count() # this is the number of records with class output 1

# Broadcast the frequency_table dictionary to the workers as a read-only variable (clusters of data)
bcast = spark.sparkContext.broadcast(model.frequencyTable) 

# By using test data, predict the class label for each record in the test RDD
predictions = dff_test.map(lambda row: model.predict(row, bcast , True))

# MulticlassClassificationEvaluator expects the true and predicted labels in a DataFrame to 
evaluator = MulticlassClassificationEvaluator(labelCol="trueLabel", predictionCol="predictedLabel", metricName="accuracy")

# Define the schema of the predictions DataFrame
schema = StructType([
  StructField("trueLabel", DoubleType(), True),
  StructField("predictedLabel", DoubleType(), True)
])
# Create a DataFrame from the predictions RDD
predictions = predictions.toDF(schema=schema)
accuracy = evaluator.evaluate(predictions)
print(f"Model with map-reduce accuracy = {accuracy}")

23/05/16 01:25:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/16 01:25:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/16 01:25:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/16 01:25:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/16 01:25:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/16 01:25:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/16 0

Model with map-reduce accuracy = 0.3333333333333333
