In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import itertools
import multiprocessing
import os
import warnings
import hyperopt
import tsfresh
import numpy as np
import pandas as pd
import altair as alt
import lightgbm as lgb
from hyperopt import fmin, hp, space_eval, STATUS_OK, tpe, Trials
from sklearn.compose import make_column_transformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_tweedie_deviance
from sklearn.model_selection import TimeSeriesSplit
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OrdinalEncoder
from tsfresh import extract_features
from tsfresh.feature_extraction.settings import (
    EfficientFCParameters, 
    MinimalFCParameters,
)
from tsfresh.utilities.dataframe_functions import roll_time_series
from utils.evaluation import calc_eval_metric, WRMSSEEvaluator
from utils.misc import dump_pickle, load_pickle

np.random.seed(42)
warnings.filterwarnings("ignore")
n_jobs = multiprocessing.cpu_count() - 1

The Kaggle dataset was saved in the local directory `~/data/mofc-demand-forecast` in advance.

In [3]:
DATA_PATH = "../../data/mofc-demand-forecast"
MODEL_PATH = "models"
TUNE_PARAMS = False

calendar = pd.read_csv(os.path.join(DATA_PATH, "calendar.csv"))
selling_prices = pd.read_csv(os.path.join(DATA_PATH, "sell_prices.csv"))
# df_train_valid = pd.read_csv(os.path.join(DATA_PATH, "sales_train_validation.csv"))
df_train_eval = pd.read_csv(os.path.join(DATA_PATH, "sales_train_evaluation.csv"))
# sample_submission = pd.read_csv(os.path.join(DATA_PATH, "sample_submission.csv"))

In [4]:
key_ids = ["id", "date"]
all_ids = df_train_eval["id"].unique()
date_names = ["d_" + str(i) for i in range(1, 1942)]
calendar["date"] = pd.to_datetime(calendar["date"])
dates = calendar["date"].unique()
test_steps = 28

key_pairs = list(itertools.product(all_ids, dates))
key_pairs = pd.DataFrame(key_pairs, columns=key_ids)

sample_ratio = 0.01

if sample_ratio == 1.0:
    sampled_ids = all_ids
else:
    sampled_ids = np.random.choice(
        all_ids, round(sample_ratio * len(all_ids)), replace=False
    ).tolist()
    
print(
    f"{len(sampled_ids)} out of {len(all_ids)} IDs were selected for validation and testing."
)

305 out of 30490 IDs were selected for validation and testing.


# Data Preprocessing

In [5]:
sales = df_train_eval[["id"] + date_names]
date_dict = calendar[["date", "d"]].set_index("d").to_dict()["date"]
sales.columns = pd.Series(sales.columns).replace(date_dict)
sales = pd.melt(
    sales,
    id_vars="id",
    value_vars=sales.columns[1:],
    var_name="date",
    value_name="sales",
)

In [6]:
def split_list(lst, n):
    q = len(lst) // n
    chunks = []
    
    for i in range(n):
        if i == n - 1:
            chunks.append(lst[q * i : len(lst)])
        else:
            chunks.append(lst[q * i : q * (i + 1)])
            
    return chunks

In [7]:
%%time
split = date_dict[date_names[-test_steps]]
sales_train = (
    sales[sales["date"] < split].set_index("id").loc[sampled_ids].reset_index()
)

frequencies = [7, 30, 90, 365]
default_fc_parameters = MinimalFCParameters()
# default_fc_parameters = EfficientFCParameters()

chunks = split_list(sampled_ids, 10)

for i, chunk in enumerate(chunks):
    for j, frequency in enumerate(frequencies):
        df_rolled = roll_time_series(
            sales_train.set_index("id").loc[chunk].reset_index(),
            column_id="id",
            column_sort="date",
            max_timeshift=frequency,
            min_timeshift=frequency,
            n_jobs=n_jobs,
            disable_progressbar=False,
        )

        df_extracted = extract_features(
            df_rolled[["id", "date", "sales"]],
            default_fc_parameters=default_fc_parameters,
            column_id="id",
            column_sort="date",
            n_jobs=n_jobs,
            pivot=True,
        )

        df_extracted.columns = df_extracted.columns + f"__D{frequency}"

        if j == 0:
            df_part = df_extracted
        else:
            df_part = df_part.merge(
                df_extracted, left_index=True, right_index=True
            )
            
    if i == 0:
        feat_dynamic_real = df_part
    else:
        feat_dynamic_real = pd.concat([feat_dynamic_real, df_part])

feat_dynamic_real = feat_dynamic_real.reset_index()
feat_dynamic_real.columns = key_ids + feat_dynamic_real.columns[2:].tolist()
feat_dynamic_real = feat_dynamic_real.merge(sales_train, on=key_ids).rename(
    {"sales": "sales__D1"}, axis=1
)

Rolling: 100%|██████████| 25/25 [00:14<00:00,  1.76it/s]
Feature Extraction: 100%|██████████| 25/25 [00:31<00:00,  1.25s/it]
Rolling: 100%|██████████| 25/25 [00:12<00:00,  2.04it/s]
Feature Extraction: 100%|██████████| 25/25 [00:30<00:00,  1.22s/it]
Rolling: 100%|██████████| 25/25 [00:13<00:00,  1.88it/s]
Feature Extraction: 100%|██████████| 25/25 [00:30<00:00,  1.22s/it]
Rolling: 100%|██████████| 25/25 [00:18<00:00,  1.34it/s]
Feature Extraction: 100%|██████████| 25/25 [00:27<00:00,  1.09s/it]
Rolling: 100%|██████████| 25/25 [00:11<00:00,  2.26it/s]
Feature Extraction: 100%|██████████| 25/25 [00:29<00:00,  1.19s/it]
Rolling: 100%|██████████| 25/25 [00:12<00:00,  2.02it/s]
Feature Extraction: 100%|██████████| 25/25 [00:30<00:00,  1.21s/it]
Rolling: 100%|██████████| 25/25 [00:13<00:00,  1.87it/s]
Feature Extraction: 100%|██████████| 25/25 [00:29<00:00,  1.19s/it]
Rolling: 100%|██████████| 25/25 [00:18<00:00,  1.36it/s]
Feature Extraction: 100%|██████████| 25/25 [00:27<00:00,  1.09s/it]


CPU times: user 27min 22s, sys: 2min 32s, total: 29min 55s
Wall time: 35min 31s


In [8]:
prices = (
    df_train_eval[["id", "store_id", "item_id"]]
    .merge(selling_prices, how="left")
    .drop(["store_id", "item_id"], axis=1)
)
week_to_date = calendar[["date", "wm_yr_wk"]].drop_duplicates()
prices = week_to_date.merge(prices, how="left").drop(
    ["wm_yr_wk"], axis=1
)

snap = calendar[["date", "snap_CA", "snap_TX", "snap_WI"]]
snap.columns = ["date", "CA", "TX", "WI"]
snap = pd.melt(
    snap,
    id_vars="date",
    value_vars=["CA", "TX", "WI"],
    var_name="state_id",
    value_name="snap",
)
snap = key_pairs.merge(df_train_eval[["id", "state_id"]], how="left").merge(
    snap, on=["date", "state_id"], how="left"
).drop(["state_id"], axis=1)

feat_dynamic_real = feat_dynamic_real.merge(prices, on=key_ids).merge(
    snap, on=key_ids
)

num_feature_names = feat_dynamic_real.columns.difference(key_ids)

In [9]:
feat_dynamic_cat = calendar[
    [
        "date",
        "wday",
        "month",
        "event_name_1",
        "event_type_1",
        "event_name_2",
        "event_type_2",
    ]
]
feat_dynamic_cat["day"] = (
    feat_dynamic_cat["date"].astype("str").map(lambda x: int(x.split("-")[2]))
)

feat_static_cat = df_train_eval[
    ["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
]

cat_feature_names = feat_dynamic_cat.columns
cat_feature_names = cat_feature_names.union(feat_static_cat.columns).difference(key_ids)

In [10]:
df_train = feat_dynamic_real.merge(feat_dynamic_cat).merge(feat_static_cat)
df_train["date"] = df_train["date"].map(lambda x: x + pd.Timedelta("1 days"))
df_recur = df_train[df_train["date"] == split].set_index(key_ids)
df_train = df_train.merge(sales_train, on=key_ids)

all_feature_names = num_feature_names.union(cat_feature_names)
feature_names_to_remove = all_feature_names[
    (df_train[all_feature_names].isna().sum() / df_train.shape[0] == 1.0)
    | (df_train[all_feature_names].std() == 0.0)
]
num_feature_names = num_feature_names.difference(feature_names_to_remove)
cat_feature_names = cat_feature_names.difference(feature_names_to_remove)
all_feature_names = all_feature_names.difference(feature_names_to_remove)

train_dates = df_train["date"].unique()
df_x_train = (
    df_train[key_ids + all_feature_names.difference(["sales"]).tolist()]
    .set_index(key_ids)
    .swaplevel()
)
df_y_train = df_train[key_ids + ["sales"]].set_index(key_ids).swaplevel()

# Hyperparameter Tuning

In [11]:
def objective(args):
    global n_jobs
    global train_dates
    global df_x_train, df_y_train
    global all_feature_names, num_feature_names, cat_feature_names

    tscv = TimeSeriesSplit(n_splits=3)
    cat_pipeline = make_pipeline(
        SimpleImputer(strategy="constant", fill_value="<unknown>"),
        OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1),
    )
    num_pipeline = SimpleImputer(strategy="median")
    processor = make_column_transformer(
        (cat_pipeline, cat_feature_names), (num_pipeline, num_feature_names)
    )

    default_params = {
        "objective": "tweedie",
        "num_threads": n_jobs,
        "device_type": "cpu",
        "seed": 42,
        "force_col_wise": True,
        "max_cat_threshold": 32,
        "verbosity": 1,
        "max_bin_by_feature": None,
        "min_data_in_bin": 3,
        "feature_pre_filter": True,
        "tweedie_variance_power": 1.5,
        "metric": "tweedie",
    }
    params = {
        "boosting": args["boosting"],
        "learning_rate": args["learning_rate"],
        "num_iterations": int(args["num_iterations"]),
        "num_leaves": int(args["num_leaves"]),
        "max_depth": int(args["max_depth"]),
        "min_data_in_leaf": int(args["min_data_in_leaf"]),
        "min_sum_hessian_in_leaf": args["min_sum_hessian_in_leaf"],
        "bagging_fraction": args["bagging_fraction"],
        "bagging_freq": int(args["bagging_freq"]),
        "feature_fraction": args["feature_fraction"],
        "extra_trees": args["extra_trees"],
        "lambda_l1": args["lambda_l1"],
        "lambda_l2": args["lambda_l2"],
        "path_smooth": args["path_smooth"],
        "max_bin": int(args["max_bin"]),
    }
    default_params.update(params)

    losses = []

    for train_index, test_index in tscv.split(train_dates):
        dtrain = df_x_train.loc[train_dates[train_index], :]
        dvalid = df_x_train.loc[train_dates[test_index], :]

        dtrain = processor.fit_transform(dtrain)
        dvalid = processor.transform(dvalid)
        dtrain = lgb.Dataset(dtrain, label=df_y_train.loc[train_dates[train_index], :])

        model = lgb.train(
            default_params,
            dtrain,
            feature_name=all_feature_names.tolist(),
            categorical_feature=cat_feature_names.tolist(),
            verbose_eval=False,
        )

        y_true = df_y_train.loc[train_dates[test_index], :]
        y_pred = model.predict(dvalid)
        loss = mean_tweedie_deviance(
            y_true, y_pred, power=default_params["tweedie_variance_power"]
        )

        losses.append(loss)

    return {"loss": np.mean(losses), "status": STATUS_OK}

In [12]:
%%time
%%capture
if TUNE_PARAMS:
    space = {
        "boosting": hp.pchoice("boosting", [(0.75, "gbdt"), (0.25, "dart")]),
        "learning_rate": 10 ** hp.uniform("learning_rate", -2, 0),
        "num_iterations": hp.quniform("num_iterations", 1, 1000, 1),
        "num_leaves": 2 ** hp.uniform("num_leaves", 1, 8),
        "max_depth": -1,
        "min_data_in_leaf": 2 * 10 ** hp.uniform("min_data_in_leaf", 0, 2),
        "min_sum_hessian_in_leaf": hp.uniform("min_sum_hessian_in_leaf", 1e-4, 1e-2),
        "bagging_fraction": hp.uniform("bagging_fraction", 0.5, 1.0),
        "bagging_freq": hp.qlognormal("bagging_freq", 0.0, 1.0, 1),
        "feature_fraction": hp.uniform("feature_fraction", 0.5, 1.0),
        "extra_trees": hp.pchoice("extra_trees", [(0.75, False), (0.25, True)]),
        "lambda_l1": hp.lognormal("lambda_l1", 0.0, 1.0),
        "lambda_l2": hp.lognormal("lambda_l2", 0.0, 1.0),
        "path_smooth": hp.lognormal("path_smooth", 0.0, 1.0),
        "max_bin": 2 ** hp.quniform("max_bin", 6, 10, 1) - 1,
    }

    trials = Trials()

    best = fmin(
        objective,
        space=space,
        algo=tpe.suggest,
        max_evals=100,
        trials=trials,
    )

    best_params = space_eval(space, best)
    best_params["num_iterations"] = int(best_params["num_iterations"])
    best_params["num_leaves"] = int(best_params["num_leaves"])
    best_params["min_data_in_leaf"] = int(best_params["min_data_in_leaf"])
    best_params["bagging_freq"] = int(best_params["bagging_freq"])
    best_params["max_bin"] = int(best_params["max_bin"])

    os.makedirs(os.path.join(MODEL_PATH, "lgb"), exist_ok=True)
    dump_pickle(os.path.join(MODEL_PATH, "lgb", "best_params.pkl"), best_params)
    
else:
    best_params = load_pickle(os.path.join(MODEL_PATH, "lgb", "best_params.pkl"))

CPU times: user 2h 23min 32s, sys: 1min 14s, total: 2h 24min 47s
Wall time: 35min


# Model Validation

In [13]:
%%capture
cat_pipeline = make_pipeline(
    SimpleImputer(strategy="constant", fill_value="<unknown>"),
    OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1),
)
num_pipeline = SimpleImputer(strategy="median")
processor = make_column_transformer(
    (cat_pipeline, cat_feature_names), (num_pipeline, num_feature_names)
)

default_params = {
    "objective": "tweedie",
    "num_threads": n_jobs,
    "device_type": "cpu",
    "seed": 42,
    "force_col_wise": True,
    "max_cat_threshold": 32,
    "verbosity": 1,
    "max_bin_by_feature": None,
    "min_data_in_bin": 3,
    "feature_pre_filter": True,
    "tweedie_variance_power": 1.5,
    "metric": "tweedie",
}
default_params.update(best_params)

dtrain = processor.fit_transform(df_x_train)
dtrain = lgb.Dataset(dtrain, label=df_y_train)

model = lgb.train(
    default_params,
    dtrain,
    feature_name=all_feature_names.tolist(),
    categorical_feature=cat_feature_names.tolist(),
    verbose_eval=False,
)

In [14]:
%%time
df_test = df_recur.copy()
sales_pred = sales_train.copy()

for i in range(test_steps):
    dtest = processor.transform(df_test[all_feature_names])
    y_pred = model.predict(dtest)

    sales_recur = pd.DataFrame(
        y_pred, columns=["sales"], index=df_test.index
    ).reset_index()
    sales_pred = pd.concat([sales_pred, sales_recur])
    pred_dates = sales_pred["date"].unique()

    for j, frequency in enumerate(frequencies):
        df_extracted = extract_features(
            sales_pred.set_index("date").loc[pred_dates[-frequency:], :].reset_index(),
            default_fc_parameters=default_fc_parameters,
            column_id="id",
            column_sort="date",
            n_jobs=n_jobs,
            disable_progressbar=False,
        )

        df_extracted.columns = df_extracted.columns + f"__D{frequency}"

        if j == 0:
            feat_dynamic_real = df_extracted
        else:
            feat_dynamic_real = feat_dynamic_real.merge(
                df_extracted, left_index=True, right_index=True
            )

    feat_dynamic_real = feat_dynamic_real.reset_index()
    feat_dynamic_real.columns = ["id"] + feat_dynamic_real.columns[1:].tolist()
    feat_dynamic_real = feat_dynamic_real.merge(sales_recur[["id", "sales"]]).rename(
        {"sales": "sales__D1"}, axis=1
    )
    feat_dynamic_real["date"] = pred_dates[-1] + pd.Timedelta("1 days")

    feat_dynamic_real = feat_dynamic_real.merge(prices, on=key_ids).merge(
        snap, on=key_ids
    )

    df_test = (
        feat_dynamic_real.merge(feat_dynamic_cat)
        .merge(feat_static_cat)
        .set_index(key_ids)
    )

Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 136.69it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 136.87it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 159.12it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 135.86it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 160.86it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 201.22it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 135.17it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 131.46it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 162.28it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 205.25it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 146.11it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 136.84it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 157.70it/s]
Feature Extraction: 100%|██████████| 24/24 [00:00<00:00, 170.46it/s]
Feature Extraction: 100%|█████████

CPU times: user 8min 4s, sys: 3min 31s, total: 11min 36s
Wall time: 11min 29s


In [15]:
key_names = ["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
df_sampled = df_train_eval.set_index("id").loc[sampled_ids].reset_index()
df_train_sampled = df_sampled.loc[:, key_names + date_names[:-test_steps]]
df_test_sampled = df_sampled.loc[:, date_names[-test_steps:]]

evaluator = WRMSSEEvaluator(
    df_train_sampled, df_test_sampled, calendar, selling_prices, test_steps
)

  0%|          | 0/12 [00:00<?, ?it/s]

In [16]:
predictions = sales_pred[sales_pred["date"] > dates[-2 * test_steps - 1]]
predictions = predictions.pivot(index="date", columns="id", values="sales")

df_pred_sampled = predictions.T
df_pred_sampled = df_pred_sampled.loc[sampled_ids]
df_pred_sampled.columns = df_test_sampled.columns
df_pred_sampled.index = range(len(sampled_ids))

wrmsse = evaluator.score(df_pred_sampled)
eval_metrics = calc_eval_metric(df_test_sampled, df_pred_sampled)

print(f"LightGBM WRMSSE: {wrmsse:.6f}")
display(eval_metrics.describe())

LightGBM WRMSSE: 0.905456


Unnamed: 0,mae,rmse,smape,mase
count,305.0,305.0,305.0,303.0
mean,0.91555,1.181758,1.403536,0.983602
std,0.845065,1.108177,0.476744,0.377991
min,0.127039,0.196797,0.300186,0.555676
25%,0.441045,0.556466,0.979287,0.755983
50%,0.684438,0.864121,1.562656,0.878861
75%,1.10153,1.409298,1.835181,1.080003
max,9.503409,12.527784,2.0,3.113883


In [17]:
def plot_forecast(source, test_steps, plot_id=None, model_name=None, start_date=None):
    if start_date is not None:
        source = source[source["time"] >= start_date]

    points = (
        alt.Chart(source)
        .mark_circle(size=10.0, color="#000000")
        .encode(
            x=alt.X("time:T", axis=alt.Axis(title="Date")),
            y=alt.Y("y", axis=alt.Axis(title="Demand")),
            tooltip=["time:T", "y:Q"],
        )
    )

    line = (
        alt.Chart(source)
        .mark_line(size=1.0, color="#4267B2")
        .encode(
            x="time:T",
            y="fcst",
        )
    )

    band = (
        alt.Chart(source)
        .mark_area(opacity=0.25, color="#4267B2")
        .encode(
            x="time:T",
            y="fcst_lower",
            y2="fcst_upper",
        )
    )

    rule = (
        alt.Chart(source[["time"]].iloc[-test_steps : -test_steps + 1])
        .mark_rule(size=1.0, color="#FF0000", strokeDash=[2, 2])
        .encode(x="time:T")
    )

    title = "Demand Forecast"
    if plot_id is not None:
        title += f" for '{plot_id}'"
    if model_name is not None:
        title = f"{model_name}: " + title

    return (points + line + band + rule).properties(title=title, width=1000, height=300)

In [18]:
best_perf_indices = eval_metrics["smape"].dropna().sort_values()[:3].index
plots = []

for index in best_perf_indices:
    plot_id = pd.Series(sampled_ids).iloc[index]

    y = (df_train_eval[df_train_eval["id"] == plot_id].loc[:, date_names]).T
    y.columns = ["y"]
    y = calendar.merge(y, left_on="d", right_index=True)[["date", "y"]]
    y["time"] = pd.to_datetime(y["date"])

    source = y.merge(predictions[plot_id].reset_index(), how="left").drop(["date"], axis=1)
    source.columns = ["y", "time", "fcst"]
    source["fcst_lower"] = np.nan
    source["fcst_upper"] = np.nan

    p = plot_forecast(
        source, test_steps, plot_id=plot_id, model_name="LightGBM", start_date="2015-05-23"
    )
    
    plots.append(p)
    
alt.VConcatChart(vconcat=plots)