Random Forest and Decision Tree Classifiers

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=9f04030f92b8ec8f598872062cb29cdf0106dafd1bdae586e46a29d7d257e329
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
#creating spark session

In [3]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-income').getOrCreate()
spark

In [None]:
#importing required libraries

In [4]:

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from google.colab import files
uploaded = files.upload()

Saving income.csv to income.csv


In [None]:
# loading income dataset and droping rows with missing values

In [5]:
df = spark.read.csv('income.csv', header=True, nullValue='?', ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, inferSchema=True)

In [6]:
df = df.na.drop()

In [7]:
df.printSchema()
df.show(40)

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- weight: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_years: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- citizenship: string (nullable = true)
 |-- income_class: string (nullable = true)

+---+----------------+------+------------+---------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+-------------+------------+
|age|       workclass|weight|   education|education_years|      marital_status|       occupation| relationship|              race|   sex|capital_gain|capita

In [8]:
print(df.dtypes)

[('age', 'int'), ('workclass', 'string'), ('weight', 'int'), ('education', 'string'), ('education_years', 'int'), ('marital_status', 'string'), ('occupation', 'string'), ('relationship', 'string'), ('race', 'string'), ('sex', 'string'), ('capital_gain', 'int'), ('capital_loss', 'int'), ('hours_per_week', 'int'), ('citizenship', 'string'), ('income_class', 'string')]


In [None]:
# converting categorical feature fields into indexes

In [9]:
categorical_columns = ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'citizenship']
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in categorical_columns]

pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)
df.show(10)

+---+----------------+------+---------+---------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+-------------+------------+---------------+---------------+--------------------+----------------+------------------+----------+---------+-----------------+
|age|       workclass|weight|education|education_years|      marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|  citizenship|income_class|workclass_index|education_index|marital_status_index|occupation_index|relationship_index|race_index|sex_index|citizenship_index|
+---+----------------+------+---------+---------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+-------------+------------+---------------+---------------+--------------------+----------------+------------------+----------+---------+-----------------+
| 39|       State-gov| 77516|Bachelors|       

In [None]:
# setting all numerical fields into one feature column

In [10]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']

In [11]:
numeric_columns = ['age', 'workclass_index', 'weight', 'education_index', 'education_years', 'marital_status_index', 'occupation_index', 'relationship_index', 'race_index', 'sex_index', 'capital_gain', 'capital_loss', 'hours_per_week', 'citizenship_index']
assembler = VectorAssembler(inputCols = numeric_columns, outputCol = "features")
df = assembler.transform(df)
df.show(10)

+---+----------------+------+---------+---------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+-------------+------------+---------------+---------------+--------------------+----------------+------------------+----------+---------+-----------------+--------------------+
|age|       workclass|weight|education|education_years|      marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|  citizenship|income_class|workclass_index|education_index|marital_status_index|occupation_index|relationship_index|race_index|sex_index|citizenship_index|            features|
+---+----------------+------+---------+---------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+-------------+------------+---------------+---------------+--------------------+----------------+------------------+----------+---------+-----------------+-----

In [None]:
# converting target column into indexes

In [12]:
labels = StringIndexer(inputCol='income_class', outputCol ='income_class_index')
df = labels.fit(df).transform(df)
df.show(10)

+---+----------------+------+---------+---------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+-------------+------------+---------------+---------------+--------------------+----------------+------------------+----------+---------+-----------------+--------------------+------------------+
|age|       workclass|weight|education|education_years|      marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|  citizenship|income_class|workclass_index|education_index|marital_status_index|occupation_index|relationship_index|race_index|sex_index|citizenship_index|            features|income_class_index|
+---+----------------+------+---------+---------------+--------------------+-----------------+-------------+-----+------+------------+------------+--------------+-------------+------------+---------------+---------------+--------------------+----------------+------------------+------

In [13]:
print(pd.DataFrame(df.take(100), columns=df.columns))

    age         workclass  weight     education  education_years  \
0    39         State-gov   77516     Bachelors               13   
1    50  Self-emp-not-inc   83311     Bachelors               13   
2    38           Private  215646       HS-grad                9   
3    53           Private  234721          11th                7   
4    28           Private  338409     Bachelors               13   
..  ...               ...     ...           ...              ...   
95   20           Private  188300  Some-college               10   
96   29           Private  103432       HS-grad                9   
97   32      Self-emp-inc  317660       HS-grad                9   
98   30           Private  194901          11th                7   
99   31         Local-gov  189265       HS-grad                9   

        marital_status         occupation   relationship   race     sex  ...  \
0        Never-married       Adm-clerical  Not-in-family  White    Male  ...   
1   Married-civ-spouse 

In [None]:
# splitting dataset into training and testing set

In [14]:
train, test = df.randomSplit([0.7, 0.3])
print('Train Size:' +str(train.count()))
print('Test Size:' +str(test.count()))

Train Size:20938
Test Size:9224


In [15]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
# define Random Forest Classifier, fit model on train set and predict

In [16]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'income_class_index', maxBins=41)
rfModel = rf.fit(train)

In [17]:
predictions = rfModel.transform(test)
predictions.select('age', 'workclass_index', 'weight', 'education_index', 'education_years', 'marital_status_index', 'occupation_index', 'relationship_index', 'race_index', 'sex_index', 'capital_gain', 'capital_loss', 'hours_per_week', 'citizenship_index', 'income_class_index', 'prediction', 'rawPrediction', 'probability').show(25)

+---+---------------+------+---------------+---------------+--------------------+----------------+------------------+----------+---------+------------+------------+--------------+-----------------+------------------+----------+--------------------+--------------------+
|age|workclass_index|weight|education_index|education_years|marital_status_index|occupation_index|relationship_index|race_index|sex_index|capital_gain|capital_loss|hours_per_week|citizenship_index|income_class_index|prediction|       rawPrediction|         probability|
+---+---------------+------+---------------+---------------+--------------------+----------------+------------------+----------+---------+------------+------------+--------------+-----------------+------------------+----------+--------------------+--------------------+
| 17|            2.0|148194|            5.0|              7|                 1.0|             3.0|               2.0|       0.0|      1.0|           0|           0|            12|           

In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
# define Decision Tree Classifier, fit model on train set and predict

In [22]:
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'income_class_index', maxBins=41)
dtModel = dt.fit(train)

In [23]:
dtPredictions = dtModel.transform(test)
dtPredictions.select('age', 'workclass_index', 'weight', 'education_index', 'education_years', 'marital_status_index', 'occupation_index', 'relationship_index', 'race_index', 'sex_index', 'capital_gain', 'capital_loss', 'hours_per_week', 'citizenship_index', 'income_class_index', 'Prediction', 'rawPrediction', 'Probability').show(25)

+---+---------------+------+---------------+---------------+--------------------+----------------+------------------+----------+---------+------------+------------+--------------+-----------------+------------------+----------+---------------+--------------------+
|age|workclass_index|weight|education_index|education_years|marital_status_index|occupation_index|relationship_index|race_index|sex_index|capital_gain|capital_loss|hours_per_week|citizenship_index|income_class_index|Prediction|  rawPrediction|         Probability|
+---+---------------+------+---------------+---------------+--------------------+----------------+------------------+----------+---------+------------+------------+--------------+-----------------+------------------+----------+---------------+--------------------+
| 17|            2.0|148194|            5.0|              7|                 1.0|             3.0|               2.0|       0.0|      1.0|           0|           0|            12|              0.0|        

In [None]:
# define evaluator and calculate accuracies for RT and DT Classifier

In [19]:
evaluator = MulticlassClassificationEvaluator(labelCol = 'income_class_index', predictionCol = 'prediction')

In [20]:
accuracy_rt = evaluator.evaluate(predictions)

In [24]:
accuracy_dt = evaluator.evaluate(dtPredictions)

In [25]:
print('Random Forest Accuracy: %s' % (accuracy_rt))
print('Random Forest Error: %s' % (1.0 - accuracy_rt))

print('Decision Tree Accuracy: %s' % (accuracy_dt))
print('Decision Tree Error: %s' % (1.0 - accuracy_dt))

Random Forest Accuracy: 0.8181978017736984
Random Forest Error: 0.18180219822630161
Decision Tree Accuracy: 0.8152679967638465
Decision Tree Error: 0.18473200323615346


In [26]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

In [None]:
# define confusion matrix

In [27]:
preds = predictions.select(['prediction', 'income_class_index']).withColumn('income_class_index', F.col('income_class_index').cast(FloatType()))
preds = preds.select(['prediction', 'income_class_index'])
metrics = MulticlassMetrics(preds.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[6581.  237.]
 [1283. 1123.]]
