In [1]:
%load_ext autoreload
%autoreload 2

# General Set up

In [2]:
from ml_example_project.train.train import Args
args = Args(
    company="GL",
    env="dev",
    is_run_on_databricks=False,
    is_use_feature_store=False
)

In [3]:
from ml_example_project.db import get_spark_session
spark = get_spark_session()

In [4]:
from ml_example_project.train.configs import get_company_train_configs
from constants.companies import get_company_by_code
company_train_configs = get_company_train_configs(company_code=args.company)
company_properties = get_company_by_code(company_code=args.company)


# Get training data

In [5]:
from ml_example_project.train.data import create_training_set
from ml_example_project.train.configs import feature_lookup_config_list
from databricks.feature_engineering import FeatureEngineeringClient
# Get training data
if (args.is_run_on_databricks) & (args.is_use_feature_store):
    fe = FeatureEngineeringClient()
else:
    fe = None
    # Override it if set to True.
    # It can't be used unless you are running on Databricks
    args.is_use_feature_store = False

training_set = create_training_set(
    spark=spark,
    company_id=company_properties.company_id,
    feature_lookup_config_list=feature_lookup_config_list,
    company_train_configs=company_train_configs,
    fe=fe
)

In [6]:
if args.is_use_feature_store:
    df_training = training_set.load_df().toPandas()
else:
    df_training = training_set.toPandas()

In [None]:
df_training

# An very minimal EDA

In [None]:
import plotly.express as px
# Create the scatter plot
fig = px.histogram(
    df_training,
    x="number_of_recipe_steps",
    color="recipe_difficulty_level_id",
    title="Cooking Time vs Recipe Difficulty Level",
    labels={"cooking_time_to": "Cooking Time (to)", "recipe_difficulty_level_id": "Recipe Difficulty Level ID"}
)

# Show the plot
fig.show()

# Build a model

In [9]:
from sklearn.model_selection import train_test_split
target = "recipe_difficulty_level_id"
X_train, X_val, y_train, y_val = train_test_split(
    df_training.drop(columns=[target]),
    df_training[target],
    test_size=0.2
)

In [13]:
from ml_example_project.train.preprocessor import PreProcessor
from sklearn.ensemble import RandomForestClassifier
preprocessor = PreProcessor(
    numeric_features=["number_of_ingredients",  "number_of_taxonomies"],
    categorical_features=["cooking_time_from"]
)

rf = RandomForestClassifier(
    **company_train_configs.model_params
)

In [14]:
from ml_example_project.train.model import ClassificationPipeline
pipeline = ClassificationPipeline(
    preprocessor=preprocessor,
    model=rf,
    task="classify"
)

In [15]:
pipeline.fit(
    X_train=X_train,
    y_train=y_train
)

In [18]:
y_pred = pipeline.predict(model_input=X_val, context=None)

In [19]:
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_true=y_val, y_pred=y_pred)

# Use the big wrapper

In [None]:
from ml_example_project.train.train import Args, train_model

args = Args(company="AMK", env="dev", is_run_on_databricks=False)
train_model(args=args, spark=spark)

# Predict

In [4]:
import mlflow
mlflow.set_tracking_uri(f"databricks://{args.profile_name}")

In [76]:
model_uri = 'runs:/d69a11fda38b434aaec4db2377fb2cea/test'
loaded_model = mlflow.pyfunc.load_model(model_uri)

In [65]:
from ml_example_project.predict.data import create_predict_dataframe
df_predict_pk, predict_data = create_predict_dataframe(
    spark=spark,
    company_id=company_properties.company_id,
    predict_start_yyyyww=202450,
    predict_end_yyyyww=202451,
    is_use_feature_store= False,
)

In [66]:
y_pred = loaded_model.predict(predict_data.toPandas())

In [73]:
df_result = df_predict_pk.toPandas()
df_result["recipe_difficulty_level_id_prediction"] = y_pred
spark_df_result = spark.createDataFrame(df_result)

In [75]:
from ml_example_project.db import save_outputs
save_outputs(
    spark_df=spark_df_result,
    table_name="ml_example_project_predictions",
    table_schema="mloutputs"
)