In [0]:
import sys
print(sys.version)



3.11.11 (main, Dec  4 2024, 08:55:07) [GCC 11.4.0]


In [0]:
# install dependencies
%pip install -e ..
%pip install git+https://github.com/end-to-end-mlops-databricks-3/marvelous@0.1.0
%pip install python-dotenv
%pip install databricks-feature-engineering databricks-feature-lookup

Obtaining file:///Workspace/Users/giridharanvel%40gmail.com/.bundle/marvelous-databricks-course-Giri-Vel/dev/files_2/Giri-Vel-marvelous-databricks-course-Giri-Vel
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Checking if build backend supports build_editable: started
  Checking if build backend supports build_editable: finished with status 'done'
  Getting requirements to build editable: started
  Getting requirements to build editable: finished with status 'done'
  Preparing editable metadata (pyproject.toml): started
  Preparing editable metadata (pyproject.toml): finished with status 'done'
Collecting mlflow==2.17.0 (from hotel-reservations==0.0.1)
  Obtaining dependency information for mlflow==2.17.0 from https://files.pythonhosted.org/packages/bd/af/fdf92ad9f654f2210f225a56b4d45698f6f171d69c1195461b9fa18c5543/mlflow-2.17.0-py3-none-any.whl.metadata
  Downloading mlflow-2.17.0-py3-none-any.whl.metadata (29 kB)
Collecting cff

In [0]:
#restart python
%restart_python

In [0]:
# system path update, must be after %restart_python
# caution! This is not a great approach
from pathlib import Path
import sys
sys.path.append(str(Path.cwd().parent / 'src'))

In [0]:
# A better approach (this file must be present in a notebook folder, achieved via synchronization)
# %pip install hotel_reservation-1.0.1-py3-none-any.whl

In [0]:
from pyspark.sql import SparkSession
import mlflow

from hotel_reservations.config import ProjectConfig
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from lightgbm import LGBMClassifier
from mlflow.models import infer_signature
from marvelous.common import is_databricks
from dotenv import load_dotenv
import os
from mlflow import MlflowClient
import pandas as pd
from hotel_reservations import __version__
from mlflow.utils.environment import _mlflow_conda_env
from databricks import feature_engineering
from databricks.feature_engineering import FeatureFunction, FeatureLookup
from databricks.feature_engineering import FeatureEngineeringClient
from pyspark.errors import AnalysisException
import numpy as np
from datetime import datetime
import boto3


In [0]:
if not is_databricks():
    load_dotenv()
    profile = os.environ["PROFILE"]
    mlflow.set_tracking_uri(f"databricks://{profile}")
    mlflow.set_registry_uri(f"databricks-uc://{profile}")


config = ProjectConfig.from_yaml(config_path="../project_config.yml", env="dev")

In [0]:
print(config.catalog_name)
print(config.schema_name)


mlops_dev
giridhar


In [0]:
spark = SparkSession.builder.getOrCreate()
fe = feature_engineering.FeatureEngineeringClient()

train_set = spark.table(f"{config.catalog_name}.{config.schema_name}.train_set")
test_set = spark.table(f"{config.catalog_name}.{config.schema_name}.test_set")

In [0]:
print(train_set)
print(test_set)


DataFrame[type_of_meal_plan: string, required_car_parking_space: bigint, room_type_reserved: string, market_segment_type: string, no_of_adults: bigint, no_of_children: bigint, no_of_weekend_nights: bigint, no_of_week_nights: bigint, lead_time: bigint, repeated_guest: bigint, no_of_previous_cancellations: bigint, no_of_previous_bookings_not_canceled: bigint, avg_price_per_room: double, no_of_special_requests: bigint, arrival_date: bigint, arrival_year: bigint, arrival_month: bigint, booking_status: string, Booking_ID: string, month_as_sin: double, month_as_cos: double, update_timestamp_utc: timestamp]
DataFrame[type_of_meal_plan: string, required_car_parking_space: bigint, room_type_reserved: string, market_segment_type: string, no_of_adults: bigint, no_of_children: bigint, no_of_weekend_nights: bigint, no_of_week_nights: bigint, lead_time: bigint, repeated_guest: bigint, no_of_previous_cancellations: bigint, no_of_previous_bookings_not_canceled: bigint, avg_price_per_room: double, no_o

In [0]:
# create feature table with information about houses

feature_table_name = f"{config.catalog_name}.{config.schema_name}.giridhar_hotres_features_demo"
lookup_features = ["type_of_meal_plan", "required_car_parking_space", "room_type_reserved", "market_segment_type", "no_of_adults", "no_of_children"]


In [0]:
# df = test_set.select("Booking_ID", *lookup_features)
# df.printSchema()

In [0]:
# Option 1: feature engineering client

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

feature_table = fe.create_table(
   name=feature_table_name,
   primary_keys=["Booking_ID"],
   df=train_set[["Booking_ID"]+lookup_features],
   description="Hotel Reservations features table",
)

spark.sql(f"ALTER TABLE {feature_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

int_cols = ["required_car_parking_space", "no_of_adults", "no_of_children"]
df = test_set.select("Booking_ID", *lookup_features)
for c in int_cols:
    df = df.withColumn(c, col(c).cast(IntegerType()))

fe.write_table(
   name=feature_table_name,
   df=df, #test_set[["Booking_ID"]+lookup_features],
   mode="merge",
)

In [0]:
# spark.sql(f"DESCRIBE TABLE {feature_table_name}").show(truncate=False)

In [0]:
# create feature table with information about houses
# Option 2: SQL

spark.sql(f"""
          CREATE OR REPLACE TABLE {feature_table_name}
          (Booking_ID STRING NOT NULL, type_of_meal_plan STRING, required_car_parking_space INT, room_type_reserved STRING, market_segment_type STRING, no_of_adults INT, no_of_children INT);
          """)
# primary key on Databricks is not enforced!
try:
    spark.sql(f"ALTER TABLE {feature_table_name} ADD CONSTRAINT hotres_pk_demo PRIMARY KEY(Booking_ID);")
except AnalysisException:
    pass
spark.sql(f"ALTER TABLE {feature_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")
spark.sql(f"""
          INSERT INTO {feature_table_name}
          SELECT Booking_ID, type_of_meal_plan, required_car_parking_space, room_type_reserved, market_segment_type, no_of_adults, no_of_children
          FROM {config.catalog_name}.{config.schema_name}.train_set
          """)
spark.sql(f"""
          INSERT INTO {feature_table_name}
          SELECT Booking_ID, type_of_meal_plan, required_car_parking_space, room_type_reserved, market_segment_type, no_of_adults, no_of_children
          FROM {config.catalog_name}.{config.schema_name}.test_set
          """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# create feature function
# docs: https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-ddl-create-sql-function

# problems with feature functions:
# functions are not versioned 
# functions may behave differently depending on the runtime (and version of packages and python)
# there is no way to enforce python version & package versions for the function 
# this is only supported from runtime 17
# advised to use only for simple calculations

function_name = f"{config.catalog_name}.{config.schema_name}.add_adults_children_demo"

In [0]:

# Option 1: with Python
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {function_name}(no_of_adults INT, no_of_children INT)
        RETURNS INT
        LANGUAGE PYTHON AS
        $$
        return no_of_adults + no_of_children
        $$
        """)

DataFrame[]

In [0]:
# it is possible to define simple functions in sql only without python
# Option 2
# commenting for now
# spark.sql(f"""
#         CREATE OR REPLACE FUNCTION {function_name}_sql (no_of_adults INT, no_of_children INT)
#         RETURNS INT
#         RETURN no_of_adults + no_of_children;
#         """)

In [0]:
# execute function
spark.sql(f"SELECT {function_name}_sql(5, 3) as adults_and_childs;").show()

+-----------------+
|adults_and_childs|
+-----------------+
|                8|
+-----------------+



In [0]:
# create a training set
training_set = fe.create_training_set(
    df=train_set.drop("type_of_meal_plan",
                      "required_car_parking_space",
                      "room_type_reserved",
                      "market_segment_type", 
                      "no_of_adults", 
                      "no_of_children"),
    label=config.target,
    feature_lookups=[
        FeatureLookup(
            table_name=feature_table_name,
            feature_names=["type_of_meal_plan",
                            "required_car_parking_space",
                            "room_type_reserved",
                            "market_segment_type", 
                            "no_of_adults", 
                            "no_of_children"],
            lookup_key="Booking_ID",
                ),
        FeatureFunction(
            udf_name=function_name,
            output_name="adults_and_childs",
            input_bindings={"no_of_adults": "no_of_adults", "no_of_children": "no_of_children"},
            ),
    ],
    # exclude_columns=["update_timestamp_utc"],
    )

In [0]:
# Train & register a model
training_df = training_set.load_df().toPandas()
X_train = training_df[config.num_features + config.cat_features + ["adults_and_childs"]]
y_train = training_df[config.target]

In [0]:
X_train[['no_of_adults', 'no_of_children', 'adults_and_childs']].head()

Unnamed: 0,no_of_adults,no_of_children,adults_and_childs
0,2,1,3
1,2,1,3
2,2,0,2
3,2,0,2
4,1,0,1


In [0]:
X_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 29020 entries, 0 to 29019
Data columns (total 18 columns):
 #   Column                                Non-Null Count  Dtype  
---  ------                                --------------  -----  
 0   no_of_adults                          29020 non-null  int32  
 1   no_of_children                        29020 non-null  int32  
 2   no_of_weekend_nights                  29020 non-null  int64  
 3   no_of_week_nights                     29020 non-null  int64  
 4   lead_time                             29020 non-null  int64  
 5   repeated_guest                        29020 non-null  int64  
 6   no_of_previous_cancellations          29020 non-null  int64  
 7   no_of_previous_bookings_not_canceled  29020 non-null  int64  
 8   avg_price_per_room                    29020 non-null  float64
 9   no_of_special_requests                29020 non-null  int64  
 10  arrival_date                          29020 non-null  int64  
 11  arrival_year   

In [0]:
y_train.info()

<class 'pandas.core.series.Series'>
RangeIndex: 29020 entries, 0 to 29019
Series name: booking_status
Non-Null Count  Dtype 
--------------  ----- 
29020 non-null  object
dtypes: object(1)
memory usage: 226.8+ KB


In [0]:
# set the classifier parameters 

train_parameters = {
    "learning_rate": 0.1,
    "n_estimators": 1000,
    "max_depth": 8,
    "num_leaves": 31,
    "min_child_samples": 20,
    "subsample": 0.8,
    "colsample_bytree": 0.8,
    "random_state": 42,
}



In [0]:
pipeline = Pipeline(
        steps=[("preprocessor", ColumnTransformer(
            transformers=[("cat", OneHotEncoder(handle_unknown="ignore"),
                           config.cat_features)],
            remainder="passthrough")
            ),
               ("classifier", LGBMClassifier(**train_parameters))]
        )

pipeline.fit(X_train, y_train)

[LightGBM] [Info] Number of positive: 19551, number of negative: 9469
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.005833 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 669
[LightGBM] [Info] Number of data points in the train set: 29020, number of used features: 30
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.673708 -> initscore=0.725003
[LightGBM] [Info] Start training from score 0.725003


The format of the columns of the 'remainder' transformer in ColumnTransformer.transformers_ will change in version 1.7 to match the format of the other transformers.
At the moment the remainder columns are stored as indices (of type int). With the same ColumnTransformer configuration, in the future they will be stored as column names (of type str).



In [0]:
mlflow.set_experiment("/Shared/giridhar-hotres-model-fe")
with mlflow.start_run(run_name="giridhar-hotres-model-fe",
                      tags={"git_sha": "1234567890abcdefg",
                            "branch": "week3"},
                            description="demo run for FE model logging") as run:
    # Log parameters and metrics
    run_id = run.info.run_id
    mlflow.log_param("model_type", "LightGBM Classifier with preprocessing")
    mlflow.log_params(train_parameters)

    # Log the model
    signature = infer_signature(model_input=X_train, model_output=pipeline.predict(X_train))
    fe.log_model(
                model=pipeline,
                flavor=mlflow.sklearn,
                artifact_path="lightgbm-pipeline-model-fe",
                training_set=training_set,
                signature=signature,
            )
    

2025/06/09 17:39:51 INFO mlflow.tracking._tracking_service.client: 🏃 View run giridhar-hotres-model-fe at: https://dbc-c2e8445d-159d.cloud.databricks.com/ml/experiments/3264736349207955/runs/f7b8a3f538de44a892c182edfe78f8eb.
2025/06/09 17:39:51 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: https://dbc-c2e8445d-159d.cloud.databricks.com/ml/experiments/3264736349207955.


In [0]:
model_name = f"{config.catalog_name}.{config.schema_name}.giridhar_hotres_model_fe_demo"
model_version = mlflow.register_model(
    model_uri=f'runs:/{run_id}/lightgbm-pipeline-model-fe',
    name=model_name,
    tags={"git_sha": "1234567890abcdxyz"})

Registered model 'mlops_dev.giridhar.giridhar_hotres_model_fe_demo' already exists. Creating a new version of this model...
Created version '23' of model 'mlops_dev.giridhar.giridhar_hotres_model_fe_demo'.


In [0]:
print(config.num_features)
print(config.cat_features)
print(lookup_features)

['no_of_adults', 'no_of_children', 'no_of_weekend_nights', 'no_of_week_nights', 'lead_time', 'repeated_guest', 'no_of_previous_cancellations', 'no_of_previous_bookings_not_canceled', 'avg_price_per_room', 'no_of_special_requests', 'arrival_date', 'arrival_year', 'arrival_month']
['type_of_meal_plan', 'required_car_parking_space', 'room_type_reserved', 'market_segment_type']
['type_of_meal_plan', 'required_car_parking_space', 'room_type_reserved', 'market_segment_type', 'no_of_adults', 'no_of_children']


In [0]:
# features = [f for f in ["Booking_ID"] + config.num_features + config.cat_features]
# features += ['adults_and_childs', "update_timestamp_utc", "month_as_cos", "month_as_sin"]
# print(features)

In [0]:
from pyspark.sql.functions import expr, col, when, pandas_udf
from pyspark.sql.types import IntegerType, ArrayType, DoubleType, StringType
import mlflow.pyfunc


# preping the test set
test_set = (
    test_set
      .withColumn("no_of_adults",               col("no_of_adults").cast(IntegerType()))
      .withColumn("no_of_children",             col("no_of_children").cast(IntegerType()))
      .withColumn("required_car_parking_space", col("required_car_parking_space").cast(IntegerType()))
      # .withColumn("adults_and_childs",          col("adults_and_childs").cast(IntegerType()))
)


test_set = test_set.withColumn(
    "adults_and_childs",
    expr(f"{config.catalog_name}.{config.schema_name}"
         f".add_adults_children_demo_sql(no_of_adults, no_of_children)")
)

# prepare the features 
features = ["Booking_ID"] + config.num_features + config.cat_features + [
    "update_timestamp_utc", "month_as_cos", "month_as_sin", "adults_and_childs"
]
features = [f for f in ["Booking_ID"] + config.num_features + config.cat_features if f not in lookup_features]
features += ["update_timestamp_utc", "month_as_cos", "month_as_sin"]


fe = FeatureEngineeringClient()

# Score via FE client, asking for StringType output
preds_df = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set.select(*features),
    result_type=StringType()
)

# Rename that “prediction” column to your label
preds_labeled = preds_df.withColumnRenamed("prediction", "prediction_label")

preds_labeled.select("Booking_ID", "prediction_label").show(5)





# # now making the predictions
# # because it can only output double, we have to tweak it a little bit
# preds_df = fe.score_batch(
#     model_uri=f"models:/{model_name}/{model_version.version}",
#     df=test_set.select(*features)
# )

# # 2) Convert the `prediction` double into a label
# preds_with_label = preds_df.withColumn(
#     "prediction_label",
#     when(col("prediction") >= 0.5, "1").otherwise("0")
# )

# preds_with_label.show(5)

# # the below code also proved to be a bit tricky as it had to remove some dependencies
# from pyspark.sql.types import StringType  # or IntegerType()
# import mlflow.pyfunc

# # Create a UDF that returns e.g. String labels
# predict_udf = mlflow.pyfunc.spark_udf(
#     spark,
#     f"models:/{model_name}/{model_version.version}",
#     result_type=StringType()
# )

# # Apply it to your feature DataFrame
# predictions = test_set.select(
#     *features,
#     predict_udf(*features).alias("prediction")
# )

# predictions.show(5)


2025/06/09 17:39:58 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


+----------+----------------+
|Booking_ID|prediction_label|
+----------+----------------+
|  INN00626|    Not_Canceled|
|  INN10204|    Not_Canceled|
|  INN20020|    Not_Canceled|
|  INN16435|    Not_Canceled|
|  INN07143|    Not_Canceled|
+----------+----------------+
only showing top 5 rows



In [0]:
# predictions = test_set.withColumn("prediction", predict_udf(*features))
# predictions.select("prediction").show(5)


In [0]:
# for me this will also provide an error because since the model is output is a string and fe.client only outputs a double.
# predictions.select("prediction").show(5)

In [0]:
from pyspark.sql.functions import col, concat, lit


features = [f for f in ["Booking_ID"] + config.num_features + config.cat_features if f not in lookup_features]
# adding some more features 
features += ["update_timestamp_utc", "month_as_cos", "month_as_sin"]

# # commenting to make a minor change, see the immediate code below 
# test_set_with_new_id = test_set.select(*features).withColumn(
#     "Booking_ID",
#     (col("Booking_ID").cast("long") + 1000000).cast("string")
# )

test_set_with_new_id = test_set.select(*features).withColumn(
    "Booking_ID",
    (concat(lit("a"), col("Booking_ID")))
)

preds_df2 = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=test_set_with_new_id,
    result_type=StringType()
)


2025/06/09 17:40:13 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


In [0]:
# make predictions for a non-existing entry -> error!
# preds_df2.select("prediction").show(5)

In [0]:
baseadults_function = f"{config.catalog_name}.{config.schema_name}.base_adults_to_one"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {baseadults_function}(no_of_adults INT)
        RETURNS INT
        LANGUAGE PYTHON AS
        $$
        if no_of_adults is None or no_of_adults == 0:
            return 1
        else:
            return no_of_adults
        $$
        """)

basechilds_function = f"{config.catalog_name}.{config.schema_name}.base_childs_to_one"
spark.sql(f"""
        CREATE OR REPLACE FUNCTION {basechilds_function}(no_of_children INT)
        RETURNS INT
        LANGUAGE PYTHON AS
        $$
        if no_of_children is None or no_of_children == 0:
            return 1
        else:
            return no_of_children
        $$
        """)


DataFrame[]

In [0]:
# what if we want to replace with a default value if entry is not found
# what if we want to look up value in another table? the logics get complex
# problems that arize: functions/ lookups always get executed (if statememt is not possible)
# it can get slow...

# step 1: create 3 feature functions

# step 2: redefine create training set

# try again

# create a training set
training_set = fe.create_training_set(
    df=train_set.drop("no_of_adults", "no_of_children"),
    label=config.target,
    feature_lookups=[
        FeatureLookup(
            table_name=feature_table_name,
            feature_names=["no_of_adults", "no_of_children"],
            lookup_key="Booking_ID",
            rename_outputs={"no_of_adults": "based_adults",
                            "no_of_children": "based_childs"}
                ),
        FeatureFunction(
            udf_name=baseadults_function,
            output_name="no_of_adults",
            input_bindings={"no_of_adults": "based_adults"},
            ),
        FeatureFunction(
            udf_name=basechilds_function,
            output_name="no_of_children",
            input_bindings={"no_of_children": "based_childs"},
            ),
        ],
    # exclude_columns=["update_timestamp_utc", "based_adults", "based_childs"],
      )
    #     ),
    #     FeatureFunction(
    #         udf_name=garagecars_function,
    #         output_name="GarageCars",
    #         input_bindings={"GarageCars": "lookup_GarageCars"},
    #     ),
    #     FeatureFunction(
    #         udf_name=function_name,
    #         output_name="house_age",
    #         input_bindings={"year_built": "YearBuilt"},
    #         ),
    # ],
    # exclude_columns=["update_timestamp_utc"],
    # )

In [0]:
# # Materialize the training set into a Spark DataFrame
# ts_df = training_set.load_df()

# # 1) Check the schema
# ts_df.printSchema()

# # 2) Peek at the key columns
# ts_df.select(
#     "Booking_ID",
#     "based_adults",
#     "based_childs",
#     "no_of_adults",
#     "no_of_children",
#     config.target
# ).show(5, truncate=False)


In [0]:
X_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 29020 entries, 0 to 29019
Data columns (total 18 columns):
 #   Column                                Non-Null Count  Dtype  
---  ------                                --------------  -----  
 0   no_of_adults                          29020 non-null  int32  
 1   no_of_children                        29020 non-null  int32  
 2   no_of_weekend_nights                  29020 non-null  int64  
 3   no_of_week_nights                     29020 non-null  int64  
 4   lead_time                             29020 non-null  int64  
 5   repeated_guest                        29020 non-null  int64  
 6   no_of_previous_cancellations          29020 non-null  int64  
 7   no_of_previous_bookings_not_canceled  29020 non-null  int64  
 8   avg_price_per_room                    29020 non-null  float64
 9   no_of_special_requests                29020 non-null  int64  
 10  arrival_date                          29020 non-null  int64  
 11  arrival_year   

In [0]:
training_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 29020 entries, 0 to 29019
Data columns (total 23 columns):
 #   Column                                Non-Null Count  Dtype         
---  ------                                --------------  -----         
 0   no_of_weekend_nights                  29020 non-null  int64         
 1   no_of_week_nights                     29020 non-null  int64         
 2   lead_time                             29020 non-null  int64         
 3   repeated_guest                        29020 non-null  int64         
 4   no_of_previous_cancellations          29020 non-null  int64         
 5   no_of_previous_bookings_not_canceled  29020 non-null  int64         
 6   avg_price_per_room                    29020 non-null  float64       
 7   no_of_special_requests                29020 non-null  int64         
 8   arrival_date                          29020 non-null  int64         
 9   arrival_year                          29020 non-null  int64         
 10

In [0]:
# Train & register a model
training_df = training_set.load_df().toPandas()
num_features_adjusted = config.num_features
num_features_adjusted = [feature for feature in num_features_adjusted if feature not in ["no_of_adults", "no_of_children"]]
X_train = training_df[num_features_adjusted + config.cat_features + ["based_adults"] + ["based_childs"]]
y_train = training_df[config.target]

#pipeline
pipeline = Pipeline(
        steps=[("preprocessor", ColumnTransformer(
            transformers=[("cat", OneHotEncoder(handle_unknown="ignore"),
                           config.cat_features)],
            remainder="passthrough")
            ),
               ("classifier", LGBMClassifier(**train_parameters))]
        )

pipeline.fit(X_train, y_train)

[LightGBM] [Info] Number of positive: 19551, number of negative: 9469
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.004096 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 662
[LightGBM] [Info] Number of data points in the train set: 29020, number of used features: 29
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.673708 -> initscore=0.725003
[LightGBM] [Info] Start training from score 0.725003


The format of the columns of the 'remainder' transformer in ColumnTransformer.transformers_ will change in version 1.7 to match the format of the other transformers.
At the moment the remainder columns are stored as indices (of type int). With the same ColumnTransformer configuration, in the future they will be stored as column names (of type str).



In [0]:
# ts = training_set.load_df()
# ts.printSchema()
# ts.select("Booking_ID", "based_adults", "based_childs", "no_of_adults", "no_of_children").show(5)


In [0]:
# just checking X_train

X_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 29020 entries, 0 to 29019
Data columns (total 17 columns):
 #   Column                                Non-Null Count  Dtype  
---  ------                                --------------  -----  
 0   no_of_weekend_nights                  29020 non-null  int64  
 1   no_of_week_nights                     29020 non-null  int64  
 2   lead_time                             29020 non-null  int64  
 3   repeated_guest                        29020 non-null  int64  
 4   no_of_previous_cancellations          29020 non-null  int64  
 5   no_of_previous_bookings_not_canceled  29020 non-null  int64  
 6   avg_price_per_room                    29020 non-null  float64
 7   no_of_special_requests                29020 non-null  int64  
 8   arrival_date                          29020 non-null  int64  
 9   arrival_year                          29020 non-null  int64  
 10  arrival_month                         29020 non-null  int64  
 11  type_of_meal_pl

In [0]:
# 1) The list of feature columns your model was trained on
train_feats = set(X_train.columns)

# 2) The columns emitted by your FE pipeline (which includes extra columns like label and metadata)
all_columns = set(training_set.load_df().columns)

# 3) Columns you need to ignore when comparing
to_ignore = {
    config.target,            # booking_status label
    "Booking_ID",             # just an ID, not a model input
    "update_timestamp_utc",   # time metadata
    "month_as_sin",           # these came from your feature script, but not in X_train
    "month_as_cos"
}

# 4) Compute the actual FE-produced feature set
fe_produced_feats = all_columns - to_ignore

missing = train_feats - fe_produced_feats
extra   = fe_produced_feats - train_feats

print("Missing in FE pipeline:", missing)
print("Extra    in FE pipeline:", extra)



Missing in FE pipeline: set()
Extra    in FE pipeline: {'no_of_children', 'no_of_adults'}


In [0]:
mlflow.set_experiment("/Shared/giridhar-hotres-model-fe")
with mlflow.start_run(run_name="giridhar-hotres-model-fe",
                      tags={"git_sha": "1234567890abcdefgh",
                            "branch": "week3"},
                            description="demo run for FE model logging") as run:
    # Log parameters and metrics
    run_id = run.info.run_id
    mlflow.log_param("model_type", "LightGBM classifier with preprocessing")
    mlflow.log_params(train_parameters)

    X_train["based_adults"] = X_train["based_adults"].astype(float)
    X_train["based_childs"] = X_train["based_childs"].astype(float)


    # Log the model
    signature = infer_signature(model_input=X_train, model_output=pipeline.predict(X_train))
    fe.log_model(
                model=pipeline,
                flavor=mlflow.sklearn,
                artifact_path="lightgbm-pipeline-model-fe",
                training_set=training_set,
                signature=signature,
            )
model_name = f"{config.catalog_name}.{config.schema_name}.giridhar_hotres_model_fe_demo"
model_version = mlflow.register_model(
    model_uri=f'runs:/{run_id}/lightgbm-pipeline-model-fe',
    name=model_name,
    tags={"git_sha": "1234567890abcd"})

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_train["based_adults"] = X_train["based_adults"].astype(float)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_train["based_childs"] = X_train["based_childs"].astype(float)
2025/06/09 17:40:47 INFO mlflow.tracking._tracking_service.client: 🏃 View run giridhar-hotres-model-fe at: https://dbc-c2e8445d-159d.cloud.databricks.com/ml/experiments/3264736349207955/runs/1ab48571924140a7a8f703f196d96cd8.
2025/06/09 17:40:47 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: https://dbc-c2e8445d-159d.cl

In [0]:
# features = [f for f in ["Booking_ID"] + num_features_adjusted + config.cat_features if f not in lookup_features]
# features += ['room_type_reserved', 'market_segment_type', 'required_car_parking_space', 'type_of_meal_plan', 'month_as_cos', 'month_as_sin', 'based_adults', 'based_childs', 'update_timestamp_utc', 'no_of_adults', 'no_of_children']
# print(features)



In [0]:
# # 1) Print the schema to confirm both lookup cols are present
# test_set_with_new_id.printSchema()

# # 2) Peek at a few rows of those columns
# test_set_with_new_id.select("Booking_ID","based_adults","based_childs").show(5, truncate=False)


In [0]:
from pyspark.sql.functions import concat, lit
from pyspark.sql.types import StringType

# 1) Build the raw scoring DF (keep all columns, just prefix IDs)
scoring_df = test_set.withColumn(
    "Booking_ID",
    concat(lit("a"), col("Booking_ID"))
)

# 2) Score with FE client against version 18
preds2 = fe.score_batch(
    model_uri=f"models:/{model_name}/{model_version.version}",
    df=scoring_df,
    result_type=StringType()
)

# 3) Inspect
preds2.select("Booking_ID", "prediction").show(5)


2025/06/09 17:40:52 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


+----------+------------+
|Booking_ID|  prediction|
+----------+------------+
| aINN04969|Not_Canceled|
| aINN34541|Not_Canceled|
| aINN36109|Not_Canceled|
| aINN01554|    Canceled|
| aINN24975|Not_Canceled|
+----------+------------+
only showing top 5 rows



In [0]:
# make predictions for a non-existing entry -> no error!
# predictions.select("prediction").show(5)

In [0]:
import boto3

region_name = "eu-west-1"
aws_access_key_id = os.environ["aws_access_key_id"]
aws_secret_access_key = os.environ["aws_secret_access_key"]

client = boto3.client(
    'dynamodb',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

In [0]:
# to check if the table exists already 

import botocore

try:
    client.describe_table(TableName='GiridharHotelReservationFeatures')
    print("Table already exists; skipping creation.")
except client.exceptions.ResourceNotFoundException:
    client.create_table(
        TableName='GiridharHotelReservationFeatures',
        KeySchema=[{'AttributeName': 'Booking_ID', 'KeyType': 'HASH'}],
        AttributeDefinitions=[{'AttributeName': 'Booking_ID', 'AttributeType': 'S'}],
        ProvisionedThroughput={'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}
    )
    print("Table created.")


Table already exists; skipping creation.


In [0]:
## Commenting for now 

# response = client.create_table(
#     TableName='GiridharHotelReservationFeatures',
#     KeySchema=[
#         {
#             'AttributeName': 'Booking_ID',
#             'KeyType': 'HASH'  # Partition key
#         }
#     ],
#     AttributeDefinitions=[
#         {
#             'AttributeName': 'Booking_ID',
#             'AttributeType': 'S'  # String
#         }
#     ],
#     ProvisionedThroughput={
#         'ReadCapacityUnits': 5,
#         'WriteCapacityUnits': 5
#     }
# )

# print("Table creation initiated:", response['TableDescription']['TableName'])

In [0]:
client.put_item(
    TableName='GiridharHotelReservationFeatures',
    Item={
        'Booking_ID': {'S': 'eINN33712'},
        'no_of_adults': {'N': '4'},
        'no_of_children': {'N': '10'},
    }
)

{'ResponseMetadata': {'RequestId': 'MH5PCQ4INAVE8D0TE4KRKPNA13VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Mon, 09 Jun 2025 17:41:01 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'MH5PCQ4INAVE8D0TE4KRKPNA13VV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

In [0]:
response = client.get_item(
    TableName='GiridharHotelReservationFeatures',
    Key={
        'Booking_ID': {'S': 'eINN33712'}
    }
)

# Extract the item from the response
item = response.get('Item')
print(item)

{'no_of_adults': {'N': '4'}, 'no_of_children': {'N': '10'}, 'Booking_ID': {'S': 'eINN33712'}}


In [0]:
import time
import boto3

# (Assumes boto3 can already pick up credentials/region from your cluster)
# client = boto3.client("dynamodb")


rows = spark.table(feature_table_name).toPandas().to_dict(orient="records")

def to_dynamodb_item(row):
    return {
        'PutRequest': {
            'Item': {
                'Booking_ID': {'S': str(row['Booking_ID'])},
                'no_of_adults': {'N': str(row['no_of_adults'])},
                'no_of_children': {'N': str(row['no_of_children'])}
            }
        }
    }

items = [to_dynamodb_item(row) for row in rows]

# limiting the number of batches
limit = 1 * 25
sliced_items = items[:limit]


def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i : i + n]

for idx, batch in enumerate(chunks(sliced_items, 25), start=1):
    to_send = batch
    attempt = 1
    while to_send:
        response = client.batch_write_item(RequestItems={'GiridharHotelReservationFeatures': to_send})
        unprocessed = response.get('UnprocessedItems', {}).get('GiridharHotelReservationFeatures', [])
        if unprocessed:
            print(f"Batch {idx} attempt {attempt}: {len(unprocessed)} unprocessed, retrying...")
            to_send = unprocessed
            attempt += 1
            time.sleep(1)  # small backoff before retry
        else:
            print(f"Batch {idx}: all {len(batch)} items written.")
            to_send = None


Batch 1: all 25 items written.


In [0]:

from pprint import pprint

# look at the first batch payload
first_batch = list(chunks(sliced_items, 25))[0]
print("First batch payload:")
pprint(first_batch)


First batch payload:
[{'PutRequest': {'Item': {'Booking_ID': {'S': 'INN25630'},
                          'no_of_adults': {'N': '2'},
                          'no_of_children': {'N': '1'}}}},
 {'PutRequest': {'Item': {'Booking_ID': {'S': 'INN14474'},
                          'no_of_adults': {'N': '2'},
                          'no_of_children': {'N': '1'}}}},
 {'PutRequest': {'Item': {'Booking_ID': {'S': 'INN23721'},
                          'no_of_adults': {'N': '2'},
                          'no_of_children': {'N': '0'}}}},
 {'PutRequest': {'Item': {'Booking_ID': {'S': 'INN05844'},
                          'no_of_adults': {'N': '2'},
                          'no_of_children': {'N': '0'}}}},
 {'PutRequest': {'Item': {'Booking_ID': {'S': 'INN18710'},
                          'no_of_adults': {'N': '1'},
                          'no_of_children': {'N': '0'}}}},
 {'PutRequest': {'Item': {'Booking_ID': {'S': 'INN07412'},
                          'no_of_adults': {'N': '2'},
      

In [0]:
# We ran into more limitations when we tried complex data types as output of a feature function
# and then tried to use it for serving
# al alternatve solution: using an external database (we use DynamoDB here)

# create a DynamoDB table
# insert records into dynamo DB & read from dynamoDB

# create a pyfunc model

In [0]:

# class HousePriceModelWrapper(mlflow.pyfunc.PythonModel):
#     """Wrapper class for machine learning models to be used with MLflow.

#     This class wraps a machine learning model for predicting house prices.
#     """

#     def __init__(self, model: object) -> None:
#         """Initialize the HousePriceModelWrapper.

#         :param model: The underlying machine learning model.
#         """
#         self.model = model
    
#     def predict(self, context, model_input):
#         parsed = []
#         for lookup_id in model_input["Booking_ID"]:
#             response = client.get_item(
#                 TableName="GiridharHotelReservationFeatures",
#                 Key={"Booking_ID": {"S": lookup_id}}
#             )
#             raw_item = response.get("Item")
#             if not raw_item:
#                 # No record found; decide what makes sense (e.g., skip or fill with defaults)
#                 print(f"Warning: Booking_ID {lookup_id} not found in DynamoDB.")
#                 continue
            
#             parsed_dict = {
#                 key: int(value["N"]) if "N" in value else value["S"]
#                 for key, value in raw_item.items()
#             }
#             parsed.append(parsed_dict)

#         # Now `parsed` contains only the records that existed.
#         # Continue with whatever your model expects:
#         return super().predict(context=context, model_input=parsed)


#     # def predict(
#     #     self, context: mlflow.pyfunc.PythonModelContext, model_input: pd.DataFrame | np.ndarray
#     # ) -> dict[str, float]:
#     #     """Make predictions using the wrapped model.

#     #     :param context: The MLflow context (unused in this implementation).
#     #     :param model_input: Input data for making predictions.
#     #     :return: A dictionary containing the adjusted prediction.
#     #     """
#     #     client = boto3.client('dynamodb',
#     #                                aws_access_key_id=os.environ["aws_access_key_id"],
#     #                                aws_secret_access_key=os.environ["aws_secret_access_key"],
#     #                                region_name=os.environ["region_name"])
        
#     #     parsed = []
#     #     for lookup_id in model_input["Booking_ID"]:
#     #         raw_item = client.get_item(
#     #             TableName='GiridharHotelReservationFeatures',
#     #             Key={'Booking_ID': {'S': lookup_id}})["Item"]     
#     #         parsed_dict = {key: int(value['N']) if 'N' in value else value['S']
#     #                   for key, value in raw_item.items()}
#     #         parsed.append(parsed_dict)
#     #     lookup_df=pd.DataFrame(parsed)
#     #     merged_df = model_input.merge(lookup_df, on="Booking_ID", how="left").drop("Booking_ID", axis=1)
        
#     #     merged_df["no_of_adults"] = merged_df["no_of_adults"].fillna(2)
#     #     merged_df["no_of_children"] = merged_df["no_of_children"].fillna(2)
#     #     merged_df["adults_and_childs"] = merged_df["no_of_adults"] + merged_df["no_of_children"]
#     #     predictions = self.model.predict(merged_df)

#     #     return [int(x) for x in predictions]

In [0]:
import os
import boto3
import pandas as pd
import mlflow.pyfunc
from datetime import datetime

class HousePriceModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, model: object) -> None:
        self.model = model

    def predict(self, context, model_input: pd.DataFrame) -> list[int]:
        # create the DynamoDB client here
        client = boto3.client(
            'dynamodb',
            aws_access_key_id=os.environ["aws_access_key_id"],
            aws_secret_access_key=os.environ["aws_secret_access_key"],
            region_name=os.environ.get("region_name", "eu-west-1"),
        )

        parsed = []
        for lookup_id in model_input["Booking_ID"]:
            resp = client.get_item(
                TableName='GiridharHotelReservationFeatures',
                Key={'Booking_ID': {'S': lookup_id}}
            )
            item = resp.get("Item")
            # if not item:
            #     # default logic or skip
            #     parsed.append({"Booking_ID": lookup_id,
            #                    "no_of_adults": 1,       # default
            #                    "no_of_children": 1})
            # else:
            #     parsed.append({
            #         "Booking_ID": lookup_id,
            #         "no_of_adults": int(item["no_of_adults"]["N"]),
            #         "no_of_children": int(item["no_of_children"]["N"])
            #     })
            # Always produce `based_adults` and `based_childs`
            if not item:
                adults = 1
                childs = 1
            else:
                adults = int(item["no_of_adults"]["N"])
                childs = int(item["no_of_children"]["N"])
                parsed.append({
                    "Booking_ID": lookup_id,
                    "based_adults": adults,
                    "based_childs": childs
                })

        # merge and drop the ID column
        # merged = model_input.merge(lookup_df, on="Booking_ID", how="left").drop("Booking_ID", axis=1)
        lookup_df = pd.DataFrame(parsed)
        merged = (model_input.merge(lookup_df, on="Booking_ID", how="left")
            # .drop("Booking_ID", axis=1)
        )
        # Drop the ID and pass all features + based_* into the model
        X = merged.drop("Booking_ID", axis=1)
        # now run your pipeline
        # preds = self.model.predict(merged)
        preds = self.model.predict(X)
        return list(preds)

In [0]:
custom_model = HousePriceModelWrapper(pipeline)

In [0]:
from pyspark.sql.functions import col

# 25 raw IDs out of first_batch
ids25 = [rec["PutRequest"]["Item"]["Booking_ID"]["S"] for rec in first_batch]


raw_test_df = spark.table(feature_table_name)

matched = train_set.filter(col("Booking_ID").isin(ids25))

print(f"Found {matched.count()} of {len(ids25)} rows:")
matched.show(25, truncate=False)

found_ids = [r.Booking_ID for r in matched.select("Booking_ID").collect()]
missing = set(ids25) - set(found_ids)
print("Missing IDs:", missing)


Found 25 of 25 rows:
+-----------------+--------------------------+------------------+-------------------+------------+--------------+--------------------+-----------------+---------+--------------+----------------------------+------------------------------------+------------------+----------------------+------------+------------+-------------+--------------+----------+-----------------------+-----------------------+-----------------------+
|type_of_meal_plan|required_car_parking_space|room_type_reserved|market_segment_type|no_of_adults|no_of_children|no_of_weekend_nights|no_of_week_nights|lead_time|repeated_guest|no_of_previous_cancellations|no_of_previous_bookings_not_canceled|avg_price_per_room|no_of_special_requests|arrival_date|arrival_year|arrival_month|booking_status|Booking_ID|month_as_sin           |month_as_cos           |update_timestamp_utc   |
+-----------------+--------------------------+------------------+-------------------+------------+--------------+------------------

In [0]:

df_input = matched.toPandas().reset_index(drop=True)


train_cols = set(X_train.columns)
input_cols = set(df_input.columns)
missing = train_cols - input_cols
extra   = input_cols - train_cols
print("Missing columns in df_input:", missing)
print("Extra   columns in df_input:", extra)

Missing columns in df_input: {'based_childs', 'based_adults'}
Extra   columns in df_input: {'booking_status', 'no_of_adults', 'update_timestamp_utc', 'no_of_children', 'month_as_cos', 'Booking_ID', 'month_as_sin'}


In [0]:
# df_input = matched.toPandas().reset_index(drop=True)
# print("Input cols:", df_input.columns.tolist())

In [0]:
# Score
custom_model = HousePriceModelWrapper(pipeline)
preds25 = custom_model.predict(None, df_input)

# df_input["prediction"] = preds25
# display(df_input)

In [0]:
from mlflow.models.signature import infer_signature

# df_input: a small Pandas DataFrame with the three columns your wrapper.predict expects
# preds: the list of predictions from custom_model.predict(None, df_input)

preds25 = custom_model.predict(None, df_input)

# Now infer signature off df_input and its preds
sig = infer_signature(df_input, pd.Series(preds25, name="prediction"))

mlflow.set_experiment("/Shared/giridhar-demo-model-fe-pyfunc")
with mlflow.start_run(run_name="giridhar-demo-run-model-fe-pyfunc") as run:
    mlflow.pyfunc.log_model(
        python_model=custom_model,
        artifact_path="lightgbm-pipeline-model-fe",
        signature=sig
    )
    run_id = run.info.run_id


2025/06/09 17:41:15 INFO mlflow.tracking._tracking_service.client: 🏃 View run giridhar-demo-run-model-fe-pyfunc at: https://dbc-c2e8445d-159d.cloud.databricks.com/ml/experiments/3648643376455240/runs/8cf6087f84fb418a8d6d4172d1686d1f.
2025/06/09 17:41:15 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: https://dbc-c2e8445d-159d.cloud.databricks.com/ml/experiments/3648643376455240.


In [0]:
# set the the train parameters 

train_parameters = {
            "learning_rate": 0.1,
            "n_estimators": 1000,
            "max_depth": 8,
            "num_leaves": 31,
            "min_child_samples": 20,
            "subsample": 0.8,
            "colsample_bytree": 0.8,
            "random_state": 42,
        }


In [0]:
# #log model
# mlflow.set_experiment("/Shared/giridhar-hotres-model-fe-pyfunc")
# with mlflow.start_run(run_name="giridhar-hoteres-model-fe-pyfunc",
#                       tags={"git_sha": "1234567890abcd",
#                             "branch": "week2"},
#                             description="demo run for FE model logging") as run:
#     # Log parameters and metrics
#     run_id = run.info.run_id
#     mlflow.log_param("model_type", "LightGBM with preprocessing")
#     mlflow.log_params(train_parameters)

#     # Log the model
#     signature = infer_signature(model_input=data, model_output=custom_model.predict(context=None, model_input=data))
#     mlflow.pyfunc.log_model(
#                 python_model=custom_model,
#                 artifact_path="lightgbm-pipeline-model-fe",
#                 signature=signature,
#             )
    

In [0]:
# predict
mlflow.models.predict(f"runs:/{run_id}/lightgbm-pipeline-model-fe", df_input[0:1])

2025/06/09 17:41:29 INFO mlflow.models.python_api: Your input data has been transformed to comply with the expected input format for the MLflow scoring server. If you want to deploy the model to online serving, make sure to apply the same preprocessing in your inference client. Please also refer to https://www.mlflow.org/docs/latest/deployment/deploy-model-locally.html#json-input for more details on the supported input format.

Original input data:
  type_of_meal_plan  ...    update_timestamp_utc
0       Meal Plan 1  ... 2025-05-19 18:42:57.888

[1 rows x 22 columns]

Transformed input data:
{"dataframe_split": {"index": [0], "columns": ["type_of_meal_plan", "required_car_parking_space", "room_type_reserved", "market_segment_type", "no_of_adults", "no_of_children", "no_of_weekend_nights", "no_of_week_nights", "lead_time", "repeated_guest", "no_of_previous_cancellations", "no_of_previous_bookings_not_canceled", "avg_price_per_room", "no_of_special_requests", "arrival_date", "arrival_yea

created virtual environment CPython3.11.11.final.0-64 in 358ms
  creator CPython3Posix(dest=/root/.mlflow/envs/mlflow-1a9947897d590dfc2e1c6471b401a21cb035bd17, clear=False, no_vcs_ignore=False, global=False)
  seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/root/.local/share/virtualenv)
    added seed packages: pip==23.2.1, setuptools==68.0.0, wheel==0.41.0
  activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
2025/06/09 17:44:40 INFO mlflow.utils.environment: === Running command '['bash', '-c', 'source /root/.mlflow/envs/mlflow-1a9947897d590dfc2e1c6471b401a21cb035bd17/bin/activate && python -c ""']'
2025/06/09 17:44:40 INFO mlflow.utils.environment: === Running command '['bash', '-c', 'source /root/.mlflow/envs/mlflow-1a9947897d590dfc2e1c6471b401a21cb035bd17/bin/activate && python /local_disk0/.ephemeral_nfs/envs/pythonEnv-9dad1bc5-209a-49be-b7b7-e91b639a028b/lib/python3.11/site-packages/mlflow

{"predictions": ["Not_Canceled"]}