#### 사용 예시

In [4]:
import pandas as pd

df = pd.read_pickle('data.pkl')
car = pd.read_pickle('car_data.pkl')

In [5]:
# 차수 예측
from predict_model.round_model.onbid_map_round_predict import RoundPredictor
# 자동차 가격 예측
from predict_model.price_model.car_price_model.onbid_map_carp_predict import CarPricePredictor
# 자동차 외 가격 예측
from predict_model.price_model.etc_price_model.onbid_map_etcp_predict import EtcPricePredictor
# 확률 예측
from predict_model.probability_model.onbid_map_prob_predict import ProbPredictor

# 모델 개체 불러오기
prob_model = ProbPredictor()
round_model = RoundPredictor()
car1 = CarPricePredictor(1)
car2 = CarPricePredictor(2)
car3 = CarPricePredictor(3)
car4 = CarPricePredictor(4)
car5 = CarPricePredictor(5)
etc1 = EtcPricePredictor(1)
etc2 = EtcPricePredictor(2)
etc3 = EtcPricePredictor(3)
etc4 = EtcPricePredictor(4)
etc5 = EtcPricePredictor(5)

In [None]:
# 예측 로직
temp_df = None # 데이터 불러오기

# 기본 전처리
temp_df = preprocessor(temp_df)

# 자동차 전처리
if temp_df['대분류'] == '자동차':
    temp_df = car_preprocessor(temp_df)
else:
    pass

# 차수 예측
temp_round = round_model.predict(temp_df)[0]
temp_round = max(temp_round, temp_df[0, '차수'])

# 가격 예측
if temp_df["대분류"].iloc[0] == "자동차":
    if temp_round == 1:
        temp_price = car1.predict(temp_df)
    elif temp_round == 2:
        temp_price = car2.predict(temp_df)
    elif temp_round == 3:
        temp_price = car3.predict(temp_df)
    elif temp_round == 4:
        temp_price = car4.predict(temp_df)
    else:  # temp_round == 5 이상인 경우
        temp_price = car5.predict(temp_df)
else:
    if temp_round == 1:
        temp_price = etc1.predict(temp_df)
    elif temp_round == 2:
        temp_price = etc2.predict(temp_df)
    elif temp_round == 3:
        temp_price = etc3.predict(temp_df)
    elif temp_round == 4:
        temp_price = etc4.predict(temp_df)
    else:  # temp_round == 5 이상인 경우
        temp_price = etc5.predict(temp_df)

In [9]:
# 예측 예시
rpmodel = RoundPredictor()
print(rpmodel.predict(df.iloc[[7]]))
rpmodel = CarPricePredictor(5)
print(rpmodel.predict(car.iloc[[7]]))
rpmodel = EtcPricePredictor(5)
print(rpmodel.predict(df.iloc[[7]]))
sample_df = pd.DataFrame([{"대분류": "유가증권", "중분류": "유가증권(주식, 채권 등)", "낙찰가율_최초최저가기준": 120.0}])
print(prob_model.predict(sample_df))

[1]
71    148.283524
Name: predicted_낙찰가율_최초최저가기준, dtype: float32
7    55.747303
Name: predicted_낙찰가율_최초최저가기준, dtype: float32
0    0.782984
Name: prob_낙찰가율_최초최저가기준, dtype: float64


#### 하이퍼파라미터 튜닝

In [None]:
import warnings
import logging
import optuna

# 1) FutureWarning 무시
warnings.filterwarnings("ignore", category=FutureWarning, message=".*'squared' is deprecated.*")

# 2) Optuna 로그 레벨을 ERROR로 설정하여 정보 메시지 숨기기
optuna.logging.set_verbosity(optuna.logging.ERROR)
logging.getLogger("optuna").setLevel(logging.ERROR)

import os
import shutil
import stat
import pandas as pd
import joblib

from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import xgboost as xgb
import optuna

from optuna.samplers import TPESampler

# Optuna 탐색 횟수
N_TRIALS = 1000

# 결과를 저장할 최상위 폴더
OUTPUT_ROOT = "output_optuna"

# 차수 리스트
ORDERS = [1, 2, 3, 4, 5]

# 필수 컬럼 목록
REQUIRED_COLS = [
    "대분류", "중분류", "기관",
    "최초입찰시기",
    "1차최저입찰가", "2차최저입찰가", "3차최저입찰가",
    "4차최저입찰가", "5차최저입찰가",
    "낙찰가율_최초최저가기준", "낙찰차수"
]


# '자동차' 대분류 행 제거(필요한 경우)
if "대분류" in df.columns:
    df = df[~(df["대분류"] == "자동차")].reset_index(drop=True)

# '낙찰차수' 정수형 변환 + 5 이상은 5로 통일
df["낙찰차수"] = df["낙찰차수"].astype(int).apply(lambda x: x if x < 5 else 5)

# 필수 컬럼이 모두 있는지 확인
missing = [col for col in REQUIRED_COLS if col not in df.columns]
if missing:
    raise ValueError(f"필수 컬럼 누락: {missing}")

print(f"데이터 로드 완료: 총 {len(df)}개 행")

# %% [markdown]
# ## 2. Optuna Objective 함수 정의
# 각 차수별로 훈련/검증 데이터에 대해 RMSE를 최소화하도록 목적 함수를 설정합니다.  
# - `trial.suggest_int`나 `trial.suggest_float`를 사용해 탐색할 파라미터 범위를 지정합니다.  
# - `train_test_split`으로 80/20 비율로 나눈 뒤, 검증 RMSE를 반환하도록 합니다.

# %%
def create_objective(order: int, X: pd.DataFrame, y: pd.Series):
    """
    차수(order)에 대해 Optuna 목적 함수를 반환합니다.
    X: 피처 데이터프레임 (필요 시 '최초입찰시기' 컬럼은 int64로 변환되어 있어야 함)
    y: 타깃 시리즈
    """
    def objective(trial):
        # 하이퍼파라미터 서치 범위 설정
        params = {
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "n_estimators": trial.suggest_int("n_estimators", 50, 300, step=50),
            "subsample": trial.suggest_float("subsample", 0.6, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
            "reg_alpha": trial.suggest_float("reg_alpha", 0.0, 1.0),
            "reg_lambda": trial.suggest_float("reg_lambda", 0.0, 1.0),
            "random_state": 42,
            "objective": "reg:squarederror",
        }

        # train/validation 분리 (고정 시드)
        X_train, X_val, y_train, y_val = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # 파이프라인: OneHotEncoder -> XGBRegressor
        base_cols = ["대분류", "중분류", "기관"]
        preprocessor = ColumnTransformer(
            transformers=[("ohe", OneHotEncoder(handle_unknown="ignore"), base_cols)],
            remainder="passthrough"
        )
        model = xgb.XGBRegressor(**params)

        pipeline = Pipeline([
            ("preprocessor", preprocessor),
            ("regressor", model)
        ])

        # 학습
        pipeline.fit(X_train, y_train)

        # 검증 RMSE 계산
        preds = pipeline.predict(X_val)
        rmse = mean_squared_error(y_val, preds, squared=False)
        return rmse

    return objective

# %% [markdown]
# ## 3. 차수별 Optuna 튜닝 및 모델 학습
# 차수마다:
# 1. 데이터를 필터링  
# 2. 날짜형 컬럼을 int64로 변환  
# 3. Optuna 스터디를 생성하고 `N_TRIALS`만큼 탐색  
# 4. 최적 파라미터로 전체 데이터를 학습  
# 5. 파이프라인을 저장  
# 6. 최적 파라미터와 RMSE를 기록

# %%
# Optuna 스터디 결과를 저장할 딕셔너리
study_results = {}

for order in ORDERS:
    print(f"\n=== 차수 {order} 처리 시작 ===")

    # 1) 차수별 데이터 필터링
    subset = df[df["낙찰차수"] == order].copy().reset_index(drop=True)
    if subset.empty:
        print(f"차수 {order} 데이터가 없습니다. 건너뜁니다.")
        study_results[order] = {"best_params": None, "best_rmse": None}
        continue

    # 2) 피처 및 타깃 설정
    base_cols = ["대분류", "중분류", "기관"]
    date_col = ["최초입찰시기"]
    if order == 1:
        bid_cols = ["1차최저입찰가"]
    elif order == 2:
        bid_cols = ["1차최저입찰가", "2차최저입찰가"]
    elif order == 3:
        bid_cols = ["1차최저입찰가", "2차최저입찰가", "3차최저입찰가"]
    elif order == 4:
        bid_cols = ["1차최저입찰가", "2차최저입찰가", "3차최저입찰가", "4차최저입찰가"]
    else:  # order == 5
        bid_cols = [
            "1차최저입찰가", "2차최저입찰가", "3차최저입찰가",
            "4차최저입찰가", "5차최저입찰가"
        ]

    X = subset[base_cols + date_col + bid_cols].copy()
    y = subset["낙찰가율_최초최저가기준"].copy()

    # 3) '최초입찰시기'를 int64로 변환 (UNIX 타임스탬프)
    X["최초입찰시기"] = X["최초입찰시기"].astype("int64")

    # 4) Optuna 스터디 생성 (TPE Sampler 사용)
    study = optuna.create_study(
        direction="minimize",
        sampler=TPESampler(seed=42)
    )
    objective = create_objective(order, X, y)

    # 5) 최적화 수행
    study.optimize(objective, n_trials=N_TRIALS, show_progress_bar=True)

    best_params = study.best_params
    best_rmse = study.best_value
    print(f"\n차수 {order} 최적 RMSE: {best_rmse:.4f}")
    print("최적 파라미터:")
    for k, v in best_params.items():
        print(f"  {k}: {v}")

    # 6) 전체 데이터를 사용해 최적 파라미터로 재학습
    preprocessor = ColumnTransformer(
        transformers=[("ohe", OneHotEncoder(handle_unknown="ignore"), base_cols)],
        remainder="passthrough"
    )
    best_model = xgb.XGBRegressor(**best_params, objective="reg:squarederror")
    pipeline = Pipeline([
        ("preprocessor", preprocessor),
        ("regressor", best_model)
    ])
    pipeline.fit(X, y)
    print(f"차수 {order} 전체 데이터 학습 완료")

    # 7) 파이프라인 저장
    output_dir = os.path.join(OUTPUT_ROOT, f"order{order}")
    os.makedirs(output_dir, exist_ok=True)
    joblib.dump(pipeline, os.path.join(output_dir, "pipeline_optuna.pkl"))
    print(f"최적 파이프라인 저장: {output_dir}/pipeline_optuna.pkl")

    # 8) 결과 기록
    study_results[order] = {"best_params": best_params, "best_rmse": best_rmse}

# %% [markdown]
# ## 4. 튜닝 결과 요약

# %%
print("\n=== 차수별 Optuna 튜닝 결과 ===")
for order, info in study_results.items():
    print(f"\n차수 {order}:")
    if info["best_params"] is None:
        print("  - 학습 데이터 없음")
    else:
        print(f"  - 최적 RMSE: {info['best_rmse']:.4f}")
        print("  - 최적 파라미터:")
        for k, v in info["best_params"].items():
            print(f"      {k}: {v}")

