In [24]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_methods').getOrCreate()

from pyspark.ml import Pipeline
from pyspark.ml.classification import (RandomForestClassifier, DecisionTreeClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [25]:
data = spark.read.load('Korea Income and Welfare Output.csv', format="csv", header=True, inferSchema=True)

#data.show()

data.printSchema()

root
 |-- year: integer (nullable = true)
 |-- region: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- year_born: integer (nullable = true)
 |-- education_tier: integer (nullable = true)
 |-- marriage_tier: integer (nullable = true)
 |-- income_tier: integer (nullable = true)
 |-- occupation_code: integer (nullable = true)



In [26]:
data.columns

['year',
 'region',
 'gender',
 'year_born',
 'education_tier',
 'marriage_tier',
 'income_tier',
 'occupation_code']

In [27]:
assembler = VectorAssembler(
  inputCols=['year',
             'region',
             'gender',
             'year_born',
             'education_tier',
             'marriage_tier',
             'occupation_code'],
              outputCol="features")

In [28]:
output = assembler.transform(data)

In [30]:
#from pyspark.ml.feature import StringIndexer

#indexer = StringIndexer(inputCol="income_tier", outputCol="income_tier_Index")
#output_fixed = indexer.fit(output).transform(output)

final_data = output.select("features",'income_tier')

final_data.show()

+--------------------+-----------+
|            features|income_tier|
+--------------------+-----------+
|[2005.0,1.0,1.0,1...|          2|
|[2006.0,1.0,1.0,1...|          2|
|[2007.0,1.0,1.0,1...|          2|
|[2008.0,1.0,1.0,1...|          2|
|[2009.0,1.0,1.0,1...|          2|
|[2010.0,1.0,1.0,1...|          2|
|[2011.0,1.0,1.0,1...|          2|
|[2012.0,1.0,1.0,1...|          2|
|[2013.0,1.0,1.0,1...|          2|
|[2014.0,1.0,1.0,1...|          2|
|[2007.0,1.0,1.0,1...|          2|
|[2008.0,1.0,1.0,1...|          2|
|[2009.0,1.0,1.0,1...|          2|
|[2012.0,1.0,1.0,1...|          2|
|[2014.0,1.0,1.0,1...|          2|
|[2015.0,1.0,1.0,1...|          2|
|[2016.0,1.0,1.0,1...|          2|
|[2017.0,1.0,1.0,1...|          2|
|[2005.0,1.0,1.0,1...|          2|
|[2006.0,1.0,1.0,1...|          2|
+--------------------+-----------+
only showing top 20 rows



In [33]:
(train_data,test_data) = final_data.randomSplit([0.8,0.2])

print(train_data.count())
print(test_data.count())



42805
10764


In [34]:
dtc = DecisionTreeClassifier(labelCol='income_tier',featuresCol='features')
rfc = RandomForestClassifier(labelCol='income_tier',featuresCol='features',numTrees=100)

In [35]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)

                                                                                

In [37]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)

rfc_predictions.show()

+--------------------+-----------+--------------------+--------------------+----------+
|            features|income_tier|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|[2005.0,1.0,1.0,1...|          2|[0.0,73.079330131...|[0.0,0.7307933013...|       1.0|
|[2005.0,1.0,1.0,1...|          1|[0.0,33.249209155...|[0.0,0.3324920915...|       2.0|
|[2005.0,1.0,1.0,1...|          1|[0.0,33.249209155...|[0.0,0.3324920915...|       2.0|
|[2005.0,1.0,1.0,1...|          1|[0.0,33.249209155...|[0.0,0.3324920915...|       2.0|
|[2005.0,1.0,1.0,1...|          1|[0.0,33.249209155...|[0.0,0.3324920915...|       2.0|
|[2005.0,1.0,1.0,1...|          1|[0.0,33.249209155...|[0.0,0.3324920915...|       2.0|
|[2005.0,1.0,1.0,1...|          1|[0.0,33.249209155...|[0.0,0.3324920915...|       2.0|
|[2005.0,1.0,1.0,1...|          1|[0.0,33.249209155...|[0.0,0.3324920915...|       2.0|
|[2005.0,1.0,1.0,1...|          

[Stage 124:>                                                        (0 + 1) / 1]                                                                                

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [39]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="income_tier", predictionCol="prediction", metricName="accuracy")

dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)

                                                                                

In [40]:
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 53.63%
----------------------------------------
A random forest ensemble has an accuracy of: 54.33%
