- MLflow is an open source platform for end-to-end machine learning operations. 
- Using MLflow, data scientists can track model training experiments; logging parameters, metrics, and other assets. 
- Machine learning engineers can use MLflow to deploy and manage models, enabling applications to consume the models and use them to inference predictions for new data.

## Register and serve models with MLflow

Model registration allows MLflow and Azure Databricks to keep track of models; which is important for two reasons:

- Registering a model allows you to serve the model for real-time, streaming, or batch inferencing. Registration makes the process of using a trained model easy, as now data scientists don't need to develop application code; the serving process builds that wrapper and exposes a REST API or method for batch scoring automatically.

- Registering a model allows you to create new versions of that model over time; giving you the opportunity to track model changes and even perform comparisons between different historical versions of models.

You can then select the option to register the model using the user interface in the experiment viewer.

Alternatively, if you want to register the model without reviewing the metrics in the run, you can include the registered_model_name parameter in the log_model method; in which case the model is automatically registered during the experiment run.

In [0]:
with mlflow.start_run():
    # code to train model goes here

    # log the model itself (and the environment it needs to be used)
    unique_model_name = "my_model-" + str(time.time())
    mlflow.spark.log_model(spark_model=model,
                           artifact_path=unique_model_name
                           conda_env=mlflow.spark.get_default_conda_env(),
                           registered_model_name="my_model")

## Using a model for inferencing

### The process of using a model to predict labels from new feature data is known as inferencing. 

You can use MLflow in Azure Databricks to make models available for inferencing in the following ways:

- Host the model as a real-time service with an HTTP endpoint to which client applications can make REST requests.
- Use the model to perform perpetual streaming inferencing of labels based on a delta table of features, writing the results to an output table.
- Use the model for batch inferencing based on a delta table, writing the results of each batch operation to a specific folder.

You can deploy a model for inferencing from its page in the Models section of the Azure Databricks portal as shown here:

# Exercise

In [0]:
 %sh
 rm -r /Workspace/MicrosoftLearnings/dbfs/mlflow_lab
 mkdir /Workspace/MicrosoftLearnings/dbfs/mlflow_lab
 pwd
 cd /Workspace/MicrosoftLearnings/dbfs/mlflow_lab
 pwd
 cd /dbfs/mlflow_lab
 pwd

/Workspace/MicrosoftLearnings
/Workspace/MicrosoftLearnings/dbfs/mlflow_lab
/dbfs/mlflow_lab


In [0]:
 %sh
 rm -r /Workspace/MicrosoftLearnings/dbfs/mlflow_lab
 mkdir /Workspace/MicrosoftLearnings/dbfs/mlflow_lab
 wget -O /Workspace/MicrosoftLearnings/dbfs/mlflow_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv

--2025-09-22 08:02:28--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9533 (9.3K) [text/plain]
Saving to: ‘/Workspace/MicrosoftLearnings/dbfs/mlflow_lab/penguins.csv’

     0K .........                                             100% 2.31M=0.004s

2025-09-22 08:02:28 (2.31 MB/s) - ‘/Workspace/MicrosoftLearnings/dbfs/mlflow_lab/penguins.csv’ saved [9533/9533]



## Now prepare the data for machine learning.

- Remove any incomplete rows
- Apply appropriate data types
- View a random sample of the data
- Split the data into two datasets: one for training, and another for testing.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
   
data = spark.read.format("csv").option("header", "true").load("file:/Workspace/MicrosoftLearnings/dbfs/mlflow_lab/penguins.csv")
data = data.dropna().select(col("Island").astype("string"),
                            col("CulmenLength").astype("float"),
                            col("CulmenDepth").astype("float"),
                            col("FlipperLength").astype("float"),
                            col("BodyMass").astype("float"),
                            col("Species").astype("int")
                          )
display(data.sample(0.2).limit(10))

splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

Island,CulmenLength,CulmenDepth,FlipperLength,BodyMass,Species
Torgersen,39.3,20.6,190.0,3650.0,0
Torgersen,39.2,19.6,195.0,4675.0,0
Torgersen,42.5,20.7,197.0,4500.0,0
Biscoe,37.7,18.7,180.0,3600.0,0
Biscoe,35.9,19.2,189.0,3800.0,0
Biscoe,37.9,18.6,172.0,3150.0,0
Biscoe,40.5,18.9,180.0,3950.0,0
Dream,37.2,18.1,178.0,3900.0,0
Dream,39.2,21.1,196.0,4150.0,0
Dream,39.8,19.1,184.0,4650.0,0


Training Rows: 240  Testing Rows: 102


## Run an MLflow experiment

You can use the same libraries and techniques you normally use to train and evaluate a model (in this case, we’ll use the Spark MLLib library), but do so within the context of an MLflow experiment that includes additional commands to log important metrics and information during the process.

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time
   
# Start an MLflow run
with mlflow.start_run():
    catFeature = "Island"
    numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
     
    # parameters
    maxIterations = 5
    regularization = 0.5
   
    # Define the feature engineering and model steps
    catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
    numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
    numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
    featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
    algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=maxIterations, regParam=regularization)
   
    # Chain the steps as stages in a pipeline
    pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
   
    # Log training parameter values
    print ("Training Logistic Regression model...")
    mlflow.log_param('maxIter', algo.getMaxIter())
    mlflow.log_param('regParam', algo.getRegParam())
    model = pipeline.fit(train)
      
    # Evaluate the model and log metrics
    prediction = model.transform(test)
    metrics = ["accuracy", "weightedRecall", "weightedPrecision"]
    for metric in metrics:
        evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction", metricName=metric)
        metricValue = evaluator.evaluate(prediction)
        print("%s: %s" % (metric, metricValue))
        mlflow.log_metric(metric, metricValue)
   
           
    # Log the model itself
    unique_model_name = "classifier-" + str(time.time())
    mlflow.spark.log_model(model, unique_model_name, mlflow.spark.get_default_conda_env())
    modelpath = "/Workspace/MicrosoftLearnings/model/%s" % (unique_model_name)
    mlflow.spark.savehttps://adb-385780289707858.18.azuredatabricks.net/editor/notebooks/3224834034575109?o=385780289707858$0_model(model, modelpath)
       
    print("Experiment run complete. And Model has been saved")

Training Logistic Regression model...
accuracy: 0.9313725490196079
weightedRecall: 0.9313725490196079
weightedPrecision: 0.9406108597285068


Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Experiment run complete.


## Create a function

In machine learning projects, data scientists often try training models with different parameters, logging the results each time. To accomplish that, it’s common to create a function that encapsulates the training process and call it with the parameters you want to try.

In [0]:
def train_penguin_model(training_data, test_data, maxIterations, regularization):
    import mlflow
    import mlflow.spark
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    import time

    # End any active MLflow run
    mlflow.end_run()
    
    # Start an MLflow run
    with mlflow.start_run():
   
        catFeature = "Island"
        numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
   
        # Define the feature engineering and model steps
        catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
        numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
        numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
        featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
        algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=maxIterations, regParam=regularization)
   
        # Chain the steps as stages in a pipeline
        pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
   
        # Log training parameter values
        print ("Training Logistic Regression model...")
        mlflow.log_param('maxIter', algo.getMaxIter())
        mlflow.log_param('regParam', algo.getRegParam())
        model = pipeline.fit(training_data)
   
        # Evaluate the model and log metrics
        prediction = model.transform(test_data)
        metrics = ["accuracy", "weightedRecall", "weightedPrecision"]
        for metric in metrics:
            evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction", metricName=metric)
            metricValue = evaluator.evaluate(prediction)
            print("%s: %s" % (metric, metricValue))
            mlflow.log_metric(metric, metricValue)
   
   
        # Log the model itself
        unique_model_name = "classifier-" + str(time.time())
        mlflow.spark.log_model(model, unique_model_name, mlflow.spark.get_default_conda_env())
        modelpath = "/model/%s" % (unique_model_name)
        mlflow.spark.save_model(model, modelpath)
   
        print("Experiment run complete.")

In [0]:
# End any active MLflow run
mlflow.end_run()

In [0]:
train_penguin_model(train, test, 10, 0.2)

Training Logistic Regression model...
accuracy: 0.9411764705882353
weightedRecall: 0.9411764705882353
weightedPrecision: 0.9480968858131488


Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Experiment run complete.


## Register and deploy a model with MLflow

### You’ve already logged the model trained by each experiment run. 
### You can also register models and deploy them so they can be served to client applications.

In [0]:
def train_penguin_model(training_data, test_data, maxIterations, regularization):
    import mlflow
    import mlflow.spark
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from mlflow.models.signature import infer_signature
    import time

    # End any active MLflow run
    mlflow.end_run()
    
    # Start an MLflow run
    with mlflow.start_run():
   
        catFeature = "Island"
        numFeatures = ["CulmenLength", "CulmenDepth", "FlipperLength", "BodyMass"]
   
        # Define the feature engineering and model steps
        catIndexer = StringIndexer(inputCol=catFeature, outputCol=catFeature + "Idx")
        numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
        numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
        featureVector = VectorAssembler(inputCols=["IslandIdx", "normalizedFeatures"], outputCol="Features")
        algo = LogisticRegression(labelCol="Species", featuresCol="Features", maxIter=maxIterations, regParam=regularization)
   
        # Chain the steps as stages in a pipeline
        pipeline = Pipeline(stages=[catIndexer, numVector, numScaler, featureVector, algo])
   
        # Log training parameter values
        print ("Training Logistic Regression model...")
        mlflow.log_param('maxIter', algo.getMaxIter())
        mlflow.log_param('regParam', algo.getRegParam())
        model = pipeline.fit(training_data)
   
        # Evaluate the model and log metrics
        prediction = model.transform(test_data)
        metrics = ["accuracy", "weightedRecall", "weightedPrecision"]
        for metric in metrics:
            evaluator = MulticlassClassificationEvaluator(labelCol="Species", predictionCol="prediction", metricName=metric)
            metricValue = evaluator.evaluate(prediction)
            print("%s: %s" % (metric, metricValue))
            mlflow.log_metric(metric, metricValue)
   
        # Infer model signature
        input_example = training_data.limit(5).toPandas()
        output_example = model.transform(training_data.limit(5)).toPandas()
        signature = infer_signature(input_example, output_example)
   
        # Log the model itself with signature
        unique_model_name = "main.default.classifier-" + str(time.time())
        mlflow.spark.log_model(
            model, 
            unique_model_name, 
            conda_env=mlflow.spark.get_default_conda_env(),
            signature=signature,
            input_example=input_example
        )
        modelpath = "/model/%s" % (unique_model_name)
        mlflow.spark.save_model(model, modelpath)
   
        print("Experiment run complete.")

In [0]:
train_penguin_model(train, test, 15, 0.2)

Training Logistic Regression model...
accuracy: 0.9411764705882353
weightedRecall: 0.9411764705882353
weightedPrecision: 0.9480968858131488




Uploading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

Experiment run complete.


## Register and deploy a model with MLflow

## Summary of steps for registry of model with endpoint
- Register Model - You will register the best model available
- Use Model for Inference - You model will be made available with an endpoint having cluster/server/VM and you can send the in parameters in the json format to get the output in json format

### Steps to follow
- View the details page for the most recent experiment run.

- Use the Register Model button to register the model that was logged in that experiment and when prompted, create a new model named Penguin Predictor.
    - While Registering model use "Workspace Model Registry" and not "Unity Catalog", since it was throwing error
    - Bad model name: please specify all three levels of the model in the form `catalog_name.schema_name.model_name`

- When the model has been registered, view the Models page (in the navigation bar on the left) and submenu "Workspace Model Registry" and select the Penguin Predictor model.

- In the page for the Penguin Predictor model, use the Use model for inference button to create a new real-time endpoint with the following settings:
    - Model: Penguin Predictor
    - Model version: 1
    - Endpoint: predict-penguin
    - Compute size: Small
    
The serving endpoint is hosted in a new cluster, which it may take several minutes to create.




### Clicks on screen
Click on Serving > predict-penguin > Use > Browse > Provide input on LHS in JSON format > Click Send request > view output on RHS 

One of way sending the input
Multiple data jsons can be created under data for each data row prediction in this one input alone 

**Input**

{
  "dataframe_split": {
    "columns": [
      "Island",
      "CulmenLength",
      "CulmenDepth",
      "FlipperLength",
      "BodyMass",
      "Species"
    ],
    "data": [
      [
        "Biscoe",
        48.7,
        14.1,
        210,
        4450,
        0
      ]
    ]
  }
}

**Output**

{
  "predictions": [
    1
  ]
}


Another way of sending the same input

**input**

 {
   "dataframe_records": [
   {
      "Island": "Biscoe",
      "CulmenLength": 48.7,
      "CulmenDepth": 14.1,
      "FlipperLength": 210,
      "BodyMass": 4450,
      "Species": 4450
   }
   ]
 }

 **output**

 {
  "predictions": [
    1
  ]
}

In [0]:
print('hi')

hi
