This notebook provides a very quick example of a pyfunc serving endpoint that queries an online table with passthrough authentication.

In [0]:
%pip install databricks-feature-engineering
%restart_python

In [0]:
feature_table = "shm.iot_turbine.turbine_hourly_features"
pk_col = "hourly_timestamp"
online_table_name = "shm.iot_turbine.turbine_hourly_online"
feature_spec_name = "shm.iot_turbine.turbine_hourly_spec"

Create an online table

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import OnlineTableSpec, OnlineTable

w = WorkspaceClient()

spec = OnlineTableSpec(
    primary_key_columns=[pk_col],
    source_table_full_name=feature_table,
    perform_full_copy=True
)

online_table = OnlineTable(name=online_table_name, spec=spec)

if not w.online_tables.get(online_table_name):
    w.online_tables.create_and_wait(table=online_table)

Test the feature engineering engine

In [0]:
display(spark.table(feature_table))

Create a UDF function to return daily values

In [0]:
%sql
CREATE OR REPLACE FUNCTION shm.iot_turbine.calculate_daily_metrics()
RETURNS TABLE (
  turbine_id STRING,
  date DATE,
  daily_avg_energy DOUBLE,
  abnormal_readings INT
)
RETURN
  SELECT 
    turbine_id, 
    DATE(hourly_timestamp) as date, 
    AVG(avg_energy) as daily_avg_energy, 
    SUM(CASE WHEN abnormal_sensor != 'ok' THEN 1 ELSE 0 END) as abnormal_readings
  FROM 
    shm.iot_turbine.turbine_hourly_features
  GROUP BY 
    turbine_id, 
    DATE(hourly_timestamp);

In [0]:
from databricks.feature_engineering import (
  FeatureFunction,
  FeatureLookup,
  FeatureEngineeringClient,
)

fe = FeatureEngineeringClient()

features = [
  # Use our daily function.
  FeatureFunction(
    udf_name="shm.iot_turbine.calculate_daily_metrics",
    output_name="daily_metrics",
  ),
]

# Create a `FeatureSpec` with the features defined above.
# The `FeatureSpec` can be accessed in Unity Catalog as a function.
fe.create_feature_spec(
  name="shm.iot_turbine.daily_metrics",
  features=features,
)

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput

workspace = WorkspaceClient()

# Create endpoint
endpoint_name = "shm-fse-test"

workspace.serving_endpoints.create_and_wait(
  name=endpoint_name,
  config=EndpointCoreConfigInput(
    served_entities=[
      ServedEntityInput(
        entity_name=feature_spec_name,
        scale_to_zero_enabled=True,
        workload_size="Small"
      )
    ]
  )
)

Generate a small pyfunc function

In [0]:
import pandas as pd
from mlflow.pyfunc import PythonModel
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import OnlineTableSpec

class DailyAverageModel(PythonModel):
    def __init__(self, online_table_name, value_column, date_column):
        self.online_table_name = online_table_name
        self.value_column = value_column
        self.date_column = date_column
        self.workspace_client = WorkspaceClient()
        
    def load_context(self, context):
        # Initialize any resources needed during model loading
        pass
        
    def predict(self, context, model_input):
        # Query the online table
        query = f"""
        SELECT * FROM {self.online_table_name}
        WHERE {self.date_column} IS NOT NULL
        """
        
        # Execute the query using the workspace client
        result = self.workspace_client.sql.execute_and_fetch(query)
        
        # Convert to pandas DataFrame
        df = pd.DataFrame(result)
        
        # Convert date column to datetime
        df[self.date_column] = pd.to_datetime(df[self.date_column])
        
        # Group by day and calculate average
        daily_avg = df.groupby(df[self.date_column].dt.date)[self.value_column].mean()
        
        # Return the result as a DataFrame
        return pd.DataFrame({
            'date': daily_avg.index,
            f'avg_{self.value_column}': daily_avg.values
        })

# Example usage:
# model = DailyAverageModel(
#     online_table_name="main.default.my_online_table",
#     value_column="sales",
#     date_column="transaction_date"
# )
# result = model.predict(None, None)


Register the model to Unity Catalog

In [0]:
import mlflow

# Define model parameters
online_table_name = "shm.iot_turbine.turbine_hourly_online"
value_column = "avg_energy"
date_column = "hourly_timestamp"

# Create model instance
model = DailyAverageModel(
    online_table_name=online_table_name,
    value_column=value_column,
    date_column=date_column
)

model.predict(None,None)

# # Log the model
# with mlflow.start_run():
#     mlflow.pyfunc.log_model(
#         artifact_path="daily_avg_model",
#         python_model=model,
#         registered_model_name="daily_average_calculator",
#         input_example=None
#     )

Test the registered model