<a href="https://colab.research.google.com/github/angel870326/Monthly-Revenue-Forecasting/blob/main/Function/MonRevForecast.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

> 2023.04.08 Ssu-Yun Wang<br/>
[Github @angel870326](https://github.com/angel870326)

# **Monthly Revenue Forecasting with Random Forest Regressor & XGB Regressor - Model**

### Contents

##### 4. Functions
##### 5. Model Training
##### 6. Predicting and Evaluation
##### 9. Best and Worst Model

In [None]:
import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import STL

## **4. Functions**


In [None]:
# 往前推算月份
def back_month(yr: int, mon: int, back: int):
    '''
    往前推算月份。

    Parameters
    ----------
    yr: 第t期的年, mon: 第t期的月, back: 往前推算幾個月
    '''

    # 若當下月份(mon)減除往前期數(b)大於0，即還在同一年內
    if mon - back > 0:                         
        return str(yr) + '-' + str(mon - back)      # 直接回傳當下年份(yr)，以及當下月份(mon)減除往前期數(back)，所組成的字串
    # 不屬於同一年
    else:                               
        return back_month(yr - 1, mon, back - 12)   # 手動調減一年，並將往前期數(back)減少12個月

In [None]:
# 取得 t-back_most ~ t-1 期的 X 資料
def X_months(data: pd.DataFrame, y_yr: int, y_mon: int, back_most: int):
    '''
    取得 t-back_most ~ t-1 期的 X 資料。

    Parameters
    ----------
    data: 資料集; y_yr: 第t期的年, y_mon: 第t期的月, back_most: X的最早月份是第t期的往前多少月份
    '''

    months = []
    for b in range(back_most, 0, -1):               # 從 back_most 到 1
        months.append(back_month(y_yr, y_mon, b))   # 根據每個往前月份數(b)推算對應年月，儲存進 months 之中
    
    # 回傳 data 中的這些月份
    return data[months]

In [None]:
from datetime import datetime

# 取得起始年月到終止年月的每個年月
def month_range(y_start_yr: int, y_start_mon: int, y_end_yr: int, y_end_mon: int):
    '''
    取得起始年月到終止年月的每個年月。

    Parameters
    ----------
    y_start_yr: 起始年, y_start_mon: 起始月, y_end_yr: 終止年, y_end_mon: 終止月
    '''

    start_ym = str(y_start_yr) + '-' + str(y_start_mon)
    
    # pd.date_range 中的 end 為終止月的下一個月，因此須將終止年月加 1 個月
    if y_end_mon < 12:
        end_ym = str(y_end_yr) + '-' + str(y_end_mon + 1)   # 終止月為1-11月時，直接在月份+1
    else: 
        end_ym = str(y_end_yr + 1) + '-' + '1'              # 終止月為12月時，end 設為下一年的1月
    
    # 生成從起始年月到終止年月的每個年月
    month_list = pd.date_range(start=start_ym, end=end_ym, freq='M').to_period('M').strftime('%Y-%m').to_numpy()

    return month_list

In [None]:
# 對 X 做標準化（和 sklearn 的 StandardScaler 公式相同）
def standardize_X(data: pd.DataFrame):
    '''
    對 X 做標準化。

    Parameters 
    ----------
    data: X

    Returns
    ----------
    mean, std, std_data
    '''
    mean = data.mean(axis=1)
    std = data.std(axis=1)
    std_data = data.apply(lambda row: (row-mean[row.name])/std[row.name], axis=1)
    return mean, std, std_data

# 以 X 的平均數和標準差對 y 做標準化
def standardize_y(mean: pd.Series, std: pd.Series, data: pd.Series):
    '''
    以 X 的平均數和標準差對 y 做標準化。

    Parameters
    ----------
    mean: X 的平均數, std: X 的標準差, data: y
    
    Returns
    ----------
    std_data
    '''
    std_data = []
    for i in range(len(data)):
        std_data.append((data[i]-mean[i])/std[i])
    return std_data

# 將標準化的 y 轉回正常值
def standardized_y_back(mean: pd.Series, std: pd.Series, std_data: np.array):
    '''
    將標準化的 y 轉回原始值。

    Parameters 
    ----------
    mean: X 的平均數, std: X 的標準差, data: 標準化的 y

    Returns
    ----------
    data
    '''
    data = []
    for i in range(len(std_data)):
        data.append(std_data[i] * std[i] + mean[i])
    return data


In [None]:
# 將資料拆解為 trend, seasonal, residual
def decompose_data(data: pd.DataFrame):
    '''
    將資料拆解為 trend, seasonal, residual，並回傳三個資料集。

    Parameters
    ----------
    data: 資料集
    
    Returns
    ----------
    trend, seasonal, residual
    '''
    trend = pd.DataFrame(index=data.index, columns=[c + '_trend' for c in data.columns], dtype=float)       # 在日期變數後面加上影響因素名稱
    seasonal = pd.DataFrame(index=data.index, columns=[c + '_season' for c in data.columns], dtype=float)
    residual = pd.DataFrame(index=data.index, columns=[c + '_resid' for c in data.columns], dtype=float)

    for index, row in data.iterrows():
        decomposed_row = STL(row.to_numpy(), period=12, seasonal=13).fit()
        trend.loc[index] = np.round(decomposed_row.trend, 4)
        seasonal.loc[index] = np.round(decomposed_row.seasonal, 4)
        residual.loc[index] = np.round(decomposed_row.resid, 4)

    # decomposed_data = pd.concat([trend, seasonal, residual], axis=1)

    return trend, seasonal, residual

In [None]:
# 取得訓練和測試資料集
def get_train_test(data: pd.DataFrame, y_test_yr: int, y_test_mon: int, back_most: int, y_back: int):
    '''
    取得訓練和測試資料集。

    Parameters
    ----------
    data: 資料集, y_test_yr: 要預測的年, y_test_mon: 要預測的月, 
    back_most: X的最早月份是第t期的往前多少月份, 
    y_back: 訓練資料集的y要比測試資料集的y往前多少月份

    Returns 
    ----------
    X_train, y_train, X_test, y_test
    '''

    # 測試資料
    y_test = data[f'{y_test_yr}-{y_test_mon}']
    X_test = X_months(data, y_test_yr, y_test_mon, back_most)
    # 訓練資料
    y_train_ym = back_month(y_test_yr, y_test_mon, y_back)
    y_train = data[y_train_ym]
    X_train = X_months(data, int(y_train_ym.split("-")[0]), int(y_train_ym.split("-")[1]), back_most)
    # 更改X的變數名稱為 t-? 期
    X_test.columns = ["t-"+str(t) for t in range(back_most, 0, -1)]                                     
    X_train.columns = ["t-"+str(t) for t in range(back_most, 0, -1)]     

    return X_train, y_train, X_test, y_test

## **5. Model Training**

[RandomForestRegressor](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html) & [XGBRegressor](https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.XGBRegressor)

In [None]:
import time
# Evaluation
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_absolute_percentage_error
# Model
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestRegressor
from xgboost.sklearn import XGBRegressor

### **5.1 Parameter Grid for Grid Search**

In [None]:
# Parameter grid for grid search
rf_params = {"n_estimators": [100], 
             "random_state": [0], 
             "n_jobs": [-1]
             }

xgb_params = {"n_estimators": [100, 250, 500], 
              "objective": ['reg:squarederror'],
              "learning_rate":[0.1, 0.2],  # usually range from 0.01 to 0.2
              "random_state": [0],
              "n_jobs": [-1]
             }

### **5.2 訓練單一模型（Random Forest or XGB）**

In [None]:
# 訓練模型
def train_model(modelName, X_train, y_train):
    '''
    以預先設定好的參數訓練模型。

    Parameters
    ---------- 
    modelName: | 'rf' | 'xgb' |, X_train: 訓練資料的X, y_train: 訓練資料的y
    
    Returns
    ----------
    model
    '''
    if modelName == 'rf':
        model = RandomForestRegressor(n_estimators=100, random_state=0, n_jobs=-1) 
        model.fit(X_train, y_train)
    elif modelName == 'xgb':
        model = XGBRegressor(n_estimators=500, objective='reg:squarederror', learning_rate=0.1, random_state=0, n_jobs=-1, eval_metric=mean_squared_error) 
        model.fit(X_train, y_train)

    return model


# Grid Search 找出最佳模型參數 (Hyperparameters tuned by k-fold cross validation)
def search_best_model(modelName, X_train, y_train, cv: int, print_best_params: bool = False):
    '''
    做 Cross Validation 找出最佳模型參數。

    Parameters
    ----------
    modelName: | 'rf' | 'xgb' |, X_train: 訓練資料的X, y_train: 訓練資料的y, 
    cv: K-Fold, 
    print_best_params: 是否要印出最佳模型參數（預設為否）
    
    Returns
    ----------
    best_model
    '''
    if modelName == 'rf':
        base_estimator = RandomForestRegressor()
        params = rf_params.copy()
    elif modelName == 'xgb':
        base_estimator = XGBRegressor()
        params = xgb_params.copy()

    search = GridSearchCV(base_estimator, params, scoring='neg_root_mean_squared_error', cv=cv, refit=True, n_jobs=-1)
    search.fit(X_train, y_train)          # 對訓練資料做 cross validation，找出最佳模型參數，最後再以整體資料做訓練(refit)
    best_model = search.best_estimator_

    # 印出最佳模型參數
    if print_best_params == True:
        print(search.best_params_)

    return best_model

### **5.3 預先訓練多個模型並儲存**

2023.04.06

In [None]:
import pickle

# 訓練並儲存模型 (for grid search)
def trainMonthlyRevenue(data: pd.DataFrame, y_test_start_yr: int, y_test_start_mon: int, y_test_end_yr: int, y_test_end_mon: int, modelName: str, save_path: str):
    '''
    當使用 grid search 需要花費大量時間預先訓練並儲存模型時適用。
    
    Parameters
    ----------    
    data: 資料集, 
    y_test_start_yr: 預測起始年, y_test_start_mon: 預測起始月, 
    y_test_end_yr: 預測終止年, y_test_end_mon: 預測終止月, 
    modelName: | 'rf' | 'xgb' |, 
    save_path: 模型欲儲存的位置
    '''

    test_y_m = month_range(y_test_start_yr, y_test_start_mon, y_test_end_yr, y_test_end_mon)  # 所有要預測的年月
    back_most = 48    # 以前48個月的資料預測第t期
    cv = 5

    # 針對每個欲預測的年月
    for i in test_y_m:
        start = time.time()

        y_test_yr = int(i.split("-")[0])    # 預測的年
        y_test_mon = int(i.split("-")[1])   # 預測的月

        #-----------------------取得訓練資料集-----------------------
        # 原始資料
        X_train, y_train, X_test, y_test = get_train_test(data, y_test_yr, y_test_mon, back_most, y_back=12)    # y_train 為 y_test 往前推 12 個月

        # 平減資料（標準化資料）
        mean_train, std_train, X_train_def = standardize_X(X_train)
        y_train_def = standardize_y(mean_train, std_train, y_train)

        # 拆解資料
        trend_train, season_train, resid_train = decompose_data(X_train)
        X_train_dec = pd.concat([trend_train, season_train, resid_train], axis=1)

        # 拆解 + 平減資料（標準化資料）
        mean_train_dec, std_train_dec, X_train_dec_def = standardize_X(X_train_dec)
        y_train_dec_def = standardize_y(mean_train_dec, std_train_dec, y_train)

        # 消除 seasonal effect
        season_train.columns = X_train.columns.copy()
        X_train_season = X_train - season_train

        # 消除 seasonal effect + 平減資料（標準化資料）
        mean_train_season, std_train_season, X_train_season_def = standardize_X(X_train_season)
        y_train_season_def = standardize_y(mean_train_season, std_train_season, y_train)
        
        #-----------------------模型訓練-----------------------
        print_best_params = False   # 設定是否要印出最佳模型參數
        model = search_best_model(modelName, X_train, y_train, cv, print_best_params)
        def_model = search_best_model(modelName, X_train_def, y_train_def, cv, print_best_params)
        dec_model = search_best_model(modelName, X_train_dec, y_train, cv, print_best_params)
        dec_def_model = search_best_model(modelName, X_train_dec_def, y_train_dec_def, cv, print_best_params)
        season_model = search_best_model(modelName, X_train_season, y_train, cv, print_best_params)
        season_def_model = search_best_model(modelName, X_train_season_def, y_train_season_def, cv, print_best_params)

        #-----------------------儲存預測模型-----------------------
        pickle.dump(model, open(f'{save_path}/{i}/model.pkl', 'wb'))
        pickle.dump(def_model, open(f'{save_path}/{i}/def_model.pkl', 'wb'))
        pickle.dump(dec_model, open(f'{save_path}/{i}/dec_model.pkl', 'wb'))
        pickle.dump(dec_def_model, open(f'{save_path}/{i}/dec_def_model.pkl', 'wb'))
        pickle.dump(season_model, open(f'{save_path}/{i}/season_model.pkl', 'wb'))
        pickle.dump(season_def_model, open(f'{save_path}/{i}/season_def_model.pkl', 'wb'))
        
        print(f"{i} model saved. Using time:", "%.3f"%(time.time() - start), " secs.")


## **6. Predicting and Evaluation**

衡量指標：

*   RMSE (Root Mean Square Error)

$$RMSE = \sqrt{\frac{1}{n}\sum_{i=1}^n(\hat{y}_i - y_i)^2}$$

<br>

*   MAE (Mean Absolute Error)

$$MAE = \frac{1}{n}\sum_{i=1}^n|\hat{y}_i - y_i|$$

<br>

*   MAE% (MAE / mean of the sum of y_true)

$$MAE\% = \frac{\frac{1}{n}\sum_{i=1}^n|\hat{y}_i - y_i|}{\frac{1}{n}\sum_{i=1}^n y_i} = \frac{\sum_{i=1}^n|\hat{y}_i - y_i|}{\sum_{i=1}^n y_i}$$

<br>

*   MAPE (Mean Absolute Percentage Error)

$$MAPE(\%) = \frac{1}{n}\sum_{i=1}^n \frac{|\hat{y}_i - y_i|}{y_i}$$

<br>


In [None]:
# 評估預測結果好壞
def evaluatePerformance(y_true, y_pred, rmse: list, mae: list, mae_percent: list, mape: list):
    '''
    評估預測結果好壞。

    Parameters
    ----------
    y_true: 真實月營收, y_pred: 預測月營收, 
    rmse, mae, mae_percent, mape: 用來儲存各種衡量指標預測分數的 list
    ''' 
    rmse.append(round(mean_squared_error(y_true, y_pred, squared=False), 0))
    mae.append(round(mean_absolute_error(y_true, y_pred), 0))
    mae_percent.append(round(mean_absolute_error(y_true, y_pred) / y_true.mean(), 4))
    mape.append(round(mean_absolute_percentage_error(y_true, y_pred), 4))

# 彙整不同衡量指標的預測分數 
def savePerformace(scoresD, rmse: list, mae: list, mae_percent: list, mape: list, dataName: str):  
    '''
    彙整不同衡量指標的預測分數。

    Parameters
    ----------
    scoresD: 彙整所有分數的 dataframe, 
    rmse, mae, mae_percent, mape: 各種衡量指標的預測分數, 
    dataName: | 'org' | 'dec' | 'season' |
    ''' 
    scoresD[f'RMSE-{dataName}'] = rmse
    scoresD[f'MAE-{dataName}'] = mae
    scoresD[f'MAE%-{dataName}'] = mae_percent
    scoresD[f'MAPE-{dataName}'] = mape

In [None]:
import pickle

# 預測並儲存結果
def predictMonthlyRevenue(data: pd.DataFrame, y_test_start_yr: int, y_test_start_mon: int, y_test_end_yr: int, y_test_end_mon: int, modelName: str, search: bool = False, save_path: str = 'None'):
    '''
    預測月營收。
    
    Parameters
    ----------    
    data: 資料集, 
    y_test_start_yr: 預測起始年, y_test_start_mon: 預測起始月, 
    y_test_end_yr: 預測終止年, y_test_end_mon: 預測終止月, 
    modelName: | 'rf' | 'xgb' |, 
    search: grid search or not (default = False)
    save_path: 模型儲存的位置，search = True 時才需要 (default = 'None')

    Returns
    ----------    
    pred = 
    {"org": {"org": org_pred, "dec": dec_pred, "season": season_pred},
     "def": {"org": def_pred, "dec": dec_def_pred, "season": season_def_pred}
    },
    
    feature_importance = 
    {"org": {"org": feature_importance, "dec": feature_importance_dec, "season": feature_importance_season},
     "def": {"org": feature_importance_def, "dec": feature_importance_dec_def, "season": feature_importance_season_def}
    },

    scores
    '''

    start = time.time()
    test_y_m = month_range(y_test_start_yr, y_test_start_mon, y_test_end_yr, y_test_end_mon)  # 所有要預測的年月
    back_most = 48    # 以前48個月的資料預測第t期
    cv = 5

    org_pred = pd.DataFrame(index=data.index.tolist())          # 紀錄原始資料的預測值
    feature_importance = pd.DataFrame()                         # 紀錄模型所計算出的 feature importance
    def_pred = pd.DataFrame(index=data.index.tolist())          # 紀錄平減資料的預測值
    feature_importance_def = pd.DataFrame()                     # 紀錄模型所計算出的 feature importance

    dec_pred = pd.DataFrame(index=data.index.tolist())          # 紀錄拆解資料的預測值
    feature_importance_dec = pd.DataFrame()                     # 紀錄模型所計算出的 feature importance
    dec_def_pred = pd.DataFrame(index=data.index.tolist())      # 紀錄拆解資料+平減的預測值
    feature_importance_dec_def = pd.DataFrame()                 # 紀錄模型所計算出的 feature importance

    season_pred = pd.DataFrame(index=data.index.tolist())       # 紀錄消除 seasonal effect 資料的預測值
    feature_importance_season = pd.DataFrame()                  # 紀錄模型所計算出的 feature importance
    season_def_pred = pd.DataFrame(index=data.index.tolist())   # 紀錄消除 seasonal effect + 平減資料的預測值
    feature_importance_season_def = pd.DataFrame()              # 紀錄模型所計算出的 feature importance

    # 紀錄原始資料的預測分數
    rmse_list = []
    mae_list = []
    mae_percent_list = []
    mape_list = []

    # 紀錄平減資料的預測分數
    rmse_list_def = []
    mae_list_def = []
    mae_percent_list_def = []
    mape_list_def = []

    # 紀錄拆解資料的預測分數
    rmse_list_dec = []
    mae_list_dec = []
    mae_percent_list_dec = []
    mape_list_dec = []

    # 紀錄拆解+平減資料的預測分數
    rmse_list_dec_def = []
    mae_list_dec_def = []
    mae_percent_list_dec_def = []
    mape_list_dec_def = []

    # 紀錄消除 seasonal effect 資料的預測分數
    rmse_list_season = []
    mae_list_season = []
    mae_percent_list_season = []
    mape_list_season = []

    # 紀錄消除 seasonal effect + 平減資料的預測分數
    rmse_list_season_def = []
    mae_list_season_def = []
    mae_percent_list_season_def = []
    mape_list_season_def = []

    # 彙整所有預測分數
    scores = pd.DataFrame(index=test_y_m)

    # To fix fragmented problem (2023.04.08)
    counter = 1

    # 針對每個欲預測的年月
    for i in test_y_m:

        y_test_yr = int(i.split("-")[0])    # 預測的年
        y_test_mon = int(i.split("-")[1])   # 預測的月

        #-----------------------取得訓練和測試資料集-----------------------
        # 原始資料
        X_train, y_train, X_test, y_test = get_train_test(data, y_test_yr, y_test_mon, back_most, y_back=12)    # y_train 為 y_test 往前推 12 個月

        # 平減資料（標準化資料）
        mean_test, std_test, X_test_def = standardize_X(X_test)
        mean_train, std_train, X_train_def = standardize_X(X_train)
        y_train_def = standardize_y(mean_train, std_train, y_train)

        # 拆解資料
        trend_test, season_test, resid_test = decompose_data(X_test)
        trend_train, season_train, resid_train = decompose_data(X_train)
        X_test_dec = pd.concat([trend_test, season_test, resid_test], axis=1)
        X_train_dec = pd.concat([trend_train, season_train, resid_train], axis=1)

        # 拆解 + 平減資料（標準化資料）
        mean_test_dec, std_test_dec, X_test_dec_def = standardize_X(X_test_dec)
        mean_train_dec, std_train_dec, X_train_dec_def = standardize_X(X_train_dec)
        y_train_dec_def = standardize_y(mean_train_dec, std_train_dec, y_train)

        # 消除 seasonal effect
        season_test.columns = X_test.columns.copy()     # Column name 相同才能相減  
        season_train.columns = X_train.columns.copy()
        X_test_season = X_test - season_test
        X_train_season = X_train - season_train

        # 消除 seasonal effect + 平減資料（標準化資料）
        mean_test_season, std_test_season, X_test_season_def = standardize_X(X_test_season)
        mean_train_season, std_train_season, X_train_season_def = standardize_X(X_train_season)
        y_train_season_def = standardize_y(mean_train_season, std_train_season, y_train)
        
        #-----------------------模型訓練-----------------------
        if search == False:
            model = train_model(modelName, X_train, y_train)
            def_model = train_model(modelName, X_train_def, y_train_def)
            dec_model = train_model(modelName, X_train_dec, y_train)
            dec_def_model = train_model(modelName, X_train_dec_def, y_train_dec_def)
            season_model = train_model(modelName, X_train_season, y_train)
            season_def_model = train_model(modelName, X_train_season_def, y_train_season_def)

        else:   # 使用 Grid Search 代表模型已經事先訓練完成
            model = pickle.load(open(f'{save_path}/{i}/model.pkl', 'rb'))
            def_model = pickle.load(open(f'{save_path}/{i}/def_model.pkl', 'rb'))
            dec_model = pickle.load(open(f'{save_path}/{i}/dec_model.pkl', 'rb'))
            dec_def_model = pickle.load(open(f'{save_path}/{i}/dec_def_model.pkl', 'rb'))
            season_model = pickle.load(open(f'{save_path}/{i}/season_model.pkl', 'rb'))
            season_def_model = pickle.load(open(f'{save_path}/{i}/season_def_model.pkl', 'rb'))

        #-----------------------儲存預測值-----------------------
        org_pred[i] = np.round(model.predict(X_test), 0)
        def_pred[i] = np.round(standardized_y_back(mean_test, std_test, def_model.predict(X_test_def)), 0)   # 將標準化的預測值轉換回原始值
        dec_pred[i] = np.round(dec_model.predict(X_test_dec), 0)
        dec_def_pred[i] = np.round(standardized_y_back(mean_test_dec, std_test_dec, dec_def_model.predict(X_test_dec_def)), 0)   # 將標準化的預測值轉換回原始值
        season_pred[i] = np.round(season_model.predict(X_test_season), 0)
        season_def_pred[i] = np.round(standardized_y_back(mean_test_season, std_test_season, season_def_model.predict(X_test_season_def)), 0)   # 將標準化的預測值轉換回原始值

        # To fix fragmented problem (2023.04.08)
        if counter == 100:
            org_pred = org_pred.copy()
            def_pred = def_pred.copy()
            dec_pred = dec_pred.copy()
            dec_def_pred = dec_def_pred.copy()
            season_pred = season_pred.copy()
            season_def_pred = season_def_pred.copy()

        #-----------------------儲存變數重要性-----------------------
        feature_importance[i] = np.round(model.feature_importances_, 4)
        feature_importance_def[i] = np.round(def_model.feature_importances_, 4)
        feature_importance_dec[i] = np.round(dec_model.feature_importances_, 4)
        feature_importance_dec_def[i] = np.round(dec_def_model.feature_importances_, 4)
        feature_importance_season[i] = np.round(season_model.feature_importances_, 4)
        feature_importance_season_def[i] = np.round(season_def_model.feature_importances_, 4)

        # To fix fragmented problem (2023.04.08)
        if counter == 100:
            feature_importance = feature_importance.copy()
            feature_importance_def = feature_importance_def.copy()
            feature_importance_dec = feature_importance_dec.copy()
            feature_importance_dec_def = feature_importance_dec_def.copy()
            feature_importance_season = feature_importance_season.copy()
            feature_importance_season_def = feature_importance_season_def.copy()

        #-----------------------儲存預測分數-----------------------
        evaluatePerformance(y_test, org_pred[i], rmse_list, mae_list, mae_percent_list, mape_list)
        evaluatePerformance(y_test, def_pred[i], rmse_list_def, mae_list_def, mae_percent_list_def, mape_list_def)
        evaluatePerformance(y_test, dec_pred[i], rmse_list_dec, mae_list_dec, mae_percent_list_dec, mape_list_dec)
        evaluatePerformance(y_test, dec_def_pred[i], rmse_list_dec_def, mae_list_dec_def, mae_percent_list_dec_def, mape_list_dec_def)
        evaluatePerformance(y_test, season_pred[i], rmse_list_season, mae_list_season, mae_percent_list_season, mape_list_season)
        evaluatePerformance(y_test, season_def_pred[i], rmse_list_season_def, mae_list_season_def, mae_percent_list_season_def, mape_list_season_def)

        counter += 1


    # Set feature names
    feature_importance.index = model.feature_names_in_
    feature_importance_def.index = def_model.feature_names_in_
    feature_importance_dec.index = dec_model.feature_names_in_
    feature_importance_dec_def.index = dec_def_model.feature_names_in_
    feature_importance_season.index = season_model.feature_names_in_
    feature_importance_season_def.index = season_def_model.feature_names_in_

    # 彙整不同衡量指標的預測分數
    savePerformace(scores, rmse_list, mae_list, mae_percent_list, mape_list, 'org')
    savePerformace(scores, rmse_list_def, mae_list_def, mae_percent_list_def, mape_list_def, 'def')
    savePerformace(scores, rmse_list_dec, mae_list_dec, mae_percent_list_dec, mape_list_dec, 'dec')
    savePerformace(scores, rmse_list_dec_def, mae_list_dec_def, mae_percent_list_dec_def, mape_list_dec_def, 'dec_def')
    savePerformace(scores, rmse_list_season, mae_list_season, mae_percent_list_season, mape_list_season, 'season')
    savePerformace(scores, rmse_list_season_def, mae_list_season_def, mae_percent_list_season_def, mape_list_season_def, 'season_def')

    # 將所有預測結果、變數重要性分別存在 dictionary 中 (2023.04.04 updated)
    pred = {"org": {"org": org_pred,
                    "dec": dec_pred,
                    "season": season_pred
                    },
            "def": {"org": def_pred,
                    "dec": dec_def_pred,
                    "season": season_def_pred
                    }
            }

    feature_importance = {"org": {"org": feature_importance,
                                  "dec": feature_importance_dec,
                                  "season": feature_importance_season
                                  },
                          "def": {"org": feature_importance_def,
                                  "dec": feature_importance_dec_def,
                                  "season": feature_importance_season_def
                                  }
                          }

    print("Using time:", "%.3f"%(time.time() - start), " secs.")

    return pred, feature_importance, scores

    # return org_pred, feature_importance, def_pred, feature_importance_def, dec_pred, feature_importance_dec, dec_def_pred, feature_importance_dec_def, season_pred, feature_importance_season, season_def_pred, feature_importance_season_def, scores

## **9. Best and Worst Model**

2023.04.05

In [None]:
class BestWorstModel():
    '''
    找出最好和最差模型。

    Attributes
    ----------
    best_rf, worst_rf: dict
        Random Forest 在各個衡量指標下最好和最差的模型。
    best_xgb, worst_xgb: dict
        XGB 在各個衡量指標下最好和最差的模型。
    best_all, worst_all: dict
        所有模型中，在各個衡量指標下最好和最差的模型。
    ----------
    best_worst_rf, best_worst_xgb: pd.DataFrame
        各資料處理方式中，預測最好和最差的月份。
    best_worst: pd.DataFrame
        各資料處理方式與模型組合中，預測最好和最差的月份。

    Methods
    ----------
    findBestWorstForAll(scoreType: str): 
        根據所選擇的衡量指標，找出各資料處理方式與模型組合中，預測最好和最差的月份。
    '''

    def __init__(self, score_rf: pd.DataFrame, score_xgb: pd.DataFrame):
        self.score_rf = score_rf
        self.score_xgb = score_xgb
        self.best_worst_rf, self.best_worst_xgb, self.best_worst = self.createBestWorstForAll()
        self.best_rf, self.worst_rf = self.findBestWorstByModel('rf')
        self.best_xgb, self.worst_xgb = self.findBestWorstByModel('xgb')
        self.best_all, self.worst_all = self.findBestWorstByModel('all')

    def createBestWorstForAll(self):
        '''
        各資料處理方式與模型組合中，預測最好和最差的月份。
        '''
        best_worst_rf = pd.DataFrame(columns = ['modelName', 'dataType', 'scoreType', 'min_month', 'min_score', 'max_month', 'max_score'])
        best_worst_xgb = pd.DataFrame(columns = ['modelName', 'dataType', 'scoreType', 'min_month', 'min_score', 'max_month', 'max_score'])

        # Random Forest
        for colName, colData in self.score_rf.iteritems():
            best_worst_rf.loc[len(best_worst_rf)] = ['Random Forest', colName.split("-")[-1], colName.split("-")[0], colData.idxmin(), colData.min(), colData.idxmax(), colData.max()]
        # XGB
        for colName, colData in self.score_xgb.iteritems():
            best_worst_xgb.loc[len(best_worst_xgb)] = ['XGB', colName.split("-")[-1], colName.split("-")[0], colData.idxmin(), colData.min(), colData.idxmax(), colData.max()]
        
        # All
        best_worst = pd.concat([best_worst_rf, best_worst_xgb])

        # Sort by scoreType
        best_worst_rf.sort_values(by=['scoreType'])
        best_worst_xgb.sort_values(by=['scoreType'])

        return best_worst_rf, best_worst_xgb, best_worst


    def findBestWorstForAll(self, scoreType: str):
        '''
        根據所選擇的衡量指標，找出各資料處理方式與模型組合中，預測最好和最差的月份。

        Parameters
        ----------
        scoreType: | 'RMSE' | 'MAPE' | 'MAE' | 'MAE%' |

        Returns
        ----------
        best, worst
        '''

        scores = self.best_worst[self.best_worst['scoreType'] == scoreType]   # filter
        best = scores.drop(['max_month', 'max_score'], axis=1)
        worst = scores.drop(['min_month', 'min_score'], axis=1)

        return best, worst


    def findBestWorstByModel(self, modelName: str):
        '''
        根據所選擇的模型，找出不同衡量指標下預測最好和最差的月份。

        Parameters
        ----------
        modelName: | 'rf' | 'xgb' | 'all' |

        Returns
        ----------
        best_dict: {"RMSE", "MAPE", "MAE", "MAE%"}
        worst_dict: {"RMSE", "MAPE", "MAE", "MAE%"}
        '''

        if modelName == 'rf':
            scoreD = self.best_worst_rf.copy()
        elif modelName == 'xgb':
            scoreD = self.best_worst_xgb.copy()
        elif modelName == 'all':
            scoreD = self.best_worst.copy()

        best_dict = {}
        worst_dict = {}

        scoreTypes = ['RMSE', 'MAPE', 'MAE', 'MAE%']
        for s in scoreTypes:
            scores = scoreD[scoreD['scoreType'] == s]   # filter
            best = scores[scores.min_score == scores.min_score.min()]
            best = best.drop(['max_month', 'max_score'], axis=1)
            best_dict[s] = best
            worst = scores[scores.max_score == scores.max_score.max()]
            worst = worst.drop(['min_month', 'min_score'], axis=1)
            worst_dict[s] = worst

        return best_dict, worst_dict
          
    # def printBestWorst(self, scoreD, scoreType: str):
    #     print('Best Model')
    #     print(f"Model Name: {modelName}, Data Type: {dataType}, Month: {min_month}, Score: {min_score} ({scoreType})")
    #     print('Worst Model')
    #     print(f"Model Name: {modelName}, Data Type: {dataType}, Month: {max_month}, Score: {max_score} ({scoreType})")
