
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img
    src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png"
    alt="Databricks Learning"
  >
</div>



# Custom Model Deployment with Model Serving

Databricks Model Serving provides an easy way of deploying ML models for real-time inference. In some cases, you may need to deploy custom pipelines for your models. An example would be implementing a pre or post processing of the inference result. 

In this demo, we will demonstrate how you could use **MLflow's `PythonModel`** to implement a post-processing step for your model and serve it with Model Serving.

**Learning Objectives:**

*By the end of this demo, you will be able to;*

* Deploy a model with custom logic using Model Serving.

* Create and manage serving endpoints using the API.



## REQUIRED - SELECT CLASSIC COMPUTE
Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:
1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

2. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

   - Click **More** in the drop-down.

   - In the **Attach to an existing compute resource** window, use the first drop-down to select your unique cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

2. Find the triangle icon to the right of your compute cluster name and click it.

3. Wait a few minutes for the cluster to start.

4. Once the cluster is running, complete the steps above to select your cluster.

## Requirements

Please review the following requirements before starting the lesson:

* To run this notebook, you need to use one of the following Databricks runtime(s): **17.3.x-cpu-ml-scala2.13**


## Classroom Setup

Before starting the demo, run the provided classroom setup script. This script will define configuration variables necessary for the demo. Execute the following cell:

In [0]:
%pip install databricks-sdk --upgrade


dbutils.library.restartPython()

In [0]:
%run ../Includes/Classroom-Setup-4.2

**Other Conventions:**

Throughout this demo, we'll refer to the object `DA`. This object, provided by Databricks Academy, contains variables such as your username, catalog name, schema name, working directory, and dataset locations. Run the code block below to view these details:

In [0]:
print(f"Username:          {DA.username}")
print(f"Catalog Name:      {DA.catalog_name}")
print(f"Schema Name:       {DA.schema_name}")
print(f"Working Directory: {DA.paths.working_dir}")
print(f"User DB Location:  {DA.paths.datasets}")

## Data Preparation

For this demonstration, we will use a fictional dataset from a Telecom Company, which includes customer information. This dataset encompasses **customer demographics**, including internet subscription details such as subscription plans, monthly charges and payment methods.

After loading the dataset, we will perform simple **data cleaning and feature selection**. 

In the final step, we will split the dataset into **features** and **response** sets.

In [0]:
from pyspark.sql.functions import col
import mlflow


# Point to UC model registry
client = mlflow.MlflowClient()
mlflow.set_registry_uri("databricks-uc")

# Load dataset with spark
shared_volume_name = 'telco' # From Marketplace
csv_name = 'telco-customer-churn-missing' # CSV file name
dataset_p_telco = f"{DA.paths.datasets.telco}/{shared_volume_name}/{csv_name}.csv" # Full path

# Dataset specs
primary_key = "customerID"
response = "Churn"
features = ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"] # Keeping numerical only for simplicity and demo purposes


# Read dataset (and drop nan)
# Convert all fields to double for spark compatibility
telco_df = spark.read.csv(dataset_p_telco, inferSchema=True, header=True, multiLine=True, escape='"')\
            .withColumn("TotalCharges", F.expr("try_cast(trim(TotalCharges) as double)"))\
            .withColumn("SeniorCitizen", col("SeniorCitizen").cast('double'))\
            .withColumn("Tenure", col("tenure").cast('double'))\
            .na.drop(how='any')

# Separate features and ground-truth
features_df = telco_df.select(primary_key, *features)
response_df = telco_df.select(primary_key, response)

# Convert data to pandas dataframes
X_train_pdf = features_df.drop(primary_key).toPandas()
Y_train_pdf = response_df.drop(primary_key).toPandas()


## Fit and Register a Custom Model

Before we start model deployment process, we will **fit and register a custom model**. 

Deploying custom pipeline for models typically involves following steps;

1. Declare **wrapper classes** for custom models

2. Train base model

3. Instantiate custom model using trained base model & log to registry

### Define Wrapper Class

We will use MLflow's `PythonModel` class to create a custom pipeline. `predict` function of the class implements the custom logic.

In [0]:
import pandas as pd


# Model wrapper class to output labels and associated probabilities
class CustomProbaModel(mlflow.pyfunc.PythonModel):
    # Initialize model in the constructor
    def __init__(self, model):
        self.model = model
 
    # Prediction function
    def predict(self, context, model_input):
        # Predict the probabilities and class
        prediction_probabilities = self.model.predict_proba(model_input)
        predictions = self.model.predict(model_input)
 
        # Organize multiple outputs
        class_labels = ["No", "Yes"]
        result = pd.DataFrame(prediction_probabilities, columns=[f'prob_{label}' for label in class_labels])
        result['prediction'] = predictions
        
        return result
    
# Dummy model outputting array
class CustomCodeModel(mlflow.pyfunc.PythonModel):
    def __init__(self):
        pass
    def predict(self, context, data):
        return [ j for j in range(0, data.shape[0]) ]

### Train Base Model

In this step, **we will fit the model as normal**.

Next, and the most important step is **wrapping the model with custom class that we created**.Then, **wrapped model is logged with MLflow**.

In [0]:
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier


X_train, X_test, y_train, y_test = train_test_split(X_train_pdf, Y_train_pdf, test_size=0.2, random_state=42)
 
# Initialize and train DecisionTreeClassifier
rf = DecisionTreeClassifier(max_depth=3, random_state=42)
rf.fit(X_train, y_train)

### Wrap and Log the Custom Model
Wrap the model and define the input and output schemas. From there, run and log the model using `pyfunc` flavor.

In [0]:
from mlflow.models import infer_signature

# Wrap the model in the ModelWrapper
wrapped_model = CustomProbaModel(rf)

# Define the input and output schemas
input_example = X_train[:1]
output_example = wrapped_model.predict([],X_train[:1])
signature = infer_signature(X_train[:1], output_example)
 
# Start an MLflow run and log the model
custom_model_name = f"{DA.catalog_name}.{DA.schema_name}.custom_ml_model"
with mlflow.start_run(run_name="Custom Model Example"):
    mlflow.pyfunc.log_model("model", 
                            python_model=wrapped_model, 
                            input_example=input_example, 
                            signature=signature,
                            registered_model_name=custom_model_name)

### Test Wrapped Model

Before serving the model, let's test it and review the result to make sure the post-processing is implemented successfully.

In [0]:
# Load the model from the run
run_id = mlflow.last_active_run().info.run_id
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")
 
# Use the loaded model to predict on the test data
y_test_ = loaded_model.predict(X_test)
display(y_test_)

In [0]:
# Test custom code model
custom_code_model = CustomCodeModel()
y_cc_test = custom_code_model.predict([], X_train[:1])
print(y_cc_test)


## Serve the Custom Model

Let's serve the model with Model Serving. Here, we will use the API to create the endpoint and serving the model.

Please note that you could simply use the UI for this task too.

In [0]:
from databricks.sdk.service.serving import EndpointCoreConfigInput
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointTag


# Create/Update endpoint and deploy model+version
w = WorkspaceClient()
endpoint_name = f"ML_AS_03_Demo4_Custom_{DA.unique_name('_')}"
model_version = "1"
endpoint_config_dict = {
    "served_models": [
        {
            "model_name": custom_model_name,
            "model_version": model_version,
            "scale_to_zero_enabled": True,
            "workload_size": "Small"
        }
    ]
}
endpoint_config = EndpointCoreConfigInput.from_dict(endpoint_config_dict)

try:
  w.serving_endpoints.create_and_wait(
    name=endpoint_name,
    config=endpoint_config,
    tags=[EndpointTag.from_dict({"key": "db_academy", "value": "serve_custom_model_example"})]
  )
  print(f"Creating endpoint {endpoint_name} with models {custom_model_name} versions {model_version}")

except Exception as e:
  if "already exists" in e.args[0]:
    print(f"Endpoint with name {endpoint_name} already exists")

  else:
    raise(e)

## Query the Endpoint

Now that the endpoint is ready, we can query it using the test-sample as shown below. Note that the `predictions` is returned as string (Yes/No) as we implemented in wrapper class.

In [0]:
# Hard-code test-sample
dataframe_records = [
    {"SeniorCitizen": 0, "tenure":12, "MonthlyCharges":65, "TotalCharges":800},
    {"SeniorCitizen": 1, "tenure":24, "MonthlyCharges":40, "TotalCharges":500}
]

print("Inference results:")
query_response = w.serving_endpoints.query(name=endpoint_name, dataframe_records=dataframe_records)
print(query_response.predictions)

## Conclusion

In this demo, we demonstrated how to build a custom model pipeline using MLflow's `PythonModel` class and serve it with Databricks Model Serving. Firstly, we defined the wrapper class with custom post-processing logic. Next, we fitted the model as usual and wrapped it with the custom model. Finally, we deployed the model with Model Serving and queried the serving endpoint using the API.

&copy; 2026 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="_blank">Apache Software Foundation</a>.<br/><br/><a href="https://databricks.com/privacy-policy" target="_blank">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use" target="_blank">Terms of Use</a> | <a href="https://help.databricks.com/" target="_blank">Support</a>