In [36]:
import pyspark
from pyspark.sql import SparkSession

In [37]:
spark = SparkSession.builder.getOrCreate()

In [38]:
data = spark.read.csv('covtype.data', inferSchema = True, header=False)

In [39]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
colNames = ["Elevation", "Aspect", "Slope", 
           "HorizontalDistanceToHydrology",
           "VerticalDistanceToHydrology",
           "HorizontalDistanceToRoadways",
           "Hillshade9am", "HillShadeNoon", "HillShade3pm",
           "HorizontalDistanceToFirePoints"] + [f"WildernessArea{i}" for i in range(4)] + [f"SoilType{i}" for i in range(40)]+ ["CoverType"]

In [40]:
data = data.toDF(*colNames).withColumn("CoverType", col("CoverType").cast(DoubleType()))

In [41]:
trainData, testData = data.randomSplit([0.9,0.1])
trainData.cache()
testData.cache()

DataFrame[Elevation: int, Aspect: int, Slope: int, HorizontalDistanceToHydrology: int, VerticalDistanceToHydrology: int, HorizontalDistanceToRoadways: int, Hillshade9am: int, HillShadeNoon: int, HillShade3pm: int, HorizontalDistanceToFirePoints: int, WildernessArea0: int, WildernessArea1: int, WildernessArea2: int, WildernessArea3: int, SoilType0: int, SoilType1: int, SoilType2: int, SoilType3: int, SoilType4: int, SoilType5: int, SoilType6: int, SoilType7: int, SoilType8: int, SoilType9: int, SoilType10: int, SoilType11: int, SoilType12: int, SoilType13: int, SoilType14: int, SoilType15: int, SoilType16: int, SoilType17: int, SoilType18: int, SoilType19: int, SoilType20: int, SoilType21: int, SoilType22: int, SoilType23: int, SoilType24: int, SoilType25: int, SoilType26: int, SoilType27: int, SoilType28: int, SoilType29: int, SoilType30: int, SoilType31: int, SoilType32: int, SoilType33: int, SoilType34: int, SoilType35: int, SoilType36: int, SoilType37: int, SoilType38: int, SoilType

In [42]:
from pyspark.ml.feature import VectorAssembler
inputCols = colNames[:-1]
vectorAssembler = VectorAssembler(inputCols=inputCols, outputCol="featureVector")
assembledTrainData = vectorAssembler.transform(trainData)
assembledTestData = vectorAssembler.transform(testData)

In [43]:
from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(seed = 1, labelCol="CoverType", featuresCol="featureVector", predictionCol="prediction")
model = classifier.fit(assembledTrainData)

In [46]:
predictions = model.transform(assembledTestData)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol ="CoverType", predictionCol="prediction")
evaluator.setMetricName("accuracy").evaluate(predictions)
evaluator.setMetricName("f1").evaluate(predictions)

0.6828906602311283

In [47]:
confusionMatrix = predictions.groupBy("CoverType").pivot("prediction", range(1,8)).count().na.fill(0.0).orderBy("CoverType")
confusionMatrix.show()

+---------+-----+-----+----+---+---+---+---+
|CoverType|    1|    2|   3|  4|  5|  6|  7|
+---------+-----+-----+----+---+---+---+---+
|      1.0|14734| 5892|  27|  1|  0|  0|360|
|      2.0| 5729|21874| 539| 16|  8|  0| 53|
|      3.0|    0|  566|3041| 79|  0|  0|  0|
|      4.0|    0|    0| 172|100|  0|  0|  0|
|      5.0|    3|  858|  67|  0| 12|  0|  0|
|      6.0|    0|  593|1107| 70|  0|  0|  0|
|      7.0| 1183|   18|   7|  0|  0|  0|888|
+---------+-----+-----+----+---+---+---+---+

