In [17]:
#To import all required modules
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import os
import pandas as pd
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import *
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier


In [2]:
spark = SparkSession.builder.appName("Classifcation").getOrCreate()

In [4]:
# Loading the dataset
adult_dataFrame = spark.read.load("dataset/adult.csv", format="csv", header=True, inferSchema=True, delimiter=",")
# Check the type of adult_dataFrame
print("The type of people_df is", type(adult_dataFrame))
#To show the first five rows
pd.DataFrame(adult_dataFrame.take(5), columns=adult_dataFrame.columns).transpose()

The type of people_df is <class 'pyspark.sql.dataframe.DataFrame'>


Unnamed: 0,0,1,2,3,4
age,39,50,38,53,28
workclass,State-gov,Self-emp-not-inc,Private,Private,Private
fnlwgt,77516,83311,215646,234721,338409
education,Bachelors,Bachelors,HS-grad,11th,Bachelors
education-num,13,13,9,7,13
marital-status,Never-married,Married-civ-spouse,Divorced,Married-civ-spouse,Married-civ-spouse
occupation,Adm-clerical,Exec-managerial,Handlers-cleaners,Handlers-cleaners,Prof-specialty
relationship,Not-in-family,Husband,Not-in-family,Husband,Wife
race,White,White,White,Black,Black
sex,Male,Male,Male,Male,Female


In [8]:
#To get numeric features only
adult_numeric_dataFrame = adult_dataFrame.select("age","fnlwgt", "education-num", "capital-gain","capital-loss","hours-per-week","income")
adult_numeric_dataFrame.show(5)

+---+------+-------------+------------+------------+--------------+------+
|age|fnlwgt|education-num|capital-gain|capital-loss|hours-per-week|income|
+---+------+-------------+------------+------------+--------------+------+
| 39| 77516|           13|        2174|           0|            40| <=50K|
| 50| 83311|           13|           0|           0|            13| <=50K|
| 38|215646|            9|           0|           0|            40| <=50K|
| 53|234721|            7|           0|           0|            40| <=50K|
| 28|338409|           13|           0|           0|            40| <=50K|
+---+------+-------------+------------+------------+--------------+------+
only showing top 5 rows



In [9]:
#to create a new column called label from the income attribute
indexer = StringIndexer(inputCol="income", outputCol="label") 
adult_numeric_dataFrame = indexer.fit(adult_numeric_dataFrame).transform(adult_numeric_dataFrame)
adult_numeric_dataFrame.show(10)

+---+------+-------------+------------+------------+--------------+------+-----+
|age|fnlwgt|education-num|capital-gain|capital-loss|hours-per-week|income|label|
+---+------+-------------+------------+------------+--------------+------+-----+
| 39| 77516|           13|        2174|           0|            40| <=50K|  0.0|
| 50| 83311|           13|           0|           0|            13| <=50K|  0.0|
| 38|215646|            9|           0|           0|            40| <=50K|  0.0|
| 53|234721|            7|           0|           0|            40| <=50K|  0.0|
| 28|338409|           13|           0|           0|            40| <=50K|  0.0|
| 37|284582|           14|           0|           0|            40| <=50K|  0.0|
| 49|160187|            5|           0|           0|            16| <=50K|  0.0|
| 52|209642|            9|           0|           0|            45|  >50K|  1.0|
| 31| 45781|           14|       14084|           0|            50|  >50K|  1.0|
| 42|159449|           13|  

In [23]:
#To Change numeric features data type into integer
adult_numeric_dataFrame = adult_numeric_dataFrame.withColumn("age", adult_numeric_dataFrame["age"].cast(IntegerType()))

adult_numeric_dataFrame = adult_numeric_dataFrame.withColumn("fnlwgt", adult_numeric_dataFrame["fnlwgt"].cast(IntegerType()))

adult_numeric_dataFrame = adult_numeric_dataFrame.withColumn("capital-gain", adult_numeric_dataFrame["capital-gain"].cast(IntegerType()))

adult_numeric_dataFrame = adult_numeric_dataFrame.withColumn("capital-loss", adult_numeric_dataFrame["capital-loss"].cast(IntegerType()))

adult_numeric_dataFrame = adult_numeric_dataFrame.withColumn("education-num", adult_numeric_dataFrame["education-num"].cast(IntegerType()))

adult_numeric_dataFrame = adult_numeric_dataFrame.withColumn("hours-per-week", adult_numeric_dataFrame["hours-per-week"].cast(IntegerType()))


#T generate the vector assembler for numeric features, I choose the first five columns
adult_dataFrame_vector = VectorAssembler(inputCols=adult_numeric_dataFrame.columns[:5], outputCol="features")
adult_numeric_dataFrame = adult_dataFrame_vector.transform(adult_numeric_dataFrame)
adult_numeric_dataFrame.show(10)

+---+------+-------------+------------+------------+--------------+------+-----+--------------------+--------------------+
|age|fnlwgt|education-num|capital-gain|capital-loss|hours-per-week|income|label|     features-vector|            features|
+---+------+-------------+------------+------------+--------------+------+-----+--------------------+--------------------+
| 39| 77516|           13|        2174|           0|            40| <=50K|  0.0|[39.0,77516.0,13....|[39.0,77516.0,13....|
| 50| 83311|           13|           0|           0|            13| <=50K|  0.0|[50.0,83311.0,13....|[50.0,83311.0,13....|
| 38|215646|            9|           0|           0|            40| <=50K|  0.0|[38.0,215646.0,9....|[38.0,215646.0,9....|
| 53|234721|            7|           0|           0|            40| <=50K|  0.0|[53.0,234721.0,7....|[53.0,234721.0,7....|
| 28|338409|           13|           0|           0|            40| <=50K|  0.0|[28.0,338409.0,13...|[28.0,338409.0,13...|
| 37|284582|    

In [24]:
# To get the label and features vector
model_data = adult_numeric_dataFrame.select("label", "features")
#To split the model dataset into training and testing datasets, 80% for taining and 20% for testing
training, test = model_data.randomSplit([0.8, 0.2])
training.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[17.0,19752.0,7.0...|
|  0.0|[17.0,24090.0,9.0...|
|  0.0|[17.0,25051.0,6.0...|
|  0.0|[17.0,25690.0,6.0...|
|  0.0|[17.0,27032.0,6.0...|
+-----+--------------------+
only showing top 5 rows



In [25]:
#To train the model on the training dataset
#Naïve Bayes model
model = NaiveBayes()
model = model.fit(training)

In [26]:
#To perform testing on the test dataset
predictions = model.transform(test)
predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|[17.0,32607.0,6.0...|[-246.42553839792...|[1.0,1.8225571908...|       0.0|
|  0.0|[17.0,38611.0,7.0...|[-263.93867673266...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,47425.0,7.0...|[-275.12051390357...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,51939.0,7.0...|[-280.84717750573...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,52486.0,7.0...|[-281.54112632948...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,56986.0,8.0...|[-297.14622256018...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,65368.0,7.0...|[-297.88381142542...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,73145.0,5.0...|[-287.95767639724...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,80077.0,7.0...|[-316.54431096557...|           [1.0,0.0]|       0.0|
|  0.0|[17.0,820

In [27]:
# To evaluate model and compute accuracy on the test set
evaluate_model = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluate_model.evaluate(predictions)
print("Model Accuracy = " + str(accuracy))

Model Accuracy = 0.7861857252494244


In [28]:
#To train Decision Tree model:
DT_Model = DecisionTreeClassifier()
DT_Model = DT_Model.fit(training)

In [29]:
#To perform testing on the test dataset using Decision Tree Model
DT_Model_predictions = DT_Model.transform(test)
DT_Model_predictions.show()

+-----+--------------------+--------------+--------------------+----------+
|label|            features| rawPrediction|         probability|prediction|
+-----+--------------------+--------------+--------------------+----------+
|  0.0|[17.0,32607.0,6.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,38611.0,7.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,47425.0,7.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,51939.0,7.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,52486.0,7.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,56986.0,8.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,65368.0,7.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,73145.0,5.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,80077.0,7.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0,82041.0,7.0...|[7964.0,423.0]|[0.9495648026708,...|       0.0|
|  0.0|[17.0

In [30]:
# To evaluate model and compute accuracy on the test set
evaluate_DT_Model = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluate_DT_Model.evaluate(DT_Model_predictions)
print("Model Accuracy = " + str(accuracy))

Model Accuracy = 0.8237912509593246


In [31]:
#To train Random Forest model:

RF_Model = RandomForestClassifier()
RF_Model = RF_Model.fit(training)


In [32]:
#To perform testing on the test dataset using Random Forest Model
RF_Model_predictions = RF_Model.transform(test)
RF_Model_predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|[17.0,32607.0,6.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,38611.0,7.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,47425.0,7.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,51939.0,7.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,52486.0,7.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,56986.0,8.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,65368.0,7.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,73145.0,5.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,80077.0,7.0...|[18.6949949765006...|[0.93474974882503...|       0.0|
|  0.0|[17.0,820

In [33]:
# To evaluate model and compute accuracy on the test set
evaluate_RF_Model = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluate_RF_Model.evaluate(RF_Model_predictions)
print("Model Accuracy = " + str(accuracy))

Model Accuracy = 0.8234842670759785
