In [0]:
# Databricks notebook source
# install dependencies
%pip install -e ..
%pip install git+https://github.com/end-to-end-mlops-databricks-3/marvelous@0.1.0

In [0]:
#restart python
%restart_python

In [0]:
# system path update, must be after %restart_python
# caution! This is not a great approach
from pathlib import Path
import sys
sys.path.append(str(Path.cwd().parent / 'src'))

In [0]:
# A better approach (this file must be present in a notebook folder, achieved via synchronization) %pip install insurance-1.0.1-py3-none-any.whl

In [0]:
from pyspark.sql import SparkSession
import mlflow

from insurance.config import ProjectConfig
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from lightgbm import LGBMRegressor
from mlflow.models import infer_signature
from marvelous.common import is_databricks
from dotenv import load_dotenv
import os
from mlflow import MlflowClient
import pandas as pd
from insurance import __version__
from mlflow.utils.environment import _mlflow_conda_env
from databricks import feature_engineering
from databricks.feature_engineering import FeatureFunction, FeatureLookup
from pyspark.errors import AnalysisException
import numpy as np
from datetime import datetime
import boto3

In [0]:
config = ProjectConfig.from_yaml(config_path="../project_config.yml", env="dev")

In [0]:
spark = SparkSession.builder.getOrCreate()
fe = feature_engineering.FeatureEngineeringClient()

train_set = spark.table(f"{config.catalog_name}.{config.schema_name}.train_set")
test_set = spark.table(f"{config.catalog_name}.{config.schema_name}.test_set")

##### Option 1: feature engineering client

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
# Step 1: Add a surrogate 'Id' column to train_set

train_set_with_id = train_set.withColumn("Id", monotonically_increasing_id())

feature_table_name = f"{config.catalog_name}.{config.schema_name}.insurance_features_demo"
feature_table_name_sql = f"{config.catalog_name}.{config.schema_name}.insurance_features_demo_sql"
lookup_features = ["age", "bmi", "children"] #features that we pretend we dont`t have it in the train_set and we lookup in the table.


In [0]:

# Step 2: Define the feature table
feature_table = fe.create_table(
    name=feature_table_name,
    primary_keys=["Id"],
    df=train_set_with_id.select(["Id"] + lookup_features),
    description="Insurance features table",
)

In [0]:
spark.sql(f"ALTER TABLE {feature_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

In [0]:
%sql
SELECT count(*) FROM mlops_dev.pirvugeo.insurance_features_demo


In [0]:
fe.write_table(
   name=feature_table_name,
   df=train_set_with_id[["Id"]+lookup_features],
   mode="merge",
)


##### Option 2: SQL - create feature table with information about insurance


In [0]:


spark.sql(f"""
          CREATE OR REPLACE TABLE {feature_table_name_sql}
          (Id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1), age BIGINT, bmi DOUBLE, children BIGINT);
          """)

In [0]:
# primary key on Databricks is not enforced!
try:
    spark.sql(f"ALTER TABLE {feature_table_name_sql} ADD CONSTRAINT insurance_pk PRIMARY KEY(Id);")
except AnalysisException:
    pass

In [0]:
spark.sql(f"ALTER TABLE {feature_table_name_sql} SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")

In [0]:
spark.sql(f"""
          INSERT INTO {feature_table_name_sql} (age, bmi, children)
          SELECT age, bmi, children
          FROM {config.catalog_name}.{config.schema_name}.train_set
          """)

#### Create feature function

In [0]:
# create feature function
# docs: https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-ddl-create-sql-function

# problems with feature functions:
# functions are not versioned 
# functions may behave differently depending on the runtime (and version of packages and python)
# there is no way to enforce python version & package versions for the function 
# this is only supported from runtime 17
# advised to use only for simple calculations

In [0]:
function_name = f"{config.catalog_name}.{config.schema_name}.average_age_demo"

In [0]:
fet = spark.sql( f"""
select * from {feature_table_name_sql} """)


In [0]:

# Option 1: with Python
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {function_name}(age long)
        RETURNS string
        LANGUAGE PYTHON AS
        $$
        if age < 18:
            return "minor"
        elif age < 65:
            return "adult"
        else:
            return "senior"
        $$
        """)

In [0]:
# execute function
maturity = spark.sql(f"SELECT {function_name}(70) as age_maturity;")
maturity.display()

In [0]:
# Option 2
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {function_name}_sql (age long)
        RETURNS string
        RETURN 
            CASE 
                WHEN age < 18 THEN 'minor'
                WHEN age < 65 THEN 'adult'
                ELSE 'senior'
        End
        """)

In [0]:
maturity = spark.sql(f"SELECT {function_name}_sql(17) as age_maturity;")
maturity.display()

In [0]:
# create a training set
training_set = fe.create_training_set(
    df=train_set_with_id.drop("age", "bmi", "children"),
    label=config.target,
    feature_lookups=[
        FeatureLookup(
            table_name=feature_table_name,
            feature_names=["age", "bmi", "children"],
            lookup_key="Id",
                ),
        FeatureFunction(
            udf_name=function_name,
            output_name="maturity_age",
            input_bindings={"age": "age"},
            ),
    ],
    )

In [0]:
# Train & register a model
training_df = training_set.load_df().toPandas()
X_train = training_df[config.num_features + config.cat_features ]
y_train = training_df[config.target]

In [0]:
print(X_train.dtypes)
print(X_train.head())

In [0]:
params = {
    "learning_rate": 0.1,
    "n_estimators": 100,
    "num_leaves": 31
}

pipeline = Pipeline(
        steps=[("preprocessor", ColumnTransformer(
            transformers=[("cat", OneHotEncoder(handle_unknown="ignore"),
                           config.cat_features)],
            remainder="passthrough")
            ),
               ("regressor", LGBMRegressor(**params))]
        )

print(config.parameters)
pipeline.fit(X_train, y_train)

In [0]:
mlflow.set_experiment("/Shared/demo-model-insurance-fe")
with mlflow.start_run(run_name="demo-model-insurance-fe",
                      tags={"git_sha": "01234567890george",
                            "branch": "feature3"},
                            description="demo run for FE model logging") as run:
    # Log parameters and metrics
    run_id = run.info.run_id
    mlflow.log_param("model_type", "LightGBM with preprocessing")
    mlflow.log_params(config.parameters)

    # Log the model with feature engineering client. important
    signature = infer_signature(model_input=X_train, model_output=pipeline.predict(X_train))
    fe.log_model(
                model=pipeline,
                flavor=mlflow.sklearn,
                artifact_path="lightgbm-pipeline-model-fe",
                training_set=training_set,
                signature=signature,
            )
    

In [0]:
model_name = f"{config.catalog_name}.{config.schema_name}.model_insurance_fe_demo"
model_version = mlflow.register_model(
    model_uri=f'runs:/{run_id}/lightgbm-pipeline-model-fe',
    name=model_name,
    tags={"git_sha": "01234567890george"})

In [0]:
print(test_set.dtypes)
print(test_set.head())

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import current_timestamp
test_set_with_id = test_set.withColumn("Id", monotonically_increasing_id())
test_set_with_id = test_set_with_id.withColumn("update_timestamp_utc", current_timestamp())

In [0]:
# make predictions
#fe.score_batch only works with pyspark, with pandas will fail
features = [f for f in ["Id", "update_timestamp_utc"] + config.num_features + config.cat_features if f not in lookup_features]
predictions = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set_with_id[features]
)

In [0]:
from mlflow import MlflowClient
client = MlflowClient()
model_version = client.get_model_version(name=model_name, version=model_version.version)
print(model_version.tags)

In [0]:
predictions.select("prediction").show(5)

In [0]:

from pyspark.sql.functions import col

features = [f for f in ["Id", "update_timestamp_utc"] + config.num_features + config.cat_features if f not in lookup_features]
test_set_with_new_id = test_set_with_id.select(*features).withColumn(
    "Id",
    (col("Id").cast("bigint") + 1000000))

predictions = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set_with_new_id[features]
)

In [0]:
# make predictions for a non-existing entry -> error!
predictions.select("prediction").show(5)

In [0]:
# replace none values or missing values with avg or other

age_function = f"{config.catalog_name}.{config.schema_name}.replace_age_missing"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {age_function}(age bigint)
        RETURNS bigint
        LANGUAGE PYTHON AS
        $$
        if age is None:
            return 35
        else:
            return age
        $$
        """)

bmi_function = f"{config.catalog_name}.{config.schema_name}.replace_bmi_missing"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {bmi_function}(bmi DOUBLE)
        RETURNS DOUBLE
        LANGUAGE PYTHON AS
        $$
        if bmi is None:
            return 30
        else:
            return bmi
        $$
        """)

children_function = f"{config.catalog_name}.{config.schema_name}.replace_children_missing"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {children_function}(children bigint)
        RETURNS bigint
        LANGUAGE PYTHON AS
        $$
        if children is None:
            return 2
        else:
            return children
        $$
        """)

In [0]:

# what if we want to replace with a default value if entry is not found
# what if we want to look up value in another table? the logics get complex
# problems that arize: functions/ lookups always get executed (if statememt is not possible)
# it can get slow...

# step 1: create 3 feature functions

# step 2: redefine create training set

# try again

# create a training set
training_set = fe.create_training_set(
    df=train_set_with_id.drop("age", "bmi", "children"),
    label=config.target,
    feature_lookups=[
        FeatureLookup(
            table_name=feature_table_name,
            feature_names=["age", "bmi", "children"],
            lookup_key="Id",
            rename_outputs={"age": "lookup_age",
                            "bmi": "lookup_bmi",
                            "children": "lookup_children"}
                ),
        FeatureFunction(
            udf_name=age_function,
            output_name="age",
            input_bindings={"age": "lookup_age"},
            ),
        FeatureFunction(
            udf_name=bmi_function,
            output_name="bmi",
            input_bindings={"bmi": "lookup_bmi"},
        ),
        FeatureFunction(
            udf_name=children_function,
            output_name="children",
            input_bindings={"children": "lookup_children"},
        ),
        FeatureFunction(
            udf_name=function_name,
            output_name="maturity_age",
            input_bindings={"age": "age"},
            ),
    ],
    exclude_columns=["update_timestamp_utc"],
    )


In [0]:

# Train & register a model
training_df = training_set.load_df().toPandas()
X_train = training_df[config.num_features + config.cat_features ]
y_train = training_df[config.target]

In [0]:
params = {
    "learning_rate": 0.1,
    "n_estimators": 100,
    "num_leaves": 31
}

pipeline = Pipeline(
        steps=[("preprocessor", ColumnTransformer(
            transformers=[("cat", OneHotEncoder(handle_unknown="ignore"),
                           config.cat_features)],
            remainder="passthrough")
            ),
               ("regressor", LGBMRegressor(**params))]
        )

print(config.parameters)
pipeline.fit(X_train, y_train)

In [0]:
mlflow.set_experiment("/Shared/demo-model-insurance-fe")
with mlflow.start_run(run_name="demo-model-insurance-fe",
                      tags={"git_sha": "01234567890george",
                            "branch": "feature3"},
                            description="demo run for FE model logging") as run:
    # Log parameters and metrics
    run_id = run.info.run_id
    mlflow.log_param("model_type", "LightGBM with preprocessing")
    mlflow.log_params(config.parameters)

    # Log the model with feature engineering client. important
    signature = infer_signature(model_input=X_train, model_output=pipeline.predict(X_train))
    fe.log_model(
                model=pipeline,
                flavor=mlflow.sklearn,
                artifact_path="lightgbm-pipeline-model-fe",
                training_set=training_set,
                signature=signature,
            )

In [0]:
model_name = f"{config.catalog_name}.{config.schema_name}.model_insurance_fe_demo"
model_version = mlflow.register_model(
    model_uri=f'runs:/{run_id}/lightgbm-pipeline-model-fe',
    name=model_name,
    tags={"git_sha": "01234567890george"})

In [0]:
from pyspark.sql.functions import col

features = [f for f in ["Id", "update_timestamp_utc"] + config.num_features + config.cat_features if f not in lookup_features]
test_set_with_new_id = test_set_with_id.select(*features).withColumn(
    "Id",
    (col("Id").cast("bigint") + 1000000))

predictions = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set_with_new_id[features]
)

In [0]:
test_set_with_id.head()

In [0]:
# make predictions for a non-existing entry -> no error!
predictions.select("prediction").show(5)


#### Feature Engineering with DynamoDB

In [0]:
import boto3

region_name = "eu-west-1"
aws_access_key_id = os.environ["aws_access_key_id"]
aws_secret_access_key = os.environ["aws_secret_access_key"]

client = boto3.client(
    'dynamodb',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

In [0]:
response = client.create_table(
    TableName='InsuranceFeatures',
    KeySchema=[
        {
            'AttributeName': 'Id',
            'KeyType': 'HASH'  # Partition key
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'Id',
            'AttributeType': 'N'  # Number
        }
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)

In [0]:
print("Table creation initiated:", response['TableDescription']['TableName'])


In [0]:
client.put_item(
    TableName='InsuranceFeatures',
    Item={
        'Id': {'N': '261898'},
        'age': {'N': '35'},
        'bmi': {'N': '30'},
        'children': {'N': '2'}
    }
)

In [0]:
response = client.get_item(
    TableName='InsuranceFeatures',
    Key={
        'Id': {'N': '261898'}
    }
)
# Extract the item from the response
item = response.get('Item')
print(item)

In [0]:
from itertools import islice
rows = spark.table(feature_table_name).toPandas().to_dict(orient="records")

In [0]:
def to_dynamodb_item(row):
    return {
        'PutRequest': {
            'Item': {
                'Id': {'N': str(row['Id'])}, 
                'bmi': {'N': str(row['bmi'])},
                'children': {'N': str(row['children'])},
                'age': {'N': str(row['age'])} # DynamoDB expects all values to be passed as strings,
                
                
            }
        }
    }

In [0]:
items = [to_dynamodb_item(row) for row in rows]
print(items)

In [0]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

In [0]:
for batch in chunks(items, 25):
    response = client.batch_write_item(
        RequestItems={
            'InsuranceFeatures': batch
        }
    )
    # Handle any unprocessed items if needed
    unprocessed = response.get('UnprocessedItems', {})
    if unprocessed:
        print("Warning: Some items were not processed. Retry logic needed.")

In [0]:

# We ran into more limitations when we tried complex data types as output of a feature function
# and then tried to use it for serving
# al alternatve solution: using an external database (we use DynamoDB here)

# create a DynamoDB table
# insert records into dynamo DB & read from dynamoDB

# create a pyfunc model

In [0]:
class InsuranceModelWrapper(mlflow.pyfunc.PythonModel):
    """Wrapper class for machine learning models to be used with MLflow.

    This class wraps a machine learning model for predicting house prices.
    """

    def __init__(self, model: object) -> None:
        """Initialize the HousePriceModelWrapper.

        :param model: The underlying machine learning model.
        """
        self.model = model

    def _parse_dynamo_item(self, raw_item):
        parsed = {}
        for key, val in raw_item.items():
            if 'N' in val:
                num_val = float(val['N'])
                parsed[key] = int(num_val) if num_val.is_integer() else num_val
            else:
                parsed[key] = val['S']
        return parsed

    def predict(self, context, model_input: pd.DataFrame | np.ndarray) -> list[int]:
        client = boto3.client(
            'dynamodb',
            aws_access_key_id=os.environ["aws_access_key_id"],
            aws_secret_access_key=os.environ["aws_secret_access_key"],
            region_name=os.environ["region_name"]
        )

        parsed = []
        for lookup_id in model_input["Id"]:
            raw_item = client.get_item(
                TableName='InsuranceFeatures',
                Key={'Id': {'N': str(int(lookup_id))}}
            )["Item"]
            parsed_dict = self._parse_dynamo_item(raw_item)
            parsed.append(parsed_dict)

        lookup_df = pd.DataFrame(parsed)
        merged_df = model_input.merge(lookup_df, on="Id", how="left").drop("Id", axis=1)

        merged_df["age"] = merged_df["age"].fillna(27)
        merged_df["bmi"] = merged_df["bmi"].fillna(30)
        merged_df["children"] = merged_df["children"].fillna(2)

        predictions = self.model.predict(merged_df)
        return [int(x) for x in predictions]

In [0]:
custom_model = InsuranceModelWrapper(pipeline)


In [0]:
features = [f for f in ["Id"] + config.num_features + config.cat_features if f not in lookup_features]
data = test_set_with_id.select(*features).toPandas()
data

In [0]:
custom_model.predict(context=None, model_input=data)

In [0]:
#log model
mlflow.set_experiment("/Shared/demo-model-insurance-fe-pyfunc")
with mlflow.start_run(run_name="demo-model-insurance-fe-pyfunc",
                      tags={"git_sha": "01234567890george",
                            "branch": "feature3"},
                            description="demo run for FE model logging") as run:
    # Log parameters and metrics
    run_id = run.info.run_id
    mlflow.log_param("model_type", "LightGBM with preprocessing")
    mlflow.log_params(config.parameters)

    # Log the model with feature engineering client. important
    signature = infer_signature(model_input=data, model_output=custom_model.predict(context=None, model_input=data))
    mlflow.pyfunc.log_model(
        artifact_path="lightgbm-pipeline-model-fe",
        python_model=custom_model,
        signature=signature,
        )
    
    

In [0]:
# predict
mlflow.models.predict(f"runs:/{run_id}/lightgbm-pipeline-model-fe", data[0:1])