In [0]:
%pip install databricks-feature-store

In [0]:
import pandas as pd
 
from pyspark.sql.functions import monotonically_increasing_id, expr, rand
import uuid
 
from databricks import feature_store
from databricks.feature_store import feature_table, FeatureLookup
 
import mlflow
import mlflow.sklearn
 
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score



In [0]:
raw_data = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema="true",header="true" )
 
def addIdColumn(dataframe, id_column_name):
    """Add id column to dataframe"""
    columns = dataframe.columns
    new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name] + columns]
 
def renameColumns(df):
    """Rename columns to be compatible with Feature Store"""
    renamed_df = df
    for column in df.columns:
        renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_'))
    return renamed_df
 
# Run functions
renamed_df = renameColumns(raw_data)
df = addIdColumn(renamed_df, 'wine_id')
 
# Drop target column ('quality') as it is not included in the feature table
features_df = df.drop('quality')
display(features_df)

wine_id,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4
5,7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4
6,7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4
7,7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0
8,7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5
9,7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5


In [0]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS wine_db")
 
# Create a unique table name for each run. This prevents errors if you run the notebook multiple times.
table_name = f"wine_db_" + str(uuid.uuid4())[:6]
print(table_name)

wine_db_f4dd69


In [0]:
fs = feature_store.FeatureStoreClient()

In [0]:
fs.create_table(
    name=table_name,
    primary_keys=["wine_id"],
    df=features_df,
    schema=features_df.schema,
    description="wine features"
)

2023/06/07 07:41:46 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.default.wine_db_f4dd69'.
  yield prop, self.__getattribute__(prop)
Out[6]: <FeatureTable: keys=['wine_id'], tags={}>

In [0]:
inference_data_df = df.select("wine_id", "quality", (10 * rand()).alias("real_time_measurement"))
display(inference_data_df)

wine_id,quality,real_time_measurement
0,5,1.4030322740365764
1,5,7.24617823637944
2,5,7.413309515041585
3,6,0.7364247252640899
4,5,2.976502691741947
5,5,0.5493283859065468
6,5,3.4261188025354903
7,7,3.4139312978653944
8,7,1.600369900155294
9,5,7.692793982839701


In [0]:
def load_data(table_name, lookup_key):
    # In the FeatureLookup, if you do not provide the `feature_names` parameter, all features except primary keys are returned
    model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]
 
    # fs.create_training_set looks up features in model_feature_lookups that match the primary key from inference_data_df
    training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()
 
    # Create train and test datasets
    X = training_pd.drop("quality", axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, training_set
 
# Create the train and test datasets
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()

Unnamed: 0,real_time_measurement,alcohol,chlorides,citric_acid,density,fixed_acidity,free_sulfur_dioxide,pH,residual_sugar,sulphates,total_sulfur_dioxide,volatile_acidity
493,9.94684,11.6,0.086,0.31,1.0002,8.7,23.0,3.48,3.0,0.74,81.0,0.69
354,7.678684,11.9,0.066,0.4,0.9912,6.1,40.5,3.25,1.4,0.59,165.0,0.21
342,8.441447,9.8,0.118,0.47,0.9982,10.9,6.0,3.3,1.8,0.75,14.0,0.39
834,3.5445,9.4,0.088,0.26,0.99694,8.8,16.0,3.32,1.6,0.47,23.0,0.685
705,9.960135,9.9,0.073,0.15,0.999,8.4,11.0,3.37,6.0,0.49,54.0,1.035


In [0]:
from mlflow.tracking.client import MlflowClient
 
client = MlflowClient()
 
try:
    client.delete_registered_model("wine_model") # Delete the model if already created
except:
    None

In [0]:
mlflow.sklearn.autolog(log_models=False)
 
def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:
 
        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)
 
        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))
 
        fs.log_model(
            model=rf,
            artifact_path="wine_quality_prediction",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )
 
train_model(X_train, X_test, y_train, y_test, training_set, fs)

Successfully registered model 'wine_model'.
2023/06/07 07:55:17 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: wine_model, version 1
Created version '1' of model 'wine_model'.


In [0]:
display(fs.read_table(name=table_name))

wine_id,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4
5,7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4
6,7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4
7,7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0
8,7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5
9,7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5


Use score_batch to apply a packaged feature store model to new data for inference. The input data only needs the primary key column wine_id and the realtime feature real_time_measurement. The model automatically looks up all of the other feature values from the feature store.

In [0]:
batch_input_df = inference_data_df.drop("quality") # Drop the label column
 
predictions_df = fs.score_batch("models:/wine_model/latest", batch_input_df)
                                  
display(predictions_df["wine_id", "prediction"])



wine_id,prediction
0,5.208354320707907
1,5.463040444836221
2,5.463040444836221
3,5.471787694191734
4,5.208354320707907
5,5.217882051592743
6,5.137947590620711
7,4.941020299709653
8,5.23625661929692
9,5.578680452146556


wine_id,real_time_measurement,alcohol,chlorides,citric_acid,density,fixed_acidity,free_sulfur_dioxide,pH,residual_sugar,sulphates,total_sulfur_dioxide,volatile_acidity,prediction
0,1.4030322740365764,9.4,0.076,0.0,0.9978,7.4,11.0,3.51,1.9,0.56,34.0,0.7,5.208354320707907
1,7.24617823637944,9.8,0.098,0.0,0.9968,7.8,25.0,3.2,2.6,0.68,67.0,0.88,5.463040444836221
2,7.413309515041585,9.8,0.092,0.04,0.997,7.8,15.0,3.26,2.3,0.65,54.0,0.76,5.463040444836221
3,0.7364247252640899,9.8,0.075,0.56,0.998,11.2,17.0,3.16,1.9,0.58,60.0,0.28,5.471787694191734
4,2.976502691741947,9.4,0.076,0.0,0.9978,7.4,11.0,3.51,1.9,0.56,34.0,0.7,5.208354320707907
5,0.5493283859065468,9.4,0.075,0.0,0.9978,7.4,13.0,3.51,1.8,0.56,40.0,0.66,5.217882051592743
6,3.4261188025354903,9.4,0.069,0.06,0.9964,7.9,15.0,3.3,1.6,0.46,59.0,0.6,5.137947590620711
7,3.4139312978653944,10.0,0.065,0.0,0.9946,7.3,15.0,3.39,1.2,0.47,21.0,0.65,4.941020299709653
8,1.600369900155294,9.5,0.073,0.02,0.9968,7.8,9.0,3.36,2.0,0.57,18.0,0.58,5.23625661929692
9,7.692793982839701,10.5,0.071,0.36,0.9978,7.5,17.0,3.35,6.1,0.8,102.0,0.5,5.578680452146556
