# Naive Bayes Income Classifier
We are going to use data from US Census to predict income

In [1]:
# initialize Spark Session
import os
import sys
top_dir = os.path.abspath(os.path.join(os.getcwd(), "../"))
if top_dir not in sys.path:
    sys.path.append(top_dir)

from init_spark import init_spark
spark = init_spark()
spark

Initializing Spark...
Spark found in :  /Users/sujee/spark
Spark config:
	 spark.app.name=TestApp
	spark.master=local[*]
	executor.memory=2g
	spark.sql.warehouse.dir=/var/folders/lp/qm_skljd2hl4xtps5vw0tdgm0000gn/T/tmpd2q7pfca
	some_property=some_value
Spark UI running on port 4040


## Step 1 : Load Data

In [2]:
%%time

dataset = spark.read.format("csv").\
          option('header','true').\
          option('inferSchema', 'true').\
          load("/data/census-income/income-cleaned.csv")

CPU times: user 1.72 ms, sys: 1.41 ms, total: 3.12 ms
Wall time: 3.4 s


In [3]:
print("record count : {:,}".format(dataset.count()))

dataset.printSchema()

record count : 32,561
root
 |-- age: integer (nullable = true)
 |-- employment: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income-over-50k: integer (nullable = true)



In [4]:
# see data
# dataset.show()
dataset.limit(5).toPandas()

Unnamed: 0,age,employment,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,income-over-50k
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,0
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,0
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,0
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,0
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,0


## Step 2 : Basic Analytics
0 is income <=50k
1 is income > 50k

In [5]:
dataset.groupBy('income-over-50k').count().show()

+---------------+-----+
|income-over-50k|count|
+---------------+-----+
|              1| 7841|
|              0|24720|
+---------------+-----+



## Step 3 : Create Feature Vector

### 3.1 Index all categorical columns

In [6]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
import time

## TODO : add numerical columns.
##   examine the data and add numerical columns
##   Hint : start with 'age', 'education-num', 'hours-per-week'
numeric_columns = ['age', 'education-num', 'hours-per-week' ]

categorical_columns = ['employment', 'education', 'marital-status', 'occupation', 'race', 'sex', 'native-country']
categorical_index = ['employment_index', 'education_index', 'marital-status_index', 'occupation_index', 
                     "race_index", 'sex_index', 'native-country_index']
input_cols = numeric_columns + categorical_index


indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep").fit(dataset)\
            for column in categorical_columns ]
pipeline = Pipeline(stages=indexers)

t1 = time.perf_counter()
dataset_indexed = pipeline.fit(dataset).transform(dataset)
t2 = time.perf_counter()



print("indexed {:,} records in {:,.2f} ms".format(dataset.count(), (t2-t1)*1000))

## Save as CSV for easy viewing in Excel
dataset_indexed.write.\
        option('header', 'true').\
        mode('overwrite').\
        csv('out-indexed')
print("Saved indexed vector to 'out-indexed' directory")

dataset_indexed.limit(5).toPandas()

indexed 32,561 records in 142.44 ms
Saved indexed vector to 'out-indexed' directory


Unnamed: 0,age,employment,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,...,hours-per-week,native-country,income-over-50k,employment_index,education_index,marital-status_index,occupation_index,race_index,sex_index,native-country_index
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,...,40,United-States,0,4.0,2.0,1.0,3.0,0.0,0.0,0.0
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,...,13,United-States,0,1.0,2.0,0.0,2.0,0.0,0.0,0.0
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,...,40,United-States,0,0.0,0.0,2.0,9.0,0.0,0.0,0.0
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,...,40,United-States,0,0.0,5.0,0.0,9.0,1.0,0.0,0.0
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,...,40,Cuba,0,0.0,2.0,0.0,0.0,1.0,1.0,9.0


### 3.2 - Create feature Vector

In [8]:
from pyspark.ml.feature import VectorAssembler

## TODO : set inputCols = input_cols
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
featureVector = assembler.transform(dataset_indexed)
featureVector = featureVector.withColumn("label",featureVector["income-over-50k"])
featureVector.select(['features', 'label']).show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[39.0,13.0,40.0,4...|    0|
|[50.0,13.0,13.0,1...|    0|
|(10,[0,1,2,5,6],[...|    0|
|[53.0,7.0,40.0,0....|    0|
|[28.0,13.0,40.0,0...|    0|
|[37.0,14.0,40.0,0...|    0|
|[49.0,5.0,16.0,0....|    0|
|(10,[0,1,2,3,6],[...|    1|
|[31.0,14.0,50.0,0...|    1|
|(10,[0,1,2,4,6],[...|    1|
|[37.0,10.0,80.0,0...|    1|
|[30.0,13.0,40.0,4...|    1|
|[23.0,13.0,30.0,0...|    0|
|[32.0,12.0,50.0,0...|    0|
|[40.0,11.0,40.0,0...|    1|
|[34.0,4.0,45.0,0....|    0|
|[25.0,9.0,35.0,1....|    0|
|(10,[0,1,2,5,6],[...|    0|
|(10,[0,1,2,4,6],[...|    0|
|[43.0,14.0,45.0,1...|    1|
+--------------------+-----+
only showing top 20 rows



## Step 4 : Train / Test set

In [9]:
(train, test) = featureVector.randomSplit([0.8, 0.2], seed=1234)

print("training set count : ", train.count())
print("testing set count : ", test.count())

training set count :  25962
testing set count :  6599


## Step 5 : Create Naive Bayes Model

In [10]:
from pyspark.ml.classification import NaiveBayes

## TODO : Create a NB model with these parameters
##     smoothing = 1.0
##     modelType = 'multinomial'
nb = NaiveBayes(smoothing=1.0, modelType='multinomial')



In [11]:
%%time

# train the model
print("training starting...")
model = nb.fit(train)
print("training done.")

training starting...
training done.
CPU times: user 7.63 ms, sys: 2.82 ms, total: 10.5 ms
Wall time: 1.25 s


## Step 6: Run Test Data
Let's call .transform on our model to do make predictions on our test data. The output should be contained in the "prediction" column, while the correct label will be there in the "label" column.

We will be able to evaluate our results by comparing the results.

In [12]:
# select example rows to display.
predictions = model.transform(test)
predictions.select(['label', 'prediction']).\
            sampleBy(col='prediction', fractions={0: 0.5, 1: 0.5}).\
            show()


predictions.groupBy('prediction').count().show()


+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 4691|
|       1.0| 1908|
+----------+-----+



## Step 7 : Evaluate Model

### 7.1 Test Accuracy

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.7708743749052887


### 7.2 Confusion Matrix

In [14]:
predictions.groupBy('label').\
            pivot('prediction', [0,1]).\
            count().na.fill(0).\
            orderBy('label').\
            show()

+-----+----+----+
|label|   0|   1|
+-----+----+----+
|    0|4077| 898|
|    1| 614|1010|
+-----+----+----+



## Step 8 : Discuss Model Accuracy
Discuss how to improve model accuracy?  Here are some points to consider.

- can you add any more input variables?
- why is the model bad at predicting >50k income category?  Check the original data for skew?