In [0]:
# import sys
# from pathlib import Path
# sys.path
# import sys
# sys.path.append(str(Path.cwd().parent / 'src'))

In [0]:
import numpy as np
import pandas as pd
import mlflow
import pyspark.sql.functions as F
from sklearn.preprocessing import LabelEncoder
from databricks.connect import DatabricksSession
from src.pricing.config import ProjectConfig
from src.pricing.data_processor import DataProcessor, FeatureProducer
from src.pricing.models.baseline_model import LGBMModel
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

spark = DatabricksSession.builder.getOrCreate()
config = ProjectConfig.from_yaml(config_path="../project_config.yml", env="dev")

# Enable automatic reloading of modules
%load_ext autoreload
%autoreload 2

In [0]:
fe = FeatureEngineeringClient()
# data_processor = DataProcessor(config, spark)
# df_raw_sales = data_processor.preprocess()

# # 2. Instantiate and run the FeatureProducer
# feature_producer = FeatureProducer(config, spark)
# df_features_to_publish = feature_producer.generate_features_spark(df_raw_sales)

# Define the Feature Store table path
feature_table_name = f"{config.catalog_name}.{config.schema_name}.price_features"
training_label_cols = ["id", "date", "demand", "item_id", "store_id", "dept_id", "cat_id", "state_id", 
                       "item_store_id"] # need this as a lookup

training_set_labels = spark.sql(f"select distinct {','.join(training_label_cols)} from {feature_table_name}")

feature_names = ['wday', 'month', 'year', 'dayofweek', 'dayofyear', 'week', 'snap_CA', 'snap_TX', 'snap_WI', 
                 'sell_price', 'days_since_first_sale', 'is_event', 'lag_t7', 'rolling_mean_lag7_w7', 'lag_t28', 'rolling_mean_lag28_w7', 'item_running_avg', 'store_running_avg']

feature_lookups = [
    FeatureLookup(
        table_name=feature_table_name,
        feature_names=feature_names,
        lookup_key="item_store_id",
        timestamp_lookup_key="date"
    )
]

training_set = fe.create_training_set(
    df=training_set_labels,
    feature_lookups=feature_lookups,
    #exclude_columns=["item_store_id"],
    label="demand",
)

# Get the final Spark DataFrame for training
training_df = training_set.load_df()

# sample of data
selected_items = [row.item_id for row in training_df.select("item_id").distinct().limit(10).collect()]
selected_stores = [row.store_id for row in training_df.select("store_id").distinct().limit(10).collect()]
training_df = training_df.filter(
    (F.col("item_id").isin(selected_items)) &
    (F.col("store_id").isin(selected_stores))
)

# Run the cross-validation loop on the final training DataFrame
training_df_pandas = training_df.toPandas()
training_df_pandas.dropna(inplace=True)

# label encoding for categorical data
le = LabelEncoder()
categorical_cols = ['item_id', 'store_id', 'dept_id', 'cat_id', 'state_id']

for col in categorical_cols:
    # Check if the column exists to prevent errors
    if col in training_df_pandas.columns:
        # Fit and transform the data in place
        training_df_pandas[col] = le.fit_transform(training_df_pandas[col])

training_df_pandas.head()

In [0]:
# Instantiate and train the model
lgbm_params = {
    'objective': 'poisson',
    'metric': 'rmse',
    'n_estimators': 2_000,
    'learning_rate': 0.05,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 1,
    'verbose': -1,
    'n_jobs': -1,
    'seed': 42
}

demand_model = LGBMModel(spark=spark, config=config, training_df=training_df_pandas, params=lgbm_params, model_type='forecast')
demand_model.train_with_cv()

In [0]:
demand_model.log_model()

In [0]:
X_train.head()