In [1]:
import os
import time

import mlflow
import pandas as pd
import requests
from databricks import feature_engineering
from pyspark.dbutils import DBUtils
from databricks.connect import DatabricksSession

from airbnb_listing.config import config
from airbnb_listing.serving.feature_serving import FeatureServing
from airbnb_listing.env import DB_HOST,DB_TOKEN

In [2]:
spark = DatabricksSession.builder.getOrCreate()

In [3]:
mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri("databricks-uc")

In [4]:
fe = feature_engineering.FeatureEngineeringClient()

In [5]:
# Set environment variables
os.environ["DB_HOST"] = DB_HOST
os.environ["DB_TOKEN"] = DB_TOKEN

In [6]:
# Define catalog, schema, and feature table, feature spec, and endpoint names
catalog_name = config.general.DEV_CATALOG
model_asset_schema_name = config.general.ML_ASSET_SCHEMA
feature_table_name = f"{catalog_name}.{model_asset_schema_name}.airbnb_listing_preds"
feature_spec_name = f"{catalog_name}.{model_asset_schema_name}.return_predictions"
endpoint_name = "airbnb-listing--feature-serving"

silver_schema_name = config.general.SILVER_SCHEMA

In [None]:
# Get the train and test set, and then combine them into a single dataframe
train_set = spark.table(f"{catalog_name}.{silver_schema_name}.airbnb_listing_price_train").toPandas()
test_set = spark.table(f"{catalog_name}.{silver_schema_name}.airbnb_listing_price_test").toPandas()
full_df = pd.concat([train_set, test_set])
full_df.head()

In [None]:
model_name = config.model.MODEL_NAME

In [None]:
# Load the latest basic model
model = mlflow.sklearn.load_model(f"models:/{catalog_name}.{model_asset_schema_name}.{model_name}_basic@latest-model")

In [None]:
# Create inference (pred) table
preds_df = full_df[[config.model.ID_COLUMN,"latitude", "longitude"]].copy()
# Add predicted_listing_price to the preds_df by performing inference with full_df and the trained model
preds_df["predicted_listing_price"] = model.predict(full_df[config.model.SELECTED_NUMERIC_FEATURES + config.model.SELECTED_CATEGORICAL_FEATURES])
# Convert to a spark dataframe
preds_df = spark.createDataFrame(preds_df)

In [None]:
# Create a feature table from preds_df (the inference table in spark)
fe.create_table(
    name=feature_table_name,
    primary_keys=[config.model.ID_COLUMN],
    df=preds_df,
    description="Airbnb listing prices predictions feature table",
)

In [None]:
# In order for the predictions (offline) predictions table to be served, I need to create a
# read-copy low-latency copy of it (the online table). If I don't want to copy the full offline
# table to the online table at each trigger, I need to enable ChangeDataFeed for the offline feature table
spark.sql(f"""
          ALTER TABLE {feature_table_name}
          SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
        """)


In [None]:
# Initialize feature store manager
feature_serving = FeatureServing(
    feature_table_name=feature_table_name, feature_spec_name=feature_spec_name, endpoint_name=endpoint_name
)

In [None]:
# Create online table
feature_serving.create_online_table()