<h1> Big Data

<h2> Group 5

<h4> Create a spark session and load the Incident Management Data set

In [0]:
pip install graphviz

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('Desicion-Tree').getOrCreate()

In [0]:
# File Upload for all the models

file_location = "/FileStore/tables/airbnb.csv"
file_type = "csv"
 
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
 
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

<h3> Desition Tree Clasifier

<h4> Data pre-processing

In [0]:
# Selecting the dependent and the independent variables that are identified as most useful attributes to make predictions
 
data=df

In [0]:
data=data.dropna()

In [0]:
# Create a 70-30 train test split
 
train_data,test_data=data.randomSplit([0.7,0.3])

In [0]:
# Import the required libraries
 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassificationModel
from graphviz import Digraph



In [0]:
# Use StringIndexer to convert the categorical columns to hold numerical data
 
host_is_superhost_indexer = StringIndexer(inputCol='host_is_superhost',outputCol='host_is_superhost_index',handleInvalid='keep')
host_identity_verified_indexer = StringIndexer(inputCol='host_identity_verified',outputCol='host_identity_verified_index',handleInvalid='keep')
neighbourhood_cleansed_indexer = StringIndexer(inputCol='neighbourhood_cleansed',outputCol='neighbourhood_cleansed_index',handleInvalid='keep')
property_type_indexer = StringIndexer(inputCol='property_type',outputCol='property_type_index',handleInvalid='keep')
room_type_indexer = StringIndexer(inputCol='room_type',outputCol='room_type_index',handleInvalid='keep')
bed_type_indexer = StringIndexer(inputCol='bed_type',outputCol='bed_type_index',handleInvalid='keep')
cancellation_policy_group_indexer = StringIndexer(inputCol='cancellation_policy',outputCol='cancellation_policy_index',handleInvalid='keep')
price_category_indexer = StringIndexer(inputCol='price_category',outputCol='price_category_index',handleInvalid='keep')

In [0]:
# Vector assembler is used to create a vector of input features
 
assembler = VectorAssembler(inputCols=['host_is_superhost_index','host_identity_verified_index','neighbourhood_cleansed_index',
                                       'property_type_index','room_type_index','bed_type_index','cancellation_policy_index',
                                       'price_category_index','latitude','longitude','accommodates','bathrooms',
                                       'bedrooms','beds','Number_of_amenities','guests_included','price_per_extra_person',
                                       'minimum_nights','number_of_reviews','number_days_btw_first_last_review',
                                       'review_scores_rating','price','price_gte_150'],
                            outputCol="features")

<h4> Building the Decision Tree Classifier

In [0]:
# Create an object for the Logistic Regression model
# Use the parameter maxBins and assign a value that is equal to or more than the number of categories in any sigle feature
 
dt_model = DecisionTreeClassifier(labelCol='price_category_index',maxBins=100)


In [0]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data
 
pipe = Pipeline(stages=[host_is_superhost_indexer,host_identity_verified_indexer,neighbourhood_cleansed_indexer,property_type_indexer,room_type_indexer,bed_type_indexer,cancellation_policy_group_indexer,price_category_indexer,assembler,dt_model])

In [0]:
# It took 8 minutes for this step to execute
 
fit_model=pipe.fit(train_data)

In [0]:
# Store the results in a dataframe
 
results = fit_model.transform(test_data)

In [0]:
results.select(['price_category_index','prediction']).show()

+--------------------+----------+
|price_category_index|prediction|
+--------------------+----------+
|                 0.0|       0.0|
|                 3.0|       3.0|
|                 0.0|       0.0|
|                 3.0|       3.0|
|                 0.0|       0.0|
|                 0.0|       0.0|
|                 3.0|       3.0|
|                 3.0|       3.0|
|                 0.0|       0.0|
|                 3.0|       3.0|
|                 3.0|       3.0|
|                 0.0|       0.0|
|                 0.0|       0.0|
|                 0.0|       0.0|
|                 3.0|       3.0|
|                 0.0|       0.0|
|                 2.0|       2.0|
|                 2.0|       2.0|
|                 0.0|       0.0|
|                 2.0|       2.0|
+--------------------+----------+
only showing top 20 rows



<h4> Evaluating the model

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="price_category_index", predictionCol="prediction", metricName="accuracy")

In [0]:
accuracy = ACC_evaluator.evaluate(results)

In [0]:
print("The accuracy of the decision tree classifier is {}".format(accuracy))

The accuracy of the decision tree classifier is 1.0


<h4> Plot

Alt 1

In [0]:
from pyspark.ml.classification import DecisionTreeClassificationModel
from graphviz import Digraph

def plot_tree(pipeline_model, dt_index):
    dt_model = pipeline_model.stages[dt_index]
    dot = Digraph()

    def build_subtree(node, parent=None):
        if node.numNodes == 1: # leaf node
            label = f"class={node.predictions.argmax()}"
        else: # split node
            feature = assembler.getInputCols()[node.split.featureIndex]
           # threshold = node.split.threshold
            threshold = node.points.getNumPartitions.threshold
            label = f"{feature} <= {threshold:.2f}"
            
            build_subtree(node.leftChild, parent=node.id)
            build_subtree(node.rightChild, parent=node.id)
        
        dot.node(str(node.id), label=label)
        if parent is not None:
            dot.edge(str(parent), str(node.id))

    build_subtree(dt_model._call_java("rootNode"))
    return dot






plot_tree(fit_model, 2)




[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
[0;32m<command-2248947621643571>[0m in [0;36m<cell line: 32>[0;34m()[0m
[1;32m     30[0m [0;34m[0m[0m
[1;32m     31[0m [0;34m[0m[0m
[0;32m---> 32[0;31m [0mplot_tree[0m[0;34m([0m[0mfit_model[0m[0;34m,[0m [0;36m2[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     33[0m [0;34m[0m[0m
[1;32m     34[0m [0;34m[0m[0m

[0;32m<command-2248947621643571>[0m in [0;36mplot_tree[0;34m(pipeline_model, dt_index)[0m
[1;32m     22[0m             [0mdot[0m[0;34m.[0m[0medge[0m[0;34m([0m[0mstr[0m[0;34m([0m[0mparent[0m[0;34m)[0m[0;34m,[0m [0mstr[0m[0;34m([0m[0mnode[0m[0;34m.[0m[0mid[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     23[0m [0;34m[0m[0m
[0;32m---> 24[0;31m     [0mbuild_subtree[0m[0;34m([0m[0mdt_model[0m[0;34m.[0m[0m_ca

Alt 2

In [0]:
# https://github.com/parrt/dtreeviz/blob/master/notebooks/dtreeviz_spark_visualisations.ipynb


