# AdventureWorks Product selection model training

## Author: Bryan Cafferky - Copyright 08/26/2020

#### Some code copied from https://docs.databricks.com/applications/machine-learning/mllib/binary-classification-mllib-pipelines.html
#### and from https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#multinomial-logistic-regression
#### and modified to fit the AdventureWorks data.

In [0]:
%sql 

SELECT Category, Subcategory, Model, count(*) 
FROM aw.t_salesinfo 
WHERE Category = 'Bikes' 
GROUP BY Category, Subcategory, Model
ORDER BY Category, Subcategory, Model

In [0]:
%sql 

SELECT Category, Subcategory, count(*) 
FROM aw.t_salesinfo 
WHERE Category = 'Bikes' 
GROUP BY Category, Subcategory
ORDER BY Category, Subcategory


## Goal:  Train a model to predict the bike subcategory (Mountain, Touring, Road) a customer is likely to buy.

### Features for the model:
#### - CommuteDistance
#### - AgeBand
#### - HasChildren
#### - Education
#### - Salary

## Dataset Review

In [0]:
%sql DESCRIBE TABLE aw.t_salesinfo

The Pipelines API provides higher-level API built on top of DataFrames for constructing ML pipelines.
You can read more about the Pipelines API in the [programming guide](https://spark.apache.org/docs/latest/ml-guide.html).

**Multiple Classification** is the task of predicting a classification label.
E.g., What Category (Mountain, Road, Touring) will a customer buy.
This section demonstrates algorithms for making these types of predictions.


The input table you created t_salesinfo has the following:

Attribute Information:

- CommuteDistance
- AgeBand
- HasChildren
- Education
- Salary

Target/Label: Mountain, Road, Touring


## Preprocess Data

We have to convert the categorical variables in the dataset into numeric variables.

There are 2 ways we can do this.

* Category Indexing

  This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}.
  This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)

* One-Hot Encoding

  This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

In this dataset, we have ordinal variables like education (Preschool - Doctorate), and also nominal variables like relationship (Wife, Husband, Own-child, etc).
For simplicity's sake, we will use One-Hot Encoding to convert all categorical variables into binary vectors.
It is possible here to improve prediction accuracy by converting each categorical column with an appropriate method.

Here, we will use a combination of [StringIndexer] and [OneHotEncoderEstimator] to convert the categorical variables.
The `OneHotEncoderEstimator` will return a [SparseVector]. Note: [OneHotEncoderEstimator] is [renamed as OneHotEncoder] in Spark 3.0.

Since we will have more than 1 stage of feature transformations, we use a [Pipeline] to tie the stages together.
This simplifies our code.

[StringIndexer]: http://spark.apache.org/docs/latest/ml-features.html#stringindexer
[OneHotEncoderEstimator]: https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator
[SparseVector]: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.linalg.SparseVector
[Pipeline]: http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline
[renamed as OneHotEncoder]: https://issues.apache.org/jira/browse/SPARK-26133

In [0]:
%sql 

SELECT FiscalYear, count(*)
FROM aw.t_salesinfo 
WHERE Category = 'Bikes' 
GROUP BY FiscalYear

In [0]:
%python

spdf_salesinfo = spark.sql('''
select  split(Subcategory, ' ')[0] as Subcategory, AgeBand, CommuteDistance, HasChildren, Education, Salary
FROM aw.t_salesinfo 
WHERE Category = 'Bikes' and FiscalYear in (2012, 2013) 
''')

spdf_salesinfo.show(2)

In [0]:
cols = spdf_salesinfo.columns
cols

In [0]:
import pyspark
from distutils.version import LooseVersion

LooseVersion(pyspark.__version__) 

In [0]:
import pyspark
from pyspark.ml.feature import StringIndexer, VectorAssembler

from distutils.version import LooseVersion

categoricalColumns = ["AgeBand", "CommuteDistance", "HasChildren", "Education"]

stages = [] # stages in our Pipeline

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    if LooseVersion(pyspark.__version__) < LooseVersion("3.0"):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], 
                                         outputCols=[categoricalCol + "classVec"])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], 
                                outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

The above code basically indexes each categorical column using the `StringIndexer`,
and then converts the indexed categories into one-hot encoded variables.
The resulting output has the binary vectors appended to the end of each row.

We use the `StringIndexer` again to encode our labels to label indices.

In [0]:
stages

In [0]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="Subcategory", outputCol="label")
stages += [label_stringIdx]

In [0]:
label_stringIdx

In [0]:
stages

Use a `VectorAssembler` to combine all the feature columns into a single vector column.
This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.  You include Salary as a feature but it is
already a number and does not get converted, just passed as is. 

In [0]:
# Transform all features into a vector using VectorAssembler
numericCols = ["Salary"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [0]:
stages

Run the stages as a Pipeline. This puts the data through all of the feature transformations we described in a single call.

In [0]:
spdf_salesinfo

In [0]:
from pyspark.ml import Pipeline
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(spdf_salesinfo)
preppedDataDF = pipelineModel.transform(spdf_salesinfo)

In [0]:
display(preppedDataDF)

In [0]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

## Fit and Evaluate Models

We are now ready to try out some of the Classification algorithms available in the Pipelines API.

The below are also capable of supporting multiclass classification with the Python API:
- Decision Tree Classifier
- Random Forest Classifier

These are the general steps we will take to build our models:
- Create initial model using the training set
- Tune parameters with a `ParamGrid` and 5-fold Cross Validation
- Evaluate the best model obtained from the Cross Validation using the test set

## Decision Trees

You can read more about [Decision Trees](http://spark.apache.org/docs/latest/mllib-decision-tree.html) in the Spark MLLib Programming Guide.
The Decision Trees algorithm is popular because it handles categorical
data and works out of the box with multiclass classification tasks.

In [0]:
display(trainingData)

### Example model training initially copied from the link below and modfied.
https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#multinomial-logistic-regression

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from pyspark.ml.regression import LinearRegression

# Create a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Train the model.  This also runs the indexers.
model = dt.fit(trainingData)

# Make predictions with the test data.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(3)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

print("numNodes = ", model.numNodes)
print("depth = ", model.depth)

In [0]:
display(model)

In [0]:
display(predictions.select('Label','prediction','features','rawPrediction','probability'))

In [0]:
predictions.createOrReplaceTempView("dtpredictions")

In [0]:
%sql 

SELECT prediction, count(*) FROM dtpredictions GROUP  BY prediction

In [0]:
%sql 

SELECT label, count(*) FROM dtpredictions GROUP BY label

We can extract the number of nodes in our decision tree as well as the
tree depth of our model.

In [0]:
predictions.printSchema()

## Random Forest

Random Forests uses an ensemble of trees to improve model accuracy.
You can read more about [Random Forest] from the [classification and regression] section of MLlib Programming Guide.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Random Forest]: https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forests

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# Train the model.  
rf_model = rf.fit(trainingData)

# Make predictions.
predictions = rf_model.transform(testData)

# Select example rows to display.
predictions.select("label", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

print(rf_model)  

Now we will try tuning the model with the ``ParamGridBuilder`` and the ``CrossValidator``.

As we indicate 3 values for maxDepth, 2 values for maxBin, and 2 values for numTrees,
this grid will have 3 x 2 x 2 = 12 parameter settings for ``CrossValidator`` to choose from.
We will create a 5-fold ``CrossValidator``.

In [0]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [0]:
# Use test set here so we can measure the accuracy of our model on new data
predictions2 = cvModel.transform(testData)

In [0]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions2)

In [0]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions2.select("label", "prediction", "probability", "AgeBand", "Education")
display(selected)

## Make Predictions
As Random Forest gives us the best areaUnderROC value, we will use the bestModel obtained from Random Forest for deployment,
and use it to generate predictions on new data.
In this example, we will simulate this by generating predictions on the entire dataset.

In [0]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

In [0]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

In [0]:
# Evaluate best model
evaluator.evaluate(finalPredictions)