# Batch Deployment

Batch inference is the most common way of deploying machine learning models. This lesson introduces various strategies for deploying models using batch including Spark. In addition, we will show how to enable optimizations for Delta tables.

## Learning Objectives

**By the end of this demo, you will be able to:**

- Load a logged Model Registry model using `pyfunc`.
- Compute predictions using `pyfunc` APIs.
- Perform batch inference using Feature Engineering's `score_batch` method.
- Materialize predictions into inference tables (Delta Lake).
- Perform common write optimizations like liquid clustering, predictive optimization to maximize data skipping and on inference tables.

For this demonstration, we will utilize a fictional dataset from a Telecom Company, which includes customer information. This dataset encompasses **customer demographics**, including gender, as well as internet subscription details such as subscription plans and payment methods.

After loading the dataset, we will perform simple **data cleaning and feature selection**.

In the final step, we will split the dataset into **features** and **response** sets.

In [0]:
print("Hello")

In [0]:
from pyspark.sql.functions import col

# dataset path (Delta table)
dataset_p_telco = "sdd_dev.sohag_test.telco_customer_churn"

# features to use
primary_key = "customerID"
response = "Churn"
features = ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"]  # Keeping numerical only for simplicity and demo purposes

# Read dataset (and drop nan)
telco_df = spark.read.table(dataset_p_telco) \
    .withColumn("TotalCharges", col("TotalCharges").cast("double")) \
    .withColumn("SeniorCitizen", col("SeniorCitizen").cast("double")) \
    .withColumn("tenure", col("tenure").cast("double")) \
    .na.drop(how="any")

# Split with 80 percent of the data in train_df and 20 percent of the data in test_df
train_df, test_df = telco_df.randomSplit([0.8, 0.2], seed=42)

# Separate features and ground-truth
features_df = train_df.select(primary_key, *features)
response_df = train_df.select(primary_key, response)

# review the features dataset
display(features_df)

# Batch Deployment - Without Feature Store

This demo will cover two main batch deployment methods. The first method is deploying models without a feature table. For the second method, we will use a feature table to train the model and later use the feature table for inference.

## Setup Model Registry with UC

Before we start model deployment, we need to fit and register a model. In this demo, **we will log models to Unity Catalog**, which means first we need to setup the **MLflow Model Registery URI**.

In [0]:
import mlflow

# Point to UC model registry
mlflow.set_registry_uri("databricks-uc")
client = mlflow.MlflowClient()

# helper function that we will use for getting latest version of a model
def get_latest_model_version(model_name):
    """Helper function to get latest model version"""
    model_version_infos = client.search_model_versions("name = '%s'" % model_name)
    return max([model_version_info.version for model_version_info in model_version_infos])

# Fit and Register a Model with UC

In [0]:
# Train a sklearn Decision Tree Classification model
from sklearn.tree import DecisionTreeClassifier
from mlflow.models import infer_signature

# Convert data to pandas dataframes
X_train_pdf = features_df.drop(primary_key).toPandas()
Y_train_pdf = response_df.drop(primary_key).toPandas()
clf = DecisionTreeClassifier(max_depth=3, random_state=42)

# Use 3-level namespace for model name
model_name = "sdd_dev.sohag_test.churn_ml_model"

with mlflow.start_run(run_name="Model-Batch-Deployment-Demo") as mlflow_run:
    # Enable automatic logging of input samples, metrics, parameters, and models
    mlflow.sklearn.autolog(
        log_input_examples=True,
        log_models=False,
        log_post_training_metrics=True,
        silent=True
    )

    clf.fit(X_train_pdf, Y_train_pdf)

    # Log model and push to registry
    signature = infer_signature(X_train_pdf, Y_train_pdf)
    mlflow.sklearn.log_model(
        clf,
        artifact_path="decision_tree",
        signature=signature,
        registered_model_name=model_name
    )

# Set model alias (i.e. Baseline)
client.set_registered_model_alias(model_name, "Baseline", get_latest_model_version(model_name))

## Use the Model for Inference
Now that our model is ready in model registry, we can use it for inference. In this section we will use the model for inference directly on a spark dataframe, which is called **batch inference**.


### Load the Model
Loading a model from UC-based model registry is done by getting a model using **alias** and **version**.

After loading the model, we will create a `spark_udf` from the model.

In [0]:
latest_model_version = client.get_model_version_by_alias(name=model_name, alias="baseline").version
model_uri = f"models:/{model_name}/{latest_model_version}"  # Should be version 1
# model_uri = f"models:/{model_name}@baseline"  # uri can also point to @alias
predict_func = mlflow.pyfunc.spark_udf(
    spark,
    model_uri
)

# Infernece

In [0]:
# prepare test dataset
test_features_df = test_df.select(primary_key, *features)

# make prediction
prediction_df = test_features_df.withColumn(
    "prediction",
    predict_func(*test_features_df.drop(primary_key).columns)
)

display(prediction_df)

In [0]:
test_features_df.drop(primary_key).columns

# Batch Deployment - With Feature Store
In the previous section we trained and registered a model using Spark dataframe. In some cases, you will need to use features from a feature store for training and inference.

In this section we will demonstrate how to train and deploy a model using Feature Store.


## Create Feature Table
Let's create a feature table based on the `features_df` that we created before. Please note that we will be using **Feature Store with Unity Catalog**, which means we need to use `FeatureEngineeringClient`.

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient

# prepare feature set
features_df_all = telco_df.select(primary_key, *features)

# feature table definition
fe = FeatureEngineeringClient()
feature_table_name = "sdd_dev.sohag_test.telco_customer_churn_features"

# drop table if exists
try:
    fe.drop_table(name=feature_table_name)
except:
    pass

# Create feature table
fe.create_table(
    name=feature_table_name,
    df=features_df_all,
    primary_keys=[primary_key],
    description="Example feature table"
)

## Setup Feature Lookups

In order to create a training set from the feature table, we need to define a **feature lookup**. This will be used for creating a training set from the feature table.

Note that the `lookup_key` is used for matching records in the feature table.

In [0]:
# Create training set based on feature lookup
from databricks.feature_engineering import FeatureLookup

fl_handle = FeatureLookup(
    table_name=feature_table_name,
    lookup_key=[primary_key]
)

training_set_spec = fe.create_training_set(
    df=response_df,
    label=response,
    feature_lookups=[fl_handle],
    exclude_columns=[primary_key]
)

# Load training dataframe based on defined feature-lookup specification
training_df = training_set_spec.load_df()

In [0]:
display(training_df)

# Fit and Register a Model with UC using Feature Table

In [0]:
# Train a sklearn Decision Tree Classification model
import warnings
from mlflow.types.utils import _infer_schema

# Convert data to pandas dataframes
X_train_pdf2 = training_df.drop(primary_key, response).toPandas()
Y_train_pdf2 = training_df.select(response).toPandas()
clf2 = DecisionTreeClassifier(max_depth=3, random_state=42)

with mlflow.start_run(run_name="Model-Batch-Deployment-Demo-With-FS") as mlflow_run:
    # Enable automatic logging of input samples, metrics, parameters, and models
    mlflow.sklearn.autolog(
        log_input_examples=True,
        log_models=False,
        log_post_training_metrics=True,
        silent=True
    )

    clf2.fit(X_train_pdf2, Y_train_pdf2)

    # Infer output schema
    try:
        output_schema = _infer_schema(Y_train_pdf2)
    except Exception as e:
        warnings.warn(f"Could not infer model output schema: {e}")
        output_schema = None

    # Log using feature engineering client and push to registry
    fe.log_model(
        model=clf2,
        artifact_path="decision_tree",
        flavor=mlflow.sklearn,
        training_set=training_set_spec,
        output_schema=output_schema,
        registered_model_name=model_name
    )

# Set model alias (i.e. Champion)
client.set_registered_model_alias(model_name, "Champion", get_latest_model_version(model_name))

In [0]:
output_schema

## Use the Model for Inference

Inference for models that are registered with a Feature Store table is different than inference with Spark dataframe. For inference, we will use feature engineering client's `.score_batch()` method. This method takes a model URI and dataframe with primary key info.

**So how does the function know which feature table to use?**  
If you visit the Artifacts section of the registered model, you will see a `data` folder is registered with the model. Also, the model file includes `data: data/feature_store` statement to define feature data.

In [0]:
champion_model_uri = f"models:/{model_name}@champion"

# prepare lookup dataset
lookup_df = test_df.select("customerID")

# predict in batch using lookup df
prediction_fe_df = fe.score_batch(
    model_uri=champion_model_uri,
    df=lookup_df,
    result_type='string'
)

In [0]:
display(prediction_fe_df)

## Performance Considerations

There are many possible (write) optimizations that Delta Lake can offer such as:
- **Partitioning:** stores data associated with different categorical values in different directories.
- **Z-Ordering:** colocates related information in the same set of files.
- **Liquid Clustering:** replaces both above-mentioned methods to simplify data layout decisions and optimize query performance.
- **Predictive Optimizations:** removes the need to manually manage maintenance operations for Delta tables on Databricks.

In this demo, we will show the last two options: liquid clustering and predictive optimization.

In [0]:
catalog_name = "sdd_dev"
schema_name = "sohag_test"
table_name = "telco_customer_churn"

# Set catalog and schema
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {schema_name}")

Enable Predictive Optimization at schema level (can also be done at catalog level)

In [0]:
spark.sql(f"ALTER SCHEMA {catalog_name}.{schema_name} ENABLE PREDICTIVE OPTIMIZATION;")

Create inference table (where batch scoring jobs would materialize) and enable liquid clustering using `CLUSTER BY`

In [0]:
%sql
-- Create or replace the batch_inference table, clustered by customerID and tenure
CREATE OR REPLACE TABLE batch_inference(
    customerID STRING,
    Churn STRING,
    SeniorCitizen DOUBLE,
    tenure DOUBLE,
    MonthlyCharges DOUBLE,
    TotalCharges DOUBLE,
    prediction STRING
)
CLUSTER BY (customerID, tenure)

In [0]:
# Write the prediction DataFrame to the batch_inference table
prediction_fe_df.write \
    .mode("append") \
    .option("mergeSchema", True) \
    .saveAsTable(f"{catalog_name}.{schema_name}.batch_inference")

In [0]:
%sql
select *
from batch_inference