In [0]:
%pip install -e ..

In [0]:
%pip install git+https://github.com/end-to-end-mlops-databricks-3/marvelous

In [0]:
%restart_python

In [0]:
import sys
from pathlib import Path

sys.path.append(str(Path.cwd().parent / "src"))

In [0]:
import os
import time
from typing import Dict, List

import requests
from loguru import logger
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

from hotel_reservations.config import ProjectConfig
from hotel_reservations.serving.fe_model_serving import FeatureLookupServing

In [0]:
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

In [0]:
# get environment variables
os.environ["DBR_TOKEN"] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
os.environ["DBR_HOST"] = spark.conf.get("spark.databricks.workspaceUrl")

In [0]:
# Load project config
config = ProjectConfig.from_yaml(config_path="../project_config.yml")
catalog_name = config.catalog_name
schema_name = config.schema_name
endpoint_name = "hotel-reservations-model-serving-fe"

In [0]:
# Initialize Feature Lookup Serving Manager
feature_model_server = FeatureLookupServing(
    model_name=f"{catalog_name}.{schema_name}.hotel_reservations_model_fe",
    endpoint_name=endpoint_name,
    feature_table_name=f"{catalog_name}.{schema_name}.hotel_features",
)

In [0]:
# Create the online table for house features
feature_model_server.create_online_table()

In [0]:
from databricks.sdk import WorkspaceClient

In [0]:
endpoint_exists = any(
            item.name == endpoint_name
            for item in WorkspaceClient().serving_endpoints.list()
        )

In [0]:
WorkspaceClient().serving_endpoints.delete(endpoint_name)

In [0]:
endpoint_exists

In [0]:
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput

In [0]:
served_entities = [
    ServedEntityInput(
        entity_name=f"{catalog_name}.{schema_name}.hotel_reservations_model_fe",
        scale_to_zero_enabled=True,
        workload_size="Small",
        entity_version=1,
    )
]

In [0]:
WorkspaceClient().serving_endpoints.create(
                name=endpoint_name,
                config=EndpointCoreConfigInput(served_entities=served_entities),
            )

In [0]:
WorkspaceClient().serving_endpoints.update_config(
                name=endpoint_name, served_entities=served_entities
            )

In [0]:
# Deploy the model serving endpoint with feature lookup
feature_model_server.deploy_or_update_serving_endpoint()

In [0]:
# Create a sample request body
required_columns = [
"no_of_adults",
"no_of_children",
"no_of_weekend_nights",
"avg_price_per_room",
"no_of_special_requests",
"no_of_previous_cancellations",
"repeated_guest",
"lead_time",
"type_of_meal_plan",
"room_type_reserved",
"market_segment_type",
"Booking_ID"
]

spark = SparkSession.builder.getOrCreate()

train_set = spark.table(f"{config.catalog_name}.{config.schema_name}.train_set").toPandas()
sampled_records = train_set[required_columns].sample(n=1000, replace=True).to_dict(orient="records")
dataframe_records = [[record] for record in sampled_records]

logger.info(train_set.dtypes)
logger.info(dataframe_records[0])

In [0]:
# Call the endpoint with one sample record
def call_endpoint(record):
    """
    Calls the model serving endpoint with a given input record.
    """
    serving_endpoint = f"https://{os.environ['DBR_HOST']}/serving-endpoints/{endpoint_name}/invocations"

    response = requests.post(
        serving_endpoint,
        headers={"Authorization": f"Bearer {os.environ['DBR_TOKEN']}"},
        json={"dataframe_records": record},
    )
    return response.status_code, response.text


status_code, response_text = call_endpoint(dataframe_records[0])
print(f"Response Status: {status_code}")
print(f"Response Text: {response_text}")

In [0]:
# Load test
for i in range(len(dataframe_records)):
    status_code, response_text = call_endpoint(dataframe_records[i])
    print(f"Response Status: {status_code}")
    print(f"Response Text: {response_text}")
    time.sleep(0.2)