In [None]:
# default_exp methods

# Hierachical Reconciliation Methods

In [None]:
#export
from typing import List

import numpy as np
from statsmodels.stats.moment_helpers import cov2corr

In [None]:
#hide
from fastcore.test import ExceptionExpected, test_close, test_eq

In [None]:
#export
def _reconcile(S: np.ndarray, P: np.ndarray, W: np.ndarray, 
               y_hat: np.ndarray, SP: np.ndarray = None):
    if SP is None:
        SP = S @ P
    return np.matmul(SP, y_hat)

In [None]:
#export
def bottom_up(S: np.ndarray,
              y_hat: np.ndarray):
    n_hiers, n_bottom = S.shape
    P = np.eye(n_bottom, n_hiers, k=(n_hiers - n_bottom), dtype=np.float32)
    W = np.eye(n_hiers, dtype=np.float32)
    return _reconcile(S, P, W, y_hat)

In [None]:
#hide
S = np.array([
    [1., 1., 1., 1.],
    [1., 1., 0., 0.],
    [0., 0., 1., 1.],
    [1., 0., 0., 0.],
    [0., 1., 0., 0.],
    [0., 0., 1., 0.],
    [0., 0., 0., 1.],
])
h = 10
_y = np.array([10., 5., 4., 2., 1.])
y_bottom = np.vstack([i * _y for i in range(1, 5)])
resids_bottom = y_bottom - np.roll(y_bottom, 1)
resids_bottom[:, 0] = np.nan
y_hat_bottom = np.vstack([i * np.ones(h) for i in range(1, 5)])

In [None]:
#hide
test_eq(
    bottom_up(S, S @ y_hat_bottom),
    S @ y_hat_bottom
)

In [None]:
#export
def top_down(S: np.ndarray, 
             y_hat: np.ndarray,
             y: np.ndarray,
             idx_bottom: List[int],
             method: str):
    n_hiers, n_bottom = S.shape
    idx_top = int(S.sum(axis=1).argmax())
    #add strictly hierarchical assert
    
    if method == 'forecast_proportions':
        raise NotImplementedError(f'Method {method} not implemented yet')
    else:
        y_top = y[idx_top]
        y_btm = y[idx_bottom]
        if method == 'average_proportions':
            prop = np.mean(y_btm / y_top, axis=1)
        elif method == 'proportion_averages':
            prop = np.mean(y_btm, axis=1) / np.mean(y_top)
        else:
            raise Exception(f'Unknown method {method}')
    P = np.zeros_like(S).T
    P[:, idx_top] = prop
    W = np.eye(n_hiers, dtype=np.float32)
    return _reconcile(S, P, W, y_hat)

In [None]:
#hide
for method in ['average_proportions', 'proportion_averages']:
    test_close(
        top_down(S, S @ y_hat_bottom, 
                 S @ y_bottom, 
                 [3, 4, 5, 6], 
                 method),
        S @ y_hat_bottom
    )

In [None]:
#export
def crossprod(x):
    return x.T @ x

In [None]:
#export
def min_trace(S: np.ndarray, 
              y_hat: np.ndarray,
              residuals: np.ndarray,
              method: str):
    # shape residuals (obs, n_hiers)
    res_methods = ['wls_var', 'mint_cov', 'mint_shrink']
    if method in res_methods and residuals is None:
        raise ValueError(f"For methods {', '.join(res_methods)} you need to pass residuals")
    n_hiers, n_bottom = S.shape
    if method == 'ols':
        W = np.eye(n_hiers)
    elif method == 'wls_struct':
        W = np.diag(S @ np.ones((n_bottom,)))
    elif method in res_methods:
        n, _ = residuals.shape
        masked_res = np.ma.array(residuals, mask=np.isnan(residuals))
        covm = np.ma.cov(masked_res, rowvar=False, allow_masked=True).data
        if method == 'wls_var':
            W = np.diag(np.diag(covm))
        elif method == 'mint_cov':
            W = covm
        elif method == 'mint_shrink':
            tar = np.diag(np.diag(covm))
            corm = cov2corr(covm)
            xs = np.divide(residuals, np.sqrt(np.diag(covm)))
            xs = xs[~np.isnan(xs).any(axis=1), :]
            v = (1 / (n * (n - 1))) * (crossprod(xs ** 2) - (1 / n) * (crossprod(xs) ** 2))
            np.fill_diagonal(v, 0)
            corapn = cov2corr(tar)
            d = (corm - corapn) ** 2
            lmd = v.sum() / d.sum()
            lmd = max(min(lmd, 1), 0)
            W = lmd * tar + (1 - lmd) * covm
    else:
        raise ValueError(f'Unkown reconciliation method {method}')
    
    eigenvalues, _ = np.linalg.eig(W)
    if any(eigenvalues < 1e-8):
        raise Exception(f'min_trace ({method}) needs covariance matrix to be positive definite.')
        
    R = S.T @ np.linalg.inv(W)
    P = np.linalg.inv(R @ S) @ R
    
    return _reconcile(S, P, W, y_hat)

In [None]:
#hide
for method in ['ols', 'wls_struct', 'wls_var', 'mint_shrink']:
    test_close(
        min_trace(S, S @ y_hat_bottom, 
                  np.transpose(S @ resids_bottom), 
                  method),
        S @ y_hat_bottom
    )
with ExceptionExpected(regex='min_trace (mint_cov)*'):
    min_trace(S, 
              S @ y_hat_bottom, 
              np.transpose(S @ resids_bottom), 
              'mint_cov')

In [None]:
#export
def empirical_risk_minimization(S: np.ndarray,
                                y_hat: np.ndarray, 
                                method: str,
                                lambda_reg: float = 1e-2):
    n_hiers, n_bottom = S.shape
    if method == 'exact':
        B = y_hat.T @ S @ np.linalg.inv(S.T @ S).T
        P = B.T @ y_hat.T @ np.linalg.inv(y_hat @ y_hat.T + lambda_reg * np.eye(n_hiers))
    else:
        raise ValueError(f'Unkown reconciliation method {method}')
        
    W = np.eye(n_hiers, dtype=np.float32)
    
    return _reconcile(S, P, W, y_hat)

In [None]:
#hide
for method in ['exact']:
    test_close(
        empirical_risk_minimization(
            S, 
            S @ y_hat_bottom, 
            method,
            lambda_reg=1e-3
        ),
        S @ y_hat_bottom
    )