# Binary Classification Example

The Pipelines API provides higher-level API built on top of DataFrames for constructing ML pipelines.
You can read more about the Pipelines API in the [programming guide](https://spark.apache.org/docs/latest/ml-guide.html).

**Binary Classification** is the task of predicting a binary label.
E.g., is an email spam or not spam? Should I show this ad to this user or not? Will it rain tomorrow or not?
This section demonstrates algorithms for making these types of predictions.

## Dataset Review

The Adult dataset we are going to use is publicly available at the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult).
This data derives from census data, and consists of information about 48842 individuals and their annual income.
We will use this information to predict if an individual earns >50k a year or <=50K a year.
The dataset is rather clean, and consists of both numeric and categorical variables.

Attribute Information:

- age: continuous
- workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
- fnlwgt: continuous
- education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc...
- education-num: continuous
- marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent...
- occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners...
- relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
- race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
- sex: Female, Male
- capital-gain: continuous
- capital-loss: continuous
- hours-per-week: continuous
- native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany...

Target/Label: - <=50K, >50K

## Load Data

In this example, we will read in the Adult dataset from databricks-datasets.
We'll read in the data in SQL using the CSV data source for Spark and rename the columns appropriately.

In [7]:
%fs ls databricks-datasets/adult/adult.data

In [8]:
%sql DROP TABLE IF EXISTS adult

In [9]:
%sql
CREATE TABLE adult (
  age DOUBLE,
  workclass STRING,
  fnlwgt DOUBLE,
  education STRING,
  education_num DOUBLE,
  marital_status STRING,
  occupation STRING,
  relationship STRING,
  race STRING,
  sex STRING,
  capital_gain DOUBLE,
  capital_loss DOUBLE,
  hours_per_week DOUBLE,
  native_country STRING,
  income STRING)
USING com.databricks.spark.csv
OPTIONS (path "/databricks-datasets/adult/adult.data", header "true")


In [10]:
dataset = spark.table("adult")
cols = dataset.columns

In [11]:
display(dataset)

## Preprocess Data

Since we are going to try algorithms like Logistic Regression, we will have to convert the categorical variables in the dataset into numeric variables.
There are 2 ways we can do this.

* Category Indexing

  This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}.
  This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)

* One-Hot Encoding

  This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

In this dataset, we have ordinal variables like education (Preschool - Doctorate), and also nominal variables like relationship (Wife, Husband, Own-child, etc).
For simplicity's sake, we will use One-Hot Encoding to convert all categorical variables into binary vectors.
It is possible here to improve prediction accuracy by converting each categorical column with an appropriate method.

Here, we will use a combination of [StringIndexer] and [OneHotEncoderEstimator] to convert the categorical variables.
The `OneHotEncoderEstimator` will return a [SparseVector].

Since we will have more than 1 stages of feature transformations, we use a [Pipeline] to tie the stages together.
This simplifies our code.

[StringIndexer]: http://spark.apache.org/docs/latest/ml-features.html#stringindexer
[OneHotEncoderEstimator]: https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator
[SparseVector]: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.linalg.SparseVector
[Pipeline]: http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

The above code basically indexes each categorical column using the `StringIndexer`,
and then converts the indexed categories into one-hot encoded variables.
The resulting output has the binary vectors appended to the end of each row.

We use the `StringIndexer` again to encode our labels to label indices.

In [15]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

Next, we will use the `VectorAssembler` to combine all the feature columns into a single vector column.
This will include both the numeric columns and the one-hot encoded binary vector columns in our dataset.

In [17]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

We finally run our stages as a Pipeline.
This puts the data through all of the feature transformations we described in a single call.

In [19]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

In [20]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

## Fit and Evaluate Models

We are now ready to try out some of the Binary Classification algorithms available in the Pipelines API.

Out of these algorithms, the below are also capable of supporting multiclass classification with the Python API:
- Decision Tree Classifier
- Random Forest Classifier

These are the general steps we will take to build our models:
- Create initial model using the training set
- Tune parameters with a `ParamGrid` and 5-fold Cross Validation
- Evaluate the best model obtained from the Cross Validation using the test set

We use the `BinaryClassificationEvaluator` to evaluate our models, which uses [areaUnderROC] as the default metric.

[areaUnderROC]: https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve

## Logistic Regression

You can read more about [Logistic Regression] from the [classification and regression] section of MLlib Programming Guide.
In the Pipelines API, we are now able to perform Elastic-Net Regularization with Logistic Regression, as well as other linear methods.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Logistic Regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression

In [23]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [24]:
sparkTransformed = lrModel.transform(trainingData)
display(sparkTransformed)

In [25]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [26]:
predictions.printSchema()

We can use ``BinaryClassificationEvaluator`` to evaluate our model. We can set the required column names in `rawPredictionCol` and `labelCol` Param and the metric in `metricName` Param.

In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

Note that the default metric for the ``BinaryClassificationEvaluator`` is ``areaUnderROC``

In [30]:
evaluator.getMetricName()

In [31]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

In [32]:
# MLeap supports serializing the model to one zip file. In order to serialize to a zip file, make sure the URI begins with jar:file and ends with a .zip.
# The supported transformers for MLeap can be found in the MLeap documentation.

In [33]:
%sh 
rm -rf /tmp/mleap_python_model_export
mkdir /tmp/mleap_python_model_export

In [34]:
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

lrModel.serializeToBundle("jar:file:/tmp/mleap_python_model_export/binaryclassifier_pipeline-json.zip", sparkTransformed)

In [35]:
#In this example we download the model files from the browser. In general, you may want to programmatically move the model to a persistent storage layer.

In [36]:
%sh
cp /tmp/mleap_python_model_export/binaryclassifier_pipeline-json.zip /dbfs/FileStore/binaryclassifier_pipeline-json.zip

In [37]:
#Get a link to the downloadable zip via: https://[MY_DATABRICKS_URL]/files/[FILE_NAME].zip. E.g., if you access Databricks at https://mycompany.databricks.com, then your link would be: https://mycompany.databricks.com/files/binaryclassifier_pipeline-json.zip.

In [38]:
#This section shows how to use MLeap to load a trained model for use in your application. To use an existing ML models and pipelines to make predictions for new data, you can deserialize the model from the file you saved.

#Import Model to PySpark
#This section shows how to load an MLeap bundle and make predictions on a Spark DataFrame. This can be useful if you want to use the same persistence format (bundle) for loading into Spark and non-Spark applications. If your goal is to make predictions only in Spark, then we recommend using MLlib's native ML persistence.

In [39]:
from pyspark.ml import PipelineModel
deserializedPipeline = PipelineModel.deserializeFromBundle("jar:file:/tmp/mleap_python_model_export/binaryclassifier_pipeline-json.zip")

In [40]:
#Now you can use the loaded model to make predictions.
display(testData)

In [41]:
exampleResults = deserializedPipeline.transform(testData)
display(exampleResults)

In [42]:
#check if the file is /tmp/ folder
display(dbutils.fs.ls("file:/tmp/mleap_python_model_export/"))

In [43]:
dbutils.fs.mkdirs("/FileStore/model")
dbutils.fs.cp("file:/tmp/mleap_python_model_export/binaryclassifier_pipeline-json.zip", "/FileStore/model/binaryclassifier_pipeline-json.zip")

In [44]:
display(dbutils.fs.ls("/FileStore/model"))