## Load the csv file

In [None]:
# Finds the spark path 
import findspark
findspark.init()

import pyspark
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

spark = SparkSession.builder \
     .master("local") \
     .appName("hotels") \
     .getOrCreate()

hotels_df = spark.read.csv("../input/Hotels_data_Changed.csv", header=True)

### Get the highest discount code for features

In [None]:
from pyspark.sql import Row

def rowToKeyValue(row):
    key = (row['WeekDay'], row["Snapshot Date"], row["Checkin Date"], row["DayDiff"], row["Hotel Name"])
    val = ([row["Discount Code"]], row['DiscountPerc'])
    return (key,val)

def reduceToMaxDiscountPerKey(val1, val2):
    codes1, discount1 = val1
    codes2, discount2 = val2
    if (discount1 > discount2):
        return val1
    elif(discount2 > discount1):
        return val2
    else: # In case the discounts are equals, merge the prices to same array
        return (codes1+ codes2, discount1)

def flatMapDiscountCodes(row):
    key, val = row
    codes = val[0]
    # Return list of key & code
    return [(key, code) for code in codes]

def rddToRow(rddRow):
    return Row(WeekDay=rddRow[0][0], SnapshotDate=rddRow[0][1], CheckinDate=rddRow[0][2],\
                DayDiff=rddRow[0][3], HotelName=rddRow[0][4], DiscountCode=rddRow[1])

hotelsBestDiscountCode_df = hotels_df.rdd\
                .map(rowToKeyValue)\
                .reduceByKey(reduceToMaxDiscountPerKey)\
                .flatMap(flatMapDiscountCodes)\
                .map(rddToRow).toDF()
hotelsBestDiscountCode_df.show()

## Normalize the data

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import pandas as pd

# Transform string values to numeric
indexers = [StringIndexer(inputCol="WeekDay", outputCol="WeekDayIndex"),
            StringIndexer(inputCol="HotelName", outputCol="HotelNameIndex"),]
pipeline = Pipeline(stages=indexers)
hotelsWithIndexedStrings_df = pipeline.fit(hotelsBestDiscountCode_df).transform(hotelsBestDiscountCode_df)

# Extract date values
dateYearValue = udf(lambda x: pd.to_datetime(x).year, IntegerType())
dateDayValue = udf(lambda x: pd.to_datetime(x).day, IntegerType())
dateMonthValue = udf(lambda x: pd.to_datetime(x).month, IntegerType())

hotelsWithDateIndexed_df = hotelsWithIndexedStrings_df\
                     .withColumn('SnapshotDateYear', dateYearValue(col('SnapshotDate')))\
                     .withColumn('SnapshotDateMonth', dateMonthValue(col('SnapshotDate')))\
                     .withColumn('SnapshotDateDay', dateDayValue(col('SnapshotDate')))\
                     .withColumn('CheckinDateYear', dateYearValue(col('CheckinDate')))\
                     .withColumn('CheckinDateMonth', dateMonthValue(col('CheckinDate')))\
                     .withColumn('CheckinDateDay', dateDayValue(col('CheckinDate')))
# Convert string column to int
hotelsWithIntCoulmn_df = hotelsWithDateIndexed_df.withColumn("DayDiff",\
                                   hotelsWithDateIndexed_df["DayDiff"].cast("integer"))\
                                .withColumn('DiscountCode', hotelsWithDateIndexed_df['DiscountCode'].cast('integer'))

# Remove unneccesary columns
hotelsWithoutColumns_df = hotelsWithIntCoulmn_df.drop('SnapshotDate').drop('CheckinDate')\
                         .drop('HotelName').drop('WeekDay')
hotelsWithoutColumns_df.show()

### Distinct values

In [None]:
from pyspark.sql.functions import col, countDistinct

def transposeDF(df):
    pandas_df = df.toPandas().transpose().reset_index()
    return spark.createDataFrame(pandas_df)
    
# Show distinct values count
distinctValuesDF = hotelsWithoutColumns_df.agg(*(countDistinct(col(c)).alias(c) for c in hotelsWithoutColumns_df.columns));
transposeDF(distinctValuesDF).show()

## Columns statsitics

In [None]:
# Data exploration
describe_df = hotelsWithoutColumns_df.describe(hotelsWithoutColumns_df.columns);
transposeDF(describe_df).show()

### Column correlations

In [None]:
for column in hotelsWithoutColumns_df.columns:
    corr = hotelsWithoutColumns_df.corr('DiscountCode', column)
    print("Column %s correlation: %s" % (column, corr))


## Run decision tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler

# Create vector of all features expect the label
assembler = VectorAssembler(
    inputCols=[x for x in hotelsWithoutColumns_df.columns if x != 'DiscountCode'],
    outputCol='features')

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = hotelsWithoutColumns_df.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol='DiscountCode', featuresCol='features',\
                            impurity='entropy', maxDepth=20, maxBins=554)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

# Train model.  This also runs the indexers.
tree_model = pipeline.fit(trainingData)

## Evaluate the model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions.
predictions = tree_model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "DiscountCode", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="DiscountCode", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuaracy = %g " %  accuracy)

## Print decision tree auc

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import functions as F

# Binary classifications
for label in range(1,5):
    # Filter only relevanot to class predictions
    binaryPredictions = predictions.where("prediction == %s OR DiscountCode == %s" % (label, label))\
                .withColumn('binaryPrediction', F.when(col('prediction')==label,1.0).otherwise(0.0))\
                .withColumn('binaryDiscountCode', F.when(col('DiscountCode')==label,1.0).otherwise(0.0))
            
    binaryPredictions.select('prediction', 'DiscountCode', 'binaryPrediction', 'binaryDiscountCode').show(5)
    
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="binaryPrediction", labelCol="binaryDiscountCode")
    auc = evaluator.evaluate(binaryPredictions)
    
    print("Class %s area under roc = %s" % (label, auc))


## Run naive bayes - old

In [None]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel

model = NaiveBayes.train(training_data, 1.0)

NaiveBayes_predictionAndLabel = test_data.map(lambda p: (float(model.predict(p.features)), p.label))

naive_metrics = MulticlassMetrics(NaiveBayes_predictionAndLabel)

print('Accuracy {}'.format(naive_metrics.accuracy))
print('False positive rate {}'.format(naive_metrics.weightedFalsePositiveRate))
print(naive_metrics.confusionMatrix())

## Run naive bayes - new

In [None]:
from pyspark.ml.classification import NaiveBayes

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = hotelsWithoutColumns_df.randomSplit([0.9, 0.1])

# Train a NaiveBayes model.
dt = NaiveBayes(labelCol='DiscountCode', featuresCol='features')

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

# Train model.  This also runs the indexers.
naive_model = pipeline.fit(trainingData)

## Naive bayes model evaluation

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions.
predictions = tree_model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "DiscountCode", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="DiscountCode", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuaracy = %g " %  accuracy)

## Print naive bayes auc

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import functions as F

# Binary classifications
for label in range(1,5):
    # Filter only relevanot to class predictions
    binaryPredictions = predictions.where("prediction == %s OR DiscountCode == %s" % (label, label))\
                .withColumn('binaryPrediction', F.when(col('prediction')==label,1.0).otherwise(0.0))\
                .withColumn('binaryDiscountCode', F.when(col('DiscountCode')==label,1.0).otherwise(0.0))
            
    binaryPredictions.select('prediction', 'DiscountCode', 'binaryPrediction', 'binaryDiscountCode').show(5)
    
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="binaryPrediction", labelCol="binaryDiscountCode")
    auc = evaluator.evaluate(binaryPredictions)
    
    print("Class %s area under roc = %s" % (label, auc))
