##  **Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset.**


## STEPS:
* Step 1) Basic operation with PySpark
* Step 2) Data preprocessing
* Step 3) Build a data processing pipeline
* Step 4) Build the classifier
* Step 5) Train and evaluate the model
* Step 6) Tune the hyperparameter

## **Initialize the SPARKContext and  SQLContext**
SQLContext allows connecting the engine with different data sources. It is used to initiate the functionalities of Spark SQL.

In [1]:
# initialize spark context
import pyspark
from pyspark import SparkContext
sc =SparkContext()


In [2]:
# initialize sql context
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

## **Read CSV dataset**

In [16]:
#Todo: using SQLContext to read csv and assign to dataframe
df = sqlContext.read.csv("adult.csv", header=True, inferSchema= True)	

In [17]:
#Todo:printSchema
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [18]:
# Run the cell to rename the columns properly:
cols = ['age','workclass','fnlwgt','education','education_num','marital','occupation','relationship','race','sex','capital_gain','capital_loss','hours_week','native_country','label']

#note income -renamed-> as label
df=df.toDF(*cols)

         

## Convert the continuous variable in the right format

In [19]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())
# Check the dataset
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [20]:
df.show(5, truncate = False)


+----+---------+--------+------------+-------------+------------------+-----------------+------------+-----+------+------------+------------+----------+--------------+-----+
|age |workclass|fnlwgt  |education   |education_num|marital           |occupation       |relationship|race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+----+---------+--------+------------+-------------+------------------+-----------------+------------+-----+------+------------+------------+----------+--------------+-----+
|25.0|Private  |226802.0|11th        |7.0          |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0.0         |0.0         |40.0      |United-States |<=50K|
|38.0|Private  |89814.0 |HS-grad     |9.0          |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0.0         |0.0         |50.0      |United-States |<=50K|
|28.0|Local-gov|336951.0|Assoc-acdm  |12.0         |Married-civ-spouse|Protective-serv  |Husband     |White|Male  |0.0         |0.

# Describe Column by count,by group

# To get a summary statistics, of the data, you can use describe(). It will compute the :

* count
* mean
* standarddeviation
* min
* max

In [21]:
#todo: describe data,column by count,by group
df.describe().show()

+-------+------------------+-----------+------------------+------------+------------------+--------+----------------+------------+------------------+------+------------------+------------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|     education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|      capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+------------------+--------+----------------+------------+------------------+------+------------------+------------------+------------------+--------------+-----+
|  count|             48842|      48842|             48842|       48842|             48842|   48842|           48842|       48842|             48842| 48842|             48842|             48842|             48842|         48842|48842|
|   mean| 38.64358543876172|       null|189664.13459727284| 

In [22]:
# the summary statistic of only one column, add the name of the column inside describe()

df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



In [42]:
#Descriptive statistics by group
df.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [43]:
#example
df.groupby('marital').agg({'capital_gain': 'mean'}).show()		


+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



# Crosstab computation
In some occasion, it can be interesting to see the descriptive statistics between two pairwise columns. For instance, you can count the number of people with income below or above 50k by education level. This operation is called a crosstab.

In [23]:
#todo crosstab computation
df.crosstab('age', 'label').sort("age_label").show()

+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|     17.0|  595|   0|
|     18.0|  862|   0|
|     19.0| 1050|   3|
|     20.0| 1112|   1|
|     21.0| 1090|   6|
|     22.0| 1161|  17|
|     23.0| 1307|  22|
|     24.0| 1162|  44|
|     25.0| 1119|  76|
|     26.0| 1068|  85|
|     27.0| 1117| 115|
|     28.0| 1101| 179|
|     29.0| 1025| 198|
|     30.0| 1031| 247|
|     31.0| 1050| 275|
|     32.0|  957| 296|
|     33.0| 1045| 290|
|     34.0|  949| 354|
|     35.0|  997| 340|
|     36.0|  948| 400|
+---------+-----+----+
only showing top 20 rows



# Drop column
There are two intuitive API to drop columns:

* drop(): Drop a column
* dropna(): Drop NA's

In [44]:
#Drop null vals
df  = df.dropna()

## Data preprocessing

Data processing is a critical step in machine learning. After you remove garbage data, you get some important insights.


**For instance, you know that age is not a linear function with the income. When people are young, their income is usually lower than mid-age. After retirement, a household uses their saving, meaning a decrease in income. To capture this pattern, you can add a square to the age feature**

**Add age square**

To add a new feature, you need to:

* 1. Select the column
* 2. Apply the transformation and add it to the DataFrame

In [45]:
#TODO:  # Add age square
        
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()        

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



**Exclude Holand-Netherlands**

* When a group within a feature has only one observation, it brings no information to the model. 
* On the contrary, it can lead to an error during the cross-validation.

In [46]:
#TODO : follow the above instruction
df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|Outlying-US(Guam-...|                   23|
|          Yugoslavia|                   23|
|                Laos|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

   **The feature native_country has only one household coming from Netherland.** 
   -> we should remove it
   -> follow the same logic for nay other column

In [47]:
df_remove = df.filter(df.native_country!='Holand-Netherlands')

## Build a data processing pipeline
You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.

* Encode the categorical data
* Index the label feature
* Add continuous variable
* Assemble the steps.

In [50]:
#import libraries for pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [51]:
# 1. Encode the categorical data
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']

# stages in our Pipeline
stages = [] 


for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    
    stages += [stringIndexer, encoder]

In [52]:
# 2. Index the label feature
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

In [53]:
# 3. Add continuous variable
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [55]:
# 4. Assemble the steps
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [58]:
stages

[StringIndexer_ebc07d1eb434,
 OneHotEncoderEstimator_4ed2f0346219,
 StringIndexer_817f9e246110,
 OneHotEncoderEstimator_9c9381de7575,
 StringIndexer_b7f3513b30eb,
 OneHotEncoderEstimator_f83a04fe496c,
 StringIndexer_c3029f6bd1b0,
 OneHotEncoderEstimator_37b8d38e4c4d,
 StringIndexer_2dbc48d794bb,
 OneHotEncoderEstimator_9a8100d19089,
 StringIndexer_83ba26e2d2bd,
 OneHotEncoderEstimator_59db38f2c633,
 StringIndexer_411249b200e4,
 OneHotEncoderEstimator_f573dd2b9bd4,
 StringIndexer_1f4c64eed576,
 OneHotEncoderEstimator_cff5b91a79ac,
 StringIndexer_ca12b56f23c6,
 VectorAssembler_20897a7265e3]

In [56]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [57]:
model

DataFrame[age: float, workclass: string, fnlwgt: float, education: string, education_num: float, marital: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: float, capital_loss: float, hours_week: float, native_country: string, label: string, age_square: double, workclassIndex: double, workclassclassVec: vector, educationIndex: double, educationclassVec: vector, maritalIndex: double, maritalclassVec: vector, occupationIndex: double, occupationclassVec: vector, relationshipIndex: double, relationshipclassVec: vector, raceIndex: double, raceclassVec: vector, sexIndex: double, sexclassVec: vector, native_countryIndex: double, native_countryclassVec: vector, newlabel: double, features: vector]

## Build the classifier: logistic

In [59]:
# To make the computation faster, you convert model to a DataFrame.
#You need to select newlabel and features from model using map.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [60]:
# import 
# from pyspark.ml.linalg import DenseVector
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

In [61]:
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



## Create a train/test set

You split the dataset 80/20 with randomSplit.

In [62]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [63]:
# Let's count how many people with income below/above 50k in both training and test set
train_data.groupby('label').agg({'label': 'count'}).show()	

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       29671|
|  1.0|        9315|
+-----+------------+



In [64]:
test_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7483|
|  1.0|        2372|
+-----+------------+



## Pyspark has an API called LogisticRegression to perform logistic regression.

In [65]:
#You initialize lr by indicating the label column and feature columns. 
# Import `LogisticRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [66]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.06629458791643376,-0.15218373640701383,-0.053913615606758065,-0.16967312430449774,-0.12115314684082322,0.1325974961176333,0.1943887659566995,-0.6386553259560794,-0.20168892525823268,-0.06643613691435478,0.22587144074752571,0.3784635752251114,-0.0044245321889490345,-0.2958940967195082,-0.011315453541718349,-0.3286032800269526,-0.4220383458346703,0.5748881894870274,-0.40562097016057325,-0.2321967047902892,0.6030218476150906,-0.35292400176648736,-0.42080161545502637,0.32504577335055407,-0.3496608365399488,-0.20341828378529217,-0.21097882986337838,-0.16094910982880872,-0.10229984961820654,0.19354140559551283,-0.05741072218845116,0.27840022983220586,-0.10767152632715087,0.04564764837286497,-0.29067926822772944,-0.22290332501866694,-0.1707732587338623,-0.1265004208587058,-0.3046016757809549,-0.32401360966564163,0.11297915494496764,0.12425063820676888,-0.2716362961533152,0.26838077428027984,-0.1980362522010291,-0.29122621595643805,-0.24230441670165628,0.4124628047635877,-0.0

## Train and evaluate the model

In [67]:
#To generate prediction for your test set, you can use linearModel with transform() on test_data
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [69]:
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [70]:
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91956529713708...|
|  0.0|       0.0|[0.93120436510470...|
|  0.0|       0.0|[0.92517755280318...|
|  0.0|       0.0|[0.96557850534464...|
|  0.0|       0.0|[0.91327749860183...|
|  0.0|       0.0|[0.89372112927668...|
|  0.0|       0.0|[0.91170827845144...|
|  0.0|       0.0|[0.79855876724043...|
|  0.0|       1.0|[0.33707064577134...|
|  0.0|       0.0|[0.93124041985776...|
|  0.0|       0.0|[0.65819871502704...|
|  0.0|       0.0|[0.68540875943836...|
|  0.0|       0.0|[0.87432677309934...|
|  0.0|       0.0|[0.81416508490888...|
|  0.0|       1.0|[0.41896286779831...|
|  0.0|       0.0|[0.55828647666212...|
|  0.0|       0.0|[0.57431564825875...|
|  0.0|       0.0|[0.53485051662803...|
|  0.0|       0.0|[0.80318175590664...|
|  0.0|       0.0|[0.87915048489301...|
+-----+----------+--------------------+
only showing top 20 rows



## Evaluate the model

In [71]:
#We need to look at the accuracy metric to see how well (or bad) the model performs.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 

accuracy_m(model = linearModel)

Model accuracy: 81.887%


# ROC metrics

In [72]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.894260182552082
areaUnderROC


## many new metrics libraries are now introduced in pyspark .
check https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html

# Tune the hyperparameter

In [74]:
#you can tune the hyperparameters. 
#Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune. 
#To reduce the time of the computation, you only tune the regularization parameter with only two values.
#use 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [75]:
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 1079.451 seconds


In [76]:
#accuracy of cv selected model
accuracy_m(model = cvModel)

Model accuracy: 84.820%


In [77]:
#We can exctract the recommended parameter by chaining cvModel.bestModel with extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_c188ddedd32b', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_c188ddedd32b', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_c188ddedd32b', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_c188ddedd32b', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_c188ddedd32b', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_c188ddedd32b', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_c188ddedd32b', name='maxIter', doc='maximum number of iterations (>= 0)'): 

# Conclusion