# Getting started with MLlib - binary classification example

This tutorial is designed to get you started with Apache Spark MLlib. It investigates a binary classification problem - can you predict if an individual's income is greater than $50,000 based on demographic data? The dataset is from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult) and is provided with Databricks Runtime. This notebook demonstrates some of the capabilities available in MLlib, including tools for data preprocessing, machine learning pipelines, and several different machine learning algorithms.

This notebook includes the following steps:

0. Load the dataset
0. Feature preprocessing
0. Define the model
0. Build the pipeline
0. Evaluate the model
0. Hyperparameter tuning
0. Make predictions and evaluate model performance

## Requirements
Databricks Runtime 7.0 or above or Databricks Runtime 7.0 ML or above. If you are running Databricks Runtime 6.x or Databricks Runtime 6.x ML, see ([AWS](https://docs.databricks.com/getting-started/spark/machine-learning.html)|[Azure](https://docs.microsoft.com/azure/databricks/getting-started/spark/machine-learning/)) for the correct notebook.

## Step 1. Load the dataset

Use Databricks utilities to view the first few rows of the data.

In [0]:
%fs head --maxBytes=1024 databricks-datasets/adult/adult.data

Because the dataset does not include column names, create a schema to assign column names and datatypes.

In [0]:
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""

dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)

Randomly split data into training and test sets, and set seed for reproducibility.

It's best to split the data before doing any preprocessing. This allows the test dataset to more closely simulate new data when we evaluate the model.

In [0]:
### START CODE ###
# Use dataset.randomSplit() to split the data into 80% train and 20% test set and set seed as 42.
trainDF, testDF = 
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())

Let's review the data.

In [0]:
display(trainDF)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
17.0,?,34019.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,20.0,United-States,<=50K
17.0,?,34088.0,12th,8.0,Never-married,?,Own-child,White,Female,0.0,0.0,25.0,United-States,<=50K
17.0,?,47407.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,10.0,United-States,<=50K
17.0,?,48703.0,11th,7.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K
17.0,?,48751.0,11th,7.0,Never-married,?,Own-child,Black,Female,0.0,0.0,40.0,United-States,<=50K
17.0,?,67808.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K
17.0,?,86786.0,10th,6.0,Never-married,?,Own-child,White,Female,0.0,0.0,40.0,United-States,<=50K
17.0,?,89870.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K
17.0,?,94366.0,10th,6.0,Never-married,?,Other-relative,White,Male,0.0,0.0,6.0,United-States,<=50K
17.0,?,103810.0,12th,8.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K


What's the distribution of the number of `hours_per_week`?

In [0]:
display(trainDF.select("hours_per_week").summary())

summary,hours_per_week
count,26076.0
mean,40.4284782942169
stddev,12.404569739132008
min,1.0
25%,40.0
50%,40.0
75%,45.0
max,99.0


How about `education` status?

In [0]:
### START CODE ###
# You can groupBy by education, get the count and sort it in descending order.
display()
### END CODE ###

education,count
HS-grad,8408
Some-college,5860
Bachelors,4255
Masters,1388
Assoc-voc,1102
11th,958
Assoc-acdm,845
10th,748
7th-8th,510
Prof-school,465


## Background: Transformers, estimators, and pipelines

Three important concepts in MLlib machine learning that are illustrated in this notebook are **Transformers**, **Estimators**, and **Pipelines**. 

- **Transformer**: Takes a DataFrame as input, and returns a new DataFrame. Transformers do not learn any parameters from the data and simply apply rule-based transformations to either prepare data for model training or generate predictions using a trained MLlib model. You call a transformer with a `.transform()` method.

- **Estimator**: Learns (or "fits") parameters from your DataFrame via a `.fit()` method and returns a Model, which is a transformer.

- **Pipeline**: Combines multiple steps into a single workflow that can be easily run. Creating a machine learning model typically involves setting up many different steps and iterating over them. Pipelines help you automate this process.

For more information:
[ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html#ml-pipelines)

## Step 2. Feature preprocessing 

The goal of this notebook is to build a model that predicts the `income` level from the features included in the dataset (education level, marital status, occupation, and so on). The first step is to manipulate, or preprocess, the features so they are in the format MLlib requires.

### Convert categorical variables to numeric

Some machine learning algorithms, such as linear and logistic regression, require numeric features. The Adult dataset includes categorical features such as education, occupation, and marital status. 

The following code block illustrates how to use `StringIndexer` and `OneHotEncoder` to convert categorical variables into a set of numeric variables that only take on values 0 and 1. 

- `StringIndexer` converts a column of string values to a column of label indexes. For example, it might convert the values "red", "blue", and "green" to 0, 1, and 2. 
- `OneHotEncoder` maps a column of category indices to a column of binary vectors, with at most one "1" in each row that indicates the category index for that row.

One-hot encoding in Spark is a two-step process. You first use the StringIndexer, followed by the OneHotEncoder. The following code block defines the StringIndexer and OneHotEncoder but does not apply it to any data yet.

For more information:   
[StringIndexer](http://spark.apache.org/docs/latest/ml-features.html#stringindexer)   
[OneHotEncoder](https://spark.apache.org/docs/latest/ml-features.html#onehotencoder)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
# Use categoricalCols as the inputCols for StringIndexer
stringIndexer = StringIndexer(inputCols=, outputCols=[x + "Index" for x in categoricalCols])
# Use stringIndexer.getOutputCols() as the inputCols for OneHotEncoder
encoder = OneHotEncoder(inputCols=, outputCols=[x + "OHE" for x in categoricalCols]) 

# The label column ("income") is also a string value - it has two possible values, "<=50K" and ">50K". 
# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="income", outputCol="label")

In this notebook, we'll build a pipeline combining all of our feature engineering and modeling steps. But let's take a minute to look more closely at how estimators and transformers work by applying the `stringIndexer` estimator that we created in the previous code block.

You can call the `.fit()` method to return a `StringIndexerModel`, which you can then use to transform the dataset. 

The `.transform()` method of `StringIndexerModel` returns a new DataFrame with the new columns appended. Scroll right to see the new columns if necessary. 

For more information: [StringIndexerModel](https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/feature/StringIndexerModel.html)

In [0]:
### START CODE ###
# Fit the string indexer on trainDF
stringIndexerModel = 
### END CODE ###
display(stringIndexerModel.transform(trainDF))

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income,educationIndex,raceIndex,occupationIndex,relationshipIndex,workclassIndex,marital_statusIndex,sexIndex
17.0,?,34019.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,20.0,United-States,<=50K,7.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,34088.0,12th,8.0,Never-married,?,Own-child,White,Female,0.0,0.0,25.0,United-States,<=50K,11.0,0.0,7.0,2.0,3.0,1.0,1.0
17.0,?,47407.0,11th,7.0,Never-married,?,Own-child,White,Male,0.0,0.0,10.0,United-States,<=50K,5.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,48703.0,11th,7.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K,5.0,0.0,7.0,2.0,3.0,1.0,1.0
17.0,?,48751.0,11th,7.0,Never-married,?,Own-child,Black,Female,0.0,0.0,40.0,United-States,<=50K,5.0,1.0,7.0,2.0,3.0,1.0,1.0
17.0,?,67808.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,7.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,86786.0,10th,6.0,Never-married,?,Own-child,White,Female,0.0,0.0,40.0,United-States,<=50K,7.0,0.0,7.0,2.0,3.0,1.0,1.0
17.0,?,89870.0,10th,6.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,7.0,0.0,7.0,2.0,3.0,1.0,0.0
17.0,?,94366.0,10th,6.0,Never-married,?,Other-relative,White,Male,0.0,0.0,6.0,United-States,<=50K,7.0,0.0,7.0,5.0,3.0,1.0,0.0
17.0,?,103810.0,12th,8.0,Never-married,?,Own-child,White,Male,0.0,0.0,40.0,United-States,<=50K,11.0,0.0,7.0,2.0,3.0,1.0,0.0


### Combine all feature columns into a single feature vector

Most MLlib algorithms require a single features column as input. Each row in this column contains a vector of data points corresponding to the set of features used for prediction. 

MLlib provides the `VectorAssembler` transformer to create a single vector column from a list of columns.

The following code block illustrates how to use VectorAssembler.

For more information: [VectorAssembler](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler)

In [0]:
from pyspark.ml.feature import VectorAssembler

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
### START CODE ###
# Use assemblerInputs as inputCols and "features" as outputCol
vecAssembler = 
### END CODE ###

## Step 3. Define the model

This notebook uses a [logistic regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression) model.

In [0]:
from pyspark.ml.classification import LogisticRegression
### START CODE ###
# Define a LogisticRegression model and specify featuresCol as "features", labelCol as "label" and regParam as 1.0
lr = 
### END CODE ###

## Step 4. Build the pipeline

A `Pipeline` is an ordered list of transformers and estimators. You can define a pipeline to automate and ensure repeatability of the transformations to be applied to a dataset. In this step, we define the pipeline and then apply it to the test dataset.

Similar to what we saw with `StringIndexer`, a `Pipeline` is an estimator. The `pipeline.fit()` method returns a `PipelineModel`, which is a transformer.

For more information:   
[Pipeline](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline)  
[PipelineModel](https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/PipelineModel.html)

In [0]:
from pyspark.ml import Pipeline

### START CODE ###
# Define the pipeline based on the stages created in previous steps.
# Hint: Use stringIndexer, encoder, labelToIndex, vecAssembler, lr
pipeline = Pipeline()

# Define the pipeline model and fit on trainDF.
pipelineModel = 

# Apply the pipeline model to the test dataset using transform.
predDF =
### END CODE ###

Display the predictions from the model. The `features` column is a [sparse vector](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.linalg.SparseVector), which is often the case after one-hot encoding, because there are so many 0 values.

In [0]:
display(predDF.select("features", "label", "prediction", "probability"))

features,label,prediction,probability
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 41643.0, 7.0, 15.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9062474976435643, 0.09375250235643576))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 64785.0, 6.0, 30.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8927691288853388, 0.10723087111466127))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 80077.0, 7.0, 20.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9041097206748728, 0.09589027932512711))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 104025.0, 7.0, 18.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8952738661074835, 0.10472613389251656))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 15, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 139183.0, 6.0, 15.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9087696046250343, 0.09123039537496568))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 8, 24, 36, 45, 49, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 148769.0, 9.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8861377407390781, 0.11386225926092175))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 170320.0, 7.0, 8.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9089516898879646, 0.09104831011203536))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 48, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 198797.0, 7.0, 20.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8942281267283682, 0.10577187327163177))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 19, 24, 36, 45, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 241021.0, 8.0, 40.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.8945663579567216, 0.10543364204327836))"
"Map(vectorType -> sparse, length -> 59, indices -> List(3, 13, 24, 36, 45, 49, 52, 53, 54, 55, 58), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 17.0, 250541.0, 7.0, 8.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9099365057082272, 0.09006349429177292))"


## Step 5. Evaluate the model

The `display` command has a built-in ROC curve option.

In [0]:
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.5139985144933676
0.0,0.0384615384615384,0.5139985144933676
0.0,0.0769230769230769,0.509717350710216
0.0,0.1153846153846153,0.5001046759516454
0.0,0.1538461538461538,0.4996941428384745
0.0105263157894736,0.1538461538461538,0.4974508751483025
0.0105263157894736,0.1923076923076923,0.4908963582806486
0.0105263157894736,0.2307692307692307,0.462090468283529
0.0105263157894736,0.2692307692307692,0.4355582066136955
0.0105263157894736,0.3076923076923077,0.4317969906768185


To evaluate the model, we use the `BinaryClassificationEvaluator` to evalute the area under the ROC curve and the `MulticlassClassificationEvaluator` to evalute the accuracy.

For more information:  
[BinaryClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.BinaryClassificationEvaluator)  
[MulticlassClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Use areaUnderROC as the metricName for BinaryClassificationEvaluator
bcEvaluator = BinaryClassificationEvaluator()
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")

# Use accuracy as the metricName for MulticlassClassificationEvaluator
mcEvaluator = MulticlassClassificationEvaluator()
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

## Step 6. Hyperparameter tuning

MLlib provides methods to facilitate hyperparameter tuning and cross validation. 
- For hyperparameter tuning, `ParamGridBuilder` lets you define a grid search over a set of model hyperparameters.
- For cross validation, `CrossValidator` lets you specify an estimator (the pipeline to apply to the input dataset), an evaluator, a grid space of hyperparameters, and the number of folds to use for cross validation.
  
For more information:   
[Model selection using cross-validation](https://spark.apache.org/docs/latest/ml-tuning.html)  
[ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.tuning)  
[CrossValidator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator)

Use `ParamGridBuilder` and `CrossValidator` to tune the model. This example uses three values for `regParam` and three for `elasticNetParam`, for a total of 3 x 3 = 9 hyperparameter combinations for `CrossValidator` to examine.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

### START CODE ###
paramGrid = (ParamGridBuilder()
             .addGrid()  # tune lr.regParam with [0.01, 0.5, 2.0]
             .addGrid()  # tune lr.elasticNetParam with [0.0, 0.5, 1.0]
             .build())
### END CODE ###

Whenever you call `CrossValidator` in MLlib, Databricks automatically tracks all of the runs using [MLflow](https://mlflow.org/). You can use the MLflow UI ([AWS](https://docs.databricks.com/applications/mlflow/index.html)|[Azure](https://docs.microsoft.com/azure/databricks/applications/mlflow/)) to compare how each model performed.

In this example we use the pipeline we created as the estimator.

In [0]:
### START CODE ###
# Create a 3-fold CrossValidator
# Use pipeline as the estimator, paramGrid for estimatorParamMaps, bcEvaluator for evaluator, with 3 folds and use 4 for parallelism
cv = CrossValidator()

# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = 
### END CODE ###

## Step 7. Make predictions and evaluate model performance
Use the best model identified by the cross-validation to make predictions on the test dataset, and then evaluate the model's performance using the area under the ROC curve.

In [0]:
### START CODE ###
# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = 

# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Area under ROC curve: {}")
print(f"Accuracy: {}")
### END CODE ###

Using SQL commands, you can also display predictions grouped by age and occupation. This requires creating a temporary view of the predictions dataset.

In [0]:
cvPredDF.createOrReplaceTempView("finalPredictions")

In [0]:
%sql
SELECT occupation, prediction, count(*) AS count
FROM finalPredictions
GROUP BY occupation, prediction
ORDER BY occupation

occupation,prediction,count
?,0.0,349
?,1.0,14
Adm-clerical,1.0,54
Adm-clerical,0.0,731
Armed-Forces,0.0,3
Craft-repair,1.0,72
Craft-repair,0.0,771
Exec-managerial,0.0,381
Exec-managerial,1.0,415
Farming-fishing,1.0,8


In [0]:
%sql
SELECT age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY age, prediction
ORDER BY age

age,prediction,count
17.0,0.0,57
18.0,1.0,1
18.0,0.0,95
19.0,0.0,134
20.0,0.0,139
21.0,0.0,147
21.0,1.0,1
22.0,0.0,148
23.0,1.0,1
23.0,0.0,191
