In [None]:
# default_exp core

# Core Hierarchical 

> Module for Hierarchical Reconciliation.

In [None]:
#export
from functools import partial
from inspect import signature
from typing import Callable, Dict, List, Optional

import numpy as np
import pandas as pd

In [None]:
#hide
from fastcore.test import test_close, test_fail

In [None]:
#exporti
def _build_fn_name(fn) -> str:
    fn_name = type(fn).__name__
    func_params = fn.__dict__
    func_params = [f'{name}-{value}' for name, value in func_params.items()]
    if func_params:
        fn_name += '_' + '_'.join(func_params)
    return fn_name

In [None]:
#export
class HierarchicalReconciliation:
    
    def __init__(self, reconcilers: List[Callable]):
        self.reconcilers = reconcilers
        
    def reconcile(self, Y_h: pd.DataFrame, Y_df: pd.DataFrame, S: pd.DataFrame,
                  tags: Dict[str, np.ndarray]):
        """Reconcile base forecasts.
        
            Parameters
            ----------
            Y_h: pd.DataFrame
                Base forecasts with columns ['ds'] 
                and models to reconcile indexed by 'unique_id'.
            Y_df: pd.DataFrame
                Training set of base time series with columns 
                ['ds', 'y'] indexed by 'unique_id'
                If a function of `self.reconcile_fns` receives
                residuals, `Y_df` must include them as columns.
            S: pd.DataFrame
                Summing matrix of size (hierarchies, bottom).
        """
        drop_cols = ['ds', 'y'] if 'y' in Y_h.columns else ['ds']
        model_names = Y_h.drop(columns=drop_cols, axis=1).columns.to_list()
        uids = Y_h.index.unique()
        # same order of Y_h to prevent errors
        S_ = S.loc[uids]
        common_vals = dict(
            y = Y_df.pivot(columns='ds', values='y').loc[uids].values,
            S = S_.values,
            idx_bottom = [S_.index.get_loc(col) for col in S.columns],
            levels={key: S_.index.get_indexer(val) for key, val in tags.items()}
        )
        fcsts = Y_h.copy()
        for reconcile_fn in self.reconcilers:
            reconcile_fn_name = _build_fn_name(reconcile_fn)
            has_res = 'residuals' in signature(reconcile_fn).parameters
            for model_name in model_names:
                # Remember: pivot sorts uid
                y_hat_model = Y_h.pivot(columns='ds', values=model_name).loc[uids].values
                if has_res:
                    if model_name in Y_df:
                        common_vals['residuals'] = Y_df.pivot(columns='ds', values=model_name).loc[uids].values.T
                    else:
                        # some methods have the residuals argument
                        # but they don't need them
                        # ej MinTrace(method='ols')
                        common_vals['residuals'] = None
                kwargs = [key for key in signature(reconcile_fn).parameters if key in common_vals.keys()]
                kwargs = {key: common_vals[key] for key in kwargs}
                fcsts_model = reconcile_fn(y_hat=y_hat_model, **kwargs)
                fcsts[f'{model_name}/{reconcile_fn_name}'] = fcsts_model.flatten()
                if has_res:
                    del common_vals['residuals']
        return fcsts

In [None]:
#hide
from hierarchicalforecast.methods import (
    BottomUp, TopDown, MinTrace, ERM, bottom_up
)
from hierarchicalforecast.utils import hierarchize

In [None]:
#hide
df = pd.read_csv('https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/tourism.csv')
df = df.rename({'Trips': 'y', 'Quarter': 'ds'}, axis=1)
df.insert(0, 'Country', 'Australia')

# non strictly hierarchical structure
hiers_grouped = [
    ['Country'],
    ['Country', 'State'], 
    ['Country', 'Purpose'], 
    ['Country', 'State', 'Region'], 
    ['Country', 'State', 'Purpose'], 
    ['Country', 'State', 'Region', 'Purpose']
]
# strictly hierarchical structure
hiers_strictly = [
    ['Country'],
    ['Country', 'State'], 
    ['Country', 'State', 'Region'], 
]

# getting df
hier_grouped_df, S_grouped, tags_grouped = hierarchize(df, hiers_grouped)
hier_strict_df, S_strict, tags_strict = hierarchize(df, hiers_strictly)

In [None]:
#hide
hier_grouped_df['y_model'] = hier_grouped_df['y']
# we should be able to recover y using the methods
hier_grouped_df_h = hier_grouped_df.groupby('unique_id').tail(12)
ds_h = hier_grouped_df_h['ds'].unique()
hier_grouped_df = hier_grouped_df.query('~(ds in @ds_h)')

#hierachical reconciliation
hrec = HierarchicalReconciliation(reconcilers=[
    #these methods should reconstruct the original y
    BottomUp(),
    MinTrace(method='ols'),
    MinTrace(method='wls_struct'),
    MinTrace(method='wls_var'),
    MinTrace(method='mint_shrink')
])
reconciled = hrec.reconcile(hier_grouped_df_h, hier_grouped_df, S_grouped, tags_grouped)
for model in reconciled.drop(columns=['ds', 'y']).columns:
    test_close(reconciled['y'], reconciled[model])

In [None]:
#hide
# top down should break
# with non strictly hierarchical structures
hrec = HierarchicalReconciliation([TopDown(method='average_proportions')])
test_fail(
    hrec.reconcile,
    contains='requires strictly hierarchical structures',
    args=(hier_grouped_df_h, hier_grouped_df, S_grouped, tags_grouped)
)

In [None]:
#hide
# methods should work with
# srtictly hierarchical structures
#hide
hier_strict_df['y_model'] = hier_strict_df['y']
# we should be able to recover y using the methods
hier_strict_df_h = hier_strict_df.groupby('unique_id').tail(12)
ds_h = hier_strict_df_h['ds'].unique()
hier_strict_df = hier_strict_df.query('~(ds in @ds_h)')

# hierarchical reconciliation
hrec = HierarchicalReconciliation(reconcilers=[
    #these methods should reconstruct the original y
    BottomUp(),
    MinTrace(method='ols'),
    MinTrace(method='wls_struct'),
    MinTrace(method='wls_var'),
    MinTrace(method='mint_shrink'),
    # top down doesnt recover the original y
    TopDown(method='average_proportions'),
    TopDown(method='proportion_averages'),
])
reconciled = hrec.reconcile(hier_strict_df_h, hier_strict_df, S_strict, tags_strict)
for model in reconciled.drop(columns=['ds', 'y']).columns:
    if 'TopDown' in model:
        test_fail(
            test_close,
            args=(reconciled['y'], reconciled[model]),
        )
    else:
        test_close(reconciled['y'], reconciled[model])

In [None]:
#hide
#test methods that dont use residuals
#even if their signature includes
#that argument
hrec = HierarchicalReconciliation([MinTrace(method='ols')])
reconciled = hrec.reconcile(hier_grouped_df_h, hier_grouped_df.drop(columns=['y_model']), S_grouped, tags_grouped)
for model in reconciled.drop(columns=['ds', 'y']).columns:
    test_close(reconciled['y'], reconciled[model])

In [None]:
#hide
reconciled.loc[tags_grouped['Country/State']]

In [None]:
#export
class HierarchicalEvaluation:
    
    def __init__(self, evaluators: List[Callable]):
        self.evaluators = evaluators
        
    def evaluate(self, 
                 Y_h: pd.DataFrame, 
                 Y_test: pd.DataFrame, 
                 tags: Dict[str, np.ndarray],
                 benchmark: Optional[str] = None):
        """Evaluate hierarchical forecasts.
        
            Parameters
            ----------
            Y_h: pd.DataFrame
                Forecasts with columns ['ds'] 
                and models to evaluate.
            Y_test: pd.DataFrame
                True values with columns ['ds', 'y']
            tags: Dict[str, np.ndarray]
                Dictionary of levels.
                Each key is a level and its value 
                contains tags associated to that level.
            benchmark: Optional[str]
                Optional benchmark model. 
                When passed, the evaluators are scaled by
                the error of this benchark.
                If passed, should be part of `Y_h`.
        """
        drop_cols = ['ds', 'y'] if 'y' in Y_h.columns else ['ds']
        model_names = Y_h.drop(columns=drop_cols, axis=1).columns.to_list()
        fn_names = [fn.__name__ for fn in self.evaluators]
        if benchmark is not None:
            fn_names = [f'{fn_name}-scaled' for fn_name in fn_names]
        tags_ = {'Overall': np.concatenate(list(tags.values()))}
        tags_ = {**tags_, **tags}
        index = pd.MultiIndex.from_product([tags_.keys(), fn_names], names=['level', 'metric'])
        evaluation = pd.DataFrame(columns=model_names, index=index)
        for level, cats in tags_.items():
            Y_h_cats = Y_h.loc[cats]
            y_test_cats = Y_test.loc[cats, 'y'].values
            for i_fn, fn in enumerate(self.evaluators):
                fn_name = fn_names[i_fn]
                for model in model_names:
                    loss = fn(y_test_cats, Y_h_cats[model].values)
                    if benchmark is not None:
                        scale = fn(y_test_cats, Y_h_cats[benchmark].values)
                        if np.isclose(scale, 0., atol=np.finfo(float).eps):
                            scale += np.finfo(float).eps
                            if np.isclose(scale, loss, atol=1e-8):
                                scale = 1.
                        loss /= scale
                    evaluation.loc[(level, fn_name), model] = loss
        return evaluation

In [None]:
#hide
def mse(y, y_hat):
    return np.mean((y-y_hat)**2)
def rmse(y, y_hat):
    return np.sqrt(mse(y, y_hat))
evaluator = HierarchicalEvaluation([mse, rmse])
evaluator.evaluate(Y_h=reconciled.drop(columns='y'), 
                   Y_test=reconciled[['ds', 'y']], 
                   tags=tags_grouped,
                   benchmark='y_model')