In [1]:
# Importing the libraries

import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time

import warnings
warnings.filterwarnings('ignore')

In [2]:
import pyspark 
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [3]:
# Create spark session

spark = SparkSession \
    .builder \
    .appName("Classification") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

22/04/12 02:17:43 WARN Utils: Your hostname, Tanishqs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.29.164 instead (on interface en0)
22/04/12 02:17:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/12 02:17:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/04/12 02:17:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Importing the data

In [4]:
df = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "False") \
  .option("sep", ",") \
  .load("census.csv") \
  .toDF("age", "workClass", "fnlwgt", "education", "education-num","marital-status", "occupation", "relationship",
        "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "income")

display(df)

                                                                                

DataFrame[age: int, workClass: string, fnlwgt: int, education: string, education-num: int, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, income: string]

In [5]:
df.show(2)

+---+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+--------------+--------------+------+
|age|       workClass|fnlwgt|education|education-num|    marital-status|     occupation| relationship| race| sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|            13| United-States| <=50K|
+---+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+--

In [6]:
# Data specs

print('Data is having', df.count(), "rows and", len(df.columns), 'columns' )

Data is having 48842 rows and 15 columns


# Data Preprocessing

In [7]:
from pyspark.sql import functions as F

# Create add new column to the dataset
df = df.withColumn('>50K', F.when(df.income == '<=50K', 0).otherwise(1))

# Drop the Income label
df = df.drop('income')

In [8]:
df.columns

['age',
 'workClass',
 'fnlwgt',
 'education',
 'education-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 '>50K']

In [9]:
# Selecting categorical features

categorical_columns = [
 'workClass',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'hours-per-week',
 'native-country',
 ]

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import (DecisionTreeClassifier, GBTClassifier, RandomForestClassifier)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns]

# The encode of indexed values multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers]

In [11]:
# Vectorizing encoded values

categorical_encoded = [encoder.getOutputCol() for encoder in encoders]
numerical_columns = ['age', 'education-num', 'capital-gain', 'capital-loss']
inputcols = categorical_encoded + numerical_columns
assembler = VectorAssembler(inputCols=inputcols, outputCol="features")

In [12]:
# Setting up a pipeline to automize the stages

pipeline = Pipeline(stages=indexers + encoders+[assembler])
model = pipeline.fit(df)

                                                                                

In [13]:
# Transform data

transformed = model.transform(df)
display(transformed)

DataFrame[age: int, workClass: string, fnlwgt: int, education: string, education-num: int, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, >50K: int, workClass_indexed: double, education_indexed: double, marital-status_indexed: double, occupation_indexed: double, relationship_indexed: double, race_indexed: double, sex_indexed: double, hours-per-week_indexed: double, native-country_indexed: double, workClass_indexed_encoded: vector, education_indexed_encoded: vector, marital-status_indexed_encoded: vector, occupation_indexed_encoded: vector, relationship_indexed_encoded: vector, race_indexed_encoded: vector, sex_indexed_encoded: vector, hours-per-week_indexed_encoded: vector, native-country_indexed_encoded: vector, features: vector]

In [14]:
# Setting up the final dataset

final_data = transformed.select('features', '>50K')

# Model

In [15]:
dtc = DecisionTreeClassifier(labelCol='>50K', featuresCol='features')

rfc = RandomForestClassifier(numTrees=200, labelCol='>50K', featuresCol='features')

gbt = GBTClassifier(labelCol='>50K', featuresCol='features', maxIter=10)

In [21]:
# Splitting the data (80/20)

train_data, test_data = final_data.randomSplit([0.8,0.2])
print(train_data.count())
print(test_data.count())

39005
9837


In [22]:
# Model Training

dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

22/04/12 02:18:50 WARN DAGScheduler: Broadcasting large task binary with size 1169.7 KiB
                                                                                

In [23]:
# Model Evaluation

dtc_preds = dtc_model.transform(test_data)
rfc_preds = rfc_model.transform(test_data)
gbt_preds = gbt_model.transform(test_data)

In [24]:
# Model Performance using ROC

evaluator = BinaryClassificationEvaluator(labelCol='>50K')

In [25]:
print('DTC =>', evaluator.evaluate(dtc_preds))
print('RFC =>', evaluator.evaluate(rfc_preds))
print('GBT =>', evaluator.evaluate(gbt_preds))



DTC => 0.5863205435814941


22/04/12 02:19:00 WARN DAGScheduler: Broadcasting large task binary with size 1120.1 KiB
                                                                                

RFC => 0.8995973523009749
GBT => 0.9118919812413948
