In [0]:
# install dependencies
%pip install -e ..
%pip install git+https://github.com/end-to-end-mlops-databricks-3/marvelous@0.1.0

In [0]:
# A better approach (this file must be present in a notebook folder, achieved via synchronization)
%pip install ../dist/fifa_players-0.0.1-py3-none-any.whl

In [0]:
#restart python
%restart_python

In [0]:
# system path update, must be after %restart_python
# caution! This is not a great approach
from pathlib import Path
import sys
sys.path.append(str(Path.cwd().parent / 'src'))

In [0]:
from pyspark.sql import SparkSession
import mlflow

from fifa_players import __version__
from fifa_players.config import ProjectConfig
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from lightgbm import LGBMRegressor
from mlflow.models import infer_signature
from marvelous.common import is_databricks
from dotenv import load_dotenv
import os
from mlflow import MlflowClient
import pandas as pd
from mlflow.utils.environment import _mlflow_conda_env
from databricks import feature_engineering
from databricks.feature_engineering import FeatureFunction, FeatureLookup
from pyspark.errors import AnalysisException
import numpy as np
from datetime import datetime
import boto3


In [0]:
if not is_databricks():
    load_dotenv()
    profile = os.environ["PROFILE"]
    mlflow.set_tracking_uri(f"databricks://{profile}")
    mlflow.set_registry_uri(f"databricks-uc://{profile}")


config = ProjectConfig.from_yaml(config_path="../project_config.yml", env="dev")

In [0]:
spark = SparkSession.builder.getOrCreate()
fe = feature_engineering.FeatureEngineeringClient()

train_set = spark.table(f"{config.catalog_name}.{config.schema_name}.train_set")
test_set = spark.table(f"{config.catalog_name}.{config.schema_name}.test_set")

In [0]:
# create feature table with information about players

feature_table_name = f"{config.catalog_name}.{config.schema_name}.player_features"
lookup_features = ["overall_rating", "potential", "release_clause_euro"]


In [0]:
feature_table = fe.create_table(
   name=feature_table_name,
   primary_keys=["Id"],
   df=train_set[["Id"]+lookup_features],
   description="Player features table",
)

spark.sql(f"ALTER TABLE {feature_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

fe.write_table(
   name=feature_table_name,
   df=test_set[["Id"]+lookup_features],
   mode="merge",
)

In [0]:
# create feature table with information about players
# Option 2: SQL

spark.sql(f"""
          CREATE OR REPLACE TABLE {feature_table_name}
          (Id STRING NOT NULL, overall_rating INT, potential INT, release_clause_euro INT);
          """)
# primary key on Databricks is not enforced!
try:
    spark.sql(f"ALTER TABLE {feature_table_name} ADD CONSTRAINT house_pk_demo PRIMARY KEY(Id);")
except AnalysisException:
    pass
spark.sql(f"ALTER TABLE {feature_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")
spark.sql(f"""
          INSERT INTO {feature_table_name}
          SELECT Id, overall_rating, potential, release_clause_euro
          FROM {config.catalog_name}.{config.schema_name}.train_set
          """)
spark.sql(f"""
          INSERT INTO {feature_table_name}
          SELECT Id, overall_rating, potential, release_clause_euro
          FROM {config.catalog_name}.{config.schema_name}.test_set
          """)

In [0]:
# create feature function
# docs: https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-ddl-create-sql-function

# problems with feature functions:
# functions are not versioned 
# functions may behave differently depending on the runtime (and version of packages and python)
# there is no way to enforce python version & package versions for the function 
# this is only supported from runtime 17
# advised to use only for simple calculations

function_name = f"{config.catalog_name}.{config.schema_name}.calculate_potential_ratio"

In [0]:

# Option 1: with Python
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {function_name} (rating INT, future_potential INT)
        RETURNS INT
        LANGUAGE PYTHON AS
        $$
        return rating / future_potential
        $$
        """)

In [0]:
# it is possible to define simple functions in sql only without python
# Option 2
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {function_name}_sql (rating INT, future_potential INT)
        RETURNS INT
        RETURN rating / future_potential;
        """)

In [0]:
# execute function
spark.sql(f"SELECT {function_name}_sql(65, 76) as potential_ratio;")

In [0]:
# create a training set
training_set = fe.create_training_set(
    df=train_set.drop("overall_rating", "potential", "release_clause_euro"),
    label=config.target,
    feature_lookups=[
        FeatureLookup(
            table_name=feature_table_name,
            feature_names=["overall_rating", "potential", "release_clause_euro"],
            lookup_key="Id",
                ),
        FeatureFunction(
            udf_name=function_name,
            output_name="potential_ratio",
            input_bindings={
                "rating": "overall_rating",
                "future_potential": "potential"
                },
            ),
    ],
    exclude_columns=["update_timestamp_utc"],
    )

In [0]:
# Train & register a model
training_df = training_set.load_df().toPandas()
X_train = training_df[config.num_features + config.cat_features + ["potential_ratio"]]
y_train = training_df[config.target]

In [0]:
pipeline = Pipeline(
        steps=[("preprocessor", ColumnTransformer(
            transformers=[("cat", OneHotEncoder(handle_unknown="ignore"),
                           config.cat_features)],
            remainder="passthrough")
            ),
               ("regressor", LGBMRegressor(**config.parameters))]
        )

pipeline.fit(X_train, y_train)

In [0]:
mlflow.set_experiment("/Shared/fifa-players-model-fe")
with mlflow.start_run(run_name="fifa-players-model-fe",
                      tags={"git_sha": "77889944bc",
                            "branch": "feat/week3"},
                            description="run for feature engineering of potential ratio of fifa players") as run:
    # Log parameters and metrics
    run_id = run.info.run_id
    mlflow.log_param("model_type", "LightGBM with preprocessing")
    mlflow.log_params(config.parameters)

    # Log the model
    signature = infer_signature(model_input=X_train, model_output=pipeline.predict(X_train))
    fe.log_model(
                model=pipeline,
                flavor=mlflow.sklearn,
                artifact_path="lightgbm-pipeline-model-fe",
                training_set=training_set,
                signature=signature,
            )
    

In [0]:
model_name = f"{config.catalog_name}.{config.schema_name}.fifa_players_model_fe_demo"
model_version = mlflow.register_model(
    model_uri=f'runs:/{run_id}/lightgbm-pipeline-model-fe',
    name=model_name,
    tags={"git_sha": "77889944bc"})

In [0]:
# make predictions
features = [f for f in ["Id"] + config.num_features + config.cat_features if f not in lookup_features]
predictions = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set[features]
)

In [0]:
predictions.select("prediction").show(5)

In [0]:
from pyspark.sql.functions import col

features = [f for f in ["Id"] + config.num_features + config.cat_features if f not in lookup_features]
test_set_with_new_id = test_set.select(*features).withColumn(
    "Id",
    (col("Id").cast("long") + 1000000).cast("string")
)

predictions = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set_with_new_id 
)

In [0]:
overallrating_function = f"{config.catalog_name}.{config.schema_name}.replace_overallrating_missing"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {overallrating_function}(overall_rating INT)
        RETURNS INT
        LANGUAGE PYTHON AS
        $$
        if overall_rating is None:
            return 5
        else:
            return overall_rating
        $$
        """)

potential_function = f"{config.catalog_name}.{config.schema_name}.replace_potential_missing"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {potential_function}(potential INT)
        RETURNS INT
        LANGUAGE PYTHON AS
        $$
        if potential is None:
            return 1000
        else:
            return potential
        $$
        """)

relclause_function = f"{config.catalog_name}.{config.schema_name}.replace_relclause_missing"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {relclause_function}(release_clause_euro INT)
        RETURNS INT
        LANGUAGE PYTHON AS
        $$
        if release_clause_euro is None:
            return 2
        else:
            return release_clause_euro
        $$
        """)

In [0]:
# what if we want to replace with a default value if entry is not found
# what if we want to look up value in another table? the logics get complex
# problems that arize: functions/ lookups always get executed (if statememt is not possible)
# it can get slow...

# step 1: create 3 feature functions

# step 2: redefine create training set

# try again

# create a training set
training_set = fe.create_training_set(
    df=train_set.drop("overall_rating", "potential", "release_clause_euro"),
    label=config.target,
    feature_lookups=[
        FeatureLookup(
            table_name=feature_table_name,
            feature_names=["overall_rating", "potential", "release_clause_euro"],
            lookup_key="Id",
            rename_outputs={"overall_rating": "lookup_overall_rating",
                            "potential": "lookup_potential",
                            "release_clause_euro": "lookup_release_clause_euro"}
                ),
        FeatureFunction(
            udf_name=overallrating_function,
            output_name="overall_rating",
            input_bindings={"overall_rating": "lookup_overall_rating"},
            ),
        FeatureFunction(
            udf_name=potential_function,
            output_name="potential",
            input_bindings={"potential": "lookup_potential"},
        ),
        FeatureFunction(
            udf_name=relclause_function,
            output_name="release_clause_euro",
            input_bindings={"release_clause_euro": "lookup_release_clause_euro"},
        ),
        FeatureFunction(
            udf_name=function_name,
            output_name="potential_ratio",
            input_bindings={
                "rating": "overall_rating",
                "future_potential": "potential"
                },
            ),
    ],
    exclude_columns=["update_timestamp_utc"],
    )

In [0]:
# Train & register a model
training_df = training_set.load_df().toPandas()
X_train = training_df[config.num_features + config.cat_features + ["potential_ratio"]]
y_train = training_df[config.target]

#pipeline
pipeline = Pipeline(
        steps=[("preprocessor", ColumnTransformer(
            transformers=[("cat", OneHotEncoder(handle_unknown="ignore"),
                           config.cat_features)],
            remainder="passthrough")
            ),
               ("regressor", LGBMRegressor(**config.parameters))]
        )

pipeline.fit(X_train, y_train)

In [0]:
mlflow.set_experiment("/Shared/fifa-players-model-fe")
with mlflow.start_run(run_name="fifa-players-model-fe",
                      tags={"git_sha": "77889944bc",
                            "branch": "feat/week3"},
                            description="run for feature engineering of potential ratio of fifa players") as run:
    # Log parameters and metrics
    run_id = run.info.run_id
    mlflow.log_param("model_type", "LightGBM with preprocessing")
    mlflow.log_params(config.parameters)

    # Log the model
    signature = infer_signature(model_input=X_train, model_output=pipeline.predict(X_train))
    fe.log_model(
                model=pipeline,
                flavor=mlflow.sklearn,
                artifact_path="lightgbm-pipeline-model-fe",
                training_set=training_set,
                signature=signature,
            )
model_name = f"{config.catalog_name}.{config.schema_name}.model_fe_demo"
model_version = mlflow.register_model(
    model_uri=f'runs:/{run_id}/lightgbm-pipeline-model-fe',
    name=model_name,
    tags={"git_sha": "77889944bc"})

In [0]:
from pyspark.sql.functions import col

features = [f for f in ["Id"] + config.num_features + config.cat_features if f not in lookup_features]
test_set_with_new_id = test_set.select(*features).withColumn(
    "Id",
    (col("Id").cast("long") + 1000000).cast("string")
)

predictions = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set_with_new_id 
)

In [0]:
# make predictions for a non-existing entry -> no error!
predictions.select("prediction").show(5)