# M5 Forecasting - Model Training and Inference

## Overview
This notebook implements the training pipeline using **Polars** for high-performance data engineering and **LightGBM** for gradient boosting. The strategy involves:

1.  **Data Processing:** Converting raw CSVs to Parquet with strict type casting (Int16/Float32) to minimize memory usage.
2.  **Feature Engineering:** Generating rolling windows and lags dynamically per store to avoid OOM errors.
3.  **Modeling:** Training 10 separate store-level models using **Tweedie loss** to handle the zero-inflated nature of retail sales.
4.  **Inference:** A two-stage prediction process (Validation days 1914-1941 and Evaluation days 1942-1969).

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import polars as pl
import gc
import os
import glob
import numpy as np
import pandas as pd
import lightgbm as lgb

# I assume that you already downloaded the files needed as showed in the EDA.

os.makedirs('./data', exist_ok=True)

pl.scan_csv('calendar.csv').sink_parquet('./data/calendar.parquet')
pl.scan_csv('sell_prices.csv').sink_parquet('./data/sell_prices.parquet')
pl.scan_csv('sales_train_evaluation.csv').sink_parquet('./data/sales_train_evaluation.parquet')

path_calendar = './data/calendar.parquet'
path_prices = './data/sell_prices.parquet'
path_sales = './data/sales_train_evaluation.parquet'
path_base_output = './data/train_features'

PREDICT_DAYS = 28
ROLLING_WINDOWS = [7, 14, 30, 60, 140]
STORES_LIST = ['CA_1', 'CA_2', 'CA_3', 'CA_4', 'TX_1', 'TX_2', 'TX_3', 'WI_1', 'WI_2', 'WI_3']
WEEKS_LAGS = [1, 2, 3, 4]

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
params = {
    'boosting_type': 'gbdt',
    'objective': 'tweedie',
    'tweedie_variance_power': 1.1,
    'metric': 'rmse',
    'subsample': 0.5,
    'subsample_freq': 1,
    'learning_rate': 0.03,
    'num_leaves': 2**11 - 1,
    'min_data_in_leaf': 2**12 - 1,
    'feature_fraction': 0.5,
    'max_bin': 100,
    'boost_from_average': False,
    'verbose': -1,
    'seed': 69,
    'n_jobs': -1,
    'force_col_wise': True
}

best_iters = {
    'CA_1': 450, 'CA_2': 1000, 'CA_3': 650, 'CA_4': 350,
    'TX_1': 150, 'TX_2': 90, 'TX_3': 90,
    'WI_1': 550, 'WI_2': 650, 'WI_3': 200
}

In [None]:
def process_store_week(store_name, min_lag, train_limit_day):

    index_columns = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']
    CURRENT_LAGS = [min_lag + (i * 7) for i in range(7)]
    ROLLING_SHIFT = min_lag

    train_df = (
        pl.scan_parquet(path_sales)
        .filter(pl.col("store_id") == store_name)
        .unpivot(index=index_columns, variable_name='d', value_name='sales')
        .with_columns(
            pl.col("d").str.slice(2).cast(pl.Int16),
            pl.col("sales").cast(pl.Float32)
        )
        .filter(pl.col("d") <= train_limit_day)
    )

    future_days_df = pl.select(
        pl.int_range(
            start=train_limit_day + 1,
            end=train_limit_day + PREDICT_DAYS + 1,
            dtype=pl.Int16,
            eager=False
        ).alias("d")
    ).lazy()

    unique_items_df = train_df.select(index_columns).unique()
    add_test = unique_items_df.join(future_days_df, how="cross")

    cols = train_df.collect_schema().names()
    add_test = add_test.with_columns(pl.lit(None).cast(pl.Float32).alias("sales")).select(cols)
    train_df = pl.concat([train_df, add_test])

    train_df = train_df.with_columns([pl.col(c).cast(pl.Categorical) for c in index_columns])

    release_df = (
        pl.scan_parquet(path_prices)
        .filter(pl.col("store_id") == store_name)
        .with_columns([pl.col("store_id").cast(pl.Categorical), pl.col("item_id").cast(pl.Categorical)])
        .group_by(['store_id', 'item_id'])
        .agg(pl.col('wm_yr_wk').min().alias('release'))
    )
    train_df = train_df.join(release_df, on=['store_id', 'item_id'], how='left')

    icols = ['date', 'd', 'wm_yr_wk', 'event_name_1', 'event_type_1', 'event_name_2', 'event_type_2',
             'snap_CA', 'snap_TX', 'snap_WI', 'year', 'month', 'wday']

    calendar = (
        pl.scan_parquet(path_calendar)
        .select(icols)
        .with_columns(pl.col("d").str.slice(2).cast(pl.Int16))
        .with_columns([
             pl.col('date').str.to_date().alias('dt_date'),
             pl.col(['event_name_1', 'event_type_1', 'event_name_2', 'event_type_2']).cast(pl.Categorical),
             pl.col(['snap_CA', 'snap_TX', 'snap_WI']).cast(pl.Int8),
             pl.col('wm_yr_wk').cast(pl.Int16)
        ])
        .with_columns([
            pl.col('dt_date').dt.day().cast(pl.Int8).alias('tm_d'),
            pl.col('dt_date').dt.week().cast(pl.Int8).alias('tm_w'),
            pl.col('month').cast(pl.Int8).alias('tm_m'),
            (pl.col('year') - pl.col('year').min()).cast(pl.Int8).alias('tm_y'),
            (pl.col('wday') - 1).cast(pl.Int8).alias('tm_dw'),
            (pl.col('dt_date').dt.day() / 7).ceil().cast(pl.Int8).alias('tm_wm'),
            (pl.col('tm_dw') >= 5).cast(pl.Int8).alias('tm_w_end')
        ])
        .drop(['date', 'dt_date', 'year', 'month', 'wday'])
    )
    train_df = train_df.join(calendar, on='d', how='left')
    train_df = train_df.with_columns((pl.col('release') - pl.col('release').min()).cast(pl.Int16))

    calendar_prices = pl.scan_parquet(path_calendar).select(['wm_yr_wk', 'month', 'year']).unique(subset=['wm_yr_wk'])
    prices_df = (
        pl.scan_parquet(path_prices)
        .filter(pl.col("store_id") == store_name)
        .with_columns([
            pl.col("store_id").cast(pl.Categorical), pl.col("item_id").cast(pl.Categorical), pl.col("wm_yr_wk").cast(pl.Int16)
        ])
        .join(calendar_prices, on='wm_yr_wk', how='left')
        .sort(['store_id', 'item_id', 'wm_yr_wk'])
        .with_columns([
            pl.col('sell_price').max().over(['store_id', 'item_id']).alias('price_max'),
            pl.col('sell_price').min().over(['store_id', 'item_id']).alias('price_min'),
            pl.col('sell_price').std().over(['store_id', 'item_id']).alias('price_std'),
            pl.col('sell_price').mean().over(['store_id', 'item_id']).alias('price_mean'),
            pl.col('sell_price').n_unique().over(['store_id', 'item_id']).alias('price_nunique'),
            (pl.col('sell_price') / pl.col('sell_price').shift(1).over(['store_id', 'item_id'])).alias('price_momentum'),
            pl.col('sell_price').mean().over(['store_id', 'item_id', 'month']).alias('price_mean_m'),
            pl.col('sell_price').mean().over(['store_id', 'item_id', 'year']).alias('price_mean_y')
        ])
        .with_columns([
            (pl.col('sell_price') / pl.col('price_max')).alias('price_norm'),
            (pl.col('sell_price') / pl.col('price_mean_m')).alias('price_momentum_m'),
            (pl.col('sell_price') / pl.col('price_mean_y')).alias('price_momentum_y')
        ])
        .drop(['month', 'year', 'price_mean_m', 'price_mean_y'])
    )

    train_df = train_df.join(prices_df, on=['store_id', 'item_id', 'wm_yr_wk'], how='left')

    train_df = train_df.sort(['id', 'd'])

    lag_exprs = [pl.col('sales').shift(l).over('id').cast(pl.Float32).alias(f'lag_{l}') for l in CURRENT_LAGS]
    rolling_mean_exprs = [
        (pl.col('sales').shift(ROLLING_SHIFT).rolling_mean(window_size=w, min_samples=1).over('id').cast(pl.Float32).alias(f'rolling_mean_{w}_shift{ROLLING_SHIFT}'))
        for w in ROLLING_WINDOWS
    ]
    rolling_std_exprs = [
        (pl.col('sales').shift(ROLLING_SHIFT).rolling_std(window_size=w, min_samples=1).over('id').cast(pl.Float32).alias(f'rolling_std_{w}_shift{ROLLING_SHIFT}'))
        for w in ROLLING_WINDOWS
    ]

    target_encoding_groups = [['item_id'], ['dept_id'], ['cat_id'], ['item_id', 'dept_id'], ['store_id', 'cat_id'], ['store_id', 'dept_id']]
    target_enc_exprs = []
    for cols in target_encoding_groups:
        name = '_'.join(cols)
        target_enc_exprs.append(pl.col('sales').mean().over(cols).cast(pl.Float32).alias(f'enc_{name}_mean'))
        target_enc_exprs.append(pl.col('sales').std().over(cols).cast(pl.Float32).alias(f'enc_{name}_std'))

    output_file = f"{path_base_output}_{store_name}_week{int(min_lag/7)}.parquet"
    train_df.with_columns(lag_exprs + rolling_mean_exprs + rolling_std_exprs + target_enc_exprs).sink_parquet(output_file)

    del train_df, prices_df, calendar, release_df
    gc.collect()

In [None]:
def run_feature_generation(end_train_day):
    print(f"Generating features for end day: {end_train_day}")
    weeks_lags = [7, 14, 21, 28]
    with pl.StringCache():
        for store in STORES_LIST:
            for lag in weeks_lags:
                process_store_week(store, min_lag=lag, train_limit_day=end_train_day)

## Training Strategy

We iterate through each store and train utilizing a rolling window approach. Due to memory constraints, features are generated on-the-fly.
*   **Objective:** `tweedie` (variance_power=1.1) captures the sparse distribution of daily sales.
*   **Granularity:** Training is performed at the store level to capture local patterns while maintaining sufficient data density.

#### We train a LightGBM model for each store, and create our submission for each store. We use the tweedie distribution, as we are dealing with sparse datasets. Most sales are zero, as we saw in the EDA.

In [None]:
def train_and_predict(end_train_day, output_path, check_path):
    print(f"Train until {end_train_day}. Predict next 28 days")

    run_feature_generation(end_train_day)

    os.makedirs(check_path, exist_ok=True)
    remove_cols = ['id', 'd', 'sales', 'date', 'wm_yr_wk']
    cat_cols = ['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id',
                'event_name_1', 'event_type_1', 'event_name_2', 'event_type_2',
                'snap_CA', 'snap_TX', 'snap_WI']

    for store_name in STORES_LIST:
        best_it = best_iters.get(store_name)

        for week_num in [1, 2, 3, 4]:
            ckpt_file = f"{check_path}/pred_{store_name}_week{week_num}.parquet"
            if os.path.exists(ckpt_file):
                print(f"{store_name} week {week_num} checkpoint exists, skipping...")
                continue

            file_path = f"{path_base_output}_{store_name}_week{week_num}.parquet"
            print(f"Training {store_name}, Week {week_num}")

            df = pl.read_parquet(file_path)
            features = [c for c in df.columns if c not in remove_cols]
            current_cat_feats = [c for c in features if c in cat_cols and c in df.columns]

            if current_cat_feats:
                df = df.with_columns([pl.col(c).to_physical() for c in current_cat_feats])

            mask_train = (pl.col('d') <= end_train_day)

            X_train = df.filter(mask_train).select(features).to_numpy()
            y_train = df.filter(mask_train).select('sales').to_numpy().flatten()

            train_set = lgb.Dataset(X_train, label=y_train, feature_name=features, categorical_feature=current_cat_feats)
            model = lgb.train(params, train_set, num_boost_round=best_it)

            start_d = end_train_day + (week_num - 1) * 7 + 1
            end_d = end_train_day + week_num * 7

            mask_test = (pl.col('d') >= start_d) & (pl.col('d') <= end_d)
            X_test = df.filter(mask_test).select(features).to_numpy()
            df_test_meta = df.filter(mask_test).select(['id', 'd']).to_pandas()

            if len(X_test) > 0:
                preds = model.predict(X_test)
                df_test_meta['sales'] = preds
                pl.from_pandas(df_test_meta).write_parquet(ckpt_file)

            del df, X_train, y_train, X_test, train_set, model, df_test_meta
            gc.collect()

    print("Consolidating submissions...")

    checkpoint_files = glob.glob(f"{check_path}/*.parquet")

    submission_long = pl.scan_parquet(checkpoint_files).with_columns(pl.col("id").cast(pl.String)).collect().to_pandas()

    submission_long['F_col'] = submission_long['d'].apply(lambda x: f"F{x - end_train_day}")

    submission_final = submission_long.pivot(index='id', columns='F_col', values='sales').reset_index()
    cols_order = ['id'] + [f"F{i}" for i in range(1, 29)]
    submission_final = submission_final.reindex(columns=cols_order, fill_value=0)

    if end_train_day == 1913:
        submission_final['id'] = submission_final['id'].str.replace('_evaluation', '_validation')

    submission_final.to_csv(output_path, index=False)
    print(f"Submission saved to {output_path}")

In [None]:
train_and_predict(
    end_train_day=1913,
    output_path='./data/submission.csv',
    check_path='./data/checkpoints_val'
)

train_and_predict(
    end_train_day=1941,
    output_path='./data/submission2.csv',
    check_path='./data/checkpoints_eval'
)

df_val = pd.read_csv('./data/submission.csv')
df_eval = pd.read_csv('./data/submission2.csv')
submission_final = pd.concat([df_val, df_eval], ignore_index=True)

print(f"Total rows: {len(submission_final)} (Expected: 60980)")
submission_final.to_csv('./data/submission_final.csv', index=False)
print("Submission completed succesfully")