In [1]:

# incase jupyter and global python version are different 
# import os
# os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.5"

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("my-spark-app").config('spark.executor.cores', '1').getOrCreate()

In [2]:
import pyspark.sql.functions as f


df = spark.read.csv("./resources/Hotels_data_Changed.csv",inferSchema=True,header=True)

def drop_outliers(df, field_name):
    per75 = df.approxQuantile(field_name, [0.75], 0.00001)[0]
    per25 = df.approxQuantile(field_name, [0.25], 0.00001)[0]
    distance = 1.5*(per75-per25)
    print(per75, per25, distance)

    return df.filter(f.col(field_name)<=(distance+per75)).filter(f.col(field_name)>=(per25-distance))
    df = df.filter('1!=1')

len_before = df.count()
field = 'DiscountPerc'

df = drop_outliers(df, field)
print("Number of rows after removing outliers: {}, down by {}%".format(df.count(),
                                                                       round((len_before - df.count()) / len_before * 100),
                                                                       2))


187848
10.933940774487471 4.844961240310078 9.13346930126609
Number of rows after removing outliers: 177121, down by 6%


In [3]:
from pyspark.mllib.regression import LabeledPoint

used_features = [
    "WeekDay",
    "Snapshot Date",
    "Checkin Date",
    "DayDiff",
    "Hotel Name"
]


len_before = df.count()

max_discount = df.groupBy([df[column] for column in used_features]).agg(f.max("DiscountPerc").alias("mx")).alias("maxs").select(
    [f.col("mx")] + [f.col(column).alias("max_" + column) for column in used_features])

df = df.join(max_discount,(
    (f.col("DiscountPerc")==f.col("mx")) & 
    (f.col("WeekDay")==f.col("max_WeekDay")) &
    (f.col("Snapshot Date")==f.col("max_Snapshot Date")) &
    (f.col("Checkin Date")==f.col("max_Checkin Date")) &
    (f.col("DayDiff")==f.col("max_DayDiff")) &
    (f.col("Hotel Name")==f.col("max_Hotel Name"))
),"inner").distinct().select(df.columns)

print("Number of rows after GroupBy the data to take maximum discount price: {}, down by {}%".format(df.count(),
                                                                                                     round((
                                                                                                                       len_before - df.count()
                                                                                                                   ) / len_before * 100),
                                                                                                     2))


Number of rows after GroupBy the data to take maximum discount price: 113144, down by 36%


In [4]:
from pyspark.sql.functions import udf, lit
my_date_format = udf(lambda d, fmt: d.strftime(fmt))

df = df.select(df.columns + [
        my_date_format(f.col("Snapshot Date"), lit("%d-%m-%Y")).alias("Snapshot Date_str"),
        my_date_format(f.col("Checkin Date"), lit("%d-%m-%Y")).alias("Checkin Date_str")
    ]
)

In [5]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


indexers = [StringIndexer(inputCol=column, outputCol=column + "_index").fit(df) for column, type in df.dtypes
            if type in ('string')]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

for column in df.dtypes:
    print (column)


('_c0', 'int')
('Snapshot ID', 'int')
('Snapshot Date', 'timestamp')
('Checkin Date', 'timestamp')
('Days', 'int')
('Original Price', 'int')
('Discount Price', 'int')
('Discount Code', 'int')
('Available Rooms', 'int')
('Hotel Name', 'string')
('Hotel Stars', 'int')
('DayDiff', 'int')
('WeekDay', 'string')
('DiscountDiff', 'int')
('DiscountPerc', 'double')
('Snapshot Date_str', 'string')
('Checkin Date_str', 'string')
('Hotel Name_index', 'double')
('WeekDay_index', 'double')
('Snapshot Date_str_index', 'double')
('Checkin Date_str_index', 'double')


In [6]:
used_features = [
    "WeekDay_index",
    "Snapshot Date_str_index",
    "Checkin Date_str_index",
    "DayDiff",
    "Hotel Name_index"
]

def parseLine(line):
    label = int(line["Discount Code"])
    features = [line[column] for column in used_features]
    return LabeledPoint(label, features)

# MAP to LabeledPoint used by mllib
df2= df.rdd.map(parseLine)

In [26]:
train, test = df2.randomSplit([0.7, 0.3])

In [18]:
from pyspark.mllib.classification import NaiveBayes
model = NaiveBayes.train(train, 1.0)

In [20]:
# Make prediction and test accuracy.
predictionAndLabel = test.map(lambda p : (float(model.predict(p.features)), p.label))


In [22]:
from pyspark.mllib.evaluation import MulticlassMetrics
# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabel)

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# Statistics by class
labels = df2.map(lambda lp: lp.label).distinct().collect()
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics.precision(label)))
    print("Class %s recall = %s" % (label, metrics.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))

Summary Stats
Precision = 0.277771229889799
Recall = 0.277771229889799
F1 Score = 0.277771229889799
Class 1.0 precision = 0.257393850658858
Class 1.0 recall = 0.3325346784363178
Class 1.0 F1 Measure = 0.2901788170563962
Class 2.0 precision = 0.3240058910162003
Class 2.0 recall = 0.06148686416992733
Class 2.0 F1 Measure = 0.10335917312661498
Class 3.0 precision = 0.311346752935538
Class 3.0 recall = 0.5255865695792881
Class 3.0 F1 Measure = 0.39104589917231003
Class 4.0 precision = 0.1879532634971797
Class 4.0 recall = 0.17322688451541032
Class 4.0 F1 Measure = 0.18028985507246378


In [62]:
df.agg(f.max('Checkin Date_str_index').alias("max")).first().max

201.0

In [68]:
df[used_features].show()

+-------------+-----------------------+----------------------+-------+----------------+
|WeekDay_index|Snapshot Date_str_index|Checkin Date_str_index|DayDiff|Hotel Name_index|
+-------------+-----------------------+----------------------+-------+----------------+
|          4.0|                   59.0|                 148.0|      3|            29.0|
|          3.0|                   52.0|                  12.0|     14|            31.0|
|          2.0|                   56.0|                 102.0|      9|           216.0|
|          5.0|                    6.0|                  65.0|     14|            71.0|
|          5.0|                    3.0|                  45.0|      6|            71.0|
|          1.0|                   82.0|                   9.0|     28|             9.0|
|          4.0|                   54.0|                  70.0|     29|           133.0|
|          6.0|                  125.0|                 136.0|     16|            67.0|
|          6.0|                 

In [19]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel


train, test = df2.randomSplit([0.7, 0.3])

# extract categories max value to give categoricalFeaturesInfo range for each feature
model = DecisionTree.trainClassifier(train, numClasses=5, categoricalFeaturesInfo={
    i : int(df.agg(f.max(column).alias("max")).first().max + 1)
        for i,column in enumerate(used_features) if i in [0]
},
                                     impurity='gini', maxDepth=20, maxBins=128)

In [20]:
predictions = model.predict(test.map(lambda x: x.features))
predictionAndLabel = test.map(lambda lp: lp.label).zip(predictions)
testErr = predictionAndLabel.filter(
    lambda lp: lp[0] != lp[1]).count() / float(test.count())

print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

Test Error = 0.36112173733925784
Learned classification tree model:
DecisionTreeModel classifier of depth 20 with 31783 nodes
  If (feature 0 in {1.0,2.0,3.0,5.0})
   If (feature 0 in {3.0,2.0,1.0})
    If (feature 0 in {2.0,1.0})
     If (feature 4 <= 45.5)
      If (feature 4 <= 10.5)
       If (feature 4 <= 5.5)
        If (feature 4 <= 2.5)
         If (feature 4 <= 1.5)
          If (feature 2 <= 106.5)
           If (feature 0 in {1.0})
            If (feature 4 <= 0.5)
             If (feature 2 <= 23.5)
              If (feature 3 <= 33.5)
               Predict: 1.0
              Else (feature 3 > 33.5)
               If (feature 1 <= 31.5)
                Predict: 2.0
               Else (feature 1 > 31.5)
                Predict: 1.0
             Else (feature 2 > 23.5)
              If (feature 3 <= 16.5)
               If (feature 2 <= 84.5)
                If (feature 2 <= 26.5)
                 Predict: 3.0
                Else (feature 2 > 26.5)
                 If (fea

In [22]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(predictionAndLabel)

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# Statistics by class
labels = df2.map(lambda lp: lp.label).distinct().collect()
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics.precision(label)))
    print("Class %s recall = %s" % (label, metrics.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))

Summary Stats
Precision = 0.6388782626607421
Recall = 0.6388782626607421
F1 Score = 0.6388782626607421
Class 1.0 precision = 0.643773010029199
Class 1.0 recall = 0.6041219918989754
Class 1.0 F1 Measure = 0.6233175588470284
Class 2.0 precision = 0.6609347845970914
Class 2.0 recall = 0.6529321405981747
Class 2.0 F1 Measure = 0.6569090909090909
Class 3.0 precision = 0.6476510067114094
Class 3.0 recall = 0.6599316133043208
Class 3.0 F1 Measure = 0.6537336412625095
Class 4.0 precision = 0.5703315227570707
Class 4.0 recall = 0.6251283104085403
Class 4.0 F1 Measure = 0.5964740450538688
