In [1]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import arch
from statsmodels.tsa.arima.model import ARIMA
from scipy.stats import norm
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.sandwich_covariance import cov_hac
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.stats.diagnostic import acorr_lm


In [2]:
def extract_model_info(model):
    coef_names = list(model.params.index)
    xreg_names = list(model.model.exog_names) if model.model.exog is not None else []
    coef_filtered = [name for name in coef_names if name not in xreg_names]
    drift = "drift" in xreg_names
    xreg_names = [x for x in xreg_names if x != "drift"]
    return {"drift": drift, "xreg": xreg_names, "coef": coef_filtered}


In [3]:
def _predict_t1(nobs, J, n_ahead):
    """
    Purpose:
    Compute the argument "t" to get ex-post forecasts (obtained from .predit(..., fixed.n.ahead=TRUE)) as function of:
    - nobs = number of observations in the time series
    - J = how many ex-post forecasts to compute
    - n.ahead = horizon = how many steps ahead to go
    The function is implemented because 'hand' calculation of "t" is a bit tricky for students.

    Arguments
    nobs:   (numeric[1]) number of observations.
    J:      (numeric[1]) number of ex-post forecasts to compute.
    n.ahead: (numeric[1]) number of steps ahead (horizon).

    Value
    The argument x extended some steps ahead.
    """
    return nobs - J - n_ahead + 1

In [4]:
def _sqrt_inv(x): return x ** 2
def _log_inv(x): return np.exp(x)
def _log10_inv(x): return 10 ** x

In [5]:
def inverse_transform(ans, g):
    if g == "sqrt":
        return np.square(ans)
    elif g == "log":
        return np.exp(ans)
    elif g == "log10":
        return np.power(10, ans)
    return ans

def predict_naive(model_fit, J, n_ahead, g="id"):
    d = model_fit.model.k_diff
    D = model_fit.model.k_seasonal_diff
    S = model_fit.model.seasonal_periods or 0
    y = model_fit.model.endog

    naive = S if D > 0 else (1 if d > 0 else 0)
    if naive == 0:
        ans = np.full(J, np.mean(y))
    else:
        steps = naive * ((n_ahead // naive) + int(n_ahead % naive > 0))
        ans = y[-J - steps: -steps]

    return inverse_transform(ans, g)

In [6]:
def predict_changing_h(object_model, n_ahead, t, y, xreg=None):
    y1 = y[:t]
    ind_out = range(t, t + n_ahead)
    if xreg is not None and xreg.shape[0] > 0:
        xreg1 = xreg.iloc[:t, :]
        xreg2 = xreg.iloc[t:t + n_ahead, :]
    else:
        xreg1 = xreg2 = None

    mod_info = extract_model_info(object_model)
    if mod_info['xreg']:
        model = SARIMAX(y1, exog=xreg1, order=object_model.model.order)
    else:
        model = ARIMA(y1, order=object_model.model.order)
    
    fit1 = model.fit()
    forecast_res = fit1.get_forecast(steps=n_ahead, exog=xreg2)
    mean = forecast_res.predicted_mean
    se = (forecast_res.conf_int(alpha=0.05).iloc[:, 1] - mean) / norm.ppf(0.975)
    
    return pd.DataFrame({
        "t": list(range(t + 1, t + n_ahead + 1)),
        "mean": mean,
        "se": se
    })


In [7]:
def predict_fixed_h(object_model, n_ahead, t, y, xreg=None):
    nobs = len(y)
    mod_info = extract_model_info(object_model)
    drift = mod_info['drift']

    if drift:
        x_drift = np.arange(start=1, stop=nobs + 1)
        if xreg is not None:
            xreg['drift'] = x_drift
        else:
            xreg = pd.DataFrame({"drift": x_drift})

    if xreg is not None and mod_info['xreg']:
        xreg = xreg[mod_info['xreg'] + (["drift"] if drift else [])]

    results = []
    for pos, current_t in enumerate(range(t, nobs - n_ahead + 1)):
        y_train = y[:current_t]
        y_test_idx = current_t + n_ahead - 1
        x_train = xreg.iloc[:current_t, :] if xreg is not None else None
        x_test = xreg.iloc[current_t:current_t + n_ahead, :] if xreg is not None else None

        if mod_info['xreg']:
            model = SARIMAX(y_train, exog=x_train, order=object_model.model.order)
        else:
            model = ARIMA(y_train, order=object_model.model.order)
        
        fit1 = model.fit()
        forecast_res = fit1.get_forecast(steps=n_ahead, exog=x_test)
        mean = forecast_res.predicted_mean.iloc[-1]
        se = (forecast_res.conf_int(alpha=0.05).iloc[-1, 1] - mean) / norm.ppf(0.975)
        results.append([y_test_idx + 1, mean, se])

    return pd.DataFrame(results, columns=["t", "mean", "se"])


In [8]:
def predict(object_model, n_ahead, t, y=None, xreg=None, fixed_n_ahead=True):
    if y is None:
        y = object_model.model.endog
    if xreg is None and hasattr(object_model.model, 'exog') and object_model.model.exog is not None:
        xreg = pd.DataFrame(object_model.model.exog)

    if t > len(y):
        raise ValueError("'t' must be <= len(y)")
    
    if not fixed_n_ahead:
        pred = predict_changing_h(object_model, n_ahead, t, y, xreg)
    else:
        pred = predict_fixed_h(object_model, n_ahead, t, y, xreg)

    return {"t": t, "n.ahead": n_ahead, "fixed.n.ahead": fixed_n_ahead, "pred": pred}


In [9]:
def coefs2poly(fit):
    # Placeholder: extract AR/MA coefficients and structure them into polynomial terms
    return {
        'ar': fit.arparams if hasattr(fit, "arparams") else [],
        'ma': fit.maparams if hasattr(fit, "maparams") else []
    }

def outliers_effects(mo, n, weights, delta, pars, freq):
    # Placeholder for building xreg matrix of outlier effects
    # mo is expected to be a DataFrame with 'type', 'time', etc.
    xreg = np.zeros((n, len(mo)))
    
    for idx, row in mo.iterrows():
        outlier_type = row["type"]
        time = int(row["time"])  # the time index of outlier
        
        if outlier_type == "AO":
            xreg[time, idx] = 1
        elif outlier_type == "LS":
            xreg[time:, idx] = 1
        elif outlier_type == "TC":
            for t in range(time, n):
                xreg[t, idx] = delta ** (t - time)
        else:
            raise ValueError(f"Unknown outlier type: {outlier_type}")
    
    return pd.DataFrame(xreg, columns=[f"{row['type']}_{row['time']}" for _, row in mo.iterrows()])


In [10]:
def oeff_4_predict(obj, n_ahead, delta=0.7, type="."):
    """
    Translate .oeff.4.predict from R.
    
    Parameters:
    - obj: a model object from TSO-like process, with `outliers`, `y`, and `fit` attributes
    - n_ahead: number of steps ahead for forecast
    - delta: a tuning parameter for outlier effect decay
    - type: either "." or "", indicating whether to return full xreg or just forecast horizon
    
    Returns:
    - xreg: DataFrame of outlier effects as external regressors
    """
    
    type = type[0] if isinstance(type, (list, tuple)) else type
    n_ahead = int(n_ahead)

    if obj.get("outliers") is None or len(obj["outliers"]) == 0 or type not in ("", "."):
        return None

    n_obs = len(obj["y"])
    freq = obj.get("freq", 1)  # Assuming frequency is stored, else default 1
    pars = coefs2poly(obj["fit"])  # Placeholder for ARIMA to poly coefficients
    mo = obj["outliers"]

    xreg = outliers_effects(mo=mo, n=n_obs + n_ahead, weights=False,
                            delta=delta, pars=pars, freq=freq)

    if type == "":
        xreg = xreg.iloc[n_obs:n_obs + n_ahead, :]

    return xreg


In [11]:
def error_measures(y, fit, naive):
    nf = len(fit)
    y1 = y[-nf:]
    u = y1 - fit

    ME = np.mean(u)
    MAE = np.mean(np.abs(u))
    RMSE = np.sqrt(np.mean(u ** 2))

    if np.all(y1 > 0):
        ur = u / y1
        MPE = np.mean(ur)
        MAPE = np.mean(np.abs(ur))
        RMSPE = np.sqrt(np.mean(ur ** 2))
    else:
        MPE = MAPE = RMSPE = None

    u1 = y1 - naive
    ScMAE = MAE / np.mean(np.abs(u1))
    ScRMSE = RMSE / np.sqrt(np.mean(u1 ** 2))

    return {
        "ME": ME, "MAE": MAE, "RMSE": RMSE,
        "MPE": MPE, "MAPE": MAPE, "RMSPE": RMSPE,
        "ScMAE": ScMAE, "ScRMSE": ScRMSE
    }


In [12]:
def mincer_zarnowitz(y, fit):
    X = sm.add_constant(fit)
    model = sm.OLS(y, X).fit()
    robust_cov = cov_hac(model, maxlags=1)
    summary = model.get_robustcov_results(cov_type='HAC', cov_kwds={'maxlags':1})
    return summary.summary()


In [13]:
def diebold_mariano(e1, e2, h, power=2, msg=""):
    """
    Perform the Diebold-Mariano test for equal predictive accuracy.

    Parameters:
    -----------
    e1, e2 : array-like
        Forecast errors from model 1 and model 2 (y_true - y_pred)
    h : int
        Forecast horizon (usually 1)
    power : int, optional
        The power of the loss function (default=2 for squared errors)
    msg : str, optional
        Optional message to print

    Returns:
    --------
    dict with keys:
        'statistic': DM test statistic
        'p_value': p-value of the test
        'horizon': forecast horizon h
        'power': power used in loss function
    """
    e1 = np.asarray(e1)
    e2 = np.asarray(e2)
    
    if len(e1) != len(e2):
        raise ValueError("Error vectors e1 and e2 must have the same length.")

    # Compute loss differential d_t = |e1_t|^power - |e2_t|^power
    d = np.abs(e1)**power - np.abs(e2)**power
    T = len(d)
    
    # Mean of loss differential
    d_bar = np.mean(d)

    # Compute the autocovariance of d up to lag h-1
    def autocovariance(x, lag):
        return np.cov(x[lag:], x[:-lag])[0,1] if lag > 0 else np.var(x, ddof=1)

    gamma = [autocovariance(d, lag) for lag in range(h)]
    
    # Variance estimate with Newey-West estimator
    var_d = gamma[0] + 2 * sum((1 - lag/h) * gamma[lag] for lag in range(1, h))
    
    # DM test statistic
    DM_stat = d_bar / np.sqrt(var_d / T)
    
    # Two-sided p-value
    p_value = 2 * (1 - norm.cdf(np.abs(DM_stat)))

    if msg:
        print(f"{msg} Horizon: {h}, Power: {power}, DM Statistic: {DM_stat:.4f}, p-value: {p_value:.4f}")

    return {
        "statistic": DM_stat,
        "p_value": p_value,
        "horizon": h,
        "power": power
    }


In [14]:
def sqrt_inv(x): return x**2
def sqrt_der(x): return 2 * x
def log_inv(x): return np.exp(x)
def log_der(x): return np.exp(x)
def log10_inv(x): return 10**x
def log10_der(x): return np.log(10) * 10**x


In [15]:
import numpy as np

def loglik(fit, g="id"):
    """Calculate adjusted log-likelihood, AIC, and BIC based on transformation g."""
    loglik = fit.llf
    aic = fit.aic
    bic = fit.bic

    valid_g = ["id", "sqrt", "log", "log10"]
    if g not in valid_g:
        raise ValueError(f"g must be one of {valid_g}")

    if g in ["sqrt", "log", "log10"]:
        w = fit.fittedvalues + fit.resid  # Estimated original y (pre-transform)
        f_inv = {"sqrt": sqrt_inv, "log": log_inv, "log10": log10_inv}[g]
        f_der = {"sqrt": sqrt_der, "log": log_der, "log10": log10_der}[g]
        y = f_inv(w)
        x1 = np.sum(np.log(f_der(y)))
        loglik += x1
        aic -= 2 * x1
        bic -= 2 * x1

    return {"g": g, "loglik": loglik, "aic": aic, "bic": bic}


In [16]:
def transformed_mean(mean, sd, g="id"):
    """Compute the mean of the original variable given a transformed mean/sd."""
    if g == "id":
        return {"g": g, "mean": mean}
    elif g == "sqrt":
        return {"g": g, "mean": mean**2 + sd**2}
    elif g == "log":
        return {"g": g, "mean": np.exp(mean + 0.5 * sd**2)}
    elif g == "log10":
        log10_base = np.log(10)
        mean_adj = mean * log10_base
        sd_adj = sd * log10_base
        return {"g": g, "mean": np.exp(mean_adj + 0.5 * sd_adj**2)}
    else:
        raise ValueError("Unsupported transformation")


In [17]:
def pred_bands(pred, alpha=0.05, g="id"):
    median = pred["pred"]["mean"]
    mean = transformed_mean(median, pred["pred"]["se"], g)["mean"]

    z = norm.ppf(1 - alpha / 2)
    lower = median - z * pred["pred"]["se"]
    upper = median + z * pred["pred"]["se"]

    if g != "id":
        f_inv = {"sqrt": sqrt_inv, "log": log_inv, "log10": log10_inv}.get(g)
        if f_inv is None:
            raise ValueError("Unsupported transformation")
        median = f_inv(median)
        lower = f_inv(lower)
        upper = f_inv(upper)

    return {
        "g": g, "t": pred["pred"]["t"], "mean": mean, "median": median,
        "lower": lower, "upper": upper, "se": pred["pred"]["se"], "alpha": alpha
    }
