In [1]:
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

In [2]:
def f(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
    rel['label'] = str(x[4])
    return rel

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.master("local").appName("DecisionTree").getOrCreate()

In [5]:
data = spark.sparkContext.textFile("dataset/iris.txt").map(lambda line: line.split(',')).map(lambda p: Row(**f(p))).toDF()

In [6]:
data.createOrReplaceTempView('iris')

In [7]:
df= spark.sql("select * from iris")

In [8]:
rel = df.rdd.map(lambda t : str(t[1])+":"+str(t[0])).collect()

In [9]:
for item in rel:
    print(item)

Iris-setosa:[5.1,3.5,1.4,0.2]
Iris-setosa:[4.9,3.0,1.4,0.2]
Iris-setosa:[4.7,3.2,1.3,0.2]
Iris-setosa:[4.6,3.1,1.5,0.2]
Iris-setosa:[5.0,3.6,1.4,0.2]
Iris-setosa:[5.4,3.9,1.7,0.4]
Iris-setosa:[4.6,3.4,1.4,0.3]
Iris-setosa:[5.0,3.4,1.5,0.2]
Iris-setosa:[4.4,2.9,1.4,0.2]
Iris-setosa:[4.9,3.1,1.5,0.1]
Iris-setosa:[5.4,3.7,1.5,0.2]
Iris-setosa:[4.8,3.4,1.6,0.2]
Iris-setosa:[4.8,3.0,1.4,0.1]
Iris-setosa:[4.3,3.0,1.1,0.1]
Iris-setosa:[5.8,4.0,1.2,0.2]
Iris-setosa:[5.7,4.4,1.5,0.4]
Iris-setosa:[5.4,3.9,1.3,0.4]
Iris-setosa:[5.1,3.5,1.4,0.3]
Iris-setosa:[5.7,3.8,1.7,0.3]
Iris-setosa:[5.1,3.8,1.5,0.3]
Iris-setosa:[5.4,3.4,1.7,0.2]
Iris-setosa:[5.1,3.7,1.5,0.4]
Iris-setosa:[4.6,3.6,1.0,0.2]
Iris-setosa:[5.1,3.3,1.7,0.5]
Iris-setosa:[4.8,3.4,1.9,0.2]
Iris-setosa:[5.0,3.0,1.6,0.2]
Iris-setosa:[5.0,3.4,1.6,0.4]
Iris-setosa:[5.2,3.5,1.5,0.2]
Iris-setosa:[5.2,3.4,1.4,0.2]
Iris-setosa:[4.7,3.2,1.6,0.2]
Iris-setosa:[4.8,3.1,1.6,0.2]
Iris-setosa:[5.4,3.4,1.5,0.4]
Iris-setosa:[5.2,4.1,1.5,0.1]
Iris-setos

In [10]:
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(df)

In [11]:
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
trainingData,testData = data.randomSplit([0.7,0.3])

In [12]:
from pyspark.ml.classification import DecisionTreeClassificationModel,DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [13]:
dtClassifier = DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
pipelinedClassifier = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])

In [14]:
modelClassifier = pipelinedClassifier.fit(trainingData)

In [15]:
predictionsClassifier = modelClassifier.transform(testData)

In [16]:
predictionsClassifier.select("predictedLabel", "label", "features").show(20)

+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.4,3.2,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[4.6,3.4,1.4,0.3]|
|    Iris-setosa|    Iris-setosa|[4.6,3.6,1.0,0.2]|
|    Iris-setosa|    Iris-setosa|[4.7,3.2,1.6,0.2]|
|    Iris-setosa|    Iris-setosa|[4.8,3.0,1.4,0.1]|
|    Iris-setosa|    Iris-setosa|[4.8,3.4,1.6,0.2]|
|    Iris-setosa|    Iris-setosa|[4.8,3.4,1.9,0.2]|
|Iris-versicolor|Iris-versicolor|[5.0,2.0,3.5,1.0]|
|    Iris-setosa|    Iris-setosa|[5.0,3.6,1.4,0.2]|
|    Iris-setosa|    Iris-setosa|[5.1,3.7,1.5,0.4]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.9,0.4]|
|    Iris-setosa|    Iris-setosa|[5.2,3.5,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.2,4.1,1.5,0.1]|
|    Iris-setosa|    Iris-setosa|[5.3,3.7,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.4,3.4,1.7,0.2]|
|Iris-versicolor|Iris-versicolor|[5.5,2.3,4.0,1.3]|
|Iris-versic

In [17]:
evaluatorClassifier = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")

In [18]:
accuracy = evaluatorClassifier.evaluate(predictionsClassifier)

In [19]:
print("Test Error = " + str(1.0 - accuracy))

Test Error = 0.0888888888888889


In [20]:
treeModelClassifier = modelClassifier.stages[2]

In [21]:
print("Learned classification tree model:\n" + str(treeModelClassifier.toDebugString))

Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_329e7119b04d) of depth 5 with 13 nodes
  If (feature 2 <= 2.35)
   Predict: 0.0
  Else (feature 2 > 2.35)
   If (feature 2 <= 4.75)
    If (feature 3 <= 1.55)
     Predict: 1.0
    Else (feature 3 > 1.55)
     Predict: 2.0
   Else (feature 2 > 4.75)
    If (feature 3 <= 1.75)
     If (feature 1 <= 2.6500000000000004)
      Predict: 2.0
     Else (feature 1 > 2.6500000000000004)
      If (feature 2 <= 5.05)
       Predict: 1.0
      Else (feature 2 > 5.05)
       Predict: 2.0
    Else (feature 3 > 1.75)
     Predict: 2.0



In [22]:
from pyspark.ml.regression import DecisionTreeRegressionModel,DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [23]:
dtRegressor = DecisionTreeRegressor().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")

In [24]:
pipelineRegressor = Pipeline().setStages([labelIndexer, featureIndexer, dtRegressor, labelConverter])

In [25]:
modelRegressor = pipelineRegressor.fit(trainingData)
predictionsRegressor = modelRegressor.transform(testData)
predictionsRegressor.select("predictedLabel", "label", "features").show(20)

+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.4,3.2,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[4.6,3.4,1.4,0.3]|
|    Iris-setosa|    Iris-setosa|[4.6,3.6,1.0,0.2]|
|    Iris-setosa|    Iris-setosa|[4.7,3.2,1.6,0.2]|
|    Iris-setosa|    Iris-setosa|[4.8,3.0,1.4,0.1]|
|    Iris-setosa|    Iris-setosa|[4.8,3.4,1.6,0.2]|
|    Iris-setosa|    Iris-setosa|[4.8,3.4,1.9,0.2]|
|Iris-versicolor|Iris-versicolor|[5.0,2.0,3.5,1.0]|
|    Iris-setosa|    Iris-setosa|[5.0,3.6,1.4,0.2]|
|    Iris-setosa|    Iris-setosa|[5.1,3.7,1.5,0.4]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.9,0.4]|
|    Iris-setosa|    Iris-setosa|[5.2,3.5,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.2,4.1,1.5,0.1]|
|    Iris-setosa|    Iris-setosa|[5.3,3.7,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.4,3.4,1.7,0.2]|
|Iris-versicolor|Iris-versicolor|[5.5,2.3,4.0,1.3]|
|Iris-versic

In [26]:
evaluatorRegressor = RegressionEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("rmse")
rmse = evaluatorRegressor.evaluate(predictionsRegressor)
print("Root Mean Squared Error (RMSE) on test data = " +str(rmse))

Root Mean Squared Error (RMSE) on test data = 0.29814239699997197


In [27]:
treeModelRegressor = modelRegressor.stages[2]
print("Learned regression tree model:\n" + str(treeModelRegressor.toDebugString))

Learned regression tree model:
DecisionTreeRegressionModel (uid=DecisionTreeRegressor_a96af1de0628) of depth 5 with 13 nodes
  If (feature 2 <= 2.35)
   Predict: 0.0
  Else (feature 2 > 2.35)
   If (feature 2 <= 4.75)
    If (feature 3 <= 1.55)
     Predict: 1.0
    Else (feature 3 > 1.55)
     Predict: 2.0
   Else (feature 2 > 4.75)
    If (feature 3 <= 1.75)
     If (feature 1 <= 2.6500000000000004)
      Predict: 2.0
     Else (feature 1 > 2.6500000000000004)
      If (feature 2 <= 5.05)
       Predict: 1.0
      Else (feature 2 > 5.05)
       Predict: 1.6666666666666667
    Else (feature 3 > 1.75)
     Predict: 2.0

