In [0]:
from databricks import feature_store
from databricks.feature_store import feature_table, FeatureLookup

In [0]:
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id, expr, rand
import uuid
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error,r2_score
import mlflow


In [0]:
raw_data=spark.read.load("dbfs:/FileStore/tables/4__winequality_red.csv",format="csv",sep=";",inferSchema="true",header="true")


In [0]:
def addIdcolumn(dataframe,id_column_name):
    """Add is column to dataframe"""
    columns= dataframe.columns
    new_df=dataframe.withColumn(id_column_name, monotonically_increasing_id())
    return new_df[[id_column_name]+columns]

def renameColumns(df):
    """Rename columns to be campatible with feature store"""
    columns_df= df
    for column in columns_df.columns:
        columns_df=columns_df.withColumnRenamed(column,column.replace(" ","_"))
    return columns_df



In [0]:
display(raw_data)

In [0]:
renamed_df=renameColumns(raw_data)
df=addIdcolumn(renamed_df,"wine_id")


In [0]:
features_df=df.drop('quality')

In [0]:
display(df)

In [0]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS wine_db")

# Create a unique table name for each run. This prevents errors if you run the notebook multiple times
import uuid
table_name = f"wine_db_" + str(uuid.uuid4())[:6]

print(table_name)


In [0]:
fs=feature_store.FeaturestoreClient()

In [0]:
fs.create_table(
    name=table_name,
    primary_keys=["wine_id"],
    df=features_df,
    schema=features_df.schema,
    description="wine features"
)

In [0]:
inference_data_df = df.select("wine_id", "quality", (10 * rand()).alias("real_time_measurement"))
display(inference_data_df)

In [0]:
# Assuming necessary imports and variables like fs and inference_data_df are defined elsewhere.
from sklearn.model_selection import train_test_split
# The FeatureLookup class would also need to be imported from the correct library.
# For example: from databricks.feature_store import FeatureLookup

def load_data(table_name, lookup_key):
    # In the FeatureLookup, if you do not provide the `feature_names` parameter, all features except primary keys are returned
    model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]

    # fs.create_training_set looks up features in model_feature_lookups that match the primary key from inference_data_df
    training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()

    # Create train and test datasets
    X = training_pd.drop("quality", axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, training_set
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()


In [0]:
from mlflow.tracking.client import MlflowClient

client = MlflowClient()

try:
    client.delete_registered_model("wine_model") # Delete the model if already created
except:
    None


In [0]:
# Disable MLflow autologging and instead log the model using Feature Store
mlflow.sklearn.autolog(log_models=False)

def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:

        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)

        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))

        fs.log_model(
            model=rf,
            artifact_path="wine_quality_prediction",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )


train_model(X_train, X_test, y_train, y_test, training_set, fs)

In [0]:
# Assuming 'inference_data_df' and 'fs' are already defined.
batch_input_df = inference_data_df.drop("quality") # Drop the label column
# Here we are using score_batch function of the Feature Store object 'fs' to make batch predictions on the model
predictions_df = fs.score_batch("models:/wine_model/latest", batch_input_df) 
display(predictions_df[["wine_id", "prediction"]])

In [0]:
so2_cols=["free_sulfur_dioxide", "total_sulfur_dioxide"]

In [0]:
new_features_df=(features_df.withColumn("average_so2",expr("+".join(so2_cols))/2))

In [0]:
fs.write_table(
    name=table_name,
    df=new_features_df,
    mode="merge"
)

In [0]:
display(fs.read_table(table_name))

In [0]:
def load_data(table_name, lookup_key):
    # In the FeatureLookup, if you do not provide the `feature_names` parameter, all features except primary keys are returned
    model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]

    # fs.create_training_set looks up features in model_feature_lookups that match the primary key from inference_data_df
    training_set = fs.create_training_set(inference_data_df, model_feature_lookups, label="quality", exclude_columns="wine_id")
    training_pd = training_set.load_df().toPandas()

    # Create train and test datasets
    X = training_pd.drop("quality", axis=1)
    y = training_pd["quality"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_test, y_train, y_test, training_set
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()


In [0]:
def train_model(X_train, X_test, y_train, y_test, training_set, fs):
    ## fit and log model
    with mlflow.start_run() as run:

        rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
        rf.fit(X_train, y_train)
        y_pred = rf.predict(X_test)

        mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
        mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))

        fs.log_model(
            model=rf,
            artifact_path="wine_quality_prediction",
            flavor=mlflow.sklearn,
            training_set=training_set,
            registered_model_name="wine_model",
        )


train_model(X_train, X_test, y_train, y_test, training_set, fs)

In [0]:
batch_input_df = inference_data_df.drop("quality") 
predictions_df = fs.score_batch(f"models:/wine_model/latest", batch_input_df)
display(predictions_df[["wine_id", "prediction"]])
