# Building Advanced Pipelines

This goal of this notebook is to provide examples of advanced pipeline functionality that I find useful on most all machine learning projects. You can use this notebook as a utility script and import the pipeline components for use in your modeling notebooks.

In [1]:
%%bash

pip install mlforecast hierarchicalforecast

Collecting mlforecast
  Downloading mlforecast-1.0.2-py3-none-any.whl.metadata (13 kB)
Collecting hierarchicalforecast
  Downloading hierarchicalforecast-1.3.1-py3-none-any.whl.metadata (11 kB)
Collecting coreforecast>=0.0.15 (from mlforecast)
  Downloading coreforecast-0.0.16-cp312-cp312-macosx_11_0_arm64.whl.metadata (3.7 kB)
Collecting utilsforecast>=0.2.9 (from mlforecast)
  Downloading utilsforecast-0.2.15-py3-none-any.whl.metadata (5.4 kB)
Collecting qpsolvers[clarabel] (from hierarchicalforecast)
  Downloading qpsolvers-4.8.2-py3-none-any.whl.metadata (12 kB)
Downloading mlforecast-1.0.2-py3-none-any.whl (72 kB)
Downloading hierarchicalforecast-1.3.1-py3-none-any.whl (54 kB)
Downloading coreforecast-0.0.16-cp312-cp312-macosx_11_0_arm64.whl (225 kB)
Downloading utilsforecast-0.2.15-py3-none-any.whl (40 kB)
Downloading qpsolvers-4.8.2-py3-none-any.whl (92 kB)
Installing collected packages: coreforecast, qpsolvers, utilsforecast, mlforecast, hierarchicalforecast
[2K   [90m━━━━━━━

In [2]:
import numpy as np
import pandas as pd

# from mlforecast.target_transforms import BaseTargetTransform

from pandas.tseries.offsets import Day

from sklearn.compose import make_column_selector, ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder

from statsmodels.tsa.deterministic import CalendarFourier, DeterministicProcess

# Functions for loading CSV files as DataFrames

In [None]:
_start = pd.Timestamp(2013, 1, 1)
_end = pd.Timestamp(2017, 8, 31)
_CUTOFF = pd.Timestamp(2017, 8, 15)

_TRAIN_DATETIME_INDEX = pd.date_range(_start, _CUTOFF, freq="D", name="date")

_TEST_DATETIME_INDEX = pd.date_range(_CUTOFF + (1 * Day()), _end, freq="D", name="date")

_COMBINED_DATETIME_INDEX = pd.date_range(_start, _end, freq="D", name="date")


def load_train_df(
    filepath="/kaggle/input/store-sales-time-series-forecasting/train.csv", onpromotion=False
) -> pd.DataFrame:

    if onpromotion:
        _usecols = ["id", "store_nbr", "family", "date", "sales", "onpromotion"]
    else:
        _usecols = ["id", "store_nbr", "family", "date", "sales"]

    df = (
        pd.read_csv(
            filepath,
            usecols=_usecols,
            parse_dates=["date"],
            dtype={
                "store_nbr": str,
                "sales": np.float32,
            },
        )
        .pipe(lambda df: df.assign(family=df.loc[:, "family"].str.replace("/", " OR ")))
        .set_index(["store_nbr", "family", "date"])
        .reindex(_TRAIN_DATETIME_INDEX, level="date")
        .sort_index(axis=0)
        .sort_index(axis=1)
    )

    return df


def load_test_df(
    filepath="/kaggle/input/store-sales-time-series-forecasting/test.csv", onpromotion=False
) -> pd.DataFrame:

    if onpromotion:
        _usecols = ["id", "store_nbr", "family", "date", "onpromotion"]
    else:
        _usecols = ["id", "store_nbr", "family", "date"]

    df = (
        pd.read_csv(
            filepath,
            usecols=_usecols,
            parse_dates=["date"],
            dtype={
                "store_nbr": str,
            },
        )
        .pipe(lambda df: df.assign(family=df.loc[:, "family"].str.replace("/", " OR ")))
        .set_index(["store_nbr", "family", "date"])
        .reindex(_TEST_DATETIME_INDEX, level="date")
        .sort_index(axis=0)
        .sort_index(axis=1)
    )

    return df


def load_transactions_df(filepath="/kaggle/input/store-sales-time-series-forecasting/transactions.csv") -> pd.DataFrame:

    df = (
        pd.read_csv(
            filepath,
            parse_dates=["date"],
            dtype={
                "store_nbr": str,
                "transactions": np.float32,
            },
        )
        .set_index(["store_nbr", "date"])
        .reindex(_TRAIN_DATETIME_INDEX, level="date")
        .sort_index(axis=0)
        .sort_index(axis=1)
    )

    return df


def load_oil_df(filepath="/kaggle/input/store-sales-time-series-forecasting/oil.csv") -> pd.DataFrame:

    df = (
        pd.read_csv(
            filepath,
            parse_dates=["date"],
            dtype={
                "dcoilwtico": np.float32,
            },
        )
        .set_index("date")
        .reindex(_COMBINED_DATETIME_INDEX, level="date")
        .sort_index(axis=0)
    )

    return df


def load_stores_df(filepath="/kaggle/input/store-sales-time-series-forecasting/stores.csv") -> pd.DataFrame:

    df = (
        pd.read_csv(
            filepath,
            index_col=["store_nbr"],
            dtype={
                "store_nbr": str,
                "cluster": str,
            },
        )
        .rename(columns={"type": "store_type", "cluster": "store_cluster"})
        .assign(country="Ecuador")
        .sort_index(axis=0)
        .sort_index(axis=1)
    )

    return df


def load_holidays_events_df(
    filepath="/kaggle/input/store-sales-time-series-forecasting/holidays_events.csv",
) -> pd.DataFrame:

    df = (
        pd.read_csv(
            filepath,
            parse_dates=["date"],
        )
        .replace({"2013-04-29": pd.to_datetime("2013-03-29")})  # 'Good Friday' mistake correction
        .sort_index(axis=0)
        .sort_index(axis=1)
    )

    return df

# Data Preprocessing Pipeline

## Pipeline for cleaning the training data

The quality of the data prior to mid-2015 is poor for many of the data series so I decided to drop it; many of the remaining series have significant numbers of leading zero values for sales. These leading zero sales values are better coded as missing values. I replace all the leading zero values for sales and then drop all the missing values.

In [None]:
def _all_false_mask(df):
    mask_df = pd.DataFrame(False, columns=df.columns, index=df.index)
    return mask_df


def _create_leading_zero_sales_mask(df):
    mask_df = _all_false_mask(df)
    sales_all_zero = df.loc[:, "sales"].eq(0).all()
    if not sales_all_zero:
        mask_df["sales"] = df.loc[:, "sales"].eq(0).cummin()
    return mask_df


def _drop_leading_zero_sales(df):
    masked_df = _mask_leading_zero_sales(df)
    without_leading_zeros_df = masked_df.dropna(how="any", subset=["sales"], axis=0)
    return without_leading_zeros_df


def _mask_leading_zero_sales(df):
    cond = df.groupby(["store_nbr", "family"], group_keys=False).apply(lambda df: _create_leading_zero_sales_mask(df))
    masked_df = df.mask(cond, np.nan)
    return masked_df


def _use_only_recent_data(df, start):
    recent_df = df.loc[pd.IndexSlice[:, :, start:]]
    return recent_df


def make_data_cleaning_pipeline(start="2015-07-01", verbose=True) -> Pipeline:
    pipeline = Pipeline(
        steps=[
            (f"Use only data after {start}", FunctionTransformer(func=_use_only_recent_data, kw_args={"start": start})),
            (
                "Drop leading zero sales values",
                FunctionTransformer(
                    func=_drop_leading_zero_sales,
                ),
            ),
        ],
        verbose=verbose,
    ).set_output(transform="pandas")

    return pipeline

## Pipeline for joining the data sets

After cleaning the training data, I join all the `test.csv`, `stores.csv`, `oil.csv`, and `transactions.csv`. I leave the `holidays_events.csv` alone for now as I will add it later during the feature engineering step. 

In [None]:
def _load_and_join_oil_df(df, filepath):
    oil_df = load_oil_df(filepath)
    joined_df = df.join(oil_df, how="left", on=["date"]).sort_index(axis=0).sort_index(axis=1)
    return joined_df


def _load_and_join_stores_df(df, filepath):
    stores_df = load_stores_df(filepath)
    joined_df = df.join(stores_df, how="left", on=["store_nbr"]).sort_index(axis=0).sort_index(axis=1)
    return joined_df


def _load_and_concat_test_df(df, filepath, onpromotion):
    test_df = load_test_df(filepath, onpromotion)
    concat_df = pd.concat(
        [df, test_df],
        axis=0,
        sort=True,
    ).sort_index(axis=0)
    return concat_df


def _load_and_join_transactions_df(df, filepath):
    transactions_df = load_transactions_df(filepath)
    joined_df = df.join(transactions_df, how="left", on=["store_nbr", "date"]).sort_index(axis=0).sort_index(axis=1)
    return joined_df


def _make_oil_joining_pipeline(filepath, verbose):
    pipeline = Pipeline(
        steps=[
            ("Load and join the data", FunctionTransformer(func=_load_and_join_oil_df, kw_args={"filepath": filepath})),
            (
                "Impute missing values",
                ColumnTransformer(
                    transformers=[
                        (
                            "Forward fill missing dcoilwtico values",
                            FunctionTransformer(lambda df: df.fillna(method="ffill")),
                            ["dcoilwtico"],
                        ),
                    ],
                    n_jobs=1,
                    remainder="passthrough",
                    verbose=verbose,
                    verbose_feature_names_out=False,
                ),
            ),
        ],
        verbose=verbose,
    )
    return pipeline


def _make_transactions_joining_pipeline(filepath, verbose):
    pipeline = Pipeline(
        steps=[
            (
                "Load and join the data",
                FunctionTransformer(func=_load_and_join_transactions_df, kw_args={"filepath": filepath}),
            ),
            (
                "Impute missing values",
                ColumnTransformer(
                    transformers=[
                        (
                            "Fill missing transactions with zeros",
                            SimpleImputer(strategy="constant", fill_value=0.0),
                            ["transactions"],
                        )
                    ],
                    n_jobs=1,
                    remainder="passthrough",
                    verbose=verbose,
                    verbose_feature_names_out=False,
                ),
            ),
        ],
        verbose=verbose,
    )
    return pipeline


def _make_data_joining_pipeline(
    use_future_onpromotion, use_future_dcoilwtico, concat_test, join_oil, join_stores, join_transactions, verbose
):

    steps = []
    if join_transactions:
        _pipeline = _make_transactions_joining_pipeline(
            "/kaggle/input/store-sales-time-series-forecasting/transactions.csv", verbose
        )
        steps.append(("Load and join the transactions data", _pipeline))

    # if joining oil and also concatenating the test data, pipeline steps
    # order depends on whether you wish to use future dcoilwtico data.
    if join_oil and concat_test:
        _pipeline = _make_oil_joining_pipeline("/kaggle/input/store-sales-time-series-forecasting/oil.csv", verbose)

        if use_future_dcoilwtico:
            steps.extend(
                [
                    (
                        "Load and concatenate the test data",
                        FunctionTransformer(
                            func=_load_and_concat_test_df,
                            kw_args={
                                "filepath": "/kaggle/input/store-sales-time-series-forecasting/test.csv",
                                "onpromotion": use_future_onpromotion,
                            },
                        ),
                    ),
                    ("Load and join the oil price data", _pipeline),
                ]
            )
        else:
            steps.extend(
                [
                    ("Load and join the oil price data", _pipeline),
                    (
                        "Load and concatenate the test data",
                        FunctionTransformer(
                            func=_load_and_concat_test_df,
                            kw_args={
                                "filepath": "/kaggle/input/store-sales-time-series-forecasting/test.csv",
                                "onpromotion": use_future_onpromotion,
                            },
                        ),
                    ),
                ]
            )
    elif join_oil and not concat_test:
        steps.append(
            ("Load and join the oil price data", _pipeline),
        )
    elif not join_oil and concat_test:
        steps.append(
            (
                "Load and concatenate the test data",
                FunctionTransformer(
                    func=_load_and_concat_test_df,
                    kw_args={
                        "filepath": "/kaggle/input/store-sales-time-series-forecasting/test.csv",
                        "onpromotion": use_future_onpromotion,
                    },
                ),
            ),
        )
    # if not joining oil and not concatenating test data, then just move on
    # to joining the stores data.
    else:
        pass

    if join_stores:
        steps.append(
            (
                "Load and join the stores data",
                FunctionTransformer(
                    func=_load_and_join_stores_df,
                    kw_args={"filepath": "/kaggle/input/store-sales-time-series-forecasting/stores.csv"},
                ),
            )
        )

    pipeline = Pipeline(steps, verbose=verbose).set_output(transform="pandas")

    return pipeline

## Full preprocessing pipeline

The full preprocessing pipeline combines the cleaning steps, the joining steps, with a final step which clips outliers.

In [None]:
def _clip_outliers(df, q):
    expanding_quantiles_df = df.groupby(level=["store_nbr", "family"], group_keys=False).apply(
        lambda _: _compute_expanding_quantiles(_, q)
    )
    mask = _all_false_mask(df)
    is_outlier = df.gt(expanding_quantiles_df).to_dict()
    cond = mask.assign(**is_outlier)
    clipped_df = df.mask(cond, expanding_quantiles_df)
    return clipped_df


def _compute_expanding_quantiles(df, q):
    expanding_quantiles = df.expanding(min_periods=1).quantile(q).to_dict()
    expanding_quantile_df = df.assign(**expanding_quantiles).astype(np.float32)
    return expanding_quantile_df


def _make_clip_outliers_transformer(onpromotion, dcoilwtico, transactions, q, n_jobs, verbose):

    transformers = [("Clip sales outliers", FunctionTransformer(func=_clip_outliers, kw_args={"q": q}), ["sales"])]

    if onpromotion:
        transformers.append(
            ("Clip onpromotion outliers", FunctionTransformer(func=_clip_outliers, kw_args={"q": q}), ["onpromotion"])
        )

    if dcoilwtico:
        transformers.append(
            ("Clip dcoilwtico outliers", FunctionTransformer(func=_clip_outliers, kw_args={"q": q}), ["dcoilwtico"])
        )

    if transactions:
        transformers.append(
            ("Clip transactions outliers", FunctionTransformer(func=_clip_outliers, kw_args={"q": q}), ["transactions"])
        )

    transformer = ColumnTransformer(
        transformers, n_jobs=n_jobs, remainder="passthrough", verbose=verbose, verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer


def make_data_preprocessing_pipeline(
    use_future_onpromotion=False,
    use_future_dcoilwtico=False,
    concat_test=True,
    join_oil=True,
    join_stores=True,
    join_transactions=True,
    start="20150701",
    q=0.99,
    n_jobs=-1,
    verbose=True,
) -> Pipeline:

    pipeline = Pipeline(
        steps=[
            ("Clean the training data", make_data_cleaning_pipeline(start, verbose)),
            (
                "Join the additional data sets",
                _make_data_joining_pipeline(
                    use_future_onpromotion,
                    use_future_dcoilwtico,
                    concat_test,
                    join_oil,
                    join_stores,
                    join_transactions,
                    verbose,
                ),
            ),
            (
                "Clip outliers",
                _make_clip_outliers_transformer(
                    use_future_onpromotion, join_oil, join_transactions, q, n_jobs, verbose
                ),
            ),
            ("Sort the columns", FunctionTransformer(func=lambda df: df.sort_index(axis=1))),
        ],
        verbose=verbose,
    ).set_output(transform="pandas")

    return pipeline

## Example usage

Un-comment and run the code below to see the data preprocessing pipeline in action!

In [None]:
# _train_df = load_train_df(
#    "/kaggle/input/store-sales-time-series-forecasting/train.csv",
#    onpromotion=True,
# )
# _pipeline = make_data_preprocessing_pipeline(
#    use_future_onpromotion=True,
#    use_future_dcoilwtico=True,
# )
# _preprocessed_df = _pipeline.fit_transform(_train_df)

Note that the future values for `dcoilwtico`, `onpromotion`, and `transactions` would not be available at the time that we need to generate our sales forecasts in a real forecasting setting. For the purposes of the competition we are given the values for `dcoilwtico` and `onpromotion` for the test window. We are not given the `transactions` values and will need to forecast them.

In [None]:
# _preprocessed_df.info()

# Feature Engineering Pipeline

## Date features

In [None]:
def _create_date_features(df):
    _df = df.reset_index("date")
    date_features = {
        "month": _df.loc[:, "date"].dt.month.astype(np.uint8),
        "day_of_month": _df.loc[:, "date"].dt.day.astype(np.uint8),
        "day_of_week": _df.loc[:, "date"].dt.day_of_week.astype(np.uint8),
        "is_month_start": _df.loc[:, "date"].dt.day.eq(1).astype(np.uint8),
        "is_month_middle": _df.loc[:, "date"].dt.day.eq(15).astype(np.uint8),
        # "is_month_end": _df.loc[:, "date"].dt.day.eq(.astype(np.uint8),
        "is_weekend": _df.loc[:, "date"].dt.day_of_week.floordiv(5).astype(np.uint8),
    }
    with_date_features_df = (
        _df.assign(**date_features).set_index("date", append=True).sort_index(axis=0).sort_index(axis=1)
    )
    return with_date_features_df


def make_date_features_transformer(verbose=True):
    transformer = ColumnTransformer(
        transformers=[
            (
                "Engineer date features",
                FunctionTransformer(
                    func=_create_date_features,
                ),
                ["id"],
            )
        ],
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False,
    ).set_output(transform="pandas")

    return transformer

In [None]:
# _transformer = make_date_features_transformer()
# _transformer.fit_transform(_preprocessed_df).groupby("month")["day_of_month"].max()

## Holidays and Events Features

I make heavy use of Pandas string processing methods to extract a unique description for each holiday at the national, regional, and local level. I will use these descriptions to create indicator/dummy variables and add them as features in our model.

In [11]:
_HOLIDAYS_EVENTS_DF = load_holidays_events_df("/kaggle/input/store-sales-time-series-forecasting/holidays_events.csv")


HOLIDAY_TYPES = ["Additional", "Bridge", "Event", "Holiday", "Transfer", "Work Day"]


LOCAL_HOLIDAY_DESCRIPTIONS = (
    _HOLIDAYS_EVENTS_DF.query("(locale == 'Local') & (type in @HOLIDAY_TYPES) & (~transferred)")
    .loc[:, "description"]
    .str.removeprefix("Traslado ")
    .str.strip("-+0123456789")
    .unique()
    .tolist()
)

NATIONAL_HOLIDAY_DESCRIPTIONS = (
    _HOLIDAYS_EVENTS_DF.query("(locale == 'National') & (type in @HOLIDAY_TYPES) & (~transferred)")
    .loc[:, "description"]
    .str.removeprefix("Traslado ")
    .str.removeprefix("Puente ")
    .str.removeprefix("Inauguracion ")
    .str.replace(":.*", "", regex=True)
    .str.replace("puente", "Puente")
    .str.replace("primer", "Primer")
    .str.strip("-+0123456789")
    .unique()
    .tolist()
)


REGIONAL_HOLIDAY_DESCRIPTIONS = (
    _HOLIDAYS_EVENTS_DF.query("(locale == 'Regional') & (type in @HOLIDAY_TYPES) & (~transferred)")
    .loc[:, "description"]
    .unique()
    .tolist()
)

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/store-sales-time-series-forecasting/holidays_events.csv'

I define some functions which will use the unique descriptions and create an indicator for each individual holiday. I need separate functions for the different the locales as the joining logic is different. 

In [None]:
def _create_some_local_holiday_indicator(df, description):
    is_local_holiday = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Local') & (type in {HOLIDAY_TYPES}) & (~transferred)")
        .loc[:, "description"]
        .str.contains(description, case=False)
    )
    indicator_name = f"is_{description.lower().replace(' ', '_')}"
    is_local_holiday_df = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Local') & (type in {HOLIDAY_TYPES}) & (~transferred)")
        .loc[is_local_holiday, ["date", "locale_name"]]
        .rename(columns={"locale_name": "city"})
        .assign(**{indicator_name: 1})
        .drop_duplicates()
        .set_index(["date", "city"])
    )
    with_local_holiday_indicator_df = (
        df.join(is_local_holiday_df, how="left", on=["date", "city"])
        .fillna(value={indicator_name: 0})
        .astype({indicator_name: np.uint8})
        .sort_index(axis=1)
    )
    return with_local_holiday_indicator_df


def _create_some_national_holiday_indicator(df, description):
    is_national_holiday = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'National') & (type in {HOLIDAY_TYPES}) & (~transferred)")
        .loc[:, "description"]
        .str.contains(description, case=False)
    )
    indicator_name = f"is_{description.lower().replace(' ', '_')}"
    is_national_holiday_df = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'National') & (type in {HOLIDAY_TYPES}) & (~transferred)")
        .loc[is_national_holiday, ["date"]]
        .assign(**{indicator_name: 1})
        .drop_duplicates()
        .set_index("date")
    )
    with_national_holiday_indicator_df = (
        df.join(is_national_holiday_df, how="left", on=["date"])
        .fillna(value={indicator_name: 0})
        .astype({indicator_name: np.uint8})
        .sort_index(axis=1)
    )
    return with_national_holiday_indicator_df


def _create_some_regional_holiday_indicator(df, description):
    is_regional_holiday = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Regional') & (type in {HOLIDAY_TYPES}) & (~transferred)")
        .loc[:, "description"]
        .str.contains(description, case=False)
    )
    indicator_name = f"is_{description.lower().replace(' ', '_')}"
    is_regional_holiday_df = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Regional') & (type in {HOLIDAY_TYPES}) & (~transferred)")
        .loc[is_regional_holiday, ["date", "locale_name"]]
        .rename(columns={"locale_name": "state"})
        .assign(**{indicator_name: 1})
        .drop_duplicates()
        .set_index(["date", "state"])
    )
    with_regional_holiday_indicator_df = (
        df.join(is_regional_holiday_df, how="left", on=["date", "state"])
        .fillna(value={indicator_name: 0})
        .astype({indicator_name: np.uint8})
        .sort_index(axis=1)
    )
    return with_regional_holiday_indicator_df

Now I define functions that build column transformers for each locale's holidays and events. Using column transformers allows my to parallelize the feature creation process. 

In [None]:
def _make_local_holidays_transformer(n_jobs=-1, verbose=True):
    transformers = []
    for description in LOCAL_HOLIDAY_DESCRIPTIONS:
        transformers.append(
            (
                f"Create {description} indicator",
                FunctionTransformer(func=_create_some_local_holiday_indicator, kw_args={"description": description}),
                ["city"],
            )
        )

    transformer = ColumnTransformer(
        transformers, n_jobs=n_jobs, remainder="drop", verbose=verbose, verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer


def _make_national_holidays_transformer(n_jobs=-1, verbose=True):
    transformers = []
    for description in NATIONAL_HOLIDAY_DESCRIPTIONS:
        transformers.append(
            (
                f"Create {description} indicator",
                FunctionTransformer(func=_create_some_national_holiday_indicator, kw_args={"description": description}),
                ["id"],
            )
        )

    transformer = ColumnTransformer(
        transformers, n_jobs=n_jobs, remainder="drop", verbose=verbose, verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer


def _make_regional_holidays_transformer(n_jobs=-1, verbose=True):
    transformers = []
    for description in REGIONAL_HOLIDAY_DESCRIPTIONS:
        transformers.append(
            (
                f"Create {description} indicator",
                FunctionTransformer(func=_create_some_regional_holiday_indicator, kw_args={"description": description}),
                ["state"],
            )
        )

    transformer = ColumnTransformer(
        transformers, n_jobs=n_jobs, remainder="drop", verbose=verbose, verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer

Next, I use a `FeatureUnion` to join together the three `ColumnTransformer` objects. This adds a further level of parallelism because now each of these three transformers can be computed in parallel.

In [None]:
def _create_local_holiday_indicator(df):
    indicator = df.select_dtypes(include=np.uint8).max(axis=1)
    with_indicator_df = df.assign(is_local_holiday=indicator)
    return with_indicator_df


def _create_national_holiday_indicator(df):
    indicator = df.select_dtypes(include=np.uint8).max(axis=1)
    with_indicator_df = df.assign(is_national_holiday=indicator)
    return with_indicator_df


def _create_regional_holiday_indicator(df):
    indicator = df.select_dtypes(include=np.uint8).max(axis=1)
    with_indicator_df = df.assign(is_regional_holiday=indicator)
    return with_indicator_df


def _remove_duplicated_columns(df):
    return df.loc[:, ~df.columns.duplicated()]


def make_holidays_events_feature_union(n_jobs=-1, verbose=True):
    feature_union = FeatureUnion(
        transformer_list=[
            (
                "Engineer local holiday features",
                Pipeline(
                    steps=[
                        (
                            "Engineer individual local holiday indicators",
                            _make_local_holidays_transformer(
                                n_jobs=-1,
                                verbose=verbose,
                            ),
                        ),
                        ("Remove any duplicated columns", FunctionTransformer(func=_remove_duplicated_columns)),
                        ("Engineer local holiday indicator", FunctionTransformer(func=_create_local_holiday_indicator)),
                    ],
                    verbose=verbose,
                ),
            ),
            (
                "Engineer national holiday and event features",
                Pipeline(
                    steps=[
                        (
                            "Engineer indivdual national holiday and event indicators",
                            _make_national_holidays_transformer(n_jobs=-1, verbose=verbose),
                        ),
                        ("Remove any duplicated columns", FunctionTransformer(func=_remove_duplicated_columns)),
                        (
                            "Engineer national holiday and event indicator",
                            FunctionTransformer(func=_create_national_holiday_indicator),
                        ),
                    ],
                    verbose=verbose,
                ),
            ),
            (
                "Engineer regional holiday features",
                Pipeline(
                    steps=[
                        (
                            "Engineer individual regional holiday indicators",
                            _make_regional_holidays_transformer(n_jobs=-1, verbose=verbose),
                        ),
                        ("Remove any duplicated columns", FunctionTransformer(func=_remove_duplicated_columns)),
                        (
                            "Engineer regional holiday indicator",
                            FunctionTransformer(func=_create_regional_holiday_indicator),
                        ),
                    ],
                    verbose=verbose,
                ),
            ),
        ],
        n_jobs=n_jobs,
        verbose=verbose,
    ).set_output(transform="pandas")

    return feature_union

### Example usage

Un-comment and run the code in the cell below to see the pipeline in action.

In [None]:
# _feature_union = make_holidays_events_feature_union(
#    n_jobs=-1,
#    verbose=True
# )
# _df = _feature_union.fit_transform(_preprocessed_df)

## Lagged Feature Engineering Pipeline

Notice that in order to properly create the lagged features from the transactions data that we will need to train our model and then forecast sales, we will need a model to forecast the missing transactions over the test window. Typically we would also need models for other features like `dcoilwtico` and `onpromotion` (and any other features we use during training that are not known at the time we make our forecast).

In [None]:
def _totals_by_store_nbr(df, onpromotion, transactions):
    features = ["sales"]
    if onpromotion:
        features.append("onpromotion")
    if transactions:
        features.append(transactions)
    totals_df = (
        df.loc[:, features]
        .groupby(["store_nbr", "date"])
        .sum()
        .rename(columns={feature: f"total_{feature}" for feature in features})
    )
    return totals_df


def _sales_per_transaction(df):
    totals_df = _totals_by_store_nbr(df, onpromotion=False, transactions=True)
    sales_per_transaction = totals_df.loc[:, "sales"].div(totals_df.loc[:, "transactions"])
    return sales_per_transaction

In [None]:
def _create_lagged_features(s, lags, drop):
    feature = s.name
    lagged_features = {}
    for lag in lags:
        lagged_features[f"{feature}_lag{lag:02d}"] = (
            s.groupby(["store_nbr", "family"]).shift(periods=lag).fillna(method="bfill").astype(np.float32)
        )
    if drop:
        with_lagged_features_df = s.to_frame().assign(**lagged_features).drop(feature, axis=1).sort_index(axis=1)
    else:
        with_lagged_features_df = s.to_frame().assign(**lagged_features).sort_index(axis=1)
    return with_lagged_features_df


def _create_rolling_mean(s, lag, window):
    rolling_mean = {
        f"{s.name}_{window:02d}_day_rolling_mean": (
            s.groupby(["store_nbr", "family"])
            .rolling(min_periods=1, window=window)
            .mean()
            .astype(np.float32)
            .droplevel([0, 1])
        )
    }
    with_rolling_mean_df = s.to_frame().assign(**rolling_mean).sort_index(axis=1)
    return with_rolling_mean_df


def _fillna_with_forecast(df, forecast_df):
    with_forecast_df = df.fillna(forecast_df)
    return with_forecast_df


def _make_fillna_with_forecast_transformer(
    dcoilwtico_forecast_df, onpromotion_forecast_df, transactions_forecast_df, n_jobs, verbose
):

    transformer = ColumnTransformer(
        transformers=[
            (
                "Insert dcoilwtico forecast",
                FunctionTransformer(func=_fillna_with_forecast, kw_args={"forecast_df": dcoilwtico_forecast_df}),
                ["dcoilwtico"],
            ),
            (
                "Insert onpromotion forecast",
                FunctionTransformer(func=_fillna_with_forecast, kw_args={"forecast_df": onpromotion_forecast_df}),
                ["onpromotion"],
            ),
            (
                "Insert transactions forecast",
                FunctionTransformer(func=_fillna_with_forecast, kw_args={"forecast_df": transactions_forecast_df}),
                ["transactions"],
            ),
        ],
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False,
    ).set_output(transform="pandas")

    return transformer


def _make_lagged_feature_transformer(max_lags, n_jobs, verbose):
    transformer = ColumnTransformer(
        transformers=[
            (
                "Engineer lagged dcoilwtico features",
                FunctionTransformer(
                    func=_create_lagged_features,
                    kw_args={
                        "lags": range(1, max_lags + 1),
                        "drop": True,
                    },
                ),
                "dcoilwtico",
            ),
            (
                "Engineer lagged onpromotion features",
                FunctionTransformer(
                    func=_create_lagged_features,
                    kw_args={
                        "lags": range(1, max_lags + 1),
                        "drop": True,
                    },
                ),
                "onpromotion",
            ),
            (
                "Engineer lagged transactions features",
                FunctionTransformer(
                    func=_create_lagged_features,
                    kw_args={
                        "lags": range(1, max_lags + 1),
                        "drop": True,
                    },
                ),
                "transactions",
            ),
        ],
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False,
    )
    return transformer


def _make_rolling_means_transformer(lag, window, n_jobs, verbose) -> ColumnTransformer:
    transformer = ColumnTransformer(
        transformers=[
            (
                f"Engineer dcoilwtico_lag{lag:02d} {window:02d}-day rolling mean",
                FunctionTransformer(func=_create_rolling_mean, kw_args={"lag": lag, "window": window}),
                f"dcoilwtico_lag{lag:02d}",
            ),
            (
                f"Engineer onpromotion_lag{lag:02d} {window:02d}-day rolling mean",
                FunctionTransformer(func=_create_rolling_mean, kw_args={"lag": lag, "window": window}),
                f"onpromotion_lag{lag:02d}",
            ),
            (
                f"Engineer transactions_lag{lag:02d} {window:02d}-day rolling mean",
                FunctionTransformer(func=_create_rolling_mean, kw_args={"lag": lag, "window": window}),
                f"transactions_lag{lag:02d}",
            ),
        ],
        n_jobs=n_jobs,
        remainder="passthrough",
        verbose=verbose,
        verbose_feature_names_out=False,
    ).set_output(transform="pandas")

    return transformer


def make_lagged_feature_engineering_pipeline(
    max_lags, dcoilwtico_forecast_df, onpromotion_forecast_df, transactions_forecast_df, n_jobs=-1, verbose=True
) -> Pipeline:
    pipeline = Pipeline(
        steps=[
            (
                "Insert test window forecasts",
                _make_fillna_with_forecast_transformer(
                    dcoilwtico_forecast_df,
                    onpromotion_forecast_df,
                    transactions_forecast_df,
                    n_jobs,
                    verbose,
                ),
            ),
            ("Create lagged features", _make_lagged_feature_transformer(max_lags, n_jobs, verbose)),
            ("Create weekly rolling means", _make_rolling_means_transformer(1, 7, n_jobs, verbose)),
            ("Create monthly rolling means", _make_rolling_means_transformer(1, 28, n_jobs, verbose)),
        ],
        verbose=verbose,
    ).set_output(transform="pandas")

    return pipeline

### Example usage

We are given data on `dcoilwtico` and `onpromotion` for the test window. However, for a real deployment of our forecasting pipeline we would NOT know the future values for either the price of oil nor number of products on promotion nor transactions. If we want to use these series to create features for training our forecasting pipeline, then we need to also be able to generate forecasts of these variables prior to generating predictions from our model.

In [None]:
def forecast_dcoilwtico():
    test_df = load_test_df("/kaggle/input/store-sales-time-series-forecasting/test.csv")
    oil_df = load_oil_df("/kaggle/input/store-sales-time-series-forecasting/oil.csv")
    forecast_df = (
        test_df.join(oil_df, how="left", on=["date"])
        .groupby(["store_nbr", "family"])
        .fillna(method="ffill")
        .loc[:, ["dcoilwtico"]]
    )
    return forecast_df


def forecast_onpromotion():
    test_df = load_test_df("/kaggle/input/store-sales-time-series-forecasting/test.csv", onpromotion=True)
    forecast_df = test_df.loc[:, ["onpromotion"]]
    return forecast_df


def forecast_transactions():
    test_df = load_test_df("/kaggle/input/store-sales-time-series-forecasting/test.csv")
    train_df = load_train_df("/kaggle/input/store-sales-time-series-forecasting/train.csv")
    transactions_df = load_transactions_df("/kaggle/input/store-sales-time-series-forecasting/transactions.csv")

    joined_df = train_df.join(transactions_df, how="left", on=["store_nbr", "date"])
    concat_df = pd.concat(
        [joined_df, test_df],
        axis=0,
        sort=True,
    ).sort_index(axis=0)
    forecast_df = (
        concat_df.groupby(["store_nbr", "family"])
        .fillna(method="ffill")
        .loc[pd.IndexSlice[:, :, "2017-08-16":], ["transactions"]]
    )
    return forecast_df

In [None]:
# _pipeline = make_lagged_feature_engineering_pipeline(
#    max_lags=28,
#    dcoilwtico_forecast_df=forecast_dcoilwtico(),
#    onpromotion_forecast_df=forecast_onpromotion(),
#    transactions_forecast_df=forecast_transactions(),
#    n_jobs=-1,
#    verbose=True
# )
# _df = _pipeline.fit_transform(_preprocessed_df)

In [None]:
# _df.info()

## Pipeline to encode categorical features

In [None]:
def make_feature_encoding_transformer(n_jobs=-1, verbose=True):
    transformer = ColumnTransformer(
        transformers=[
            ("One-hot encode city feature", OneHotEncoder(sparse_output=False, dtype=np.uint8), ["city"]),
            ("One-hot encode state feature", OneHotEncoder(sparse_output=False, dtype=np.uint8), ["state"]),
            (
                "One-hot encode store_cluster feature",
                OneHotEncoder(sparse_output=False, dtype=np.uint8),
                ["store_cluster"],
            ),
            ("One-hot encode store_type feature", OneHotEncoder(sparse_output=False, dtype=np.uint8), ["store_type"]),
        ],
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False,
    ).set_output(transform="pandas")

    return transformer

## Combined Feature Engineering Pipeline

In [None]:
def make_feature_engineering_feature_union(
    max_lags, dcoilwtico_forecast_df, onpromotion_forecast_df, transactions_forecast_df, n_jobs=-1, verbose=True
) -> FeatureUnion:

    feature_union = FeatureUnion(
        transformer_list=[
            (
                "Passthrough sales target",
                ColumnTransformer(
                    transformers=[("Identity transform", FunctionTransformer(func=lambda df: df), ["sales"])],
                    n_jobs=1,
                    remainder="drop",
                    verbose=verbose,
                    verbose_feature_names_out=False,
                ),
            ),
            ("Engineer date features", make_date_features_transformer(verbose)),
            ("Engineer holidays and events features", make_holidays_events_feature_union(n_jobs, verbose)),
            (
                "Engineer lagged features",
                make_lagged_feature_engineering_pipeline(
                    max_lags, dcoilwtico_forecast_df, onpromotion_forecast_df, transactions_forecast_df, n_jobs, verbose
                ),
            ),
            ("Encode categorical features", make_feature_encoding_transformer(n_jobs=n_jobs, verbose=verbose)),
        ],
        n_jobs=n_jobs,
        verbose=verbose,
    ).set_output(transform="pandas")

    return feature_union

In [None]:
# _feature_union_kwargs = {
#    "max_lags": 28,
#    "dcoilwtico_forecast_df": forecast_dcoilwtico(),
#    "onpromotion_forecast_df": forecast_onpromotion(),
#    "transactions_forecast_df": forecast_transactions(),
#    "n_jobs": -1,
#    "verbose": True
# }
# _feature_union = make_feature_engineering_feature_union(
#    **_feature_union_kwargs
# )
# _df = _feature_union.fit_transform(_preprocessed_df)

In [None]:
# _df.info()

# Pipeline for converting to Nixlats format

In [None]:
def _create_unique_id(df):
    unique_ids = (
        df.loc[:, ["store_nbr", "family"]]
        .apply(lambda feature: feature.cat.codes, axis=0)
        .apply(lambda cs: "_".join(str(c) for c in cs), axis=1)
        .astype("category")
    )
    return unique_ids


def make_nixtlats_preparation_pipeline(verbose=True) -> Pipeline:
    pipeline = Pipeline(
        steps=[
            ("Reset multi-index", FunctionTransformer(func=lambda df: df.reset_index())),
            (
                "Convert store_nbr and family to category dtype",
                FunctionTransformer(func=lambda df: df.astype({"store_nbr": "category", "family": "category"})),
            ),
            (
                "Create unique_id column using store_nbr and family",
                FunctionTransformer(func=lambda df: df.assign(unique_id=_create_unique_id(df))),
            ),
            (
                "Encode remaining categorical features",
                ColumnTransformer(
                    transformers=[
                        (
                            "One-hot encode store_nbr and family",
                            OneHotEncoder(sparse_output=False, dtype=np.uint8),
                            ["store_nbr", "family"],
                        )
                    ],
                    remainder="passthrough",
                    verbose=verbose,
                    verbose_feature_names_out=False,
                ),
            ),
            (
                "Rename columns for use with nixlats libraries",
                FunctionTransformer(func=lambda df: df.rename(columns={"date": "ds", "sales": "y"})),
            ),
            ("Sort the columns", FunctionTransformer(func=lambda df: df.sort_index(axis=1))),
            (
                "Split into train and test data sets",
                FunctionTransformer(func=lambda df: (df.query("ds < 20170816"), df.query("ds >= 20170816"))),
            ),
        ],
        verbose=verbose,
    ).set_output(transform="pandas")

    return pipeline

# Complete Pipeline

In [None]:
def prepare_sales_forecasting_data(
    onpromotion=True,
    join_oil=True,
    join_transactions=True,
    start="20150701",
    q=0.99,
    max_lags=28,
    dcoilwtico_forecast_df=None,
    onpromotion_forecast_df=None,
    transactions_forecast_df=None,
    n_jobs=-1,
    verbose=True,
):

    # load the training, test, and transactions data sets
    _train_df = load_train_df("/kaggle/input/store-sales-time-series-forecasting/train.csv")

    # preprocess the data
    data_preprocessing_pipeline = make_data_preprocessing_pipeline(
        onpromotion, join_oil, join_transactions, start, q, verbose
    )
    _preprocessed_df = data_preprocessing_pipeline.fit_transform(_train_df)

    # forecast future values of exogenous variables
    if dcoilwtico_forecast_df is None:
        dcoilwtico_forecast_df = forecast_dcoilwtico()
    if onpromotion_forecast_df is None:
        onpromotion_forecast_df = forecast_onpromotion()
    if transactions_forecast_df is None:
        transactions_forecast_df = forecast_transactions()

    # engineer features
    feature_engineering_feature_union = make_feature_engineering_feature_union(
        max_lags, dcoilwtico_forecast_df, onpromotion_forecast_df, transactions_forecast_df, n_jobs, verbose
    )
    _with_engineered_features_df = feature_engineering_feature_union.fit_transform(_preprocessed_df)

    # prepate for nixlats
    nixtlats_preparation_pipeline = make_nixtlats_preparation_pipeline(verbose)
    train_df, test_df = nixtlats_preparation_pipeline.fit_transform(_with_engineered_features_df)

    return train_df, test_df

# Helper function for extracting static feature names

In [None]:
def get_static_feature_names(df) -> list:
    is_static_feature = (
        df.columns.str.startswith("city_")
        + df.columns.str.startswith("state_")
        + df.columns.str.startswith("store_nbr_")
        + df.columns.str.startswith("store_type_")
        + df.columns.str.startswith("store_cluster")
        + df.columns.str.startswith("family_")
    )
    static_feature_names = df.columns[is_static_feature].to_list()
    return static_feature_names

# Example Usage

Uncomment and run the cells below to prepare the data using my default settings and explore!

In [None]:
# train_df, test_df = prepare_sales_forecasting_data()

In [None]:
# train_df.info()

# Target Transformations

In [None]:
class Detrender(BaseTargetTransform):

    def __init__(self, kwargs):
        self.kwargs = kwargs

    def _create_deterministic_process(self, df):
        deterministic_process = DeterministicProcess(df.index.get_level_values(self.time_col), **self.kwargs)
        return deterministic_process

    def _linear_regression_fit(self, df):
        estimator = LinearRegression(fit_intercept=False).fit(
            df.drop(self.target_col, axis=1), df.loc[:, self.target_col]
        )
        return estimator

    def _linear_regression_predict(self, df):
        estimator = self.estimators_.loc[df.name, "LinearRegression"]
        predictions_df = pd.DataFrame({self.target_col: estimator.predict(df)}, index=df.index.get_level_values(-1))
        return predictions_df

    def fit_transform(self, df):

        in_sample_targets_df = df.loc[:, [self.target_col]]

        # create the deterministic processes
        self.deterministic_processes_ = (
            in_sample_targets_df.groupby(self.id_col)
            .apply(self._create_deterministic_process)
            .rename("DeterministicProcess")
            .to_frame()
        )

        # generate in-sample features
        self.in_sample_features_ = self.deterministic_processes_.groupby(self.id_col).apply(
            lambda df: df.loc[df.name, "DeterministicProcess"].in_sample()
        )

        # fit the linear regression models
        self.estimators_ = (
            in_sample_targets_df.join(
                self.in_sample_features_,
                how="left",
            )
            .groupby(self.id_col)
            .apply(self._linear_regression_fit)
            .rename("LinearRegression")
            .to_frame()
        )

        # predict the in-sample trend values
        in_sample_trend_forecasts_df = self.in_sample_features_.groupby(self.id_col).apply(
            self._linear_regression_predict
        )

        # detrend the in-sample targets
        in_sample_residuals_df = in_sample_targets_df.sub(in_sample_trend_forecasts_df)
        transformed_df = df.assign(**{self.target_col: in_sample_residuals_df})

        return transformed_df

    def inverse_transform(self, df):
        out_of_sample_residuals_df = df.loc[:, [self.target_col]]

        # compute out-of-sample features
        h = out_of_sample_residuals_df.index.get_level_values(self.time_col).unique().size
        self.out_of_sample_features_ = self.deterministic_processes_.groupby(self.id_col).apply(
            lambda df: df.loc[df.name, "DeterministicProcess"].out_of_sample(steps=h)
        )

        # predict the out-of-sample trend values
        out_of_sample_trend_forecasts_df = self.out_of_sample_features_.groupby(self.id_col).apply(
            self._linear_regression_predict
        )

        # re-introduce the trend to the residuals
        out_of_sample_forecasts_df = out_of_sample_residuals_df.add(out_of_sample_trend_forecasts_df)
        transformed_df = df.assign(**{self.target_col: out_of_sample_forecasts_df})

        return transformed_df

    def inverse_transform_fitted(self, df):
        in_sample_residuals_df = df.loc[:, [self.target_col]]

        # predict the in-sample trend values
        in_sample_trend_forecasts_df = self.in_sample_features_.groupby(self.id_col).apply(
            self._linear_regression_predict
        )

        # re-introduce the trend to the residuals
        in_sample_forecasts_df = in_sample_residuals_df.add(in_sample_trend_forecasts_df)
        transformed_df = df.assign(**{self.target_col: in_sample_forecasts_df})

        return transformed_df

In [None]:
# _deterministic_process_kwargs = {
#    "constant": True,
#    "order": 1,
#    "seasonal": True,
#    "period": 7,
#    "additional_terms": [
#        CalendarFourier(freq="M", order=4)
#    ],
# }
#
# _transformer = Detrender(
#    kwargs=_deterministic_process_kwargs
# )
# _transformer.set_column_names(
#    id_col=["store_nbr", "family"],
#    time_col="date",
#    target_col="sales"
# )

# detrended_target_df = (
#    _transformer.fit_transform(
#        _preprocessed_df.loc[pd.IndexSlice[:, :, :"2017-08-15"]]
#    )
# )

In [None]:
# _transformer.inverse_transform_fitted(detrended_target_df)

In [None]:
# out_of_sample_residuals_df = (
#    _preprocessed_df.assign(sales=detrended_target_df.loc[:, ["sales"]])
#                    .groupby(["store_nbr", "family"])
#                    .fillna(method="ffill")
#                    .loc[pd.IndexSlice[:, :, "2017-08-16":]]
# )
#
# _transformer.inverse_transform(out_of_sample_residuals_df)

In [None]:
# test_df.info()

In [None]:
_growth_rate_pipeline = Pipeline(
    steps=[
        (
            "log transform",
            FunctionTransformer(
                func=np.log,
                inverse_func=np.exp,
                check_inverse=False,
            ),
        ),
        ("difference transform", FunctionTransformer(func=lambda df: df.diff(periods=1))),
        ("lag transform", FunctionTransformer(func=lambda df: df.shift(periods=1))),
    ],
    verbose=True,
).set_output(transform="pandas")

_pipeline = Pipeline(
    steps=[
        (
            "growth rate transform",
            ColumnTransformer(
                transformers=[("growth rate transform", _growth_rate_pipeline, ["usd_index"])],
                remainder="passthrough",
                verbose=True,
                verbose_feature_names_out=False,
            ),
        ),
        (
            "lag transforms",
            ColumnTransformer(
                transformers=[
                    (
                        "weekly rolling mean",
                        FunctionTransformer(
                            func=lambda df: df.rolling(on="date", window=7)
                            .mean()
                            .rename(columns={"usd_index": "usd_index_rolling_mean_lag1_window_size7"})
                        ),
                        ["date", "usd_index"],
                    ),
                    (
                        "two-week rolling mean",
                        FunctionTransformer(
                            func=lambda df: df.rolling(on="date", window=14)
                            .mean()
                            .rename(columns={"usd_index": "usd_index_rolling_mean_lag1_window_size14"})
                        ),
                        ["date", "usd_index"],
                    ),
                ],
                remainder="passthrough",
                verbose=True,
                verbose_feature_names_out=False,
            ),
        ),
        ("remove duplicate columns", FunctionTransformer(func=lambda df: df.loc[:, ~df.columns.duplicated()])),
    ],
    verbose=True,
).set_output(transform="pandas")

# X_df = _pipeline.fit_transform(usd_index_df)

# train_df = (
#    oil_df.merge(X_df, how="left", on=["unique_id", "date"])
# )