In [None]:
# 06 Combined Error Metrics
## DM test

In [None]:
# import libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_absolute_error
import itertools
import statsmodels.api as sm
import os
from joblib import Parallel, delayed
import properscoring as ps

In [None]:
from epiweeks import Week, Year
from datetime import date
def create_epiweek(date):
    return Week.fromdate(date)
def create_epiweekplot(epiweek):
    epiweek = str(epiweek)
    return F'Y{epiweek[:4]}W{epiweek[4:]}'
def create_epiweek_fromstr(str):
    return Week.fromstring(str)

In [None]:
# def generate_error_metrics(dataset, target_var):
#     pred = dataset.copy()
#     model_list = list(pred.columns.values)
#     y = pred[[target_var]]
#     model_list.remove(target_var)

#     error_df = pd.DataFrame()
#     #print(model_list)

#     for model in model_list:
#         model_val = pred[[model]].dropna()
#         window_start = model_val.index[0]
#         window_end = model_val.index[-1]
#         y_val = y.loc[window_start:window_end].copy()

#         ## Diebold-Mariano against Naive
#         if model == 'naive':
#             dm_stat, pvalue = 0, 0
#         else:
#             dm_stat, pvalue = dm_test(y_val, naive_val, model_val)
#             if pvalue < 0.05:
#                 pvalue = 'R'
#             else:
#                 pvalue = 'A'

#         error_df.at[model, 'DM'], error_df.at[model, 'pval'] = dm_stat, pvalue

#     return error_df


In [None]:
from itertools import islice
from typing import Sequence, Callable, List, Tuple
from math import lgamma, fabs, isnan, nan, exp, log, log1p, sqrt


class InvalidParameterException(Exception):
    def __init__(self, message: str):
        super().__init__(message)


class ZeroVarianceException(ArithmeticError):
    def __init__(self, message: str):
        super().__init__(message)


def autocovariance(X: Sequence[float], k: int, mean: float) -> float:
    """
    Returns the k-lagged autocovariance for the input iterable.
    """
    return sum((a - mean) * (b - mean) for a, b in zip(islice(X, k, None), X)) / len(X)


def log_beta(a: float, b: float) -> float:
    """
    Returns the natural logarithm of the beta function computed on
    arguments `a` and `b`.
    """
    return lgamma(a) + lgamma(b) - lgamma(a + b)


def evaluate_continuous_fraction(
    fa: Callable[[int, float], float],
    fb: Callable[[int, float], float],
    x: float,
    *,
    epsilon: float = 1e-10,
    maxiter: int = 10000,
    small: float = 1e-50
) -> float:
    """
    Evaluate a continuous fraction.
    """
    h_prev = fa(0, x)
    if fabs(h_prev < small):
        h_prev = small

    n: int = 1
    d_prev: float = 0.0
    c_prev: float = h_prev
    hn: float = h_prev

    while n < maxiter:
        a = fa(n, x)
        b = fb(n, x)

        dn = a + b * d_prev
        if fabs(dn) < small:
            dn = small

        cn = a + b / c_prev
        if fabs(cn) < small:
            cn = small

        dn = 1 / dn
        delta_n = cn * dn
        hn = h_prev * delta_n

        if fabs(delta_n - 1.0) < epsilon:
            break

        d_prev = dn
        c_prev = cn
        h_prev = hn

        n += 1

    return hn


def regularized_incomplete_beta(
    x: float, a: float, b: float, *, epsilon: float = 1e-10, maxiter: int = 10000
) -> float:
    if isnan(x) or isnan(a) or isnan(b) or x < 0 or x > 1 or a <= 0 or b <= 0:
        return nan

    if x > (a + 1) / (2 + b + a) and 1 - x <= (b + 1) / (2 + b + a):
        return 1 - regularized_incomplete_beta(
            1 - x, b, a, epsilon=epsilon, maxiter=maxiter
        )

    def fa(n: int, x: float) -> float:
        return 1.0

    def fb(n: int, x: float) -> float:
        if n % 2 == 0:
            m = n / 2.0
            return (m * (b - m) * x) / ((a + (2 * m) - 1) * (a + (2 * m)))

        m = (n - 1.0) / 2.0
        return -((a + m) * (a + b + m) * x) / ((a + (2 * m)) * (a + (2 * m) + 1.0))

    return exp(
        a * log(x) + b * log1p(-x) - log(a) - log_beta(a, b)
    ) / evaluate_continuous_fraction(fa, fb, x, epsilon=epsilon, maxiter=maxiter)


def dm_test(
    P1: Sequence[float],
    P2: Sequence[float],
    *,
    h: int = 1,
    one_sided: bool = False,
    harvey_correction: bool = True
) -> Tuple[float, float]:
    r"""
    Performs the Diebold-Mariano test using precomputed loss values.
    The null hypothesis is that the two forecasts (`P1`, `P2`) have the same accuracy.

    Parameters
    ----------
    P1: Sequence[float]
        First loss series.

    P2: Sequence[float]
        Second loss series.

    h: int
        The forecast horizon. Default is 1.

    one_sided: bool
        If set to true, returns the p-value for a one-sided test instead of a two-sided test. Default is false.

    harvey_correction: bool
        If set to true, uses a modified test statistic as per Harvey, Leybourne and Newbold (1997).

    Returns
    -------
    A tuple of two values. The first is the test statistic, the second is the p-value.
    """
    if not (len(P1) == len(P2)):
        raise InvalidParameterException(
            "Prediction series must have the same length."
        )

    if h <= 0:
        raise InvalidParameterException(
            "Invalid parameter for horizon length. Must be a positive integer."
        )

    n = len(P1)
    D = [l1 - l2 for l1, l2 in zip(P1, P2)]
    mean = sum(D) / n

    V_d = 0.0
    for i in range(h):
        cov = autocovariance(D, i, mean)
        if i != 0:
            cov *= 2
        V_d += cov

    V_d /= n

    if V_d == 0:
        raise ZeroVarianceException(
            "Variance of the DM statistic is zero. Maybe the loss series are identical?"
        )

    if harvey_correction:
        harvey_adj = sqrt((n + 1 - 2 * h + h * (h - 1) / n) / n)
        dmstat = harvey_adj * mean / sqrt(V_d)
    else:
        dmstat = mean / sqrt(V_d)

    pvalue = regularized_incomplete_beta(
        (n - 1) / ((n - 1) + dmstat ** 2), 0.5 * (n - 1), 0.5
    )

    if one_sided:
        if dmstat > 0:
            pvalue = pvalue
        else:
            pvalue = 1

    return dmstat.item(), pvalue

In [None]:
def crps(y_val, y_pred, model, target_var):
    np.random.seed(0)
    crps_df = pd.DataFrame()
    
    for epiweek in y_val.index:
        
        crps_df.at[epiweek, model] = ps.crps_ensemble(y_val.loc[epiweek, target_var], 
                                                      np.array(y_pred.loc[epiweek], dtype='float64'))
    
    return crps_df

In [None]:
def prepare_diebold_mariano(pred_models_path, pred_combis_path, target_var, model_1, model_2, model_list, combi_list):
    
    if model_1 in model_list:
        pred_model1_file = os.path.join(pred_models_path, f'{model_1}.csv')
    else:
        pred_model1_file = os.path.join(pred_combis_path, f'{model_1}.csv')
        
    
    if model_2 in model_list:
        pred_model2_file = os.path.join(pred_models_path, f'{model_2}.csv')
    else:
        pred_model2_file = os.path.join(pred_combis_path, f'{model_2}.csv')
    
    # To get y_val, so use the model 'ar_pure'
    ar_pure_file = os.path.join(pred_models_path, 'ar_pure.csv')
    ar_pure_val = pd.read_csv(ar_pure_file, parse_dates = [0], dayfirst = True)
    ar_pure_val['epiweek'] = ar_pure_val['epiweek'].apply(create_epiweek_fromstr)
    ar_pure_val = ar_pure_val.set_index('epiweek')
    y_val = ar_pure_val[[target_var]]
    
    if os.path.isfile(pred_model1_file) and os.path.isfile(pred_model2_file):
        model_1_val = pd.read_csv(pred_model1_file, parse_dates = [0], dayfirst = True)  
        model_1_val['epiweek'] = model_1_val['epiweek'].apply(create_epiweek_fromstr)
        model_1_val = model_1_val.set_index('epiweek')
        model_1_val_crps = crps(y_val.copy(), model_1_val.iloc[:,1:].copy(), model_1, target_var)

        model_2_val = pd.read_csv(pred_model2_file, parse_dates = [0], dayfirst = True)  
        model_2_val['epiweek'] = model_2_val['epiweek'].apply(create_epiweek_fromstr)
        model_2_val = model_2_val.set_index('epiweek')
        model_2_val_crps = crps(y_val.copy(), model_2_val.iloc[:,1:].copy(), model_2, target_var)
        


    return model_1_val_crps, model_2_val_crps


In [None]:
def evaluate_pvalue(pvalue):
    if pvalue < 0.05:
    #non-equivalent, i.e. we reject the null hypothesis that both models have equal predictive capability
    #non-equivalence in RED
        pvalue = -1
    else:
    #pvalue > 0.05
    #equivalent, i.e. we accept the null hypothesis that both models have equal predictive capability
    #not enough evidence to show that one model predictive better than the other
        pvalue = 1
    return pvalue

In [None]:
# def generate_diebold_mariano(dataset, target_var, step_name):
    
#     pred_models_path = os.path.join(pred_directory_path,step_name)
#     if os.path.isdir(pred_models_path):
#         for model_name in os.listdir(pred_models_path): # 'model_name' here includes the '.csv'
#             pred_file = os.path.join(pred_models_path, model_name)
#             model = model_name[0:-4]
#             if os.path.isfile(pred_file):
#                 y_pred = pd.read_csv(pred_file, parse_dates = [0], dayfirst = True)  
#                 y_pred['epiweek'] = y_pred['epiweek'].apply(create_epiweek_fromstr)
#                 y_pred = y_pred.set_index('epiweek') 
    
#     model_list = list(pred.columns.values)
#     y = pred[[target_var]]
#     model_list.remove(target_var)

#     diebold_mariano_dmstat_df = pd.DataFrame(index=model_list, columns=model_list)
#     diebold_mariano_pvalue_df = pd.DataFrame(index=model_list, columns=model_list)
    
#     for model_1 in model_list:
#         for model_2 in model_list:
#             if model_1 == model_2:
#                 dm_stat, pvalue = 0, 0
#             else:
#                 if pd.isna(diebold_mariano_pvalue_df.loc[model_2, model_1]):
#                     model_1_val, model_2_val, y_val = prepare_diebold_mariano(pred, target_var, model_1, model_2)
#                     dm_stat, pvalue = dm_test(y_val, model_1_val, model_2_val, one_sided=True)
#                     pvalue = evaluate_pvalue(pvalue)
#                 else:
#                     dm_stat, pvalue = 0, 0
#             diebold_mariano_dmstat_df.at[model_1, model_2], diebold_mariano_pvalue_df.at[model_1, model_2] = dm_stat, pvalue
#     return diebold_mariano_dmstat_df, diebold_mariano_pvalue_df

In [None]:
def generate_diebold_mariano(target_var, pred_directory, pred_combi_directory):
    pred_directory_path = os.path.join(target_var, pred_directory)
    pred_combi_directory_path = os.path.join(target_var, pred_combi_directory)
    
    for step_name in os.listdir(pred_directory_path):
        pred_models_path = os.path.join(pred_directory_path,step_name)
        pred_combis_path = os.path.join(pred_combi_directory_path,step_name)
        if os.path.isdir(pred_models_path):
            model_combi_list = []
            model_list = []
            combi_list = []
            for model_name in os.listdir(pred_models_path): # 'model_name' here includes the '.csv'
#                 pred_file = os.path.join(pred_models_path, model_name) 
                model = model_name[0:-4]
                model_combi_list.append(model)
                model_list.append(model)
            for combi_name in os.listdir(pred_combis_path):
#                 pred_file = os.path.join(pred_combis_path, combi_name)
                combi = combi_name[0:-4]
                model_combi_list.append(combi)
                combi_list.append(combi)
                
                
            diebold_mariano_dmstat_df = pd.DataFrame(index=model_combi_list, columns=model_combi_list)
            diebold_mariano_pvalue_df = pd.DataFrame(index=model_combi_list, columns=model_combi_list)

            for model_1 in model_combi_list:
                for model_2 in model_combi_list:
                    if model_1 == model_2:
                        dm_stat, pvalue = 0, 0
                    else:
                        if pd.isna(diebold_mariano_pvalue_df.loc[model_2, model_1]):
                            model_1_val_crps, model_2_val_crps = prepare_diebold_mariano(pred_models_path, pred_combis_path, target_var, model_1, model_2, model_list, combi_list)
                            dm_stat, pvalue = dm_test(np.array(model_1_val_crps), np.array(model_2_val_crps), one_sided=True)
                            pvalue = evaluate_pvalue(pvalue)
                        else:
                            dm_stat, pvalue = 0, 0
                    diebold_mariano_dmstat_df.at[model_1, model_2], diebold_mariano_pvalue_df.at[model_1, model_2] = dm_stat, pvalue



            dmstat_path = os.path.join(target_var, 'dmstat')
            if not os.path.exists(dmstat_path):
                os.makedirs(dmstat_path)
            diebold_mariano_dmstat_df.to_csv(os.path.join(dmstat_path, f'{step_name}.csv'))

            pvalue_path = os.path.join(target_var, 'pvalue')
            if not os.path.exists(pvalue_path):
                os.makedirs(pvalue_path)
            diebold_mariano_pvalue_df.to_csv(os.path.join(pvalue_path, f'{step_name}.csv'))



In [None]:
def full_generate_diebold_mariano(target_variables_file, pred_directory, pred_combi_directory):
    target_variables = []
    with open(target_variables_file, 'r') as file:
        for line in file:
            # Remove linebreak which is the last character of the string
            target_variable = line[:-1]
            # Add item to the list
            target_variables.append(target_variable)
    print(target_variables)

    Parallel(n_jobs=-1, verbose=51)(delayed(generate_diebold_mariano)(target_var, 
                                                                    pred_directory, 
                                                                    pred_combi_directory) for target_var in target_variables)
    
full_generate_diebold_mariano('target_variables_new.txt', 'pred','combi_samples')