In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import os

In [2]:
os.environ["SPARK_HOME"] = "C:/spark-2.4.4-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "C:/winutils"

In [3]:
# Creating spark session
spark = SparkSession.builder.appName("ICP7").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [4]:
# Loading the dataset
ICP = spark.read.load("D:/Datasets/Classification/adult.csv", format="csv", header=True, delimiter=",")

In [5]:
import pandas as pd
pd.DataFrame(ICP.take(5), columns=ICP.columns).transpose()

Unnamed: 0,0,1,2,3,4
age,39,50,38,53,28
workclass,State-gov,Self-emp-not-inc,Private,Private,Private
fnlwgt,77516,83311,215646,234721,338409
education,Bachelors,Bachelors,HS-grad,11th,Bachelors
education-num,13,13,9,7,13
marital-status,Never-married,Married-civ-spouse,Divorced,Married-civ-spouse,Married-civ-spouse
occupation,Adm-clerical,Exec-managerial,Handlers-cleaners,Handlers-cleaners,Prof-specialty
relationship,Not-in-family,Husband,Not-in-family,Husband,Wife
race,White,White,White,Black,Black
sex,Male,Male,Male,Male,Female


In [6]:
ICP = ICP.select("age","fnlwgt","capital-gain","capital-loss","education-num","hours-per-week","income")
ICP.show()

+---+------+------------+------------+-------------+--------------+------+
|age|fnlwgt|capital-gain|capital-loss|education-num|hours-per-week|income|
+---+------+------------+------------+-------------+--------------+------+
| 39| 77516|        2174|           0|           13|            40| <=50K|
| 50| 83311|           0|           0|           13|            13| <=50K|
| 38|215646|           0|           0|            9|            40| <=50K|
| 53|234721|           0|           0|            7|            40| <=50K|
| 28|338409|           0|           0|           13|            40| <=50K|
| 37|284582|           0|           0|           14|            40| <=50K|
| 49|160187|           0|           0|            5|            16| <=50K|
| 52|209642|           0|           0|            9|            45|  >50K|
| 31| 45781|       14084|           0|           14|            50|  >50K|
| 42|159449|        5178|           0|           13|            40|  >50K|
| 37|280464|           0|

In [7]:
print("The number of rows: ")
ICP.count()

The number of rows: 


32561

In [8]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="income", outputCol="label") 
ICP7 = indexer.fit(ICP).transform(ICP) 

In [9]:
ICP7.show()

+---+------+------------+------------+-------------+--------------+------+-----+
|age|fnlwgt|capital-gain|capital-loss|education-num|hours-per-week|income|label|
+---+------+------------+------------+-------------+--------------+------+-----+
| 39| 77516|        2174|           0|           13|            40| <=50K|  0.0|
| 50| 83311|           0|           0|           13|            13| <=50K|  0.0|
| 38|215646|           0|           0|            9|            40| <=50K|  0.0|
| 53|234721|           0|           0|            7|            40| <=50K|  0.0|
| 28|338409|           0|           0|           13|            40| <=50K|  0.0|
| 37|284582|           0|           0|           14|            40| <=50K|  0.0|
| 49|160187|           0|           0|            5|            16| <=50K|  0.0|
| 52|209642|           0|           0|            9|            45|  >50K|  1.0|
| 31| 45781|       14084|           0|           14|            50|  >50K|  1.0|
| 42|159449|        5178|   

In [10]:
from pyspark.sql.types import *

# Change column type
ICP7 = ICP7.withColumn("age", ICP7["age"].cast(IntegerType()))
ICP7 = ICP7.withColumn("fnlwgt", ICP7["fnlwgt"].cast(IntegerType()))
ICP7 = ICP7.withColumn("capital-gain", ICP7["capital-gain"].cast(IntegerType()))
ICP7 = ICP7.withColumn("capital-loss", ICP7["capital-loss"].cast(IntegerType()))
ICP7 = ICP7.withColumn("education-num", ICP7["education-num"].cast(IntegerType()))
ICP7 = ICP7.withColumn("hours-per-week", ICP7["hours-per-week"].cast(IntegerType()))

In [11]:
# Create vector assembler for feature columns
VAssembler = VectorAssembler(inputCols=ICP7.columns[:5], outputCol="features")
ICP7 = VAssembler.transform(ICP7)

In [12]:
ICP7.show()

+---+------+------------+------------+-------------+--------------+------+-----+--------------------+
|age|fnlwgt|capital-gain|capital-loss|education-num|hours-per-week|income|label|            features|
+---+------+------------+------------+-------------+--------------+------+-----+--------------------+
| 39| 77516|        2174|           0|           13|            40| <=50K|  0.0|[39.0,77516.0,217...|
| 50| 83311|           0|           0|           13|            13| <=50K|  0.0|[50.0,83311.0,0.0...|
| 38|215646|           0|           0|            9|            40| <=50K|  0.0|[38.0,215646.0,0....|
| 53|234721|           0|           0|            7|            40| <=50K|  0.0|[53.0,234721.0,0....|
| 28|338409|           0|           0|           13|            40| <=50K|  0.0|[28.0,338409.0,0....|
| 37|284582|           0|           0|           14|            40| <=50K|  0.0|[37.0,284582.0,0....|
| 49|160187|           0|           0|            5|            16| <=50K|  0.0|[4

In [13]:
# Split the crime dataset into training and testing data sets
ICPx = ICP7.select("label", "features")
# Split the crime dataset into training and testing data sets
X_train, x_test = ICPx.randomSplit([0.7, 0.3])

# Naïve Bayes

In [14]:
from pyspark.ml.classification import NaiveBayes
nb1 = NaiveBayes()

# train the model
model1 = nb1.fit(X_train)


In [15]:
# select example rows to display.
predictions = model1.transform(x_test)
predictions.show(5)


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|[17.0,25690.0,0.0...|[-237.88849492312...|[1.0,2.8645132617...|       0.0|
|  0.0|[17.0,27415.0,0.0...|[-249.98868674479...|[1.0,5.0306391904...|       0.0|
|  0.0|[17.0,33138.0,0.0...|[-267.20281998699...|[1.0,1.7633499941...|       0.0|
|  0.0|[17.0,34019.0,0.0...|[-248.54232636174...|[1.0,1.0012565829...|       0.0|
|  0.0|[17.0,34019.0,0.0...|[-258.43602770056...|[1.0,1.2018448469...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.779803359278984


# Decision Tree

In [17]:
from pyspark.ml.classification import DecisionTreeClassifier
DecisionTreeModel = DecisionTreeClassifier()
model = DecisionTreeModel.fit(x_test)

In [18]:
# Generate prediction from test dataset
ICP7DT = model.transform(x_test)

In [19]:
# Evuluate the accuracy of the model
evaluator = MulticlassClassificationEvaluator()
accuracy = evaluator.evaluate(ICP7DT)

# Show model accuracy
print("Accuracy:", accuracy)

Accuracy: 0.7786364317424104


# Random Forest

In [20]:
from pyspark.ml.classification import RandomForestClassifier

# Using the training set for the model traning
RandomForestModel = RandomForestClassifier()
model = RandomForestModel.fit(x_test)


In [21]:
# Generate prediction from test dataset
ICPRF = model.transform(x_test)

In [22]:
# Evuluate the accuracy of the model
evaluator = MulticlassClassificationEvaluator()
accuracy = evaluator.evaluate(ICPRF)

# Show model accuracy
print("Accuracy:", accuracy)

Accuracy: 0.7791593625700464
