# 10 minutes to MLflow
## MLflow + SparkML Notebook using the Iris Dataset

This notebook demonstrates [MLflow](https://mlflow.org/) experiment tracking. It creates an experiment and executes several model runs with different input parameters.

The notebook uses Fisher's [Iris flower data](https://en.wikipedia.org/wiki/Iris_flower_data_set) in `Rdatasets` and predicts the flower species based petal and sepal lenghts and widths. It uses SparkML's `DecisionTreeClassifier` algorithm, where we vary the `maxBins` and `maxDepth` parameters for tuning.

**Install the `mlflow` library onto the cluster via PyPI rules.**

References:
* [Data Science Process](http://www.ebc.cat/2017/01/23/predicting-with-labeled-data/)
* [Decision Tree Algorithm](https://en.wikipedia.org/wiki/Decision_tree_learning)

<img src="https://upload.wikimedia.org/wikipedia/commons/4/41/Iris_versicolor_3.jpg" alt="iris" width="600"/>

#### Load the Iris Dataset from Disk and into a DataFrame

In [3]:
irisDF = spark.read \
            .format("csv") \
            .option("header","true") \
            .load("databricks-datasets/Rdatasets/data-001/csv/datasets/iris.csv")

# Drop the first column (IDs) and change column names to not contain a period
irisDF = irisDF.drop("_c0") \
            .withColumnRenamed("Sepal.Length", "Sepal_Length") \
            .withColumnRenamed("Sepal.Width", "Sepal_Width")   \
            .withColumnRenamed("Petal.Length", "Petal_Length") \
            .withColumnRenamed("Petal.Width", "Petal_Width")   \

# get feature column names
features = irisDF.columns[:-1]

# cast features to float
for col_name in features:
    irisDF = irisDF.withColumn(col_name, irisDF[col_name].cast('float'))

display(irisDF.limit(10))

Sepal_Length,Sepal_Width,Petal_Length,Petal_Width,Species
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
5.4,3.9,1.7,0.4,setosa
4.6,3.4,1.4,0.3,setosa
5.0,3.4,1.5,0.2,setosa
4.4,2.9,1.4,0.2,setosa
4.9,3.1,1.5,0.1,setosa


#### Prepapre Dataset for Machine Learning (SparkML)

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='Species', outputCol='label')

# Assemble features into vectors
assembler = VectorAssembler(inputCols=features, outputCol='features')

# Chain indexer and feature assembler, to prepare dataset for training and testing
pipeline = Pipeline(stages=[labelIndexer, assembler])
PipelineModel = pipeline.fit(irisDF)
irisDF_vectorized = PipelineModel.transform(irisDF)

display(irisDF_vectorized.limit(10))

Sepal_Length,Sepal_Width,Petal_Length,Petal_Width,Species,label,features
5.1,3.5,1.4,0.2,setosa,2.0,"List(1, 4, List(), List(5.099999904632568, 3.5, 1.399999976158142, 0.20000000298023224))"
4.9,3.0,1.4,0.2,setosa,2.0,"List(1, 4, List(), List(4.900000095367432, 3.0, 1.399999976158142, 0.20000000298023224))"
4.7,3.2,1.3,0.2,setosa,2.0,"List(1, 4, List(), List(4.699999809265137, 3.200000047683716, 1.2999999523162842, 0.20000000298023224))"
4.6,3.1,1.5,0.2,setosa,2.0,"List(1, 4, List(), List(4.599999904632568, 3.0999999046325684, 1.5, 0.20000000298023224))"
5.0,3.6,1.4,0.2,setosa,2.0,"List(1, 4, List(), List(5.0, 3.5999999046325684, 1.399999976158142, 0.20000000298023224))"
5.4,3.9,1.7,0.4,setosa,2.0,"List(1, 4, List(), List(5.400000095367432, 3.9000000953674316, 1.7000000476837158, 0.4000000059604645))"
4.6,3.4,1.4,0.3,setosa,2.0,"List(1, 4, List(), List(4.599999904632568, 3.4000000953674316, 1.399999976158142, 0.30000001192092896))"
5.0,3.4,1.5,0.2,setosa,2.0,"List(1, 4, List(), List(5.0, 3.4000000953674316, 1.5, 0.20000000298023224))"
4.4,2.9,1.4,0.2,setosa,2.0,"List(1, 4, List(), List(4.400000095367432, 2.9000000953674316, 1.399999976158142, 0.20000000298023224))"
4.9,3.1,1.5,0.1,setosa,2.0,"List(1, 4, List(), List(4.900000095367432, 3.0999999046325684, 1.5, 0.10000000149011612))"


#### Data Science Process - Splitting Data into Train and Test sets
<img src="http://www.ebc.cat/wp-content/uploads/2017/01/train_test_detail.png" alt="train-test" width="900"/>

In [7]:
# Randomly split data into training and test sets, and cache them
trainDF, testDF = irisDF_vectorized.randomSplit([0.67, 0.33], seed=10)

trainDF.cache()
testDF.cache()

#### Define Function to Train and Test Model, while Tracking Attributes with MLflow

In [9]:
import mlflow
import mlflow.spark
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def train_and_evaluate(maxBins, maxDepth):
  
  print("maxBins =", maxBins, ", maxDepth =", maxDepth)
  with mlflow.start_run():
    # Initialize DecisionTree algorithm
    dtc = DecisionTreeClassifier() \
                .setLabelCol("label") \
                .setFeaturesCol("features") \
                .setMaxBins(maxBins) \
                .setMaxDepth(maxDepth)

    # Fit decision tree to training dateset
    pipeline = Pipeline(stages=[dtc])
    pipelineModel = pipeline.fit(trainDF)
    dtc_model = pipelineModel.stages[-1]
    # Print depth and number of nodes of decision tree
    print(dtc_model)

    # Make predictions on the test dataset
    predictions = dtc_model.transform(testDF)
    
    # Evaluate model using the accuracy metric
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
    evaluator2 = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="f1")
    accuracy = evaluator.evaluate(predictions)
    f1 = evaluator2.evaluate(predictions)
    print("Accuracy =", accuracy)
    print("F1 score =", f1, "\n")

    # Log model parameters in MLflow
    mlflow.log_param("maxBins", maxBins)
    mlflow.log_param("maxDepth", maxDepth)
    
    # Log model performance metrics in MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1 score", f1)
    
    # Save and log model in MLflow
    mlflow.spark.log_model(pipelineModel, "decision-tree-model")
    
    return dtc_model

#### Use Various Parameters to Experiment with Models
Can re-run the following cell to generate new model with various parameter values, while tracking parameters and model performance metrics via MLflow

In [11]:
maxBins  = 15
maxDepth = 2
dtc_model = train_and_evaluate(maxBins, maxDepth)

#### Visualize Decision Tree Model

In [13]:
display(dtc_model)

treeNode
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":0.75,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":2.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":5.049999952316284,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


#### Evaluate Feature Importance

In [15]:
va = PipelineModel.stages[-1]
list(zip(va.getInputCols(), dtc_model.featureImportances))