In [4]:
import pandas as pd
import numpy as np
intervals= [
    {'lower': 'q_1', 'upper': 'q_99', 'alpha': 0.02},    # 98% PI
    {'lower': 'q_2', 'upper': 'q_97', 'alpha': 0.05}, # 95% PI
    {'lower': 'q_5', 'upper': 'q_95', 'alpha': 0.1},     # 90% PI
    {'lower': 'q_10', 'upper': 'q_90', 'alpha': 0.2},    # 80% PI
    {'lower': 'q_15', 'upper': 'q_85', 'alpha': 0.3},    # 70% PI
    {'lower': 'q_20', 'upper': 'q_80', 'alpha': 0.4},    # 60% PI
    {'lower': 'q_25', 'upper': 'q_75', 'alpha': 0.5},    # 50% PI
    {'lower': 'q_30', 'upper': 'q_70', 'alpha': 0.6},    # 40% PI
    {'lower': 'q_35', 'upper': 'q_65', 'alpha': 0.7},    # 30% PI
    {'lower': 'q_40', 'upper': 'q_60', 'alpha': 0.8},    # 20% PI
    {'lower': 'q_45', 'upper': 'q_55', 'alpha': 0.9}     # 10% PI
]

def symmetric_mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    # 避免分母为 0（如果 y_true + y_pred = 0，替换成很小的数）
    denominator = (np.abs(y_true) + np.abs(y_pred))
    denominator = np.where(denominator == 0, 1e-10, denominator)
    return np.mean(np.abs(y_pred - y_true) / denominator) * 100  # 百分比形式

def interval_score(y, lower, upper, alpha):
    width = upper - lower
    penalty_lower = (2 / alpha) * (lower - y) * (y < lower)
    penalty_upper = (2 / alpha) * (y - upper) * (y > upper)
    return width + penalty_lower + penalty_upper

def calculate_coverate(t,rate='95'):
    intervals = {
              "95":  {'lower': 'q_2', 'upper': 'q_97'},  # 95%
              "80":   {'lower': 'q_10', 'upper': 'q_90'},     # 80%
              "50":   {'lower': 'q_25', 'upper': 'q_75'},     # 50%
    }
    interval = intervals[rate]

    y = t['true']
    metrics = {}
    lower = t[interval['lower']] if interval['lower'] else -np.inf
    upper = t[interval['upper']] if interval['upper'] else np.inf
    correct = (y >= lower) & (y <= upper)
    coverate = np.mean(correct)
    return coverate

def calculate_wis(row):
    y = row['true']
    median = y
    K = len(intervals)  # 区间数量

    # 计算所有区间的 IS
    interval_scores = sum(
        0.5 * interval['alpha'] *interval_score(y, row[interval['lower']], row[interval['upper']], interval['alpha'])
        for interval in intervals
    )

    # 中位数误差
    median_penalty = 0.5 * abs(y - median)

    # WIS
    wis = (interval_scores + median_penalty) / (K + 0.5)
    return wis

def median_absolute_error(y_true, y_pred):
    absolute_errors = np.abs(y_true - y_pred)
    return np.median(absolute_errors)

def calculate_accuracy_with_tolerance(true_values, pred_values, tolerance=0.25):
    lower_bound = true_values * (1 - tolerance)
    upper_bound = true_values * (1 + tolerance)
    correct = (pred_values >= lower_bound) & (pred_values <= upper_bound)
    accuracy = np.mean(correct)
    return accuracy

def smape_score(y_true, y_pred):
    return  np.mean(2 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred) + 1e-8))

In [5]:
import pandas as pd
from datetime import timedelta
from sklearn.metrics import mean_absolute_error
import numpy as np

# 假设您已经有这些函数定义
# def calculate_wis(row):
# def calculate_coverate(df, rate='95'):
# def smape_score(y_true, y_pred):

models = ['constant', 'gru', 'tcn', 'Nbeats', 'itransformer', 'ensemble_model']
selected_steps = [0, 1, 2, 3, 4, 5, 6, 7, 8]
modes = [ 'base_week_holi_pf','base_week']

# 创建第一个Excel文件（不转置版本）
with pd.ExcelWriter('all_models_results_original.xlsx', engine='openpyxl') as writer_original:

    # 创建第二个Excel文件（转置版本）
    with pd.ExcelWriter('all_models_results_transposed.xlsx', engine='openpyxl') as writer_transposed:

        for model in models:
            # 为每个模型创建空的DataFrame
            model_results = pd.DataFrame()

            for step in selected_steps:
                # 为每个预测步长创建DataFrame
                step_results = pd.DataFrame()

                for mode in modes:
                    # 构建文件路径
                    path = ''
                    if model == 'constant':
                        path = 'forc_baseline.csv'
                    elif model == 'ensemble_model':
                        path = 'ensemble_model_with_intervals.csv'
                    elif model == 'ensemble_model_no_ol':
                        path = 'ensemble_model_with_intervals_no_ol.csv'
                    elif model in ['gru', 'tcn', 'Nbeats', 'itransformer']:
                        path = f'{model}/{model}_{mode}_42test.csv'
                    else:
                        continue

                    try:
                        # 读取数据并过滤
                        res = pd.read_csv(path)
                        if 'mode' in res.columns:
                            res = res[(res['mode'] == 'train')|(res['mode'] == 'train_seed42')]
                        res['date'] = pd.to_datetime(res['date'])
                        res['date_origin'] = res['date'] - res['week_ahead'] * timedelta(days=7)

                        invalid_dates = res.groupby('date').filter(lambda x: (x['true'] <= 1).any())['date'].unique()
                        res = res[~res['date_origin'].isin(invalid_dates)]
                        res = res[(res['date_origin'] >= '2024-02-11')]
                        res = res.dropna()

                        # 获取当前步长的数据
                        t = res[res['week_ahead'] == step]
                        if len(t) == 0:
                            continue

                        # 计算指标
                        wis = t.apply(calculate_wis, axis=1)
                        wis_mean = np.mean(wis)
                        mae = mean_absolute_error(t['true'], t['point'])
                        cov_95 = calculate_coverate(t, rate='95')
                        smape = smape_score(t['true'], t['point'])

                        # 创建列名
                        step_results[f"{mode}_WIS"] = [wis_mean]
                        step_results[f"{mode}_MAE"] = [mae]
                        step_results[f"{mode}_Coverage_95"] = [cov_95]
                        step_results[f"{mode}_SMAPE"] = [smape]

                    except Exception as e:
                        print(f"Error processing {model} with mode {mode} at step {step}: {e}")
                        continue

                # 将当前步长的结果添加到模型结果中
                if not step_results.empty:
                    step_results.index = [f"week{step}"]
                    model_results = pd.concat([model_results, step_results], axis=0)

            # 保存到两个Excel文件
            if not model_results.empty:
                # 不转置版本（原始格式）
                model_results.to_excel(writer_original, sheet_name=model)

                # 转置版本
                model_results_transposed = model_results.T
                model_results_transposed.to_excel(writer_transposed, sheet_name=model)
            else:
                print(f"No data found for model: {model}")

print("所有模型结果已保存到两个文件：")
print("1. all_models_results_original.xlsx (不转置版本)")
print("2. all_models_results_transposed.xlsx (转置版本)")

所有模型结果已保存到两个文件：
1. all_models_results_original.xlsx (不转置版本)
2. all_models_results_transposed.xlsx (转置版本)


In [6]:
# import pandas as pd
# from datetime import timedelta
# from sklearn.metrics import mean_absolute_error
# import numpy as np

# # 假设您已经有这些函数定义
# # def calculate_wis(row):
# # def calculate_coverate(df, rate='95'):
# # def smape_score(y_true, y_pred):

# models = ['constant', 'gru', 'tcn', 'Nbeats', 'itransformer', 'ensemble_model']
# selected_steps = [0,1, 2,3, 4,5, 6,7, 8]
# # modes = ['base_stage_holi_pf_ol', 'base_stage_holi_pf','base_stage_holi','base_stage_pf','base_stage','base']
# modes = ['base_stage_holi_pf','base']

# # 创建新的Excel文件
# with pd.ExcelWriter('all_models_results_by_mode.xlsx', engine='openpyxl') as writer:
#     for model in models:
#         # 为每个模型创建空的DataFrame
#         model_results = pd.DataFrame()

#         for mode in modes:
#             # 为每个mode创建一行数据
#             mode_data = {}

#             for step in selected_steps:
#                 # 构建文件路径
#                 path = ''
#                 if model == 'constant':
#                     path = 'forc_baseline.csv'
#                 elif model == 'ensemble_model':
#                     path = 'ensemble_model_with_intervals.csv'
#                 elif model == 'ensemble_model_no_ol':
#                     path = 'ensemble_model_with_intervals_no_ol.csv'
#                 elif model in ['gru', 'tcn', 'Nbeats', 'itransformer']:
#                     path = f'{model}/{model}_{mode}_42test.csv'
#                 else:
#                     continue

#                 try:
#                     # 读取数据并过滤
#                     res = pd.read_csv(path)
#                     if 'mode' in res.columns:
#                         res = res[(res['mode'] == 'train')|(res['mode'] == 'train_seed42')]
#                     res['date'] = pd.to_datetime(res['date'])
#                     res['date_origin'] = res['date'] - res['week_ahead'] * timedelta(days=7)

#                     invalid_dates = res.groupby('date').filter(lambda x: (x['true'] <= 1).any())['date'].unique()
#                     res = res[~res['date_origin'].isin(invalid_dates)]
#                     res = res[(res['date_origin'] >= '2023-11-26')]
#                     # res = res[(res['date_origin'] < '2024-11-26')]
#                     # res = res[(res['date_origin'] > '2024-11-26')]
#                     res = res.dropna()

#                     # 获取当前步长的数据
#                     t = res[res['week_ahead'] == step]
#                     if len(t) == 0:
#                         continue

#                     # 计算指标
#                     wis = t.apply(calculate_wis, axis=1)
#                     wis_mean = np.mean(wis)
#                     mae = mean_absolute_error(t['true'], t['point'])
#                     cov_95 = calculate_coverate(t, rate='95')
#                     smape = smape_score(t['true'], t['point'])

#                     # 添加指标到mode_data字典
#                     mode_data[f"week{step}_WIS"] = wis_mean
#                     mode_data[f"week{step}_MAE"] = mae
#                     # mode_data[f"week{step}_Coverage_95"] = cov_95
#                     mode_data[f"week{step}_SMAPE"] = smape

#                 except Exception as e:
#                     print(f"Error processing {model} with mode {mode} at step {step}: {e}")
#                     continue

#             # 将当前mode的结果添加到模型结果中
#             if mode_data:
#                 mode_df = pd.DataFrame(mode_data, index=[mode])
#                 model_results = pd.concat([model_results, mode_df], axis=0)

#         # 保存到Excel文件
#         if not model_results.empty:
#             model_results.to_excel(writer, sheet_name=model)
#         else:
#             print(f"No data found for model: {model}")

# print("所有模型结果已保存到文件：all_models_results_by_mode.xlsx")
# print("格式说明：每个模型一个sheet，每行代表一个mode，列按照week顺序排列，每个week包含所有指标")