In [6]:
!pip install pyspark



In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#import numpy
# Load training data
from pyspark.ml.linalg import SparseVector
# from pyspark.python.pyspark.shell import spark
from pyspark.sql import SparkSession
import os

In [8]:
os.environ["HADOOP_HOME"] = "C:/winutils"

In [9]:
# Creating spark session
spark = SparkSession.builder.appName("ICP7").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [10]:
data = spark.read.load("/content/drive/MyDrive/adult.csv", format="csv", header=True, delimiter=",")
data.show(5)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|fnlwgt| education|education-num|     marital-status|        occupation|  relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|           0|           0|            13| United-States| <=50K|
| 38|          Private|215646|   HS-grad|            9|     

In [11]:
data = data.select("age","capital-gain","education-num","hours-per-week","income")
data.show()


+---+------------+-------------+--------------+------+
|age|capital-gain|education-num|hours-per-week|income|
+---+------------+-------------+--------------+------+
| 39|        2174|           13|            40| <=50K|
| 50|           0|           13|            13| <=50K|
| 38|           0|            9|            40| <=50K|
| 53|           0|            7|            40| <=50K|
| 28|           0|           13|            40| <=50K|
| 37|           0|           14|            40| <=50K|
| 49|           0|            5|            16| <=50K|
| 52|           0|            9|            45|  >50K|
| 31|       14084|           14|            50|  >50K|
| 42|        5178|           13|            40|  >50K|
| 37|           0|           10|            80|  >50K|
| 30|           0|           13|            40|  >50K|
| 23|           0|           13|            30| <=50K|
| 32|           0|           12|            50| <=50K|
| 40|           0|           11|            40|  >50K|
| 34|     

In [12]:
print("The number of rows: ")
data.count()

The number of rows: 


32561

In [13]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="income", outputCol="label") 
data_converted = indexer.fit(data).transform(data)

In [14]:
data_converted.show()

+---+------------+-------------+--------------+------+-----+
|age|capital-gain|education-num|hours-per-week|income|label|
+---+------------+-------------+--------------+------+-----+
| 39|        2174|           13|            40| <=50K|  0.0|
| 50|           0|           13|            13| <=50K|  0.0|
| 38|           0|            9|            40| <=50K|  0.0|
| 53|           0|            7|            40| <=50K|  0.0|
| 28|           0|           13|            40| <=50K|  0.0|
| 37|           0|           14|            40| <=50K|  0.0|
| 49|           0|            5|            16| <=50K|  0.0|
| 52|           0|            9|            45|  >50K|  1.0|
| 31|       14084|           14|            50|  >50K|  1.0|
| 42|        5178|           13|            40|  >50K|  1.0|
| 37|           0|           10|            80|  >50K|  1.0|
| 30|           0|           13|            40|  >50K|  1.0|
| 23|           0|           13|            30| <=50K|  0.0|
| 32|           0|      

In [15]:
from pyspark.sql.types import *

# Change column type
data_converted = data_converted.withColumn("age", data_converted["age"].cast(IntegerType()))
data_converted = data_converted.withColumn("capital-gain", data_converted["capital-gain"].cast(IntegerType()))
data_converted = data_converted.withColumn("education-num", data_converted["education-num"].cast(IntegerType()))
data_converted = data_converted.withColumn("hours-per-week", data_converted["hours-per-week"].cast(IntegerType()))


In [17]:

# Create vector assembler for feature columns
VAssembler = VectorAssembler(inputCols=data_converted.columns[:4], outputCol="features")
data_converted = VAssembler.transform(data_converted)

In [18]:
data_converted.show(10)

+---+------------+-------------+--------------+------+-----+--------------------+
|age|capital-gain|education-num|hours-per-week|income|label|            features|
+---+------------+-------------+--------------+------+-----+--------------------+
| 39|        2174|           13|            40| <=50K|  0.0|[39.0,2174.0,13.0...|
| 50|           0|           13|            13| <=50K|  0.0|[50.0,0.0,13.0,13.0]|
| 38|           0|            9|            40| <=50K|  0.0| [38.0,0.0,9.0,40.0]|
| 53|           0|            7|            40| <=50K|  0.0| [53.0,0.0,7.0,40.0]|
| 28|           0|           13|            40| <=50K|  0.0|[28.0,0.0,13.0,40.0]|
| 37|           0|           14|            40| <=50K|  0.0|[37.0,0.0,14.0,40.0]|
| 49|           0|            5|            16| <=50K|  0.0| [49.0,0.0,5.0,16.0]|
| 52|           0|            9|            45|  >50K|  1.0| [52.0,0.0,9.0,45.0]|
| 31|       14084|           14|            50|  >50K|  1.0|[31.0,14084.0,14....|
| 42|        517

In [19]:
# Split the dataset into training and testing data sets
x = data_converted.select("label", "features")
# Split the dataset into training and testing data sets
X_train, x_test = x.randomSplit([0.7, 0.3])

# Naive Bayes

In [20]:

from pyspark.ml.classification import NaiveBayes
nb1 = NaiveBayes()

# train the model
model1 = nb1.fit(X_train)

In [21]:
# select example rows to display.
predictions = model1.transform(x_test)
predictions.show(5)

+-----+-------------------+--------------------+--------------------+----------+
|label|           features|       rawPrediction|         probability|prediction|
+-----+-------------------+--------------------+--------------------+----------+
|  0.0|[17.0,0.0,4.0,45.0]|[-125.35616342715...|[1.0,9.9608115215...|       0.0|
|  0.0|[17.0,0.0,5.0,15.0]|[-74.640681407274...|[1.0,1.5075237380...|       0.0|
|  0.0|[17.0,0.0,5.0,25.0]|[-92.611207198269...|[1.0,2.4944842373...|       0.0|
|  0.0|[17.0,0.0,5.0,35.0]|[-110.58173298926...|[1.0,4.1275977641...|       0.0|
|  0.0|[17.0,0.0,5.0,40.0]|[-119.56699588476...|[1.0,5.3095250096...|       0.0|
+-----+-------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.7765089722675367


# Decision Tree

In [23]:
from pyspark.ml.classification import DecisionTreeClassifier
DecisionTreeModel = DecisionTreeClassifier()
model2 = DecisionTreeModel.fit(x_test)

In [24]:
# Generate prediction from test dataset
dt = model2.transform(x_test)

In [25]:
# Evuluate the accuracy of the model
evaluator = MulticlassClassificationEvaluator()
accuracy = evaluator.evaluate(dt)

# Show model accuracy
print("Accuracy:", accuracy)

Accuracy: 0.8051962592361115


# Random Forest


In [26]:
from pyspark.ml.classification import RandomForestClassifier

# Using the training set for the model traning
RandomForestModel = RandomForestClassifier()
model3 = RandomForestModel.fit(x_test)

In [28]:
# Generate prediction from test dataset
RF = model3.transform(x_test)

In [29]:
# Evuluate the accuracy of the model
evaluator = MulticlassClassificationEvaluator()
accuracy = evaluator.evaluate(RF)

# Show model accuracy
print("Accuracy:", accuracy)

Accuracy: 0.8069629875824761
