### Binary classification by MLlib based on [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult) to Predict income > $50,000
###[Banafsheh Hassani](https://www.linkedin.com/in/banafsheh-hassani-7b063a129/)

###[More Projects](https://github.com/BanafshehHassani)

#•	•  • 	•	Problem statement •	•  • 	•

This is an example work with Apache Spark MLlib. Which investigates the binary classification problem - to predict if an individual's income > $50,000 based on given data. 
Dataset obtained from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult). 
This project build by some of the capabilities available in MLlib, as well as tools for data preprocessing, machine learning pipelines, and several different machine learning algorithms.

[reference](https://docs.databricks.com/_static/notebooks/getting-started/get-started-with-mllib-dbr7.html)

#Following steps:

0. Load the dataset
0. Feature preprocessing
0. Define the model
0. Build the pipeline
0. Evaluate the model
0. Hyperparameter tuning
0. Make predictions and evaluate model performance

#•	•  • 	•	Load data •	•  • 	•

* Note: Get the first few rows of the data by use Databricks utilities .

In [0]:
%fs head --maxBytes=1024 databricks-datasets/adult/adult.data

* Note: Dataset column header needs to rename, create a schema to assign column names and datatypes.

In [0]:
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""

dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)

#Arrange Test and train set randomly, set seed for reproducibility.

Thats better to split the data before any preprocessing. Which allows test dataset to be more closely simulate new data while model is evaluating.

In [0]:
trainDF, testDF = dataset.randomSplit([0.7, 0.3], seed=45)
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())

#Review the data.

In [0]:
display(trainDF)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
17.0,?,34088.0,12th,8.0,Never-married,?,Own-child,White,Female,0.0,0.0,25.0,United-States,<=50K
17.0,?,47407.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,10.0,United-States,<=50K
17.0,?,48703.0,11th,7.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K
17.0,?,48751.0,11th,7.0,Never-married,?,Own-child,Black,Female,0.0,0.0,40.0,United-States,<=50K
17.0,?,67808.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K
17.0,?,86786.0,10th,6.0,Never-married,?,Own-child,White,Female,0.0,0.0,40.0,United-States,<=50K
17.0,?,89870.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K
17.0,?,94366.0,10th,6.0,Never-married,?,Other-relative,White,Male,0.0,0.0,6.0,United-States,<=50K
17.0,?,103810.0,12th,8.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K
17.0,?,104025.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,18.0,United-States,<=50K


#The distribution of the number of `hours_per_week`

In [0]:
display(trainDF.select("hours_per_week").summary())

summary,hours_per_week
count,22753.0
mean,40.4126928317145
stddev,12.450031656667852
min,1.0
25%,40.0
50%,40.0
75%,45.0
max,99.0


#Classify`education`status by count them sort by high to low.

In [0]:
display(trainDF
        .groupBy("education")
        .count()
        .sort("count", ascending=False))

education,count
HS-grad,7379
Some-college,5129
Bachelors,3768
Masters,1182
Assoc-voc,953
11th,818
Assoc-acdm,735
10th,666
7th-8th,415
Prof-school,388


#Background: Transformers, estimators, and pipelines

#•	•  • 	•	Feature preprocessing  •	•  • 	•

* Note: Convert categorical variables to numeric

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]

# Estimators, The functions that will later apply to transform dataset
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]) 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 

# "<=50K" , ">50K"
# Convert to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="income", outputCol="label")

#Train-Test

In [0]:
stringIndexerModel = stringIndexer.fit(trainDF)
display(stringIndexerModel.transform(trainDF))

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income,educationIndex,raceIndex,occupationIndex,relationshipIndex,workclassIndex,marital_statusIndex,sexIndex
17.0,?,34088.0,12th,8.0,Never-married,?,Own-child,White,Female,0.0,0.0,25.0,United-States,<=50K,11.0,0.0,7.0,2.0,3.0,1.0,1.0
17.0,?,47407.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,10.0,United-States,<=50K,5.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,48703.0,11th,7.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K,5.0,0.0,7.0,2.0,3.0,1.0,1.0
17.0,?,48751.0,11th,7.0,Never-married,?,Own-child,Black,Female,0.0,0.0,40.0,United-States,<=50K,5.0,1.0,7.0,2.0,3.0,1.0,1.0
17.0,?,67808.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,7.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,86786.0,10th,6.0,Never-married,?,Own-child,White,Female,0.0,0.0,40.0,United-States,<=50K,7.0,0.0,7.0,2.0,3.0,1.0,1.0
17.0,?,89870.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,7.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,94366.0,10th,6.0,Never-married,?,Other-relative,White,Male,0.0,0.0,6.0,United-States,<=50K,7.0,0.0,7.0,5.0,3.0,1.0,0.0
17.0,?,103810.0,12th,8.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,11.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,104025.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,18.0,United-States,<=50K,5.0,0.0,7.0,2.0,3.0,1.0,0.0


#Combine all feature columns into a a feature vector

In [0]:
from pyspark.ml.feature import VectorAssembler

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

#•	•  • 	•	Define the model •	•  • 	•

logistic regression

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

#•	•  • 	•	Build the pipeline •	•  • 	•

In [0]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

Demonstrate predictions from the model. The features column is a sparse vector, which is often case after one-hot encoding, the resone is to many 0 values.

In [0]:
display(predDF.select("features", "label", "prediction", "probability"))

features,label,prediction,probability
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 34019.0, 6.0, 20.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.897778574960737, 0.10222142503926299))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 41643.0, 7.0, 15.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.907197687206375, 0.09280231279362501))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 64785.0, 6.0, 30.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8931879252454108, 0.10681207475458922))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 80077.0, 7.0, 20.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9050882371591391, 0.09491176284086093))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 145886.0, 7.0, 30.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9007520823543821, 0.09924791764561801))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 8, 24, 36, 45, 49, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 148769.0, 9.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8864690459660329, 0.1135309540339671))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 171461.0, 6.0, 20.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9072110185165235, 0.09278898148347643))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 49, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 172145.0, 6.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9088021347635962, 0.09119786523640387))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 179715.0, 6.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.888316815046023, 0.11168318495397694))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 202521.0, 7.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8856778422733979, 0.11432215772660216))"


#•	•   • 	• Evaluate the model•	•   • 	•

Display command has a built-in ROC curve option

In [0]:
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.4864822371393636
0.0,0.0434782608695652,0.4864822371393636
0.0,0.0869565217391304,0.4698836902242394
0.0,0.1304347826086956,0.466624270781905
0.0,0.1739130434782608,0.4459338434406595
0.010752688172043,0.1739130434782608,0.4459203456308542
0.010752688172043,0.217391304347826,0.428387228346741
0.021505376344086,0.217391304347826,0.4079294679350002
0.032258064516129,0.217391304347826,0.4065779589743691
0.032258064516129,0.2608695652173913,0.3730570126787619


For evaluate the model, the `BinaryClassificationEvaluator` had used to evalute area under the ROC curve, the `MulticlassClassificationEvaluator` to evalute the accuracy.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

#•	•  • 	•	Hyperparameter tuning •	•  • 	•

MLlib provides methods to facilitate hyperparameter tuning and cross validation. 
- For hyperparameter tuning, `ParamGridBuilder` lets you define a grid search over a set of model hyperparameters.
- For cross validation, `CrossValidator` lets you specify an estimator (the pipeline to apply to the input dataset), an evaluator, a grid space of hyperparameters, and the number of folds to use for cross validation.

Use `ParamGridBuilder` and `CrossValidator` for tune the model. The model uses three values for `regParam` and three for `elasticNetParam`, for a total of 3 x 3 = 9 hyperparameter combinations for `CrossValidator` to examine.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

* Note: Pipeline created as estimator

In [0]:
# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)

# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

#•	•  • 	•	Make predictions and evaluate model performance •	•  • 	•

* Note: Use the best model identified by the cross-validation to make predictions on the test dataset, and then evaluate the model's performance using the area under the ROC curve.

In [0]:
# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)

# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")

* Note: Creating a temporary view of the predictions dataset by using SQL commands, display predictions grouped by age and occupation

In [0]:
cvPredDF.createOrReplaceTempView("finalPredictions")

In [0]:
%sql
SELECT occupation, prediction, count(*) AS count
FROM finalPredictions
GROUP BY occupation, prediction
ORDER BY occupation

occupation,prediction,count
?,0.0,513
?,1.0,26
Adm-clerical,1.0,84
Adm-clerical,0.0,978
Armed-Forces,1.0,1
Armed-Forces,0.0,2
Craft-repair,0.0,1153
Craft-repair,1.0,122
Exec-managerial,0.0,599
Exec-managerial,1.0,649


In [0]:
%sql
SELECT age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY age, prediction
ORDER BY age

age,prediction,count
17.0,0.0,114
18.0,0.0,145
19.0,0.0,190
20.0,0.0,209
21.0,0.0,223
21.0,1.0,2
22.0,1.0,2
22.0,0.0,211
23.0,1.0,3
23.0,0.0,259
