## Importing necessary libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col,isnan, when, count

In [2]:
spark = SparkSession.builder.getOrCreate()

## Reading the dataframe

In [3]:
df = spark.read.options(header = "true", inferSchema = "true", nullValue = "NA").csv("bookings.csv")

In [4]:
df.printSchema()

root
 |-- Booking_ID: string (nullable = true)
 |-- no_of_adults: integer (nullable = true)
 |-- no_of_children: integer (nullable = true)
 |-- no_of_weekend_nights: integer (nullable = true)
 |-- no_of_week_nights: integer (nullable = true)
 |-- type_of_meal_plan: string (nullable = true)
 |-- required_car_parking_space: integer (nullable = true)
 |-- room_type_reserved: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_date: integer (nullable = true)
 |-- market_segment_type: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- no_of_previous_cancellations: integer (nullable = true)
 |-- no_of_previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_price_per_room: double (nullable = true)
 |-- no_of_special_requests: integer (nullable = true)
 |-- booking_status: string (nullable = true)



## Dropping columns that are not required

In [5]:
df = df.drop('Booking_ID','arrival_year')

In [6]:
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()
#There are no null values in this dataset to drop

+------------+--------------+--------------------+-----------------+-----------------+--------------------------+------------------+---------+-------------+------------+-------------------+--------------+----------------------------+------------------------------------+------------------+----------------------+--------------+
|no_of_adults|no_of_children|no_of_weekend_nights|no_of_week_nights|type_of_meal_plan|required_car_parking_space|room_type_reserved|lead_time|arrival_month|arrival_date|market_segment_type|repeated_guest|no_of_previous_cancellations|no_of_previous_bookings_not_canceled|avg_price_per_room|no_of_special_requests|booking_status|
+------------+--------------+--------------------+-----------------+-----------------+--------------------------+------------------+---------+-------------+------------+-------------------+--------------+----------------------------+------------------------------------+------------------+----------------------+--------------+
|           0|  

In [7]:
#Sperating columns into numerical and categorical columns for 
num_cols = [colname for colname, dtype in df.dtypes if dtype in ('int', 'double')]
categorical_cols = [colname for colname, dtype in df.dtypes if dtype == 'string']
target_col = ['booking_status']

## Splitting the dataset

In [8]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=12345)

## Preprocessing the categorical columns

In [9]:
indexOutput = [x + '_idx' for x in categorical_cols]
stringIndexer = StringIndexer(inputCols=categorical_cols, outputCols=indexOutput)

oheOutput = [x + '_ohe' for x in categorical_cols]
ohe = OneHotEncoder(inputCols=indexOutput, outputCols=oheOutput)

targetIndexer = StringIndexer(inputCol='booking_status', outputCol='label')

## Building ML model - Decision tree classifier

In [10]:
#vector assembler
assemblerInput = oheOutput + num_cols
vecAssembler = VectorAssembler(inputCols=assemblerInput, outputCol='features')

In [11]:
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [12]:
#Creating pipeline
pipeline = Pipeline(stages=[stringIndexer, ohe, targetIndexer, vecAssembler, dtc])

In [13]:
binaryEval = BinaryClassificationEvaluator()

In [14]:
#Assigning parameter for hypterparameter tuning
paramGrid = (ParamGridBuilder()
             .addGrid(dtc.impurity, ['gini', 'entropy'])
             .addGrid(dtc.maxBins, [5, 10, 15])
             .addGrid(dtc.minInfoGain, [0.0, 0.2, 0.4])
             .addGrid(dtc.maxDepth, [3, 5, 7])
             .build()
            )

In [15]:
# cross validate with 3 folds
cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=binaryEval, 
                    numFolds=4, 
                    parallelism=4)
# fit the model
cvModel = cv.fit(train_df)
# get best model 
bestModel = cvModel.bestModel

In [16]:
bestModel

PipelineModel_77be8c6863c7

In [17]:
# getting best model parameters
bestImpurity = bestModel.stages[-1]._java_obj.getImpurity()
bestMaxBins = bestModel.stages[-1]._java_obj.getMaxBins()
bestMinInfoGain = bestModel.stages[-1]._java_obj.getMinInfoGain()
bestMaxDepth = bestModel.stages[-1]._java_obj.getMaxDepth()
# print best parameters
print(f'Best parameters:')
print(f'—— Impurity: \t{bestImpurity}')
print(f'—— MaxBins: \t{bestMaxBins}')
print(f'—— MinInfoGain: {bestMinInfoGain}')
print(f'—— MaxDepth: \t{bestMaxDepth}')

Best parameters:
—— Impurity: 	gini
—— MaxBins: 	5
—— MinInfoGain: 0.0
—— MaxDepth: 	3


In [18]:
# make predictions with CV model - TEST data
pred_cancellations_test = cvModel.transform(test_df)

# predictions - TRAIN data
pred_cancellations_train = cvModel.transform(train_df)

In [19]:
binaryEval.getMetricName()

'areaUnderROC'

## Model Evaluation

In [20]:
binaryEval.setRawPredictionCol("rawPrediction")
trainAUROC = binaryEval.evaluate(pred_cancellations_train)
testAUROC = binaryEval.evaluate(pred_cancellations_test)
print(f'Training auROC: {trainAUROC}\nTest auROC: {testAUROC}')

Training auROC: 1.0
Test auROC: 1.0


In [21]:
binaryEval.setMetricName('areaUnderPR')

BinaryClassificationEvaluator_68c854277bc4

In [22]:
trainAUPR = binaryEval.evaluate(pred_cancellations_train)
testAUPR = binaryEval.evaluate(pred_cancellations_test)
print(f'Training auPR: {trainAUPR}\nTest auPR: {testAUPR}')

Training auPR: 1.0
Test auPR: 1.0


References: https://github.com/enkeboll/data603-sp22/blob/main/homework/hw07-sparkml.ipynb

#Was searching for references for pyspark ml online, but never found any that I could understand, ended up using the HW07 answer as a reference