In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import xgboost as xgb
import warnings
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from typing import Tuple, List
import os
import shap
import copy

warnings.filterwarnings(action="ignore")

GRAY = "#595959"
GREEN = "#42B049"
PURPLE = "#6A3289"
ORANGE = "#F2620F"

In [None]:
# 이 부분은 처음에 실행하기 전에 각자의 파일에 맞는 값으로 설정하기
# 이 블록은 한번만 실행하면 됨.
# 데이터셋이 클 경우에 origin_df를 읽어오는데, 오랜 시간이 걸린다
# 그리고 만약 데이터셋이 매우 클 경우에 크롬이나 다른 프로그램을 끈 후에 실행하기(8.9GB까지는 실행되는 것을 확인함)
SALE_FILE_NAME="FULL(0806).csv"
PROMOTINO_FILE_NAME="promotion_[ITEM_CD, YMD_CD]_left_join.csv"
STOCK_HIGH_FILE_NAME="y_high.csv"
origin_df = pd.read_csv(SALE_FILE_NAME, parse_dates=["YMD_CD"])
promotion_df = pd.read_csv(PROMOTINO_FILE_NAME, parse_dates=["YMD_CD"])
y_high = pd.read_csv(STOCK_HIGH_FILE_NAME, parse_dates=['ORD_YMD'])

In [None]:
def custom_gradient(predt: np.ndarray, dtrain: xgb.DMatrix, alpha: float, const: float) -> np.ndarray:
    y = dtrain.get_label()
    grad = np.where(predt < y, const*(predt - y) ** 2, alpha * np.abs(predt - y))
    return grad

def custom_hessian(predt: np.ndarray, dtrain: xgb.DMatrix, alpha: float, const:float) -> np.ndarray:
    y = dtrain.get_label()
    hess = np.where(predt < y, const*(predt - y), alpha)
    return hess

def custom_loss(predt: np.ndarray,
                dtrain: xgb.DMatrix,
                alpha: float = 1.0,
                const: float = 1.0) -> Tuple[np.ndarray, np.ndarray]:
    grad = custom_gradient(predt, dtrain, alpha, const)
    hess = custom_hessian(predt, dtrain, alpha, const)
    return grad, hess

In [None]:
def filtering_df(origin_df: pd.DataFrame, prst_no: int | None, item_cd: int | None) -> pd.DataFrame:
    if prst_no and item_cd:
        return origin_df[(origin_df["PRST_NO"] == prst_no) & (origin_df["ITEM_CD"] == item_cd)]
    elif prst_no:
        return origin_df[origin_df["PRST_NO"] == prst_no]
    elif item_cd:
        return origin_df[origin_df["ITEM_CD"] == item_cd]
    return origin_df

In [None]:
def plot_result(y_test, y_pred, model_name, prst_no, item_cd):
    start_date = y_test.index.min()  # CHANGED: Set dynamic date range
    end_date = y_test.index.max()  # CHANGED: Set dynamic date range
    date_range = pd.date_range(start=start_date, end=end_date)
    date_array = date_range.strftime("%m-%d").tolist()
    
    differences = y_test - y_pred
    abs_differences = np.abs(differences)
    
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 18), gridspec_kw={"height_ratios": [2, 1, 1]})
    
    sns.lineplot(x=date_array, y=y_test, label="Actual Values", marker="o", ax=ax1, color=GRAY)
    sns.lineplot(x=date_array, y=y_pred, label="Predicted Values", marker="o", ax=ax1, color=ORANGE)
    
    ax1.set_ylabel("Sales")
    ax1.set_title("Actual vs Predicted Sales")
    ax1.set_xticks(np.arange(0, len(date_array), step=len(date_array) // 10))  # CHANGED: Set dynamic ticks
    ax1.legend()
    
    bar_colors = [GREEN if diff > 0 else GRAY for diff in differences]
    sns.barplot(x=date_array, y=differences, palette=bar_colors, ax=ax2)
    ax2.set_ylabel("Difference")
    ax2.set_title("Differences (y_test - y_pred)")
    ax2.set_xticks(np.arange(0, len(date_array), step=len(date_array) // 10))  # CHANGED: Set dynamic ticks
    
    sns.barplot(x=date_array, y=abs_differences, color=GRAY, ax=ax3)
    ax3.set_ylabel("Absolute Difference")
    ax3.set_title("Absolute Differences |y_test - y_pred|")
    ax3.set_xticks(np.arange(0, len(date_array), step=len(date_array) // 10))  # CHANGED: Set dynamic ticks
    
    plt.tight_layout()
    dir_path = os.path.join(datetime.today().strftime("%Y%m%d"), "fig", model_name)
    os.makedirs(dir_path, exist_ok=True)  # CHANGED: Simplified directory creation
    plt.savefig(f"{dir_path}/{model_name}_PRST_NO({prst_no})_ITEM_CD({item_cd}).png")

In [None]:
def plot_xgb(origin_df: pd.DataFrame, prst_no: int, item_cds: List[int], **kwargs) -> Tuple[xgb.Booster, dict, dict, dict, dict, dict]:
    options = kwargs
    categorical_features = options.get("categorical_features", [])
    origin_df[categorical_features] = origin_df[categorical_features].astype('category')  # CHANGED: Direct type conversion
    
    model_results = {
        "y_true": {}, 
        "y_pred": {}}
    
    for item_cd in item_cds:
        item_filtered_df = origin_df[origin_df['ITEM_CD'] == item_cd]
        train_df = item_filtered_df[item_filtered_df["YMD_CD"].dt.year == 2023]
        validation_df = train_df[train_df['YMD_CD'].dt.month>=10]
        train_df = train_df[train_df['YMD_CD'].dt.month<10]
        test_df = item_filtered_df[item_filtered_df["YMD_CD"].dt.year == 2024]
        
        train_df["MONTH"] = train_df["YMD_CD"].dt.month
        train_df["DAY"] = train_df["YMD_CD"].dt.day
        validation_df["MONTH"] = validation_df["YMD_CD"].dt.month
        validation_df["DAY"] = validation_df["YMD_CD"].dt.day
        test_df["MONTH"] = test_df["YMD_CD"].dt.month
        test_df["DAY"] = test_df["YMD_CD"].dt.day
        
        X_train = train_df.drop(columns=["SALE_QTY", "YMD_CD", "PRST_NO", "ITEM_CD", "SMCL_CD"])
        X_validation = validation_df.drop(columns=["SALE_QTY", "YMD_CD", "PRST_NO", "ITEM_CD", "SMCL_CD"])
        y_train = train_df["SALE_QTY"]
        y_validation = validation_df['SALE_QTY']
        feature_names = X_train.columns.tolist()
        dtrain = xgb.DMatrix(X_train, label=y_train, enable_categorical=True, feature_names=feature_names)
        dvalidation = xgb.DMatrix(X_validation, label=y_validation, enable_categorical=True, feature_names=feature_names)
        weight_df = pd.read_csv("weights.csv")
        
        params = options.get("params", {"objective": "reg:quantileerror", "tree_method": "hist", "seed": 42})
        if params.get("obj", None)==custom_loss and params.get("objective", None) is None:
            weight = weight_df[weight_df['ITEM_CD']==item_cd]['weight'].unique()
            params['alpha'] = weight[0] - 1 if weight.size!=0 else 0
            params['const'] = np.mean(y_train)
        bst = xgb.train(params, dtrain, num_boost_round=100, evals=[(dtrain, 'train'), (dvalidation,'eval')], early_stopping_rounds=10)
        
        X_test = test_df.drop(columns=["SALE_QTY", "YMD_CD", "PRST_NO", "ITEM_CD", "SMCL_CD"])
        y_test = test_df["SALE_QTY"]
        # X_test = pd.concat([X_train, X_test], ignore_index=True)
        # y_test = pd.concat([y_train, y_test], ignore_index=True)
        
        dtest = xgb.DMatrix(X_test, label=y_test, enable_categorical=True, feature_names=feature_names)
        y_pred = bst.predict(dtest)
        explainer = shap.TreeExplainer(bst)
        explanation = explainer(dtrain)
        explanation.feature_names = feature_names
        train_y_pred = bst.predict(dtrain)
        shap_values = explanation.values
        print(f"SHAP: {np.abs(shap_values.sum(axis=1) + explanation.base_values - train_y_pred).max()}")
        shap.plots.beeswarm(copy.deepcopy(explanation), order=explanation.abs.max(0))
        plt.title(f"{item_cd}")
        shap_interaction_values = explainer.shap_interaction_values(dtrain)
        print(f"SHAP interaction values: {shap_interaction_values[0]}")
        


        model_results["prst_no"] = prst_no
        model_results["y_true"][item_cd] = y_test.to_numpy()
        model_results["y_pred"][item_cd] = y_pred
        
        if options.get("process_fig", True):
            plot_result(y_test, y_pred, bst.__class__.__name__, prst_no, item_cd)
    
    return bst, model_results

In [None]:
def make_prediction_dataframe(prediction_results):
    rows = []
    for item_cd in prediction_results['y_true']:
        prst_no = prediction_results['prst_no']
        y_tests = prediction_results['y_true'][item_cd]
        y_preds = prediction_results['y_pred'][item_cd]
        for test, pred in zip(y_tests, y_preds):
            rows.append({'PRST_NO':prst_no,'ITEM_CD': item_cd, 'y_true': test, 'y_pred':pred})
    start_date = datetime.strptime("2024-01-01", "%Y-%m-%d")
    ymd_cd = [start_date + timedelta(days=i) for i in range(len(list(prediction_results['y_true'].items())[0][1]))]
    item_nums = len(prediction_results['y_true'].keys())
    ymd_cd = np.tile(ymd_cd, item_nums)
    df = pd.DataFrame(rows)
    df['YMD_CD'] = ymd_cd
    return df

In [None]:
점포코드들 = origin_df['PRST_NO'].unique()
alpha=0.8
for idx in range(len(점포코드들)):
    점포코드 = 점포코드들[idx]
    # 이 부분만 수정해서 밑에만 재실행하면 됨.
    param_dict = {
            "prst_no": 점포코드,
            "item_cd": None,
            "categorical_features": [
            "ITEM_CD",
            "SMCL_CD",
            "holidays",
            "weekdays",
            "before_holidays",
            "mon",
            "thu",
            "fri",
            "tue",
            "wed",
            "sat",
            "sun",
        ],
    }

    filtered_df = filtering_df(origin_df, param_dict['prst_no'], param_dict['item_cd'])
    filtered_df = pd.merge(filtered_df, promotion_df, on=["ITEM_CD", "YMD_CD"], how="left").fillna(0)

    result = filtered_df.groupby(by=['PRST_NO', 'ITEM_CD']).agg({'SALE_QTY': 'sum'})
    non_zero_item_cd = result[result['SALE_QTY'] != 0].index.get_level_values('ITEM_CD').unique()
    filtered_df = filtered_df[filtered_df['ITEM_CD'].isin(non_zero_item_cd)]

    PRST_NOS = filtered_df['PRST_NO'].unique()
    prst_no = param_dict['prst_no']
    input_df = filtered_df[filtered_df['PRST_NO'] == prst_no].copy()
    scaler_dict = {}
    item_cd = sorted(input_df['ITEM_CD'].unique())

    for cd in item_cd:
        scaler = MinMaxScaler()
        mask = input_df['ITEM_CD'] == cd
        input_df.loc[mask, 'SALE_QTY'] = scaler.fit_transform(input_df.loc[mask, 'SALE_QTY'].values.reshape(-1, 1))
        scaler_dict[cd] = scaler
    print("Training cutom loss xgb model")
    model, custom_results = plot_xgb(
        origin_df=input_df,
        prst_no=prst_no,
        item_cds=item_cd,
        show_fig=False,
        process_fig=False,
        params={
            "obj": custom_loss,
            "tree_method": "hist",
            "seed": 42,
        },
        categorical_features=param_dict["categorical_features"]
    )