In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Decision_Trees').getOrCreate()



1. Demonstrate how to load a dataset suitable for prediction into a PySpark DataFrame and
Display basic statistics and information about the dataset.

In [3]:
path= "covtype.data"
data_without_header = spark.read.csv(path,header=False, inferSchema=True)
data_without_header.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

In [4]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
colnames = ["Elevation", "Aspect", "Slope", \
"Horizontal_Distance_To_Hydrology", \
"Vertical_Distance_To_Hydrology", "Horizontal_Distance_To_Roadways",
\
"Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm", \
"Horizontal_Distance_To_Fire_Points"] + \
[f"Wilderness_Area_{i}" for i in range(4)] + \
[f"Soil_Type_{i}" for i in range(40)] + \
["Cover_Type"]

data = data_without_header.toDF(*colnames).withColumn("Cover_Type",col("Cover_Type").cast(DoubleType()))
data.head()

Row(Elevation=2596, Aspect=51, Slope=3, Horizontal_Distance_To_Hydrology=258, Vertical_Distance_To_Hydrology=0, Horizontal_Distance_To_Roadways=510, Hillshade_9am=221, Hillshade_Noon=232, Hillshade_3pm=148, Horizontal_Distance_To_Fire_Points=6279, Wilderness_Area_0=1, Wilderness_Area_1=0, Wilderness_Area_2=0, Wilderness_Area_3=0, Soil_Type_0=0, Soil_Type_1=0, Soil_Type_2=0, Soil_Type_3=0, Soil_Type_4=0, Soil_Type_5=0, Soil_Type_6=0, Soil_Type_7=0, Soil_Type_8=0, Soil_Type_9=0, Soil_Type_10=0, Soil_Type_11=0, Soil_Type_12=0, Soil_Type_13=0, Soil_Type_14=0, Soil_Type_15=0, Soil_Type_16=0, Soil_Type_17=0, Soil_Type_18=0, Soil_Type_19=0, Soil_Type_20=0, Soil_Type_21=0, Soil_Type_22=0, Soil_Type_23=0, Soil_Type_24=0, Soil_Type_25=0, Soil_Type_26=0, Soil_Type_27=0, Soil_Type_28=1, Soil_Type_29=0, Soil_Type_30=0, Soil_Type_31=0, Soil_Type_32=0, Soil_Type_33=0, Soil_Type_34=0, Soil_Type_35=0, Soil_Type_36=0, Soil_Type_37=0, Soil_Type_38=0, Soil_Type_39=0, Cover_Type=5.0)

In [5]:
print("Schema:")
data.printSchema()

Schema:
root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_0: integer (nullable = true)
 |-- Wilderness_Area_1: integer (nullable = true)
 |-- Wilderness_Area_2: integer (nullable = true)
 |-- Wilderness_Area_3: integer (nullable = true)
 |-- Soil_Type_0: integer (nullable = true)
 |-- Soil_Type_1: integer (nullable = true)
 |-- Soil_Type_2: integer (nullable = true)
 |-- Soil_Type_3: integer (nullable = true)
 |-- Soil_Type_4: integer (nullable = true)
 |-- Soil_Type_5: integer (nullable = true)
 |-- S

In [6]:
print("No: of records")
data.count()

No: of records


581012

In [14]:
print("Data summary and statistics",data.describe().show())


+-------+-----------------+------------------+------------------+--------------------------------+------------------------------+-------------------------------+------------------+------------------+------------------+----------------------------------+------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------

2. Implement a PySpark script to handle missing values and categorical features in the
dataset.

In [11]:
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler

In [15]:
imputer = Imputer(strategy="mean", missingValue=float('nan'), inputCols=data.columns, outputCols=[col+"_imputed" for col in data.columns])
data_imputed = imputer.fit(data).transform(data)

# Handle categorical features by encoding them
categorical_columns = [col for (col, dtype) in data_imputed.dtypes if dtype == "string"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(data_imputed) for col in categorical_columns]
data_indexed = data_imputed
for indexer in indexers:
    data_indexed = indexer.transform(data_indexed)

# Basic statistics and information about the cleaned dataset
print("Number of rows: ", data_indexed.count())
print("Number of columns: ", len(data_indexed.columns))
print("Schema:")
data_indexed.printSchema()
print("First few rows:")
data_indexed.show(5)


Number of rows:  581012
Number of columns:  110
Schema:
root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area_0: integer (nullable = true)
 |-- Wilderness_Area_1: integer (nullable = true)
 |-- Wilderness_Area_2: integer (nullable = true)
 |-- Wilderness_Area_3: integer (nullable = true)
 |-- Soil_Type_0: integer (nullable = true)
 |-- Soil_Type_1: integer (nullable = true)
 |-- Soil_Type_2: integer (nullable = true)
 |-- Soil_Type_3: integer (nullable = true)
 |-- Soil_Type_4: integer (nullable = true)
 |

3. Develop a PySpark script that trains a decision tree model on the training dataset.

In [16]:
(train_data, test_data) = data.randomSplit([0.9, 0.1])
train_data.cache()
test_data.cache()

DataFrame[Elevation: int, Aspect: int, Slope: int, Horizontal_Distance_To_Hydrology: int, Vertical_Distance_To_Hydrology: int, Horizontal_Distance_To_Roadways: int, Hillshade_9am: int, Hillshade_Noon: int, Hillshade_3pm: int, Horizontal_Distance_To_Fire_Points: int, Wilderness_Area_0: int, Wilderness_Area_1: int, Wilderness_Area_2: int, Wilderness_Area_3: int, Soil_Type_0: int, Soil_Type_1: int, Soil_Type_2: int, Soil_Type_3: int, Soil_Type_4: int, Soil_Type_5: int, Soil_Type_6: int, Soil_Type_7: int, Soil_Type_8: int, Soil_Type_9: int, Soil_Type_10: int, Soil_Type_11: int, Soil_Type_12: int, Soil_Type_13: int, Soil_Type_14: int, Soil_Type_15: int, Soil_Type_16: int, Soil_Type_17: int, Soil_Type_18: int, Soil_Type_19: int, Soil_Type_20: int, Soil_Type_21: int, Soil_Type_22: int, Soil_Type_23: int, Soil_Type_24: int, Soil_Type_25: int, Soil_Type_26: int, Soil_Type_27: int, Soil_Type_28: int, Soil_Type_29: int, Soil_Type_30: int, Soil_Type_31: int, Soil_Type_32: int, Soil_Type_33: int, S

In [17]:
from pyspark.ml.feature import VectorAssembler

In [18]:
input_cols = colnames[:-1]
vector_assembler = VectorAssembler(inputCols=input_cols,outputCol="featureVector")
assembled_train_data = vector_assembler.transform(train_data)
assembled_train_data.select("featureVector").show(truncate = False)

+-----------------------------------------------------------------------------------------------------+
|featureVector                                                                                        |
+-----------------------------------------------------------------------------------------------------+
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1863.0,37.0,17.0,120.0,18.0,90.0,217.0,202.0,115.0,769.0,1.0,1.0])  |
|(54,[0,1,2,5,6,7,8,9,13,18],[1874.0,18.0,14.0,90.0,208.0,209.0,135.0,793.0,1.0,1.0])                 |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1888.0,33.0,22.0,150.0,46.0,108.0,209.0,185.0,103.0,735.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,14],[1889.0,28.0,22.0,150.0,23.0,120.0,205.0,185.0,108.0,759.0,1.0,1.0]) |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1889.0,353.0,30.0,95.0,39.0,67.0,153.0,172.0,146.0,600.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,18],[1896.0,337.0,12.0,30.0,6.0,175.0,195.0,224.0,168.0,732.0,1.0,1.0])  |
|(54,[0,1,2,3,4,5,6,7,8,9,13,15],[1898.0,34.0,23.0,175.0,56.0,13

In [19]:
from pyspark.ml.classification import DecisionTreeClassifier

In [20]:
classifier = DecisionTreeClassifier(seed = 1234, labelCol="Cover_Type",
featuresCol="featureVector",
predictionCol="prediction")
model = classifier.fit(assembled_train_data)
print(model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_d7abf6c0dc21, depth=5, numNodes=45, numClasses=8, numFeatures=54
  If (feature 0 <= 3048.5)
   If (feature 0 <= 2500.5)
    If (feature 3 <= 15.0)
     If (feature 12 <= 0.5)
      If (feature 23 <= 0.5)
       Predict: 4.0
      Else (feature 23 > 0.5)
       Predict: 3.0
     Else (feature 12 > 0.5)
      Predict: 6.0
    Else (feature 3 > 15.0)
     If (feature 16 <= 0.5)
      Predict: 3.0
     Else (feature 16 > 0.5)
      If (feature 9 <= 1305.5)
       Predict: 3.0
      Else (feature 9 > 1305.5)
       Predict: 4.0
   Else (feature 0 > 2500.5)
    If (feature 17 <= 0.5)
     If (feature 0 <= 2952.5)
      If (feature 15 <= 0.5)
       Predict: 2.0
      Else (feature 15 > 0.5)
       Predict: 3.0
     Else (feature 0 > 2952.5)
      Predict: 2.0
    Else (feature 17 > 0.5)
     If (feature 0 <= 2711.5)
      Predict: 3.0
     Else (feature 0 > 2711.5)
      If (feature 5 <= 1259.5)
       Predict: 5.0
      Else (featu

4. Implement code to evaluate the decision tree model using metrics such as accuracy,
precision, and recall.

In [21]:
predictions = model.transform(assembled_train_data)
predictions.select("Cover_Type", "prediction", "probability").show(10, truncate = False)

+----------+----------+----------------------------------------------------------------------------------------------------------------------------------+
|Cover_Type|prediction|probability                                                                                                                       |
+----------+----------+----------------------------------------------------------------------------------------------------------------------------------+
|6.0       |3.0       |[0.0,3.15000315000315E-5,0.0676935676935677,0.6082971082971083,0.020317520317520318,0.0016695016695016695,0.30199080199080197,0.0]|
|6.0       |4.0       |[0.0,0.0,0.041287613715885234,0.24212736179146255,0.616515045486354,0.009097270818754374,0.09097270818754374,0.0]                 |
|6.0       |3.0       |[0.0,3.15000315000315E-5,0.0676935676935677,0.6082971082971083,0.020317520317520318,0.0016695016695016695,0.30199080199080197,0.0]|
|6.0       |3.0       |[0.0,3.15000315000315E-5,0.0676935676935677,0.6

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="Cover_Type", predictionCol="prediction")
print("Accuracy:",evaluator.setMetricName("accuracy").evaluate(predictions))

print("F1 Score: ",evaluator.setMetricName("f1").evaluate(predictions))

print("Precision: ",evaluator.setMetricName("weightedPrecision").evaluate(predictions))

print("Recall: ",evaluator.setMetricName("weightedRecall").evaluate(predictions))

Accuracy: 0.7031614816684546
F1 Score:  0.6868680702002361
Precision:  0.7012120552703207
Recall:  0.7031614816684545


Confusion Matrix

In [24]:
confusion_matrix = predictions.groupBy("Cover_Type").pivot("prediction", range(1,8)).\
count().na.fill(0.0).orderBy("Cover_Type")
confusion_matrix.show()

+----------+------+------+-----+----+---+---+-----+
|Cover_Type|     1|     2|    3|   4|  5|  6|    7|
+----------+------+------+-----+----+---+---+-----+
|       1.0|122850| 62496|  103|   0| 21|  8| 5151|
|       2.0| 45445|204709| 3683|  59|377| 56|  765|
|       3.0|     0|  3858|27785| 359| 20|119|    0|
|       4.0|     0|     4| 1321|1141|  0|  0|    0|
|       5.0|     0|  7735|  332|  13|428|  0|    0|
|       6.0|     0|  4446|10543| 130|  8|520|    0|
|       7.0|  8021|   159|    0|   0|  0|  0|10286|
+----------+------+------+-----+----+---+---+-----+

