In [1]:
%load_ext autoreload
%autoreload 2

In [2]:

%load_ext mlflow.pipelines

# MLflow Regression Pipeline Notebook

This notebook runs the MLflow Regression Pipeline on Databricks and inspects its results. For more information about the MLflow Regression Pipeline, including usage examples, see the [Regression Pipeline overview documentation](https://mlflow.org/docs/latest/pipelines.html#regression-pipeline) the [Regression Pipeline API documentation](https://mlflow.org/docs/latest/python_api/mlflow.pipelines.html#module-mlflow.pipelines.regression.v1.pipeline).

## Setup

Initialize and inspect the pipeline. Configure MLflow logging.

In [3]:
from mlflow.pipelines import Pipeline

p = Pipeline()

2022/09/11 20:46:23 INFO mlflow.pipelines.pipeline: Creating MLflow Pipeline 'sklearn_regression'


In [4]:
p.inspect()

In [5]:
import os
import mlflow

mlflow.set_tracking_uri(f"sqlite:///{os.path.abspath('../metadata/mlflow/mlruns3.db')}")
mlflow.set_experiment(
    "sklearn_regression_experiment",
    artifact_location=os.path.abspath('../metadata/mlflow/mlartifacts')
)

<Experiment: artifact_location='/Users/corey.zumar/mlflow/examples/pipelines/sklearn_regression/metadata/mlflow/mlartifacts', experiment_id='1', lifecycle_stage='active', name='sklearn_regression_experiment', tags={'mlflow.experiment.primaryMetric.greaterIsBetter': 'False',
 'mlflow.experiment.primaryMetric.name': 'root_mean_squared_error_on_data_test'}>

## Ingest

Ingest data for model development

In [6]:
ingested_df = p.ingest(
    location="./data/sample.parquet",
    format="parquet",
    use_cached=True,
)
target_col = "fare_amount"

name,type
tpep_pickup_datetime,datetime
tpep_dropoff_datetime,datetime
trip_distance,number
fare_amount,number
pickup_zip,integer
dropoff_zip,integer


## Split

Split data into train, validation, and test datasets

In [7]:
%%mlp_code steps/split.py

"""
This module defines the following routines used by the 'split' step of the regression pipeline:

- ``process_splits``: Defines customizable logic for processing & cleaning the training, validation,
  and test datasets produced by the data splitting procedure.
"""

from pandas import DataFrame

from mlflow.pipelines.decorators import mlp


@mlp
def process_splits(
    train_df: DataFrame, validation_df: DataFrame, test_df: DataFrame
) -> (DataFrame, DataFrame, DataFrame):
    """
    Perform additional processing on the split datasets.

    :param train_df: The training dataset produced by the data splitting procedure.
    :param validation_df: The validation dataset produced by the data splitting procedure.
    :param test_df: The test dataset produced by the data splitting procedure.
    :return: A tuple containing, in order: the processed training dataset, the processed
             validation dataset, and the processed test dataset.
    """

    def process(df: DataFrame):
        # Drop invalid data points
        cleaned = df.dropna()
        # Filter out invalid fare amounts and trip distance
        cleaned = cleaned[
            (cleaned["fare_amount"] > 0)
            & (cleaned["trip_distance"] < 400)
            & (cleaned["trip_distance"] > 0)
            & (cleaned["fare_amount"] < 1000)
        ]

        cleaned["pickup_dow"] = cleaned["tpep_pickup_datetime"].dt.dayofweek
        cleaned["pickup_hour"] = cleaned["tpep_pickup_datetime"].dt.hour
        trip_duration = (
            cleaned["tpep_dropoff_datetime"] - cleaned["tpep_pickup_datetime"]
        )
        cleaned["trip_duration"] = trip_duration.map(lambda x: x.total_seconds() / 60)

        return cleaned

    return process(train_df), process(validation_df), process(test_df)


In [8]:
train_df, val_df, test_df = p.split(
    split_ratios=[0.1, 0.1, 0.8],
    target_col=target_col,
    post_split_method=process_splits
)

## Transform

Fit a transformer on the training dataset and apply the fitted transformer to the training and validation datasets

In [9]:
%%mlp_code steps/transform.py

"""
This module defines the following routines used by the 'transform' step of the regression pipeline:

- ``transformer_fn``: Defines customizable logic for transforming input data before it is passed
  to the estimator during model inference.
"""

from pandas import DataFrame
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline as SkPipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, FunctionTransformer

from mlflow.pipelines.decorators import mlp

def calculate_features(df: DataFrame):
    """
    Extend the input dataframe with pickup day of week and hour, and trip duration.
    Drop the now-unneeded pickup datetime and dropoff datetime columns.
    """
    df["pickup_dow"] = df["tpep_pickup_datetime"].dt.dayofweek
    df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
    trip_duration = (
            df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]
    )
    df["trip_duration"] = trip_duration.map(lambda x: x.total_seconds() / 60)
    df.drop(columns=["tpep_pickup_datetime", "tpep_dropoff_datetime"], inplace=True)
    return df


@mlp
def transformer_fn():
    """
    Returns an *unfitted* transformer that defines ``fit()`` and ``transform()`` methods.
    The transformer's input and output signatures should be compatible with scikit-learn
    transformers.
    """
    return SkPipeline(
        steps=[
            (
                "calculate_time_and_duration_features",
                FunctionTransformer(calculate_features, feature_names_out="one-to-one"),
            ),
            (
                "encoder",
                ColumnTransformer(
                    transformers=[
                        (
                            "hour_encoder",
                            OneHotEncoder(categories="auto", sparse=False),
                            ["pickup_hour"],
                        ),
                        (
                            "day_encoder",
                            OneHotEncoder(categories="auto", sparse=False),
                            ["pickup_dow"],
                        ),
                        (
                            "std_scaler",
                            StandardScaler(),
                            ["trip_distance", "trip_duration"],
                        ),
                    ]
                ),
            ),
        ]
    )

In [10]:
transformed_train_df, transformed_val_df = p.transform(transformer_method=transformer_fn, target_col=target_col)

Name,Type
tpep_pickup_datetime,datetime64[ns]
tpep_dropoff_datetime,datetime64[ns]
trip_distance,float64
fare_amount,float64
pickup_zip,int32
dropoff_zip,int32
pickup_dow,int64
pickup_hour,int64
trip_duration,float64

Name,Type
hour_encoder__pickup_hour_0,float64
hour_encoder__pickup_hour_1,float64
hour_encoder__pickup_hour_2,float64
hour_encoder__pickup_hour_3,float64
hour_encoder__pickup_hour_4,float64
hour_encoder__pickup_hour_5,float64
hour_encoder__pickup_hour_6,float64
hour_encoder__pickup_hour_7,float64
hour_encoder__pickup_hour_8,float64
hour_encoder__pickup_hour_9,float64


## Define metrics

Define metrics that will be used across the **Train** and **Evaluate** stages of the pipeline

In [11]:
%%mlp_code steps/metrics.py

"""
This module defines custom metric functions that are invoked during the 'train' and 'evaluate'
steps to provide model performance insights. Custom metric functions defined in this module are
referenced in the ``metrics`` section of ``pipeline.yaml``, for example:

.. code-block:: yaml
    :caption: Example custom metrics definition in ``pipeline.yaml``

    metrics:
      custom:
        - name: weighted_mean_squared_error
          function: weighted_mean_squared_error
          greater_is_better: False
"""

from typing import Dict

from pandas import DataFrame
from sklearn.metrics import mean_squared_error

from mlflow.pipelines.decorators import mlp


@mlp
def weighted_mean_squared_error(
    eval_df: DataFrame,
    builtin_metrics: Dict[str, int],  # pylint: disable=unused-argument
) -> Dict[str, int]:
    """
    Computes the weighted mean squared error (MSE) metric.

    :param eval_df: A Pandas DataFrame containing the following columns:

                    - ``"prediction"``: Predictions produced by submitting input data to the model.
                    - ``"target"``: Ground truth values corresponding to the input data.

    :param builtin_metrics: A dictionary containing the built-in metrics that are calculated
                            automatically during model evaluation. The keys are the names of the
                            metrics and the values are the scalar values of the metrics. For more
                            information, see
                            https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate.
    :return: A single-entry dictionary containing the MSE metric. The key is the metric name and
             the value is the scalar metric value. Note that custom metric functions can return
             dictionaries with multiple metric entries as well.
    """
    return {
        "weighted_mean_squared_error": mean_squared_error(
            eval_df["prediction"],
            eval_df["target"],
            sample_weight=1 / eval_df["prediction"].values,
        )
    }


In [12]:
metrics = {
    "custom": [
        {
            "name": "weighted_mean_squared_error",
            "function": weighted_mean_squared_error,  
            "greater_is_better": False
        },
    ],
    # Use the built-in "root_mean_squared_error" metric for ranking models during development
    "primary": "root_mean_squared_error",
}

## Train

Fit an estimator on the training data, save a pipeline model consisting of the fitted transformer and the fitted estimator, and evaluate the pipeline model on the training and validation datasets

In [13]:
%%mlp_code steps/train.py

"""
This module defines the following routines used by the 'train' step of the regression pipeline:

- ``estimator_fn``: Defines the customizable estimator type and parameters that are used
  during training to produce a model pipeline.
"""

from mlflow.pipelines.decorators import mlp


@mlp
def estimator_fn():
    """
    Returns an *unfitted* estimator that defines ``fit()`` and ``predict()`` methods.
    The estimator's input and output signatures should be compatible with scikit-learn
    estimators.
    """
    from sklearn.linear_model import SGDRegressor

    return SGDRegressor(random_state=42)

In [14]:
model = p.train(estimator_method=estimator_fn, metrics=metrics, target_col=target_col)

Metric,training,validation
root_mean_squared_error,1.52682,12.1932
example_count,1037.0,1021.0
max_error,24.1959,387.524
mean_absolute_error,0.604771,0.967447
mean_absolute_percentage_error,0.0487821,0.0886578
mean_on_label,12.4219,12.4701
mean_squared_error,2.33119,148.674
r2_score,0.976947,-0.469174
score,0.976947,-0.469174
sum_on_label,12881.5,12732.0

Name,Type
tpep_pickup_datetime,datetime
tpep_dropoff_datetime,datetime
trip_distance,double
pickup_zip,integer
dropoff_zip,integer
pickup_dow,long
pickup_hour,long
trip_duration,double

Name,Type
-,"Tensor('float64', (-1,))"

absolute_error,prediction,fare_amount,tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,pickup_zip,dropoff_zip,pickup_dow,pickup_hour,trip_duration
24.19589,15.80411,40.0,2016-02-21 22:00:15,2016-02-21 22:00:55,6.3,7030,7030,6,22,0.666667
18.635776,43.864224,62.5,2016-02-01 10:30:45,2016-02-01 10:56:12,16.1,10019,7114,0,10,25.45
18.34306,60.15694,78.5,2016-01-22 10:09:42,2016-01-22 11:04:16,19.8,10028,7114,4,10,54.566667
13.150804,65.150804,52.0,2016-02-18 16:44:37,2016-02-18 17:53:50,20.8,10005,11422,3,16,69.216667
9.72791,61.72791,52.0,2016-01-30 14:38:28,2016-01-30 15:27:48,21.9,10103,11422,5,14,49.333333
9.300988,61.300988,52.0,2016-01-07 17:57:51,2016-01-07 18:52:35,21.0,11422,10023,3,17,54.733333
8.02927,43.97073,52.0,2016-02-20 05:54:48,2016-02-20 06:20:50,16.65,10019,11422,5,5,26.033333
7.814331,59.814331,52.0,2016-02-11 15:44:42,2016-02-11 16:48:25,18.8,11422,10014,3,15,63.716667
6.605409,58.605409,52.0,2016-01-15 18:33:21,2016-01-15 19:38:00,18.02,11422,10018,4,18,64.65
6.322139,39.677861,46.0,2016-01-25 17:30:08,2016-01-25 18:37:06,8.41,11370,10018,0,17,66.966667

Unnamed: 0,Latest,Best,2nd Best
Model Rank,1,1,2
root_mean_squared_error,12.1932,12.1932,14.7792
weighted_mean_squared_error,3.75588,3.75588,4.8956
max_error,387.524,387.524,468.065
mean_absolute_error,0.967447,0.967447,1.13139
mean_absolute_percentage_error,0.0886578,0.0886578,2.07974e+14
mean_squared_error,148.674,148.674,218.424
Run Time,2022-09-11 20:34:08,2022-09-11 20:34:08,2022-09-11 20:16:15
Run ID,2cdee10cdf324879adbaa8a4bce1b3bd,2cdee10cdf324879adbaa8a4bce1b3bd,f9716742d401405087da5758086fcdb5


## Evaluate

Evaluate the pipeline model on the test dataset and compare resulting performance metrics against specified validation criteria

In [15]:
p.evaluate(
    metrics=metrics,
    validation_criteria=[
        {
            "metric": "root_mean_squared_error",
            "threshold": 100
        },
        {
            "metric": "mean_absolute_error",
            "threshold": 600
        },
        {
            "metric": "weighted_mean_squared_error",
            "threshold": 200
        },
    ],
    target_col=target_col,
)

Metric,validation,test
root_mean_squared_error,12.1932,16.691811
example_count,1021.0,7903.0
max_error,387.524,430.703478
mean_absolute_error,0.967447,1.325932
mean_absolute_percentage_error,0.0886578,0.150586
mean_on_label,12.4701,12.397762
mean_squared_error,148.674,278.616553
r2_score,-0.469174,-1.563801
score,-0.469174,-1.563801
sum_on_label,12732.0,97979.51

metric,greater_is_better,value,threshold,validated
root_mean_squared_error,0,16.6918,100,✅
mean_absolute_error,0,1.32593,600,✅
weighted_mean_squared_error,0,8.91785,200,✅


## Register

Register the trained pipeline model with the MLflow Model Registry

In [16]:
model_version = p.register(model_name="mymodel")