In [3]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_4').getOrCreate()

# Load the dataset
data_without_header = spark.read.option("inferSchema", True)\
    .option("header", False).csv("/home/lplab/Documents/220962068/BDA_Lab/covtype.data")

# Print schema of the dataset
data_without_header.printSchema()

# Renaming columns and converting the target column to DoubleType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

colnames = ["Elevation", "Aspect", "Slope", 
            "Horizontal_Distance_To_Hydrology", 
            "Vertical_Distance_To_Hydrology", 
            "Horizontal_Distance_To_Roadways", 
            "Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm", 
            "Horizontal_Distance_To_Fire_Points"] + \
            [f"Wilderness_Area_{i}" for i in range(4)] + \
            [f"Soil_Type_{i}" for i in range(40)] + \
            ["Cover_Type"]

data = data_without_header.toDF(*colnames)\
    .withColumn("Cover_Type", col("Cover_Type").cast(DoubleType()))

# Display the first few rows of the dataset
data.show(5)

# Display summary statistics for numerical columns
data.describe().show()

# Count the number of rows in the DataFrame
row_count = data.count()
print(f"Number of rows: {row_count}")


                                                                                

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: integer (nullable = true)
 |-- _c15: integer (nullable = true)
 |-- _c16: integer (nullable = true)
 |-- _c17: integer (nullable = true)
 |-- _c18: integer (nullable = true)
 |-- _c19: integer (nullable = true)
 |-- _c20: integer (nullable = true)
 |-- _c21: integer (nullable = true)
 |-- _c22: integer (nullable = true)
 |-- _c23: integer (nullable = true)
 |-- _c24: integer (nullable = true)
 |-- _c25: integer (nullable = true)
 |-- _c26: integer (nullable = true)
 |-- _

[Stage 12:>                                                       (0 + 18) / 18]

+-------+-----------------+------------------+------------------+--------------------------------+------------------------------+-------------------------------+------------------+------------------+------------------+----------------------------------+------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------

                                                                                

In [4]:
# Handle missing values (drop rows with any missing values)
data = data.na.drop()

# Categorical features encoding
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler

def unencode_one_hot(data):
    wilderness_cols = ['Wilderness_Area_' + str(i) for i in range(4)]
    soil_cols = ['Soil_Type_' + str(i) for i in range(40)]
    
    wilderness_assembler = VectorAssembler().setInputCols(wilderness_cols).setOutputCol("wilderness")
    soil_assembler = VectorAssembler().setInputCols(soil_cols).setOutputCol("soil")
    
    unhot_udf = udf(lambda v: v.toArray().tolist().index(1))
    
    with_wilderness = wilderness_assembler.transform(data).drop(*wilderness_cols)\
        .withColumn("wilderness", unhot_udf(col("wilderness")).cast(IntegerType()))
    
    with_soil = soil_assembler.transform(with_wilderness).drop(*soil_cols)\
        .withColumn("soil", unhot_udf(col("soil")).cast(IntegerType()))
    
    return with_soil

data_transformed = unencode_one_hot(data)
data_transformed.show(5)




+---------+------+-----+--------------------------------+------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+----------+----------+----+
|Elevation|Aspect|Slope|Horizontal_Distance_To_Hydrology|Vertical_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Hillshade_9am|Hillshade_Noon|Hillshade_3pm|Horizontal_Distance_To_Fire_Points|Cover_Type|wilderness|soil|
+---------+------+-----+--------------------------------+------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+----------+----------+----+
|     2596|    51|    3|                             258|                             0|                            510|          221|           232|          148|                              6279|       5.0|         0|  28|
|     2590|    56|    2|                             212|                            -6|        

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

(train_data, test_data)= data_transformed.randomSplit([0.9,0.1])
# Use the updated column names after encoding
input_cols = ["Elevation", "Aspect", "Slope", "Horizontal_Distance_To_Hydrology",
              "Vertical_Distance_To_Hydrology", "Horizontal_Distance_To_Roadways",
              "Hillshade_9am", "Hillshade_Noon", "Hillshade_3pm", 
              "Horizontal_Distance_To_Fire_Points", "wilderness", "soil"]  # Updated list

# Assemble the feature vector
vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="featureVector")
assembled_train_data = vector_assembler.transform(train_data)

# Train the Decision Tree model
classifier = DecisionTreeClassifier(seed=1234, labelCol="Cover_Type", featuresCol="featureVector")
dt_model = classifier.fit(assembled_train_data)

# Print the structure of the trained Decision Tree
print(dt_model.toDebugString)


                                                                                

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_96ebc76b104f, depth=5, numNodes=43, numClasses=8, numFeatures=12
  If (feature 0 <= 3047.5)
   If (feature 0 <= 2498.5)
    If (feature 11 <= 8.5)
     If (feature 3 <= 15.0)
      If (feature 10 <= 2.5)
       Predict: 6.0
      Else (feature 10 > 2.5)
       Predict: 4.0
     Else (feature 3 > 15.0)
      Predict: 3.0
    Else (feature 11 > 8.5)
     If (feature 0 <= 2344.5)
      If (feature 3 <= 15.0)
       Predict: 4.0
      Else (feature 3 > 15.0)
       Predict: 3.0
     Else (feature 0 > 2344.5)
      If (feature 4 <= 121.5)
       Predict: 6.0
      Else (feature 4 > 121.5)
       Predict: 3.0
   Else (feature 0 > 2498.5)
    If (feature 11 <= 3.5)
     If (feature 0 <= 2711.5)
      Predict: 3.0
     Else (feature 0 > 2711.5)
      If (feature 5 <= 1148.5)
       Predict: 5.0
      Else (feature 5 > 1148.5)
       Predict: 2.0
    Else (feature 11 > 3.5)
     Predict: 2.0
  Else (feature 0 > 3047.5)
   If (feature 1

In [9]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate the model on the training set
predictions = dt_model.transform(assembled_train_data)

# Accuracy evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="Cover_Type", predictionCol="prediction")
accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)
print(f"Accuracy: {accuracy}")

# F1 Score evaluation
f1 = evaluator.setMetricName("f1").evaluate(predictions)
print(f"F1 Score: {f1}")

# Confusion matrix
confusion_matrix = predictions.groupBy("Cover_Type")\
    .pivot("prediction", range(1,8)).count().na.fill(0.0)\
    .orderBy("Cover_Type")
confusion_matrix.show()


                                                                                

Accuracy: 0.7093996834402551


                                                                                

F1 Score: 0.7008469254622698




+----------+------+------+-----+---+---+----+-----+
|Cover_Type|     1|     2|    3|  4|  5|   6|    7|
+----------+------+------+-----+---+---+----+-----+
|       1.0|127618| 57429|  110|  0| 20|   7| 5295|
|       2.0| 50057|199813| 2749| 23|386|1744|  544|
|       3.0|     0|  3491|25839|528|177|2263|    0|
|       4.0|     0|     4| 1468|975|  0|  24|    0|
|       5.0|     0|  7791|  268|  0|399|  54|    0|
|       6.0|     0|  4459| 7055|169|  9|3911|    0|
|       7.0|  5811|    85|    0|  0|  0|   0|12549|
+----------+------+------+-----+---+---+----+-----+



                                                                                