In [107]:
import numpy as np
import pandas as pd
import os
from pyspark.sql import SparkSession

In [108]:
# spark_home = "D:\Recovered Folders\CUFE\Spring 2024\Big Data\Spark"
spark_home = "D:/Programs/Spark/spark_unzipped"
os.environ["SPARK_HOME"] = spark_home

# Add Spark bin and executors to PATH
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "bin")
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "sbin")

# Add Spark Python libraries to PYTHONPATH
os.environ["PYTHONPATH"] = os.path.join(spark_home, "python") + os.pathsep + os.environ.get("PYTHONPATH", "")
os.environ["PYTHONPATH"] += os.pathsep + os.path.join(spark_home, "python", "lib")

# Add PySpark to the system path
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "python", "lib", "pyspark.zip")
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "python", "lib", "py4j-0.10.9-src.zip")
os.environ['PYSPARK_PYTHON'] = 'python'

In [109]:
spark = SparkSession.builder.appName("HotelBooking").getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [110]:
# Read the data from the CSV file
df = spark.read.option("header", True).option("inferSchema", True).csv('hotel_bookings_train.csv')
df_test = spark.read.option("header", True).option("inferSchema", True).csv('hotel_bookings_test.csv')

In [111]:
y_train = df.select("is_canceled")
y_train

DataFrame[is_canceled: double]

In [112]:
x_data = df.drop('is_canceled')
x_data

DataFrame[deposit_type: double, lead_time: double, country: double, total_of_special_requests: double, booking_changes: double, assigned_room_type: double, distribution_channel: double, hotel: double, adults: double, customer_type: double, reserved_room_type: double, market_segment: double, adr: double, stays_in_week_nights: double]

## Naive Bayes Classifier

In [113]:
label_column = "is_canceled"
features_columns = df.columns
print(features_columns)
print(len(features_columns))

['deposit_type', 'lead_time', 'country', 'total_of_special_requests', 'booking_changes', 'assigned_room_type', 'distribution_channel', 'hotel', 'adults', 'customer_type', 'reserved_room_type', 'market_segment', 'adr', 'stays_in_week_nights', 'is_canceled']
15


#### Prior Probabilities

In [114]:
# Output: [(c0, 1), (c1, 1), (c0, 1), (c0, 1), (c1, 1)]
def mapper_prior(rows):
    result = []
    for row in rows:
        result.append((row[label_column], 1))
    return result

# Output: [(c0, 3), (c1, 2)]
def reducer_prior(row):
    key, values = row
    return [(key, sum(values))]

In [115]:
# Select the first 100 rows
# df_selected = df.take(100)
# df_selected = sc.parallelize(df_selected)

In [116]:
prior = df.rdd.mapPartitions(mapper_prior).groupByKey().flatMap(reducer_prior)
# prior = df_selected.mapPartitions(mapper_prior).groupByKey().flatMap(reducer_prior)

In [117]:
# Calculate the total count per class
total_count = prior.map(lambda x: x[1]).sum()

In [118]:
# Calculate prior probabilities
prior_probabilities = prior.map(lambda x: (x[0], x[1]/total_count)).collectAsMap()

print("Prior probability:", prior_probabilities)

Prior probability: {0.0: 0.629236195188714, 1.0: 0.370763804811286}


#### Likelihood Probabilities

In [119]:
# dict = { 
#     c0: { 
#             f1: { v1: 0, v2: 0}, 
#             f2: { v1: 0, v2: 0}
#         },
#     c1: { 
#             f1: { v1: 0, v2: 0}, 
#             f2: { v1: 0, v2: 0}
#         }
# }

def mapper_likelihood(split):
    mapper_result = []
    mapper_dict = {}

    for row in split:
        label = row[label_column]
        if label not in mapper_dict:
            mapper_dict[label] = {}

        for feature in features_columns:
            value = row[feature]
            if feature not in mapper_dict[label]:
                mapper_dict[label][feature] = {}

            if value not in mapper_dict[label][feature]:
                mapper_dict[label][feature][value] = 0

            mapper_dict[label][feature][value] += 1

    for label in mapper_dict:
        for feature in mapper_dict[label]:
            for value in mapper_dict[label][feature]:
                mapper_result.append((label, (feature, value, mapper_dict[label][feature][value])))
    return mapper_result

# Output:
# [<c0, (f1, v1, 5)>, <c0, (f1, v2, 10)>]
# [<c1, (f1, v1, 2)>, <c1, (f1, v2, 3)>]
def reducer1(row):
    key, tups = row
    reducer1_dict = {}
    reducer1_result = []

    for tup in tups:
        feature, value, count = tup
        if feature not in reducer1_dict:
            reducer1_dict[feature] = {}

        if value not in reducer1_dict[feature]:
            reducer1_dict[feature][value] = 0

        reducer1_dict[feature][value] += count

    for feature in reducer1_dict:
        for value in reducer1_dict[feature]:
            reducer1_result.append((key, (feature, value, reducer1_dict[feature][value])))

    return reducer1_result

# Output:
# [<c0, (f1, v1, 10, 15)>, <c0, (f1, v2, 5, 15)>]
# [<c1, (f1, v1, 2, 5)>, <c1, (f1, v2, 3, 5)>]
def reducer2(row):
    label, tups = row
    reducer2_dict = {}
    reducer2_result = []

    for tup in tups:
        feature, value, count = tup
        if feature not in reducer2_dict:
            reducer2_dict[feature] = 0

        reducer2_dict[feature] += count

    for tup in tups:
        feature, value, count = tup
        reducer2_result.append((label, (feature, value, count, reducer2_dict[feature])))

    return reducer2_result

# Output: 
# [<c0, (f1, v1, 10/15)>, <c0, (f1, v2, 5/15)>]
# [<c1, (f1, v1, 2/5)>, <c1, (f1, v2, 3/5)>]
def reducer3(row):
    label, tups = row
    reducer3_result = []

    for tup in tups:
        feature, value, count, total = tup
        reducer3_result.append((label, (feature, value, count / total)))

    return reducer3_result

In [120]:
# likelihood = df_selected.mapPartitions(mapper_likelihood).groupByKey().flatMap(reducer1).groupByKey().flatMap(reducer2).groupByKey().flatMap(reducer3).collect()
likelihood = df.rdd.mapPartitions(mapper_likelihood).groupByKey().flatMap(reducer1).groupByKey().flatMap(reducer2).groupByKey().flatMap(reducer3).collect()
print( "Posterior probability:", likelihood)

Posterior probability: [(0.0, ('deposit_type', 0.0, 0.9970720117788783)), (0.0, ('deposit_type', 1.0, 0.001288314817293535)), (0.0, ('deposit_type', 2.0, 0.0016396734038281356)), (0.0, ('lead_time', 8.0, 0.0832050595636461)), (0.0, ('lead_time', 0.0, 0.14256792932673001)), (0.0, ('lead_time', 6.0, 0.08999799223664838)), (0.0, ('lead_time', 2.0, 0.11322112167045911)), (0.0, ('lead_time', 1.0, 0.13535671262213894)), (0.0, ('lead_time', 4.0, 0.09966871904698166)), (0.0, ('lead_time', 3.0, 0.10264690135189398)), (0.0, ('lead_time', 5.0, 0.0935785035470486)), (0.0, ('lead_time', 7.0, 0.0863338241199304)), (0.0, ('lead_time', 9.0, 0.05342323651452282)), (0.0, ('country', 59.0, 0.13025364743675547)), (0.0, ('country', 81.0, 0.032509034935082316)), (0.0, ('country', 169.0, 0.02131575424976576)), (0.0, ('country', 29.0, 0.017082719850087004)), (0.0, ('country', 135.0, 0.28165573551064116)), (0.0, ('country', 25.0, 0.019123945924240396)), (0.0, ('country', 56.0, 0.1135055548119395)), (0.0, ('cou

In [121]:
# Define the predict function for naive bayes
def predict(row):
    
    dict = {} # Label and probability
    for label in prior_probabilities:
        prob = prior_probabilities[label]
        for feature in features_columns:
            value = row[feature]
            for tup in likelihood:
                if tup[0] == label and tup[1][0] == feature and tup[1][1] == value:
                    prob *= tup[1][2]
                    break
        dict[label] = prob
    return max(dict, key=dict.get)
        


In [122]:
# Training Accuracy
def mapper (rows) :
    result = []
    for row in rows:
        if predict(row) == row[label_column]:
            result.append((0,1))
        else:
            result.append((0,0))
    return result

def reducer (row) :
    count = 0
    for val in row[1]:
        count += val
    return [(0, count)]

correct = df.rdd.mapPartitions(mapper).groupByKey().flatMap(reducer).collect()
total = df.count()
accuracy = correct[0][1] / total
print("Training Accuracy: ", accuracy)



Training Accuracy:  0.7757330104753382


In [123]:
correct = df_test.rdd.mapPartitions(mapper).groupByKey().flatMap(reducer).collect()
total = df_test.count()
accuracy = correct[0][1] / total
print("Testing Accuracy: ", accuracy)

Testing Accuracy:  0.7701604413189035
