# Model Evaluation

## Imoprt libraries

In [486]:
import pandas as pd
from dataclasses import dataclass
from enum import Enum
from typing import List
import os

## Settings

In [487]:
RESULT_PARENT_DIRECTORY = "Result"

## Enums

In [488]:
class Metric(Enum):
    RMSSE = 0  # Root Mean Squared Scaled Error

## Classes

In [489]:
@dataclass
class Evaluation:
    name: str
    data_frame: pd.DataFrame


@dataclass
class MetricResult:
    data_frame_name: str
    metric: Metric
    point: float


@dataclass
class Result:
    author: str
    model_list: List[Model] = field(default_factory=list)
    metric_result_list: List[MetricResult] = field(default_factory=list)

## Metric formulas

### Mean Absolute Scaled Error

In [490]:
def mase(train: np.ndarray, actual: np.ndarray, forecast: np.ndarray) -> float:
    """
    Compute Mean Absolute Scaled Error (MASE) based on the correct formula.

    Args:
        train (np.ndarray): Array of historical (training) data
        actual (np.ndarray): Array of actual values for the test period
        forecast (np.ndarray): Array of forecasted values

    Returns:
        float: MASE value
    """

    # Compute scale denominator
    denominator = np.mean(np.abs(np.diff(train)))

    if denominator == 0:
        return np.nan  # Avoid division by zero

    # Compute numerator
    numerator = np.mean(np.abs(actual - forecast))

    return np.sqrt(numerator / denominator)

### Root Mean Squared Scaled Error

In [491]:
def rmsse(train: np.ndarray, actual: np.ndarray, forecast: np.ndarray) -> float:
    """
    Compute Root Mean Squared Scaled Error (RMSSE) based on the correct formula.

    Parameters:
        train (np.ndarray): Array of historical (training) data
        actual (np.ndarray): Array of actual values for the test period
        forecast (np.ndarray): Array of forecasted values

    Returns:
        float: RMSSE value
    """

    # Compute scale denominator
    denominator = np.mean(np.square(np.diff(train)))

    if denominator == 0:
        return np.nan  # Avoid division by zero

    # Compute numerator
    numerator = np.mean(np.square(actual - forecast))

    return np.sqrt(numerator / denominator)

## Helper Functions

In [492]:
def create_test_data_frame() -> pd.DataFrame:
    if os.path.isfile(SETTING_TEST_DATA_FRAME_PATH):
        print(f"Test data frame already exists at {SETTING_TEST_DATA_FRAME_PATH}")
        test_df = pd.read_csv(SETTING_TEST_DATA_FRAME_PATH)
        return test_df.copy()

    if not os.path.isfile(SETTING_REVENUE_TEST_DATA_FRAME_PATH):
        raise FileNotFoundError(
            f"Revenue test data frame not found at {SETTING_REVENUE_TEST_DATA_FRAME_PATH}."
        )

    revenue_test_df = pd.read_csv(SETTING_REVENUE_TEST_DATA_FRAME_PATH)

    pivoted_revenue_test_df = revenue_test_df.pivot(
        index=["date", "d", "weekday"], columns="store_id", values="revenue"
    ).reset_index()

    pivoted_revenue_test_df.insert(0, "id", [(i + 1) for i in range(1941)])

    pivoted_revenue_test_df.to_csv(SETTING_TEST_DATA_FRAME_PATH, index=False)

    return pivoted_revenue_test_df.copy()

### Read results

In [493]:
def read_result(parent_directory: str) -> List[Result]:
    # Check if the given path exists and is a directory
    if not os.path.isdir(parent_directory):
        raise NotADirectoryError(f"Directory '{parent_directory}' is not found.")

    # Get a list of all subdirectories within the parent directory
    subdirectories = [
        d
        for d in os.listdir(parent_directory)
        if os.path.isdir(os.path.join(parent_directory, d))
    ]

    # Check if there are any subdirectories; if not, raise an error
    if not member_subdirectories:
        raise NotADirectoryError("Author subdirectories are not found.")

    result_list: List[Result] = []

    for member in member_subdirectories:
        result = Result(author=member, model_list=[], metric_result_list=[])

        member_path = os.path.join(parent_directory, member)

        csv_files = [
            f
            for f in os.listdir(member_path)
            if f.endswith(".csv") and os.path.isfile(os.path.join(member_path, f))
        ]

        for csv_file in csv_files:
            data_frame = pd.read_csv(os.path.join(member_path, csv_file))
            state = set(data_frame.columns) & set(SETTING_STATES)

            if len(state) == 0:
                raise ValueError(
                    f"Data frame contains no state! Author: '{member}', Model: '{csv_file}'"
                )

            model = Model(
                name=csv_file.split(".")[0],
                data_frame=data_frame,
                state=state.pop(),
            )
            result.model_list.append(model)

        if len(result.model_list) > 0:
            result_list.append(result)

    return result_list

### Evaluate results

In [494]:
def evaluate(
    evaluated_data_frame: pd.DataFrame,
    test_data_frame: pd.DataFrame,
    metric: Metric,
    state: str,
    weight: np.ndarray = None,
) -> float:
    train = test_data_frame[test_data_frame["id"] < SETTING_FIRST_FORECAST_DAY]
    actual = test_data_frame[test_data_frame["id"] >= SETTING_FIRST_FORECAST_DAY]

    value = 0.0
    match metric:
        case Metric.MASE:
            value = mase(
                train=train[state].to_numpy(),
                actual=actual[state].to_numpy(),
                forecast=evaluated_data_frame[state].to_numpy(),
            )

        case Metric.RMSSE:
            value = rmsse(
                train=train[state].to_numpy(),
                actual=actual[state].to_numpy(),
                forecast=evaluated_data_frame[state].to_numpy(),
            )

        case Metric.WMSSE:
            pass
            # if weight is None:
            #     raise ValueError("'weight' must be an 1D array.")

            # value = np.sum(
            #     weight
            #     * np.array(
            #         [
            #             rmsse(
            #                 train=train[state].to_numpy(),
            #                 actual=actual[state].to_numpy(),
            #                 forecast=evaluated_data_frame[state].to_numpy(),
            #             )
            #             for state in SETTING_STATES
            #         ]
            #     )
            # )

    return value


def evaluate_result(
    result_list: List[Result],
    test_data_frame: pd.DataFrame,
    metric: Metric,
    weight: np.ndarray = None,
):
    for result in result_list:
        for model in result.model_list:
            metric_result = MetricResult(
                model_name=model.name,
                state=model.state,
                metric=metric,
                value=evaluate(
                    model.data_frame, test_data_frame, metric, model.state, weight
                ),
            )
            result.metric_result_list.append(metric_result)

### Summarize results

In [495]:
def summarize_result(result_list: List[Result]):
    if not result_list or len(result_list) == 0:
        print("No results to summarize!")

    df = pd.DataFrame(columns=["id", "author", "model", "state", "metric", "value"])

    for result in result_list:
        for metric_result in result.metric_result_list:
            new_row = {
                "id": f"{result.author}_{metric_result.model_name}",
                "author": f"{result.author}",
                "model": f"{metric_result.model_name}",
                "state": f"{metric_result.state}",
                "metric": f"{metric_result.metric.value}",
                "value": f"{metric_result.value}",
            }
            df.loc[len(df)] = new_row

    df.to_csv("result_summary.csv", index=False)

    return df

### Revenue by year

In [496]:
def revenue_by_year(data_frame: pd.DataFrame, year: int = None):
    if "date" not in data_frame.columns:
        raise ValueError(
            "Invalid argument: The DataFrame must contain a 'date' column."
        )

    data_frame = data_frame.copy()

    # Extract the year from the 'date' column and add it as a new column
    data_frame["year"] = pd.to_datetime(data_frame["date"]).dt.year

    # Drop some columns
    data_frame = data_frame.drop(columns=["id", "date", "d", "weekday"])

    if year:
        min_year = data_frame["year"].min()
        max_year = data_frame["year"].max()
        if year < min_year or year > max_year:
            print(
                f"Invalid argument: Year {year} is out of range. Range: {min_year} - {max_year}."
            )
        else:
            data_frame = data_frame[data_frame["year"] == year]

    # Group by the year and sum the revenue
    revenue = data_frame.groupby("year").sum().reset_index()

    return revenue

## Visualze results

### Horizontal bar chart

In [497]:
def horizontal_bar_chart(summarize_result: pd.DataFrame, top_result: int = 5):
    summarize_result = summarize_result.sort_values(by="value", ascending=False).head(
        top_result
    )

    fig = go.Figure(
        go.Bar(
            x=summarize_result["value"].astype("float").tolist(),
            y=summarize_result["id"].tolist(),
            orientation="h",
            text=summarize_result["value"].astype("float").round(3).tolist(),
            textposition="outside",
        )
    )
    fig.update_xaxes(range=[0, float(summarize_result["value"].max()) * 1.1])
    fig.show()

## Main Flow

### Read Results

In [498]:
train_df = create_test_data_frame()
train_df = train_df[train_df["id"] < SETTING_FIRST_FORECAST_DAY]

Test data frame already exists at test_data_frame.csv


In [499]:
train_df

Unnamed: 0,id,date,d,weekday,CA_1,CA_2,CA_3,CA_4,TX_1,TX_2,TX_3,WI_1,WI_2,WI_3
0,1,2011-01-29,d_1,Saturday,10933.16,9101.52,11679.83,4561.59,6586.68,9915.78,7597.99,6454.72,5451.46,9367.88
1,2,2011-01-30,d_2,Sunday,9787.06,8417.53,12161.46,4681.41,6610.60,9804.54,7356.54,5645.77,4636.86,9868.80
2,3,2011-01-31,d_3,Monday,7201.38,5320.51,9123.86,3637.98,4551.97,6651.16,5406.70,3640.12,4621.58,7551.65
3,4,2011-02-01,d_4,Tuesday,7407.74,5550.56,10249.78,3708.92,5374.39,6985.60,5597.97,2949.96,5754.75,7181.53
4,5,2011-02-02,d_5,Wednesday,6566.12,5229.72,9538.65,3841.14,4347.07,6039.05,4069.74,2.96,2679.19,4646.31
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1908,1909,2016-04-20,d_1909,Wednesday,12347.85,11564.22,18113.40,7978.28,8950.16,11278.96,11092.53,9949.23,12212.21,9341.89
1909,1910,2016-04-21,d_1910,Thursday,11687.99,10746.54,16230.51,8169.58,8260.00,11409.73,11535.43,10361.39,12796.87,9212.23
1910,1911,2016-04-22,d_1911,Friday,14205.08,14654.14,18491.01,8301.77,9499.88,12494.57,11889.90,12292.56,14251.83,10959.56
1911,1912,2016-04-23,d_1912,Saturday,18317.93,19846.12,24861.53,9911.55,11373.50,14575.46,13093.63,14332.14,15761.02,13120.79


### Create test data frame

In [500]:
test_df = create_test_data_frame()

Test data frame already exists at test_data_frame.csv


In [501]:
test_df

Unnamed: 0,id,date,d,weekday,CA_1,CA_2,CA_3,CA_4,TX_1,TX_2,TX_3,WI_1,WI_2,WI_3
0,1,2011-01-29,d_1,Saturday,10933.16,9101.52,11679.83,4561.59,6586.68,9915.78,7597.99,6454.72,5451.46,9367.88
1,2,2011-01-30,d_2,Sunday,9787.06,8417.53,12161.46,4681.41,6610.60,9804.54,7356.54,5645.77,4636.86,9868.80
2,3,2011-01-31,d_3,Monday,7201.38,5320.51,9123.86,3637.98,4551.97,6651.16,5406.70,3640.12,4621.58,7551.65
3,4,2011-02-01,d_4,Tuesday,7407.74,5550.56,10249.78,3708.92,5374.39,6985.60,5597.97,2949.96,5754.75,7181.53
4,5,2011-02-02,d_5,Wednesday,6566.12,5229.72,9538.65,3841.14,4347.07,6039.05,4069.74,2.96,2679.19,4646.31
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1936,1937,2016-05-18,d_1937,Wednesday,12920.62,12766.25,17780.83,8116.41,9851.97,11121.03,12078.07,9605.89,12928.75,9163.29
1937,1938,2016-05-19,d_1938,Thursday,13259.90,13432.94,18635.35,8367.57,8403.09,10474.33,11196.76,10478.86,13547.39,9660.13
1938,1939,2016-05-20,d_1939,Friday,13999.65,15545.28,18219.23,8960.17,11296.88,13832.01,14667.00,11358.75,14139.33,11982.37
1939,1940,2016-05-21,d_1940,Saturday,18637.70,24088.59,23849.52,9768.51,13137.35,15212.81,15696.64,14614.05,15020.25,12370.23


### Read results

In [502]:
result_list = read_result(SETTING_RESULT_PARENT_DIRECTORY_PATH)

In [503]:
if not result_list:
    raise FileNotFoundError("No data frame found.")

### Create weight values

In [504]:
# revenue_by_year_df = revenue_by_year(train_df)
# revenue_by_year_df

In [505]:
# revenue_by_year_df_2011_2015 = revenue_by_year_df[revenue_by_year_df["year"] < 2016]
# revenue_by_year_df_2011 = revenue_by_year_df[revenue_by_year_df["year"] == 2011]
# revenue_by_year_df_2012 = revenue_by_year_df[revenue_by_year_df["year"] == 2012]
# revenue_by_year_df_2013 = revenue_by_year_df[revenue_by_year_df["year"] == 2013]
# revenue_by_year_df_2014 = revenue_by_year_df[revenue_by_year_df["year"] == 2014]
# revenue_by_year_df_2015 = revenue_by_year_df[revenue_by_year_df["year"] == 2015]

In [506]:
# fig = px.line(revenue_by_year_df_2011_2015, x="year", y=revenue_by_year_df.columns)
# fig.show()

In [507]:
# rows, cols = 1, 5

# fig = make_subplots(
#     rows=rows,
#     cols=cols,
#     subplot_titles=[
#         f"Revenue in {i}" for i in range(2011, 2016)
#     ],  # Auto-generate titles
#     specs=[
#         [{"type": "pie"} for _ in range(cols)] for _ in range(rows)
#     ],  # Generate pie types
# )

# count = 0
# for year in range(2011, 2016):
#     df = revenue_by_year_df.drop(columns="year")[
#         revenue_by_year_df["year"] == year
#     ].melt(var_name="state", value_name="revenue")

#     fig.add_trace(
#         go.Pie(
#             labels=df["state"],  # Categories
#             values=df["revenue"],  # Corresponding values
#             domain=dict(x=[0, 1], y=[0, 1]),  # Expands pie size
#         ),
#         row=1,
#         col=1 + count,
#     )
#     count += 1

# fig.update_layout(
#     showlegend=True,  # Hide legends for a cleaner look
#     margin=dict(l=10, r=10, t=50, b=10),  # Reduce margins
# )

# fig.show()

In [508]:
# revenue_by_year_df_2011_2015

In [509]:
# # Create a copy of the original DataFrame with only the 'year' column
# revenue_by_year_df_2011_2015_percentage = revenue_by_year_df_2011_2015[["year"]].copy()

# # Compute total revenue for each year
# total_revenue = revenue_by_year_df_2011_2015.iloc[:, 1:].sum(axis=1)

# # Calculate percentage for each state efficiently
# state_columns = revenue_by_year_df_2011_2015.columns[1:]
# revenue_by_year_df_2011_2015_percentage[state_columns + "_percentage"] = (
#     revenue_by_year_df_2011_2015[state_columns].div(total_revenue, axis=0) * 100
# )

# # Display the result
# revenue_by_year_df_2011_2015_percentage

In [510]:
# revenue_by_year_df_2011_2015_weight = revenue_by_year_df_2011_2015_percentage.drop(
#     columns="year"
# ).sum(axis=0)
# total_revenue = revenue_by_year_df_2011_2015_weight.sum()
# revenue_by_year_df_2011_2015_weight = (
#     revenue_by_year_df_2011_2015_weight / total_revenue
# ).to_numpy()
# revenue_by_year_df_2011_2015_weight

### Evaluate models

#### Metric: Mean Absolute Scaled Error

In [511]:
# evaluate_result(result_list=result_list, test_data_frame=test_df, metric=Metric.MASE)

#### Metric: Root Mean Squared Scaled Error

In [512]:
evaluate_result(result_list=result_list, test_data_frame=test_df, metric=Metric.RMSSE)

#### Metric: Weighted Root Mean Squared Scaled Error

In [513]:
# evaluate_result(
#     result_list=result_list,
#     test_data_frame=test_df,
#     metric=Metric.WMSSE,
#     weight=revenue_by_year_df_2011_2015_weight,
# )

### Summarize models

In [514]:
summary_result = summarize_result(result_list)
summary_result

Unnamed: 0,id,author,model,state,metric,value
0,Trung_CA_1_model_name,Trung,CA_1_model_name,CA_1,RMSSE,2.5910163078296886
1,Trung_CA_2_model_name,Trung,CA_2_model_name,CA_2,RMSSE,2.769451424384504
2,Trung_CA_3_model_name,Trung,CA_3_model_name,CA_3,RMSSE,3.530289405571649
3,Trung_CA_4_model_name,Trung,CA_4_model_name,CA_4,RMSSE,5.510639465602722


### Visualize models

In [515]:
horizontal_bar_chart(summary_result)