# MLflow

In this notebook, you'll learn how to use MLflow to track machine learning experiments and manage models. The goal is to train a classification model that can predict the species of a penguin based on its location and body measurements.

> **Citation**: The penguins dataset used in the this exercise is a subset of data collected and made available by [Dr. Kristen
Gorman](https://www.uaf.edu/cfos/people/faculty/detail/kristen-gorman.php)
and the [Palmer Station, Antarctica LTER](https://pal.lternet.edu/), a
member of the [Long Term Ecological Research
Network](https://lternet.edu/).

## Ingest data

Run the following cell to ingest the data file you will use in this exercise. The data file will be saved in the DBFS storage for your Azure Databricks cluster.

In [0]:
%sh
rm -r /dbfs/data
mkdir /dbfs/data
wget -O /dbfs/data/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/dp-090-databricks-ml/master/data/penguins.csv

## Prepare the data
  
Now let's prepare the data for machine learning. Run the following cell to:

1. Remove any incomplete rows
2. Apply appropriate data types
3. View a random sample of the data
4. Split the data into two datasets: one for training, and another for testing.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

data = spark.read.format("csv").option("header", "true").load("/data/penguins.csv")
data = data.dropna().select(col("Island").astype("string"),
                          col("CulmenLength").astype("float"),
                          col("CulmenDepth").astype("float"),
                          col("FlipperLength").astype("float"),
                          col("BodyMass").astype("float"),
                          col("Species").astype("int")
                          )
display(data.sample(0.2))

splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

## Run an MLflow experiment

MLflow enables you to run experiments that track the model training process and log evaluation metrics. This ability to record details of model training runs can be extremely useful in the iterative process of creating an effective machine learning model.

You can use the same libraries and techniques you normally use to train and evaluate a model (in this case, we'll use the Spark MLLib library), but do so within the context of an MLflow experiment that includes additional commands to log important metrics and information during the process.

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

# Start an MLflow run; the "with" keyword ensures we'll close the run even if this cell crashes
with mlflow.start_run():
    
    catFeature = "Island"
    numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
    
    # parameters
    maxIterations = 5
    regularization = 0.5

    # Define the feature engineering and model steps
    catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
    numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
    numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
    featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
    algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=maxIterations, regParam=regularization)

    # Chain the steps as stages in a pipeline
    pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

    # Log training parameter values
    print ("Training Logistic Regression model...")
    mlflow.log_param('maxIter', algo.getMaxIter())
    mlflow.log_param('regParam', algo.getRegParam())
    model = pipeline.fit(train)
    
    # Evaluate the model and log metrics
    prediction = model.transform(test)
    metrics = ["accuracy", "weightedRecall", "weightedPrecision"]
    for metric in metrics:
        evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction", metricName=metric)
        metricValue = evaluator.evaluate(prediction)
        print("%s: %s" % (metric, metricValue))
        mlflow.log_metric(metric, metricValue)

        
    # Log the model itself
    unique_model_name = "classifier-" + str(time.time())
    mlflow.spark.log_model(model, unique_model_name, mlflow.spark.get_default_conda_env())
    modelpath = "/model/%s" % (unique_model_name)
    mlflow.spark.save_model(model, modelpath)
    
    print("Experiment run complete.")

When the experiement run has finished, under the code cell, if necessary use the **&#9656;** toggle to expand the **MLflow run** details. The use the **experiment** hyperlink that is displayed there to open the MLflow page that lists your experiement runs. Each run is assigned a unique name.

Select the most recent run and view its details. Note that you can expand sections to see the **Parameters** and **Metrics** that were logged, and you can see details of the model that was trained and saved.

> **Tip**: You can also use the **MLflow experiments** icon in the sidebar menu on the right of this notebook to view details of experiment runs.

In machine learning projects, data scientists often try training models with different parameters, logging the results each time. To accomplish that, it's common to create a function that encapsulates the training process and call it with the parameters you want to try.

Run the following cell to create a function based on the training code you used previously.

In [0]:
def train_penguin_model(training_data, test_data, maxIterations, regularization):
    import mlflow
    import mlflow.spark
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    import time

    # Start an MLflow run; the "with" keyword ensures we'll close the run even if this cell crashes
    with mlflow.start_run():

        catFeature = "Island"
        numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]

        # Define the feature engineering and model steps
        catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
        numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
        numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
        featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
        algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=maxIterations, regParam=regularization)

        # Chain the steps as stages in a pipeline
        pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])

        # Log training parameter values
        print ("Training Logistic Regression model...")
        mlflow.log_param('maxIter', algo.getMaxIter())
        mlflow.log_param('regParam', algo.getRegParam())
        model = pipeline.fit(training_data)

        # Evaluate the model and log metrics
        prediction = model.transform(test_data)
        metrics = ["accuracy", "weightedRecall", "weightedPrecision"]
        for metric in metrics:
            evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction", metricName=metric)
            metricValue = evaluator.evaluate(prediction)
            print("%s: %s" % (metric, metricValue))
            mlflow.log_metric(metric, metricValue)


        # Log the model itself
        unique_model_name = "classifier-" + str(time.time())
        mlflow.spark.log_model(model, unique_model_name, mlflow.spark.get_default_conda_env())
        modelpath = "/model/%s" % (unique_model_name)
        mlflow.spark.save_model(model, modelpath)

        print("Experiment run complete.")

No you can call your function to try another training run with different parameter values.

In [0]:
train_penguin_model(train, test, 10, 0.2)

Once again, you can use the hyperlink in the **MLflow run** output information or the sidebar on the right to view details of the MLflow experiment run.

## Register and deploy a model with MLflow

In addition to tracking details of training experiment runs, you can use MLflow to manage the machine learning models you've trained. You've already logged the model trained by each experiment run. You can also *register* models and deploy them so they can be served to client applications.

> **Note**: Model serving is only supported in Azure Databricks *Premium* workspaces, and is retricted to [certain regions](https://learn.microsoft.com/azure/databricks/resources/supported-regions).

Follow these steps to register, deploy, and test the model trained in your most recent experiment run:

1. View the details page for the most recent experiment run.
2. Use the **Register Model** button to register the model that was logged in that experiment and when prompted, create a new model named **Penguin Predictor**.
3. When the model has been registered, view the **Models** page (in the navigation bar on the left) and select the **Penguin Predictor** model.
4. In the page for the **Penguin Predictor** model, select the **Serving** tab, and then use the **Create serving endpoint** button to create a new endpoint named **predict-penguin**. Use version **1** of the model and select the **small** compute size.

    The serving endpoint is hosted in a new cluster, which it may take several minutes to create.
  
5. When the endpoint has been created, use the **Query endpoint** button at the top right to open an interface from which you can test the endpoint. Then in the test interface, on the **Browser** tab, enter the following JSON request and use the **Send Request** button to call the endpoint and generate a prediction.

    ```json
    {
      "dataframe_records": [
      {
         "Island": "Biscoe",
         "CulmenLength": 48.7,
         "CulmenDepth": 14.1,
         "FlipperLength": 210,
         "BodyMass": 4450
      }
      ]
    }
    ```

6. Experiment with a few different values for the penguin features and observe the results that are returned. Then, close the test interface.

## Delete the endpoint

When the endpoint is not longer required, you should delete it to avoid unnecessary costs.

In the **predict-penguin** endpoint page, in the **&#8285;** menu, select **Delete**.

In this notebook, you've explored the basics of preparing data and training machine learning models using MLLib in Apache Spark.

For more information see the [Spark MLLib documentation](https://spark.apache.org/docs/latest/ml-guide.html).