In [0]:
https://www.databricks.com/product/feature-store

In [0]:
from databricks import feature_store
from databricks.feature_store import feature_table, FeatureLookup

In [0]:
from pyspark.sql.functions import *

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error,r2_score

In [0]:
%fs ls dbfs:/databricks-datasets/wine-quality/

In [0]:
#df=spark.read.csv("dbfs:/databricks-datasets/wine-quality/winequality-red.csv",header=True,inferSchema=True)
df=spark.read.csv("dbfs:/databricks-datasets/wine-quality/winequality-red.csv",sep=";",header=True,inferSchema=True)

In [0]:
display(df)

In [0]:
def addIdColumn(dataframe, id_column_name):
    """Add id column to dataframe"""
    columns = dataframe.columns
    new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name] + columns]
 
def renameColumns(df):
    """Rename columns to be compatible with Feature Store"""
    renamed_df = df
    for column in df.columns:
        renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_'))
    return renamed_df

In [0]:
renamed_df=renameColumns(df)
df=addIdColumn(renamed_df,'wine_id')

In [0]:
display(df)

In [0]:
features_df=df.drop("quality")
display(features_df)

In [0]:
fs=feature_store.FeatureStoreClient()

In [0]:
fs.create_table(
    name="ml.naval.wine_quality",
    primary_keys=["wine_id"],
    df=features_df,
    schema=features_df.schema,
    description="Wine Feature"
)

In [0]:
#creating inference data
inference_df=df.select("wine_id","quality",(10*rand()).alias("real_time_measurement"))

In [0]:
display(inference_df)

In [0]:
def load_data(table_name,lookup_key):
    model_feature_lookups=[FeatureLookup(table_name=table_name, lookup_key=lookup_key)]

    training_set=fs.create_training_set(inference_df,feature_lookups=model_feature_lookups,label="quality",exclude_columns="wine_id")
    training_pd=training_set.load_df().toPandas()


    #create train and test datasets
    X=training_pd.drop("quality",axis=1)
    y=training_pd["quality"]
    X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2,random_state=42)
    return X_train,X_test,y_train,y_test,training_set

In [0]:
table_name="ml.naval.wine_quality"

In [0]:
X_train,X_test,y_train,y_test,training_set=load_data(table_name,"wine_id")

In [0]:
X_train.head()

In [0]:
import mlflow

In [0]:
mlflow.sklearn.autolog(log_models=False)


In [0]:
def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:
 
        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)
 
        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))
 
        fs.log_model(
            model=rf,
            artifact_path="wine_quality_prediction",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )
 
train_model(X_train, X_test, y_train, y_test,training_set, fs)

In [0]:
# Helper function
def get_latest_model_version(model_name):
    latest_version = 1
    mlflow_client = MlflowClient()
    for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
        version_int = int(mv.version)
        if version_int > latest_version:
            latest_version = version_int
    return latest_version

In [0]:
model_name="dev.default.wine_model"
latest_model_version=7

In [0]:
from mlflow.tracking.client import MlflowClient
client = MlflowClient()

In [0]:
## For simplicity, this example uses inference_data_df as input data for prediction
batch_input_df = inference_df.drop("quality") # Drop the label column

latest_model_version = get_latest_model_version(model_name)

predictions_df = fs.score_batch(model_uri=f"models:/{model_name}/{latest_model_version}", df=batch_input_df)

display(predictions_df["wine_id", "prediction"])

In [0]:
## Modify the dataframe containing the features
so2_cols = ["free_sulfur_dioxide", "total_sulfur_dioxide"]
new_features_df = (features_df.withColumn("average_so2", expr("+".join(so2_cols)) / 2))

display(new_features_df)

In [0]:
mode="merge" or "overwrite"

In [0]:
fs.write_table(
    name="ml.naval.wine_quality",
    df=new_features_df,
    mode="merge"
)