#### Import Libraries

In [1]:
# Create an experiment to track and register model with mlflow
import mlflow

# Specify names for experiment and model
mlexperiment_name = "nyc_yellowtaxi_predict_tripduration"
mlalgorithm_name = "lightgbm" 
mlmodel_name = f"{mlexperiment_name}_{mlalgorithm_name}"

mlflow.set_experiment(mlexperiment_name)

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 3, Finished, Available)

2023/09/13 23:48:47 INFO mlflow.tracking.fluent: Experiment with name 'nyc_yellowtaxi_predict_tripduration' does not exist. Creating a new experiment.


<Experiment: artifact_location='', creation_time=1694648929013, experiment_id='7cde3447-5067-495c-84a4-0256ea1cc9bf', last_update_time=None, lifecycle_stage='active', name='nyc_yellowtaxi_predict_tripduration', tags={}>

#### Create a dataframe based on sample of the cleansed data

In [2]:
SEED = 1234 # Specify a random seed to use with random sampling

# we are randomly sampling training data to speed up overall execution time - 50% of the total data
nyc_yellowtaxi_clean_sampled_df = spark.read.table("nyc_yellowtaxi_clean").sample(fraction = 0.5, seed = SEED)
nyc_yellowtaxi_clean_sampled_df.count()

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 4, Finished, Available)

34397605

#### Randomly split the sampled data to create train (with 75% of the sample) and test (with 25% of the sample) datasets 
#### Define categorical and numeric features

In [3]:
nyc_yellowtaxi_predict_tripduration_train_df, nyc_yellowtaxi_predict_tripduration_test_df = nyc_yellowtaxi_clean_sampled_df.randomSplit([0.75, 0.25], seed=SEED)

# Cache these dataframes in memory to improve the speed of subsequent reads
nyc_yellowtaxi_predict_tripduration_train_df.cache()
nyc_yellowtaxi_predict_tripduration_test_df.cache()

print(f"train set count:{nyc_yellowtaxi_predict_tripduration_train_df.count()}")
print(f"test set count:{nyc_yellowtaxi_predict_tripduration_test_df.count()}")

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 5, Finished, Available)

train set count:25799958
test set count:8597647


#### Define categorical and numerical features

In [4]:
categorical_features = ["storeAndFwdFlag","timeBins","vendorID","weekDayName","pickupHour","rateCodeId","paymentType"]
numerical_features = ['passengerCount', "tripDistance"]

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 6, Finished, Available)

In [5]:
from pyspark.ml import Pipeline
from synapse.ml.core.platform import *
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from synapse.ml.lightgbm import LightGBMRegressor

# Define a machine learning pipeline steps for training a LightGBMRegressor regressor model
def create_lgbmr_pipeline(categorical_features,numerical_features, hyperparameters):
    # Create string indexer
    strindx = StringIndexer(inputCols=categorical_features, 
                        outputCols=[f"{feat}StrIdx" for feat in categorical_features]).setHandleInvalid("keep")
    # Apply one hot encoding for categorical/indexed columns
    ohe = OneHotEncoder(inputCols= strindx.getOutputCols(),  
                        outputCols=[f"{feat}OHEnc" for feat in categorical_features])
    
    # convert all feature of the dataset into a vector
    featurizer_vector = VectorAssembler(inputCols=ohe.getOutputCols() + numerical_features, outputCol="features")

    # Define the LightGBM regressor hyperparameters
    lgbmr_hyperparameters = LightGBMRegressor(
        objective = hyperparameters["objective"],
        alpha = hyperparameters["alpha"],
        learningRate = hyperparameters["learning_rate"],
        numLeaves = hyperparameters["num_leaves"],
        labelCol="tripDuration",
        numIterations = hyperparameters["iterations"],
    )
    # Define the steps and sequence of the Spark ML pipeline
    spark_ml_pipeline = Pipeline(stages=[strindx, ohe, featurizer_vector, lgbmr_hyperparameters])
    return spark_ml_pipeline

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 7, Finished, Available)

In [6]:
from mlflow.models.signature import ModelSignature 
from mlflow.types.utils import _infer_schema 

# Define a function to register a spark model
def register_spark_ml_model(mlflow_active_run, mlmodel, mlmodel_name, mlmodel_signature, mlmodel_metrics, mlmodel_hyperparameters):
        # log the model, parameters and metrics
        mlflow.spark.log_model(mlmodel, artifact_path = mlmodel_name, signature=mlmodel_signature, registered_model_name = mlmodel_name, dfs_tmpdir="Files/mlflow/tmp/") 
        mlflow.log_params(mlmodel_hyperparameters) 
        mlflow.log_metrics(mlmodel_metrics) 
        mlmodel_uri = f"runs:/{mlflow_active_run.info.run_id}/{mlmodel_name}" 
        print(f"Model saved in run{mlflow_active_run.info.run_id}") 
        print(f"Model URI: {mlmodel_uri}")
        return mlmodel_uri

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 8, Finished, Available)

#### Model Training #1

In [7]:
# Default hyperparameters for LightGBM regressor
lgbmr_hyperparameters = {"objective":"regression",
    "alpha":0.09,
    "learning_rate":0.01,
    "num_leaves":92,
    "iterations":200}

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 9, Finished, Available)

In [8]:
if mlflow.active_run() is None:
    mlflow.start_run()
mlflow_active_run = mlflow.active_run()

print(f"Active experiment run_id: {mlflow_active_run.info.run_id}")
lgbmr_pipeline = create_lgbmr_pipeline(categorical_features,numerical_features,lgbmr_hyperparameters)
lgbmr_model = lgbmr_pipeline.fit(nyc_yellowtaxi_predict_tripduration_train_df)

# Get Predictions on test dataset
lgbmr_predictions = lgbmr_model.transform(nyc_yellowtaxi_predict_tripduration_test_df)

## Caching scored predictions so that when running model evaluation it runs faster 
lgbmr_predictions.cache()

print(f"Prediction run for {lgbmr_predictions.count()} samples")

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 10, Finished, Available)

Active experiment run_id: 91917f3e-0847-430f-9c91-b6da3d17b324
Prediction run for 8597647 samples


In [9]:
from synapse.ml.train import ComputeModelStatistics
import json

# compute model statistics to evaluate its performance
lgbmr_metrics = ComputeModelStatistics(
    evaluationMetric="regression", labelCol="tripDuration", scoresCol="prediction"
).transform(lgbmr_predictions)

lgbmr_metrics_dict = json.loads(lgbmr_metrics.toJSON().first())
lgbmr_metrics_dict

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 11, Finished, Available)

{'mean_squared_error': 31.32707418869297,
 'root_mean_squared_error': 5.597059423366253,
 'R^2': 0.7369768686952491,
 'mean_absolute_error': 3.6737069472412704}

In [10]:
# Define Signature object 
mlmodel_signature = ModelSignature(inputs=_infer_schema(nyc_yellowtaxi_predict_tripduration_train_df.select(categorical_features + numerical_features)), 
                     outputs=_infer_schema(nyc_yellowtaxi_predict_tripduration_train_df.select("tripDuration"))) 

# Call model register function
model_uri = register_spark_ml_model(mlflow_active_run = mlflow_active_run,
                                mlmodel = lgbmr_model, 
                                mlmodel_name = mlmodel_name, 
                                mlmodel_signature = mlmodel_signature, 
                                mlmodel_metrics = lgbmr_metrics_dict, 
                                mlmodel_hyperparameters = lgbmr_hyperparameters)
mlflow.end_run()

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 12, Finished, Available)

  mlmodel_signature = ModelSignature(inputs=_infer_schema(nyc_yellowtaxi_predict_tripduration_train_df.select(categorical_features + numerical_features)),
Successfully registered model 'nyc_yellowtaxi_predict_tripduration_lightgbm'.
2023/09/13 23:58:13 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: nyc_yellowtaxi_predict_tripduration_lightgbm, version 1
Created version '1' of model 'nyc_yellowtaxi_predict_tripduration_lightgbm'.


Model saved in run91917f3e-0847-430f-9c91-b6da3d17b324
Model URI: runs:/91917f3e-0847-430f-9c91-b6da3d17b324/nyc_yellowtaxi_predict_tripduration_lightgbm


#### Model Training #2

In [11]:
# Tuned hyperparameters for LightGBM regressor
lgbmr_tuned_hyperparameters = {"objective":"regression",
    "alpha":0.9,
    "learning_rate":0.1,
    "num_leaves":31,
    "iterations":100}

# Remove paymentType
categorical_features.remove("paymentType")

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 13, Finished, Available)

In [12]:
if mlflow.active_run() is None:
    mlflow.start_run()
mlflow_active_run = mlflow.active_run()

print(f"Active experiment run_id: {mlflow_active_run.info.run_id}")
lgbmr_tuned_pipeline = create_lgbmr_pipeline(categorical_features, numerical_features, lgbmr_tuned_hyperparameters)
lgbmr_tuned_model = lgbmr_tuned_pipeline.fit(nyc_yellowtaxi_predict_tripduration_train_df)

# Get predictions on test dataset
lgbmr_tuned_predictions = lgbmr_tuned_model.transform(nyc_yellowtaxi_predict_tripduration_test_df)

# Caching predictions so that when running model evaluation, it runs faster 
lgbmr_tuned_predictions.cache()

print(f"Prediction run for {lgbmr_tuned_predictions.count()} samples")

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 14, Finished, Available)

Active experiment run_id: 5c4296f8-d24e-4749-9db3-29d5f1315b71
Prediction run for 8597647 samples


In [13]:
from synapse.ml.train import ComputeModelStatistics
import json

# compute model statistics to evaluate its performance
lgbmr_tuned_metrics = ComputeModelStatistics(
    evaluationMetric="regression", labelCol="tripDuration", scoresCol="prediction"
).transform(lgbmr_tuned_predictions)

lgbmr_tuned_metrics_dict = json.loads(lgbmr_tuned_metrics.toJSON().first())
lgbmr_tuned_metrics_dict

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 15, Finished, Available)

{'mean_squared_error': 28.54970506348561,
 'root_mean_squared_error': 5.34319240374943,
 'R^2': 0.7602957499830791,
 'mean_absolute_error': 3.425641443239282}

In [14]:
# Define Signature object 
mlmodel_signature = ModelSignature(inputs=_infer_schema(nyc_yellowtaxi_predict_tripduration_train_df.select(categorical_features + numerical_features)), 
                     outputs=_infer_schema(nyc_yellowtaxi_predict_tripduration_train_df.select("tripDuration")))
                     
model_uri = register_spark_ml_model(mlflow_active_run = mlflow_active_run,
                                mlmodel = lgbmr_tuned_model, 
                                mlmodel_name = mlmodel_name, 
                                mlmodel_signature = mlmodel_signature, 
                                mlmodel_metrics = lgbmr_tuned_metrics_dict, 
                                mlmodel_hyperparameters = lgbmr_tuned_hyperparameters)
mlflow.end_run()

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 16, Finished, Available)

  mlmodel_signature = ModelSignature(inputs=_infer_schema(nyc_yellowtaxi_predict_tripduration_train_df.select(categorical_features + numerical_features)),
Successfully registered model 'nyc_yellowtaxi_predict_tripduration_lightgbm'.
2023/09/14 00:02:24 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: nyc_yellowtaxi_predict_tripduration_lightgbm, version 2
Created version '2' of model 'nyc_yellowtaxi_predict_tripduration_lightgbm'.


Model saved in run5c4296f8-d24e-4749-9db3-29d5f1315b71
Model URI: runs:/5c4296f8-d24e-4749-9db3-29d5f1315b71/nyc_yellowtaxi_predict_tripduration_lightgbm


In [15]:
import mlflow
experiments = mlflow.search_experiments()
for exp in experiments:
    print(exp.name)

exp = mlflow.get_experiment_by_name(mlexperiment_name)
mlflow.search_runs(exp.experiment_id, order_by=["start_time DESC"], max_results=10)

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 17, Finished, Available)

nyc_yellowtaxi_predict_tripduration


Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.root_mean_squared_error,metrics.mean_absolute_error,metrics.mean_squared_error,metrics.R^2,...,params.learning_rate,params.num_leaves,params.objective,params.iterations,tags.synapseml.notebook.artifactId,tags.synapseml.user.id,tags.mlflow.user,tags.synapseml.user.name,tags.mlflow.rootRunId,tags.mlflow.runName
0,5c4296f8-d24e-4749-9db3-29d5f1315b71,7cde3447-5067-495c-84a4-0256ea1cc9bf,FINISHED,sds://lake.trident.com/04fb8c5b-81dd-4501-847b...,2023-09-13 23:58:18.379000+00:00,2023-09-14 00:02:25.582000+00:00,,,,,...,0.1,31,regression,100,00f9055c-5948-4b24-99cb-46837c8f2d91,dd9ce7f2-d245-40b0-9e5a-3458955b5205,4b3a56ea-6f42-450e-b7c3-fb2932c7ac32,Arshad Ali,5c4296f8-d24e-4749-9db3-29d5f1315b71,calm_rose_nj91bj0m
1,91917f3e-0847-430f-9c91-b6da3d17b324,7cde3447-5067-495c-84a4-0256ea1cc9bf,FINISHED,sds://lake.trident.com/04fb8c5b-81dd-4501-847b...,2023-09-13 23:52:08.897000+00:00,2023-09-13 23:58:15.010000+00:00,5.597059,3.673707,31.327074,0.736977,...,0.01,92,regression,200,00f9055c-5948-4b24-99cb-46837c8f2d91,dd9ce7f2-d245-40b0-9e5a-3458955b5205,4b3a56ea-6f42-450e-b7c3-fb2932c7ac32,Arshad Ali,91917f3e-0847-430f-9c91-b6da3d17b324,goofy_angle_2jms4ymf


In [16]:
exp = mlflow.get_experiment_by_name(mlexperiment_name)
mlflow.search_runs(exp.experiment_id, order_by=["start_time DESC"], max_results=10)

StatementMeta(, 03bca968-138f-43f4-8bfc-ac2f016ba407, 18, Finished, Available)

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.root_mean_squared_error,metrics.mean_absolute_error,metrics.mean_squared_error,metrics.R^2,...,params.learning_rate,params.num_leaves,params.objective,params.iterations,tags.synapseml.notebook.artifactId,tags.synapseml.user.id,tags.mlflow.user,tags.synapseml.user.name,tags.mlflow.rootRunId,tags.mlflow.runName
0,5c4296f8-d24e-4749-9db3-29d5f1315b71,7cde3447-5067-495c-84a4-0256ea1cc9bf,FINISHED,sds://lake.trident.com/04fb8c5b-81dd-4501-847b...,2023-09-13 23:58:18.379000+00:00,2023-09-14 00:02:25.582000+00:00,,,,,...,0.1,31,regression,100,00f9055c-5948-4b24-99cb-46837c8f2d91,dd9ce7f2-d245-40b0-9e5a-3458955b5205,4b3a56ea-6f42-450e-b7c3-fb2932c7ac32,Arshad Ali,5c4296f8-d24e-4749-9db3-29d5f1315b71,calm_rose_nj91bj0m
1,91917f3e-0847-430f-9c91-b6da3d17b324,7cde3447-5067-495c-84a4-0256ea1cc9bf,FINISHED,sds://lake.trident.com/04fb8c5b-81dd-4501-847b...,2023-09-13 23:52:08.897000+00:00,2023-09-13 23:58:15.010000+00:00,5.597059,3.673707,31.327074,0.736977,...,0.01,92,regression,200,00f9055c-5948-4b24-99cb-46837c8f2d91,dd9ce7f2-d245-40b0-9e5a-3458955b5205,4b3a56ea-6f42-450e-b7c3-fb2932c7ac32,Arshad Ali,91917f3e-0847-430f-9c91-b6da3d17b324,goofy_angle_2jms4ymf
