In [1]:
import pyspark
import os
import sys
from pyspark import SparkContext
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_4').getOrCreate()



In [3]:
data_without_header = spark.read.option("inferSchema", True)\
.option("header", False).csv("covtype.data")
data_without_header.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

In [4]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
colnames = ["Elevation", "Aspect", "Slope", \
"Horizontal_Distance_To_Hydrology", \
"Vertical_Distance_To_Hydrology", "Horizontal_Distance_To_Roadways",
\
"Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm", \
"Horizontal_Distance_To_Fire_Points"] + \
[f"Wilderness_Area_{i}" for i in range(4)] + \
[f"Soil_Type_{i}" for i in range(40)] + \
["Cover_Type"]
data = data_without_header.toDF(*colnames).\
withColumn("Cover_Type",
col("Cover_Type").cast(DoubleType()))
data.head()

Row(Elevation=2596, Aspect=51, Slope=3, Horizontal_Distance_To_Hydrology=258, Vertical_Distance_To_Hydrology=0, Horizontal_Distance_To_Roadways=510, Hillshade_9am=221, Hillshade_Noon=232, Hillshade_3pm=148, Horizontal_Distance_To_Fire_Points=6279, Wilderness_Area_0=1, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=0, Soil_Type_0=0, Soil_Type_1=0, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=1, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=5.0)

In [5]:
(train_data, test_data) = data.randomSplit([0.9, 0.1])
train_data.cache()
test_data.cache()

DataFrame[Elevation: int, Aspect: int, Slope: int, Horizontal_Distance_To_Hydrology: int, Vertical_Distance_To_Hydrology: int, Horizontal_Distance_To_Roadways: int, Hillshade_9am: int, Hillshade_Noon: int, Hillshade_3pm: int, Horizontal_Distance_To_Fire_Points: int, Wilderness_Area_0: int, Wilderness_Area_1: int, Wilderness_Area_2: int, Wilderness_Area_3: int, Soil_Type_0: int, Soil_Type_1: int, Soil_Type_2: int, Soil_Type_3: int, Soil_Type_4: int, Soil_Type_5: int, Soil_Type_6: int, Soil_Type_7: int, Soil_Type_8: int, Soil_Type_9: int, Soil_Type_10: int, Soil_Type_11: int, Soil_Type_12: int, Soil_Type_13: int, Soil_Type_14: int, Soil_Type_15: int, Soil_Type_16: int, Soil_Type_17: int, Soil_Type_18: int, Soil_Type_19: int, Soil_Type_20: int, Soil_Type_21: int, Soil_Type_22: int, Soil_Type_23: int, Soil_Type_24: int, Soil_Type_25: int, Soil_Type_26: int, Soil_Type_27: int, Soil_Type_28: int, Soil_Type_29: int, Soil_Type_30: int, Soil_Type_31: int, Soil_Type_32: int, Soil_Type_33: int, S

In [6]:
from pyspark.ml.feature import VectorAssembler
input_cols = colnames[:-1]
vector_assembler = VectorAssembler(inputCols=input_cols,
outputCol="featureVector")
assembled_train_data = vector_assembler.transform(train_data)
assembled_train_data.select("featureVector").show(truncate = False)

+-----------------------------------------------------------------------------------------------------+
|featureVector                                                                                        |
+-----------------------------------------------------------------------------------------------------+
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1863.0,37.0,17.0,120.0,18.0,90.0,217.0,202.0,115.0,769.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1879.0,28.0,19.0,30.0,12.0,95.0,209.0,196.0,117.0,778.0,1.0,1.0])   |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1888.0,33.0,22.0,150.0,46.0,108.0,209.0,185.0,103.0,735.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,14],[1889.0,28.0,22.0,150.0,23.0,120.0,205.0,185.0,108.0,759.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1889.0,353.0,30.0,95.0,39.0,67.0,153.0,172.0,146.0,600.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1896.0,337.0,12.0,30.0,6.0,175.0,195.0,224.0,168.0,732.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1898.0,34.0,23.0,175.0,56.0,13

In [7]:
from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(seed = 1234, labelCol="Cover_Type",
featuresCol="featureVector",
predictionCol="prediction")
model = classifier.fit(assembled_train_data)
print(model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_8070b0674135, depth=5, numNodes=39, numClasses=8, numFeatures=54
  If (feature 0 <= 3047.5)
   If (feature 0 <= 2500.5)
    If (feature 3 <= 15.0)
     If (feature 12 <= 0.5)
      If (feature 23 <= 0.5)
       Predict: 4.0
      Else (feature 23 > 0.5)
       Predict: 3.0
     Else (feature 12 > 0.5)
      Predict: 6.0
    Else (feature 3 > 15.0)
     Predict: 3.0
   Else (feature 0 > 2500.5)
    If (feature 17 <= 0.5)
     If (feature 0 <= 2952.5)
      If (feature 15 <= 0.5)
       Predict: 2.0
      Else (feature 15 > 0.5)
       Predict: 3.0
     Else (feature 0 > 2952.5)
      Predict: 2.0
    Else (feature 17 > 0.5)
     If (feature 0 <= 2712.5)
      Predict: 3.0
     Else (feature 0 > 2712.5)
      If (feature 5 <= 1219.5)
       Predict: 5.0
      Else (feature 5 > 1219.5)
       Predict: 2.0
  Else (feature 0 > 3047.5)
   If (feature 0 <= 3312.5)
    If (feature 7 <= 240.5)
     Predict: 1.0
    Else (feature 7 > 24

In [8]:
import pandas as pd
pd.DataFrame(model.featureImportances.toArray(),
index=input_cols, columns=['importance']).\
sort_values(by="importance", ascending=False)

Unnamed: 0,importance
Elevation,0.843157
Soil_Type_3,0.035638
Soil_Type_1,0.031186
Hillshade_Noon,0.026278
Horizontal_Distance_To_Hydrology,0.022278
Soil_Type_31,0.018201
Wilderness_Area_2,0.015279
Horizontal_Distance_To_Roadways,0.00446
Hillshade_9am,0.002588
Soil_Type_9,0.000935


In [9]:
predictions = model.transform(assembled_train_data)
predictions.select("Cover_Type", "prediction", "probability").\
show(10, truncate = False)

+----------+----------+------------------------------------------------------------------------------------------------------------------------------------+
|Cover_Type|prediction|probability                                                                                                                         |
+----------+----------+------------------------------------------------------------------------------------------------------------------------------------+
|6.0       |3.0       |[0.0,2.9053720328888115E-5,0.06391818472355384,0.6089078706528371,0.04206978703622999,0.0015689008977599582,0.28350620296929024,0.0]|
|6.0       |3.0       |[0.0,2.9053720328888115E-5,0.06391818472355384,0.6089078706528371,0.04206978703622999,0.0015689008977599582,0.28350620296929024,0.0]|
|6.0       |3.0       |[0.0,2.9053720328888115E-5,0.06391818472355384,0.6089078706528371,0.04206978703622999,0.0015689008977599582,0.28350620296929024,0.0]|
|6.0       |3.0       |[0.0,2.9053720328888115E-5,0.063918

In [10]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Cover_Type",
predictionCol="prediction")
evaluator.setMetricName("accuracy").evaluate(predictions)
evaluator.setMetricName("f1").evaluate(predictions)

0.6870746239989998

In [11]:
confusion_matrix = predictions.groupBy("Cover_Type").\
pivot("prediction", range(1,8)).count().\
na.fill(0.0).\
orderBy("Cover_Type")
confusion_matrix.show()

+----------+------+------+-----+---+---+---+-----+
|Cover_Type|     1|     2|    3|  4|  5|  6|    7|
+----------+------+------+-----+---+---+---+-----+
|       1.0|129625| 55605|  107|  0| 21|  6| 5240|
|       2.0| 52323|197490| 3703| 57|339| 54|  770|
|       3.0|     0|  3882|27873|345| 20|123|    0|
|       4.0|     0|     4| 1567|882|  0|  0|    0|
|       5.0|     0|  7840|  325| 12|419|  0|    0|
|       6.0|     0|  4424|10558|136|  7|515|    0|
|       7.0|  8070|    76|    0|  0|  0|  0|10286|
+----------+------+------+-----+---+---+---+-----+



In [12]:
from pyspark.sql import DataFrame
def class_probabilities(data):
    total = data.count()
    return data.groupBy("Cover_Type").count().\
    orderBy("Cover_Type").\
    select(col("count").cast(DoubleType())).\
    withColumn("count_proportion", col("count")/total).\
    select("count_proportion").collect()
train_prior_probabilities = class_probabilities(train_data)
test_prior_probabilities = class_probabilities(test_data)
train_prior_probabilities

[Row(count_proportion=0.3646499739814503),
 Row(count_proportion=0.48734274082463497),
 Row(count_proportion=0.06168500719336374),
 Row(count_proportion=0.004692904588447764),
 Row(count_proportion=0.01644525391043497),
 Row(count_proportion=0.029921332149744406),
 Row(count_proportion=0.03526278735192384)]

In [13]:
train_prior_probabilities = [p[0] for p in train_prior_probabilities]
test_prior_probabilities = [p[0] for p in test_prior_probabilities]
sum([train_p * cv_p for train_p, cv_p in zip(train_prior_probabilities,
test_prior_probabilities)])

0.3776893328330288

In [14]:
from pyspark.ml import Pipeline
assembler = VectorAssembler(inputCols=input_cols, outputCol="featureVector")
classifier = DecisionTreeClassifier(seed=1234, labelCol="Cover_Type",
featuresCol="featureVector",
predictionCol="prediction")
pipeline = Pipeline(stages=[assembler, classifier])