<a href="https://colab.research.google.com/github/aeoner000/stackingmodel_notebook/blob/main/kaggle%E6%88%BF%E5%83%B9%E9%A0%90%E6%B8%AC.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1.匯入套件

In [None]:
# 基本套件
import os
import time
import warnings
import traceback
import datetime
# !ls -R /content/drive/MyDrive/機器學習練習檔/kaggle房價預測/project
# 數值運算與資料處理
import numpy as np
import pandas as pd
from scipy.stats import boxcox
from scipy.special import inv_boxcox
from sklearn.impute import KNNImputer
from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.tools.tools import add_constant

# 資料視覺化
import matplotlib.pyplot as plt
import seaborn as sns
import shap

# 類別編碼與特徵處理
import category_encoders as ce
from sklearn.preprocessing import StandardScaler, MinMaxScaler, PowerTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import TransformedTargetRegressor
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA

# 監督式模型
from sklearn.linear_model import (LinearRegression as lr,Ridge as rdg,RidgeCV as rdgcv,Lasso,SGDRegressor as sgd,LogisticRegression as logis)
from sklearn.ensemble import RandomForestRegressor as rf, StackingRegressor as skStack
from sklearn.tree import DecisionTreeRegressor as dt
from sklearn.svm import SVR as svr
from sklearn.neighbors import KNeighborsRegressor as knn
from sklearn.neural_network import MLPRegressor as mlp
from sklearn.kernel_ridge import KernelRidge as kr
import xgboost as xgb
import lightgbm as lgb
import catboost as cb

# 模型評估與交叉驗證
from sklearn.model_selection import train_test_split, cross_val_score, cross_val_predict
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_squared_log_error

# 並行處理與儲存
from joblib import Parallel, delayed, dump, load

# 超參數優化
import optuna
from skopt import gp_minimize
from skopt.space import Real, Integer, Categorical
from skopt.utils import use_named_args

# pip install xgboost lightgbm category_encoders scikit-optimize catboost optuna
warnings.filterwarnings('ignore')


# 2.資料探勘

In [None]:
train_df = pd.read_csv("/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/train.csv")

# df = pd.read_csv("/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/train.csv", keep_default_na=False, na_values=[])
# filepath：CSV 檔案路徑
# keep_default_na：是否保留 pandas 預設的 NA 標記
# na_values：指定哪些值要被視為 NA

#### 直方圖、散佈圖、盒鬚圖 --->多特徵子圖
# =====================================================================================================================================================
def plot_features(data, plot_type='hist', target=None, drop_cols=None, n_col=4, bins=20, kde=True):
    """
    data: pd.DataFrame
    plot_type: 'hist' -> 直方圖, 'scatter' -> 散佈圖(y軸:目標變數), 'box' -> 盒鬚圖
    target: 目標變數名稱，plot_type='scatter' 時必填
    drop_cols: scatter 時要排除的數值欄位 (list)
    n_col: 每列的圖數
    bins: hist 圖的 bins 數
    kde: hist 圖是否畫 KDE
    """

    if plot_type not in ['hist', 'scatter', 'box']:
        raise ValueError("plot_type 只能是 'hist', 'scatter', 'box'")

    if plot_type == 'scatter' and target is None:
        raise ValueError("plot_type='scatter' 時必須提供 target")

    if plot_type == 'scatter' or plot_type == 'box':
        numeric_cols = data.select_dtypes(include=np.number).columns
        if drop_cols:
            numeric_cols = numeric_cols.drop(drop_cols)
        cols_to_plot = numeric_cols
    else:
        cols_to_plot = data.columns

    n_var = len(cols_to_plot) # 變數數量
    n_row = n_var // n_col + (1 if n_var % n_col != 0 else 0 )# 子圖要幾行
    fig, ax = plt.subplots(n_row, n_col, figsize=(n_col*6, n_row*4)) # fig=大圖框，ax=多個小圖框==>2Darray
    axs = ax.flatten() # 攤平成一列，for迴圈比較好做

    for i, col in enumerate(cols_to_plot): # enumerate 會把每個元素包裝成一個 tuple 包含: 1.索引（index）2.元素本身（value）
        if plot_type == 'hist':
            sns.histplot(data[col], ax=axs[i], bins=bins, kde=kde) # axs[i]就是第i個子圖，ax參數若沒填就是自動生成新圖
        elif plot_type == 'scatter':
            sns.scatterplot(data=data, x=col, y=target, ax=axs[i])
        elif plot_type == 'box':
            sns.boxplot(data=data[col], ax=axs[i])
        axs[i].set_title(col)

    for j in range(i+1, len(axs)):
        fig.delaxes(axs[j]) # 刪除上面迴圈最後i的第i+1個到最後一張空白子圖

    plt.tight_layout()
    plt.show()

    if plot_type in ['scatter', 'box']:
        print(f'數值變數數量: {n_var}')
        print(f'數值變數: {list(cols_to_plot)}')

# # 直方圖
# plot_features(train_df, plot_type='hist', bins=30)

# # 散佈圖 y軸:SalePrice
# plot_features(train_df, plot_type='scatter', target='SalePrice', drop_cols=['Id', 'SalePrice'])

# # 盒鬚圖
# plot_features(train_df, plot_type='box', drop_cols=['Id', 'SalePrice'])

#### 熱力圖
# =====================================================================================================================================================
def plot_heatmap(data, numeric_only=True, target=None, annot=True, cmap='coolwarm', figsize=(12,10)):
    """
    繪製數值特徵的相關性熱力圖
    data: DataFrame
    numeric_only: 是否只選數值欄位
    target: 如果指定目標欄位，會畫該欄與其他數值欄的相關性
    annot: 是否在格子上標示數值
    cmap: 顏色地圖
    figsize: 圖片大小
    """
    if numeric_only:
        df = data.select_dtypes(include=np.number)
    else:
        df = data.copy()

    if target and target in df.columns:
        corr = df.corr()[[target]].sort_values(by=target, ascending=False)
    else:
        corr = df.corr()

    n = corr.shape[0]
    size = max(6, 40//n)  # 特徵太多文字縮小
    plt.figure(figsize=figsize)  # figsize 設定圖框長寬
    sns.heatmap(
      corr,               # 要畫的數據 (通常是相關矩陣)
      annot=annot,           # 是否在格子中標註數值
      fmt=".2f",            # 標註的數字格式 (小數點 2 位)
      cmap=cmap,            # 顏色映射 (colormap)，例如 "coolwarm", "viridis"
      cbar=True,            # 是否顯示右邊的顏色條 (colorbar)
      square=True,           # 是否讓每個格子是正方形
      annot_kws={"size": size}    # 控制標註字體大小
    )
    plt.title('Heatmap of Correlations', fontsize=16)
    plt.show()

# 針對所有數值欄位相關性
# plot_heatmap(train_df, numeric_only=True)

# 只看數值欄位與目標 SalePrice 的相關性
# plot_heatmap(train_df, target='SalePrice')

#### VIF值計算
# =====================================================================================================================================================
def vif_cal(data):
  df = data.select_dtypes(include=np.number)
  df = df.dropna()
  # 重要：計算 VIF 前，必須加上常數項（截距）
  X = add_constant(df)

  # 創建一個 DataFrame 來儲存 VIF 值
  vif_df = pd.DataFrame()
  vif_df["feature"] = X.columns

  # 計算 VIF 值並添加到 DataFrame           # i 是第i個特徵
  vif_df["VIF"] = [f'{variance_inflation_factor(X.values, i):.2f}' for i in range(X.shape[1])] # X.shape[1]是col
                        # X.values==>把 DataFrame X 轉成 NumPy 陣列（2D）
  return vif_df

#### SHAP特徵重要性
# =====================================================================================================================================================
def shap_analysis(data, y_name, model=None, Dependence=None):
  data = data.select_dtypes(include=np.number).dropna()
  y = data[y_name]
  X = data.drop(columns=[y_name])

  columns_to_drop = [col for col in ['Id', 'id', 'ID'] if col in X.columns]
  if columns_to_drop:
    X = X.drop(columns=columns_to_drop)
  if Dependence in X.columns:
    Dependence = Dependence
  elif Dependence is None:
    Dependence = X.columns[0]
  else:
    raise ValueError("Dependence not in X.columns")

  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
  if model is None:
    model = xgb()
  model.fit(X_train, y_train)

  # 計算 SHAP 值
  # 1. 建立 explainer 物件
  # 這裡使用 TreeExplainer，因為我們的模型是樹狀模型
  explainer = shap.TreeExplainer(model)

  # 2. 計算測試集上每個預測的 SHAP 值
  # shap_values 是一個包含 SHAP 值的 NumPy 陣列
  shap_values = explainer.shap_values(X_test)

  # SHAP 值視覺化
  # 3. 繪製摘要圖 (Summary Plot)
  # 這個圖可以顯示每個特徵對模型輸出的整體影響
  print("--- SHAP Summary Plot ---")
  shap.summary_plot(shap_values, X_test)

  # 4. 繪製依賴圖 (Dependence Plot)
  # 這個圖可以顯示單一特徵如何影響預測，並顯示特徵之間的交互作用
  print("\n--- SHAP Dependence Plot for bmi ---")
  shap.dependence_plot(Dependence, shap_values, X_test)

  # 5. 繪製單一預測的解釋 (Force Plot)
  # 這裡我們解釋測試集中的第一筆資料 (索引 0)
  print("\n--- SHAP Force Plot for a single prediction ---")
  shap.initjs()
  display(shap.force_plot(explainer.expected_value, shap_values[0,:], X_test.iloc[0,:]))

# shap_analysis(train_df, y_name='SalePrice', Dependence='LotFrontage')


# 3.ETL流程

In [None]:
class ETLPipeline:
    def __init__(self):
        self.save_dir = save_dir
        self.scaler = None
        self.encoder = None
        self.pca = None
        self.imputer = None
        self.y_name = None
        self.X_interaction_list = None
        self.pca_n_components = 20

        # NA為類別而不是缺失值的特徵列表
        self.str_na_col = [
            'Alley', 'BsmtQual', 'BsmtCond', 'BsmtFinType1', 'BsmtFinType2', 'GarageType',
            'GarageFinish','GarageQual', 'GarageCond', 'Fence', 'MiscFeature', 'PoolQC',
            'BsmtExposure', 'FireplaceQu', 'MasVnrType'
        ]
        self.na_per15up = None
        self.idx = None

        # 連續變數
        self.con_var = [
          'LotFrontage', 'LotArea', 'MasVnrArea', 'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF',
          'TotalBsmtSF', '1stFlrSF', '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'GarageCars',
          'GarageArea', 'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch',
          'PoolArea', 'MiscVal', 'TotRmsAbvGrd', 'Fireplaces', 'KitchenAbvGr', 'FullBath',
          'BsmtFullBath', 'BsmtHalfBath', 'BedroomAbvGr', 'HalfBath'
        ]
        # 有序類別變數
        self.ord_var = [
          'OverallQual', 'OverallCond', 'ExterQual', 'ExterCond', 'HeatingQC', 'KitchenQual',
          'FireplaceQu', 'PoolQC', 'Fence', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1',
          'BsmtFinType2', 'Functional', 'LandSlope', 'GarageFinish', 'GarageQual', 'GarageCond'
        ]
        # 無序類別變數
        self.nom_var = [
          'MSSubClass', 'MSZoning', 'Alley', 'LotShape', 'LandContour', 'Utilities',
          'LotConfig', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType', 'HouseStyle',
          'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'Foundation',
          'Heating', 'Electrical', 'GarageType', 'PavedDrive', 'SaleType', 'SaleCondition',
          'MiscFeature'
        ]
        # 直接轉為1、0
        self.one_zero = ['CentralAir', 'Street', 'HasGarage']
        # 時間
        self.time_ = ['YearBuilt', 'YearRemodAdd', 'GarageYrBlt', 'YrSold', 'MoSold']

        self.id_1 = ['Id']
        self.y_var = ['SalePrice']

        #有序編碼
        self.ordinal_mappings = {
            'ExterQual': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1},
            'ExterCond': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1},
            'HeatingQC': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1},
            'KitchenQual': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1},
            'FireplaceQu': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1, 'NO': 0},
            'PoolQC': {'Ex': 4, 'Gd': 3, 'Ta': 2, 'Fa': 1, 'NO': 0},
            'Fence': {'GdPrv': 4, 'MnPrv': 3, 'GdWo': 2, 'MnWw': 1, 'NO': 0},
            'BsmtQual': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1, 'NO': 0},
            'BsmtCond': {'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1, 'NO': 0},
            'BsmtExposure': {'Gd': 4, 'Av': 3, 'Mn': 2, 'No': 1, 'NO': 0},
            'BsmtFinType1': {'GLQ': 6, 'ALQ': 5, 'BLQ': 4, 'Rec': 3, 'LwQ': 2, 'Unf': 1, 'NO': 0},
            'BsmtFinType2': {'GLQ': 6, 'ALQ': 5, 'BLQ': 4, 'Rec': 3, 'LwQ': 2, 'Unf': 1, 'NO': 0},
            'Functional': {'Typ': 7, 'Min1': 6, 'Min2': 5, 'Mod': 4, 'Maj1': 3, 'Maj2': 2, 'Sev': 1, 'Sal': 0},
            'LandSlope': {'Gtl': 3, 'Mod': 2, 'Sev': 1},
            'GarageFinish': {'Fin': 3, 'RFn': 2, 'Unf': 1, 'NO': 0},
            'GarageQual': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1, 'NO': 0},
            'GarageCond': {'Ex': 5, 'Gd': 4, 'TA': 3, 'Fa': 2, 'Po': 1, 'NO': 0}
        }
    def _preprocess(self, df: pd.DataFrame):

        # 1. 挑出NA為類別而不是缺失值的欄，並替換NA為NO
        df[self.str_na_col] = df[self.str_na_col].fillna('NO')

        # 2. 新增一欄特徵為有無車庫
        df['HasGarage'] = df['GarageYrBlt'].notna().astype(int)

        # 2.1 將無車庫NA的部分設為-1，避免被認為是缺失值
        df['GarageYrBlt'] = df['GarageYrBlt'].fillna(-1)

        # 3. 丟棄NA值數量大於總樣本15%的特徵
        count_na_2 = df.isna().sum()
        total_sample = len(df)
        self.na_per15up = count_na_2[count_na_2 > total_sample * 0.15].index
        df = df.drop(columns=self.na_per15up)

        # 4. 刪除y中有缺失值的一整列
        if 'SalePrice' in df.columns:
          df = df.dropna(subset=['SalePrice'])

        # 5. 映射有序類別變數為數值
        for col, mapping in self.ordinal_mappings.items():
          df[col] = df[col].map(mapping)

        # 6. 將二元類別依順序轉為1、0
        for col in self.one_zero:
          if col == 'CentralAir':
            df[col] = df[col].map({'Y': 1, 'N': 0})
          else:
            df[col] = pd.factorize(df[col])[0] # 將二元類別依順序轉為1、0

        # 7. 將落成或改建日期與買賣日期整合為屋齡
        df['houseage'] = df['YrSold'] - df['YearBuilt']
        df['remodelage'] = df['YrSold'] - df['YearRemodAdd']

        # 8. 剔除日期特徵
        df = df.drop(columns=self.time_)

        return df

    def serch_interaction_feature(self, X, y=None, top_n=5):

        base_features = X.columns.tolist()
        # print('原始特徵數: ', len(base_features)+1)

        # 分割數據
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # 建立 LightGBM 模型
        train_data = lgb.Dataset(X_train, label=y_train)
        test_data = lgb.Dataset(X_test, label=y_test, reference=train_data)
        params = {"objective": "regression", "metric": "rmse", "boosting": "gbdt", "num_leaves": 31, "learning_rate": 0.05, "feature_fraction": 0.9, "verbose": -1}
        model = lgb.train(params,train_data, valid_sets=[test_data], num_boost_round=100, callbacks=[lgb.early_stopping(10), lgb.log_evaluation(0)])

        # 建立 SHAP explainer
        explainer = shap.TreeExplainer(model)
        shap_interaction = explainer.shap_interaction_values(X_test)
        if isinstance(shap_interaction, list):
            shap_interaction = shap_interaction[0]  # 取單目標 array
        # 計算交互作用強度
        interaction_strength = np.abs(shap_interaction).mean(axis=0)

        n_features = X_test.shape[1]

        # 生成所有可能的特徵對並計算交互強度
        pairs = []
        cols = X_test.columns.tolist()

        for i in range(n_features):
            for j in range(i+1, n_features):  # 只取上三角，避免重複
                feature1, feature2 = cols[i], cols[j]
                # 確保至少有一個特徵是連續變數
                if feature1 in self.con_var or feature2 in self.con_var:
                    # 確保兩個特徵都在原始特徵中
                    if feature1 in base_features and feature2 in base_features:
                        pairs.append(((feature1, feature2), interaction_strength[i, j]))

        # 按交互強度排序，取前n個
        top_interactions = sorted(pairs, key=lambda x: x[1], reverse=True)[:top_n]

        # 生成交互項特徵
        print("生成的二階交互項特徵：")
        print("-" * 50)

        X_interaction_list = []
        for idx, ((f1, f2), score) in enumerate(top_interactions, 1):
            X_interaction_list.append((f1, f2))
            print(f"{idx:2d}. {f1} × {f2} → (強度: {score:.4f})")

        # print(f"\n總共生成了 {len(top_interactions)} 個交互項特徵")
        # print('增加交互向後特徵數量: ', len(X.columns)+top_n)
        return X_interaction_list

    def add_interaction_feature(self, X, X_interaction_list):
        interaction_df = pd.DataFrame()
        for f1, f2 in X_interaction_list:
            interaction_df[f"interx{f1}_{f2}"] = X[f1] * X[f2]
        return interaction_df

    def fit(self, df: pd.DataFrame, y_name="SalePrice"):
        df = df.copy()
        df.set_index('Id', inplace=True)
        self.y_name = y_name
        # 1. 執行清理
        df = self._preprocess(df)

        # 2. 分開 y
        self.y = df[y_name]
        self.X = df.drop(columns=[y_name])

        # 3. 無序類別變數轉換(LeaveOneOutEncoder(LOO)編碼器)
        self.encoder = ce.LeaveOneOutEncoder(cols=self.nom_var, sigma=0.01)
        self.encoder.fit(self.X[self.nom_var], self.y)
        X_cat = self.encoder.transform(self.X[self.nom_var])

        # 3.1 將原始值替換
        X_temp = self.X.copy()
        X_temp[self.nom_var] = X_cat

        # 4. 數值標準化
        self.scaler = StandardScaler()
        self.scaler.fit(X_temp)
        X_scaled = self.scaler.transform(X_temp)

        # 5. 填補缺失值(knn插值)
        self.imputer = KNNImputer(n_neighbors=10, weights='distance')
        self.imputer.fit(X_scaled)
        X_imputed = self.imputer.transform(X_scaled)
        X_invers = self.scaler.inverse_transform(X_imputed)
        X_invers_imputed_df = pd.DataFrame(X_invers, columns=self.X.columns, index=self.X.index)

        # 6. 計算二階交互項前5重要特徵
        self.X_interaction_list = self.serch_interaction_feature(X_invers_imputed_df, self.y, top_n=5)

        # 7. 加入原始資料PCA後特徵，加入20項
        self.pca = PCA(n_components=self.pca_n_components)
        self.pca.fit(X_imputed)

        return self

    def transform(self, df: pd.DataFrame):
        df = df.copy()
        if 'Id' in df.columns:
          df.set_index('Id', inplace=True)
        print('原始特徵數: ', len(df.columns))
        df = self._preprocess(df)
        print('因缺失超過15%刪除特徵為: ', self.na_per15up)

        X = df.drop(columns=[self.y_name], errors='ignore')

        # 1. 類別編碼
        X[self.nom_var] = self.encoder.transform(X[self.nom_var])

        # 2. 標準化
        X_scaled = self.scaler.transform(X)

        # 3. 填補缺失值
        X_imputed = self.imputer.transform(X_scaled)
        X_imputed_inver = self.scaler.inverse_transform(X_imputed)

        X_imputed_inver_df = pd.DataFrame(X_imputed_inver, columns=X.columns, index=X.index)
        class_var = self.ord_var + self.one_zero
        X_imputed_inver_df[class_var] = X_imputed_inver_df[class_var].round(0).astype(int)

        # 4. 加入交互特徵
        X_interaction_df = self.add_interaction_feature(X_imputed_inver_df, self.X_interaction_list)
        print(f'增加交互項特徵數量: ', len(self.X_interaction_list))

        # 5. 加入PCA特徵
        X_pca = self.pca.transform(X_imputed)
        pca_df = pd.DataFrame(X_pca, columns=[f"pca_{i}" for i in range(X_pca.shape[1])], index=X.index)
        print(f'增加PCA特徵數量: ', len(pca_df.columns))

        # 6. 合併
        X_all = pd.concat([X_imputed_inver_df, X_interaction_df, pca_df], axis=1)

        if self.y_name in df.columns:
            # 對齊 y 的 index，只保留 X_all 中有的 index
            y_aligned = df[self.y_name].reindex(X_all.index)
            # 合併
            X_all[self.y_name] = y_aligned
            # 刪除 y 為 NaN 的列
            X_all = X_all.dropna(subset=[self.y_name])


        return X_all

    def fit_transform(self, df: pd.DataFrame, y_name="SalePrice", pca_n_components=20):
        self.pca_n_components = pca_n_components
        self.fit(df, y_name=y_name)
        return self.transform(df)

    def save(self, save_dir='save_model'):
        # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        # file_path = f"{save_dir}/etl_pipeline_{timestamp}.joblib"
        file_path = f"{save_dir}/etl_pipeline_RMSLE.joblib"
        dump(self, file_path)
        print(f"ETLPipeline 已儲存到 {file_path}")

    @classmethod
    def load(cls, file_path):
        """載入已存的 ETLPipeline"""
        obj = load(file_path)
        print(f"ETLPipeline 已從 {file_path} 載入")
        return obj


# save_dir = '/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/project/saved_objects'
# train_df = pd.read_csv("/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/train.csv")
# etl_pipeline = ETLPipeline(save_dir=save_dir)
# train_df = etl_pipeline.fit_transform(train_df, pca_n_components=20)
# train_df.head(5)

# 4.偏態處理(log)

In [None]:

class SkeTransX(BaseEstimator, TransformerMixin):
    """
    特徵 (X) 轉換器，可選 log/box-cox/yeo-johnson 和標準化方式。
    scale_method: 'standard', 'minmax', 或 None
    method: 'log', 'box-cox', 'yeo-johnson', 或 None
    log_cols_index: 指定列索引進行偏態處理，None 表示所有列
    scale_cols_index: 指定列索引進行標準化，None 表示所有列
    """
    def __init__(self, method='yeo-johnson', scale=True, scale_method='standard', log_cols_index=None, scale_cols_index=None):
        self.method = method
        self.scale = scale
        self.scale_method = scale_method
        self.log_cols_index = log_cols_index
        self.scale_cols_index = scale_cols_index
        self.pt = None
        self.scaler = None

    def fit(self, X, y=None):
        X_copy = np.array(X, dtype=float)

        # 偏態處理
        if self.method is not None:
            if self.method == 'log':
                if self.log_cols_index is not None:
                    X_copy[:, self.log_cols_index] = np.log1p(X_copy[:, self.log_cols_index])
                else:
                    X_copy = np.log1p(X_copy)
            else:
                pt_method = 'box-cox' if self.method == 'box-cox' else 'yeo-johnson'
                self.pt = PowerTransformer(method=pt_method, standardize=False)

                if self.log_cols_index is not None:
                    self.pt.fit(X_copy[:, self.log_cols_index])
                    X_copy[:, self.log_cols_index] = self.pt.transform(X_copy[:, self.log_cols_index])
                else:
                    self.pt.fit(X_copy)
                    X_copy = self.pt.transform(X_copy)

        # 標準化處理
        if self.scale:
            if self.scale_method == 'standard':
                self.scaler = StandardScaler()
            elif self.scale_method == 'minmax':
                self.scaler = MinMaxScaler()
            else:
                self.scaler = None

            if self.scaler is not None:
                if self.scale_cols_index is not None:
                    self.scaler.fit(X_copy[:, self.scale_cols_index])
                else:
                    self.scaler.fit(X_copy)

        return self

    def transform(self, X):
        X_copy = np.array(X, dtype=float)

        # 偏態處理
        if self.method is not None:
            if self.method == 'log':
                if self.log_cols_index is not None:
                    X_copy[:, self.log_cols_index] = np.log1p(X_copy[:, self.log_cols_index])
                else:
                    X_copy = np.log1p(X_copy)
            else:
                if self.log_cols_index is not None:
                    X_copy[:, self.log_cols_index] = self.pt.transform(X_copy[:, self.log_cols_index])
                else:
                    X_copy = self.pt.transform(X_copy)

        # 標準化處理
        if self.scale and self.scaler is not None:
            if self.scale_cols_index is not None:
                X_copy[:, self.scale_cols_index] = self.scaler.transform(X_copy[:, self.scale_cols_index])
            else:
                X_copy = self.scaler.transform(X_copy)

        return X_copy


class SkeTransY(BaseEstimator, TransformerMixin):
    """
    目標變數 (y) 轉換器，可選 log/box-cox/yeo-johnson 和標準化方式。
    """
    def __init__(self, method='yeo-johnson', scale=True, scale_method='standard'):
        self.method = method
        self.scale = scale
        self.scale_method = scale_method
        self.pt = None
        self.scaler = None

    def fit(self, y, *_):
        y_arr = np.asarray(y, dtype=float).reshape(-1, 1)

        if self.method is not None:
            if self.method == 'log':
                y_arr = np.log1p(y_arr)
            else:
                pt_method = 'box-cox' if self.method == 'box-cox' else 'yeo-johnson'
                self.pt = PowerTransformer(method=pt_method, standardize=False)
                self.pt.fit(y_arr)
                y_arr = self.pt.transform(y_arr)

        if self.scale:
            if self.scale_method == 'standard':
                self.scaler = StandardScaler()
            elif self.scale_method == 'minmax':
                self.scaler = MinMaxScaler()
            else:
                self.scaler = None

            if self.scaler is not None:
                self.scaler.fit(y_arr)

        return self

    def transform(self, y):
        y_arr = np.asarray(y, dtype=float).reshape(-1, 1)

        if self.method is not None:
            if self.method == 'log':
                y_arr = np.log1p(y_arr)
            else:
                y_arr = self.pt.transform(y_arr)

        if self.scale and self.scaler is not None:
            y_arr = self.scaler.transform(y_arr)

        return y_arr

    def inverse_transform(self, y_trans):
        y_arr = np.asarray(y_trans, dtype=float).reshape(-1, 1)

        if self.scale and self.scaler is not None:
            y_arr = self.scaler.inverse_transform(y_arr)

        if self.method is not None:
            if self.method == 'log':
                y_arr = np.expm1(y_arr)
            else:
                y_arr = self.pt.inverse_transform(y_arr)

        return y_arr


def get_raw_model(model):
    """
    遞迴剝除包裝器，直到取得最底層的裸模型
    支援：
        - Pipeline
        - TransformedTargetRegressor
        - StackingRegressor
    """
    while True:
      if isinstance(model, TransformedTargetRegressor):
        model = model.regressor
      elif isinstance(model, Pipeline):
        model = model.steps[-1][1]  # 取最後一層
      elif isinstance(model, skStack):
        model = model.final_estimator
      else:
        break  # 已經到底層

    return model

class MetaFeatureGenerator(BaseEstimator, TransformerMixin):
    def __init__(self, base_models, cv_folds=5, n_jobs=-1):
        self.base_models = base_models
        self.cv_folds = cv_folds
        self.n_jobs = n_jobs
        self.fitted_models_ = {}  # 保存訓練好的 base_models

    def fit(self, X, y):
        """
        訓練每個 base model 並生成 OOF meta features
        """
        meta_feature = []
        start_time = time.time()

        for name, model in self.base_models.items():
            # cross_val_predict 產生 OOF 預測
            cv_pred = cross_val_predict(model, X, y, cv=self.cv_folds, n_jobs=self.n_jobs)
            meta_feature.append(cv_pred.reshape(-1,1))


        self.meta_features_ = np.hstack(meta_feature)
        print(f"Meta features generated in {time.time() - start_time:.2f}s")
        return self

    def transform(self, X):
        """
        對任意 X 生成 meta features (用已訓練好的 base_models 預測)
        """
        meta_feature = []
        for name, model in self.base_models.items():
            pred = model.predict(X).reshape(-1,1)
            meta_feature.append(pred)

        return np.hstack(meta_feature)



# 5.搜尋空間

In [None]:

# 快速搜尋
fast_search_spaces = {
    'rf': [
        # ✨ 優化：減少搜尋範圍加速訓練
        Integer(50, 200, name='n_estimators'),  # 原: 5-1000 → 50-200
        Integer(3, 15, name='max_depth'),       # 原: 1-30 → 3-15
        Integer(2, 4, name='min_samples_split'),
        Categorical(['sqrt'], name='max_features')  # 原: [None, 'sqrt', 'log2'] → 固定較快選項
    ],
    'lgb': [
        # ✨ 優化：縮小參數範圍
        Integer(50, 300, name='n_estimators'),   # 原: 100-2000 → 50-300
        Integer(3, 8, name='max_depth'),         # 原: -1-15 → 3-8
        Real(0.01, 0.3, name='learning_rate', prior='log-uniform'),
        Integer(10, 50, name='num_leaves')       # 原: 10-100 → 10-50
    ],
    'lr': [
        Categorical([True], name='fit_intercept')
    ],
    'rdg': [
        # ✨ 優化：縮小搜尋範圍
        Real(0.1, 10, name='alpha', prior='log-uniform')  # 原: 0.01-100 → 0.1-10
    ]
}

# 完整搜尋
full_search_spaces = {
    'rf_1': [
        Integer(10, 500, name='n_estimators'),
        Integer(10, 100, name='max_depth'),
        Integer(7, 12, name='min_samples_split'),
        Categorical([None, 'sqrt', 'log2'], name='max_features')# [None, 'sqrt', 'log2'] 考慮多少特徵去拆分
    ],
    'rf_2': [
        Integer(500, 1000, name='n_estimators'),
        Integer(100, 200, name='max_depth'),
        Integer(3, 6, name='min_samples_split'),
        Categorical([None, 'sqrt', 'log2'], name='max_features')# [None, 'sqrt', 'log2'] 考慮多少特徵去拆分
    ],
    'dt_1': [
        # ✨ 優化：大幅減少深度和範圍
        Integer(10, 50, name='max_depth'),        # 原: 20-100 → 5-20
        Integer(10, 20, name='min_samples_split'), # 原: 2-20 → 2-10
        Integer(5, 20, name='min_samples_leaf'),  # 原: 5-100 → 5-20
        Categorical([None, 'sqrt', 'log2'], name='max_features') # 原: [None, 'sqrt', 'log2'] → 固定較快選項
    ],
    'dt_2': [
        # ✨ 優化：大幅減少深度和範圍
        Integer(50, 100, name='max_depth'),        # 原: 20-100 → 5-20
        Integer(30, 60, name='min_samples_split'), # 原: 2-20 → 2-10
        Integer(40, 80, name='min_samples_leaf'),  # 原: 5-100 → 5-20
        Categorical([None, 'sqrt', 'log2'], name='max_features') # 原: [None, 'sqrt', 'log2'] → 固定較快選項
    ],
    # 'mlp': [
    #     Categorical(['200_100_50', '1000_500_250_100', '1000_1000_1000_1000'], name='hidden_layer_sizes'),
    #     Real(0.0001, 0.1, name='learning_rate_init', prior='log-uniform'),
    #     Real(0.0001, 2, name='alpha', prior='log-uniform'),
    #     Categorical([16, 32, 64, 128, 256, 512], name='batch_size'),
    # ],
    'lasso_1': [
        # ✨ 優化：縮小搜尋範圍
        Real(0.001, 0.01, name='alpha', prior='log-uniform')  # 原: 0.0001-100 → 0.001-10
    ],
    'lasso_2': [
        # ✨ 優化：縮小搜尋範圍
        Real(0.01, 0.1, name='alpha', prior='log-uniform')  # 原: 0.0001-100 → 0.001-10
    ],
    'xgb_1': [
        # ✨ 優化：大幅減少估計器數量和深度
        Integer(100, 500, name='n_estimators'),   # 原: 2000-4000 → 100-500
        Integer(2, 5, name='max_depth'),          # 原: 3-5 → 3-6 但實際更保守
        Real(0.01, 0.5, name='learning_rate', prior='log-uniform')  # 調整範圍
    ],
    'xgb_2': [
        # ✨ 優化：大幅減少估計器數量和深度
        Integer(600, 1500, name='n_estimators'),   # 原: 2000-4000 → 100-500
        Integer(6, 12, name='max_depth'),          # 原: 3-5 → 3-6 但實際更保守
        Real(0.01, 0.5, name='learning_rate', prior='log-uniform')  # 調整範圍
    ],
    'lgb_1': [
        # ✨ 優化：減少估計器數量
        Integer(100, 500, name='n_estimators'),   # 原: 100-2000 → 100-800
        Integer(5, 100, name='max_depth'),        # 原: -1-15 → -1-10
        Real(0.001, 0.1, name='learning_rate', prior='log-uniform'),
        Integer(3, 100, name='num_leaves')
    ],
    'lgb_2': [
        # ✨ 優化：減少估計器數量
        Integer(600, 1200, name='n_estimators'),   # 原: 100-2000 → 100-800
        Integer(200, 600, name='max_depth'),        # 原: -1-15 → -1-10
        Real(0.001, 0.1, name='learning_rate', prior='log-uniform'),
        Integer(3, 100, name='num_leaves')
    ],
    'lr': [
        Categorical([True], name='fit_intercept')
    ],
    'rdg_1': [
        Real(0.0001, 0.001, name='alpha', prior='log-uniform')
    ],
    'rdg_2': [
        Real(0.001, 0.1, name='alpha', prior='log-uniform')
    ],
    'sgd_1': [
        Real(0.0001, 0.01, prior='log-uniform', name='alpha'),
        Real(0.001, 0.01, prior='log-uniform', name='l1_ratio'),
        Real(0.01, 1, prior='log-uniform', name='epsilon'),
        Categorical(['squared_error', 'huber'], name='loss') # ['squared_error', 'huber', 'epsilon_insensitive', 'squared_epsilon_insensitive']
    ],
    'sgd_2': [
        Real(0.01, 0.5, prior='log-uniform', name='alpha'),
        Real(0.01, 0.9, prior='log-uniform', name='l1_ratio'),
        Real(0.01, 1, prior='log-uniform', name='epsilon'),
        Categorical(['epsilon_insensitive', 'squared_epsilon_insensitive'], name='loss') # ['squared_error', 'huber', 'epsilon_insensitive', 'squared_epsilon_insensitive']
    ],
    'knn_1': [
        # ✨ 優化：減少鄰居數量範圍
        Integer(3, 10, name='n_neighbors'),       # 原: 3-20 → 3-15
        Categorical(['uniform', 'distance'], name='weights'),  # 原: ['uniform', 'distance'] → 固定較快選項
        Categorical([1, 2], name='p')                # 原: [1, 2] → 固定歐氏距離
    ],
    'knn_2': [
        # ✨ 優化：減少鄰居數量範圍
        Integer(10, 20, name='n_neighbors'),       # 原: 3-20 → 3-15
        Categorical(['uniform', 'distance'], name='weights'),  # 原: ['uniform', 'distance'] → 固定較快選項
        Categorical([1, 2], name='p')                # 原: [1, 2] → 固定歐氏距離
    ],
    'kr_1': [
        # ✨ 優化：大幅縮小參數範圍
        Real(0.01, 0.1, name='alpha', prior='log-uniform'),     # 原: 0.01-100 → 0.1-10
        Real(0.001, 0.01, name='gamma', prior='log-uniform'),     # 原: 0.0001-10 → 0.01-1
    ],
    'kr_2': [
        # ✨ 優化：大幅縮小參數範圍
        Real(0.1, 1, name='alpha', prior='log-uniform'),     # 原: 0.01-100 → 0.1-10
        Real(0.01, 0.5, name='gamma', prior='log-uniform'),     # 原: 0.0001-10 → 0.01-1
    ],
    'cat_1': [
        Integer(50, 400, name='n_estimators'), # 估計器數量
        Integer(3, 7, name='depth'),         # 樹的深度，最大只能到16
        Real(0.001, 0.1, name='learning_rate', prior='log-uniform'), # 學習率
        Real(0.01, 1, name='l2_leaf_reg', prior='log-uniform') # L2 正則化
    ],
    'cat_2': [
        Integer(700, 1000, name='n_estimators'), # 估計器數量
        Integer(3, 7, name='depth'),         # 樹的深度，最大只能到16
        Real(0.001, 0.1, name='learning_rate', prior='log-uniform'), # 學習率
        Real(1, 10, name='l2_leaf_reg', prior='log-uniform') # L2 正則化
    ],
    'svr_1': [
        Real(0.01, 1, name='C', prior='log-uniform'),  # 正則化參數
        Real(0.001, 0.1, name='gamma', prior='log-uniform'), # 核函數係數
    ],
    'svr_2': [
        Real(1, 100, name='C', prior='log-uniform'),  # 正則化參數
        Real(0.1, 1, name='gamma', prior='log-uniform'), # 核函數係數
    ]
}

# 6.Stacking Model

In [None]:
class StackingModel:
    def __init__(self, x, y, random_state=42, search_ncall=10, cv_folds=5, test_size=0.2, fast_models_only=False, interx=None):
        self.x = x
        self.y = y
        etl = ETLPipeline()
        self.con_inter = etl.con_var + interx
        self.skew_col = [s for s in self.con_inter if s in self.x.columns and self.x[s].skew() > 0.5]
        self.log_cols_index = [self.x.columns.get_loc(c) for c in self.skew_col]
        self.scale_cols_index = [self.x.columns.get_loc(c) for c in self.x.columns if 'pca' not in c]

        self.random_state = random_state
        self.x_train, self.x_test, self.y_train, self.y_test = train_test_split(self.x, self.y, test_size=test_size, random_state=self.random_state)

        self.search_ncall = search_ncall
        # ✨ 新增屬性：可配置的加速選項
        self.cv_folds = cv_folds  # 減少 CV folds
        self.fast_models_only = fast_models_only

        # ✨ 針對低核心數CPU的智能並行配置
        self.n_cpu = os.cpu_count() or 4

        # 🎯 專門針對2核CPU優化的配置
        if self.n_cpu <= 2:
            # 2核CPU：採用序列模型搜尋，但單模型內部並行
            self.outer_n_jobs = 1    # 一次只訓練一個模型
            self.inner_n_jobs = 2    # 單模型使用全部核心
            print(f"⚡ 低核心CPU模式：序列搜尋模型，單模型使用{self.inner_n_jobs}核心")
        elif self.n_cpu <= 4:
            # 4核CPU：適度並行
            self.outer_n_jobs = 2    # 同時訓練2個模型
            self.inner_n_jobs = 2    # 每個模型用2核心
            print(f"⚡ 中等CPU模式：{self.outer_n_jobs}個模型並行，每個用{self.inner_n_jobs}核心")
        else:
            # 高核心CPU：正常配置
            self.outer_n_jobs = min(4, max(2, self.n_cpu // 2))
            self.inner_n_jobs = max(1, self.n_cpu // self.outer_n_jobs)
            print(f"⚡ 高性能CPU模式：{self.outer_n_jobs}個模型並行，每個用{self.inner_n_jobs}核心")

        # ✨ 新增：根據模式選擇搜尋空間
        self.search_spaces = fast_search_spaces if fast_models_only else full_search_spaces
        self.standard_method = 'minmax' # standard / minmax 用於偏態處理時
        self.standard_stack = StandardScaler() # MinMaxScaler() StandardScaler()
        self.trans_skew_type = 'yeo-johnson' # 'log' / 'box-cox' / 'yeo-johnson'

    def evaluate_model(self, name, params, stack=False, input_x=None): #name, space, stack=False, estimators=None, input_x=None
        try:
            if name.startswith('rf'):
                model = rf(random_state=self.random_state, n_jobs=self.inner_n_jobs, **params) # ** 為解開dict讓param的dict的key去對應模型參數名稱value作為參數設定
            elif name.startswith('dt'):
                model = dt(random_state=self.random_state, **params)
            elif name.startswith('lasso'):
                model = Lasso(max_iter=2000, **params)
            elif name.startswith('xgb'):
                model = xgb.XGBRegressor(random_state=self.random_state, objective='reg:squarederror', n_jobs=self.inner_n_jobs, tree_method='hist', **params)
            elif name.startswith('lgb'):
                model = lgb.LGBMRegressor(random_state=self.random_state, verbose=-1, n_jobs=self.inner_n_jobs, **params)
            elif name.startswith('lr'):
                model = lr(n_jobs=self.inner_n_jobs, **params)
            elif name.startswith('rdg'):
                model = rdg(**params)
            elif name.startswith('sgd'):
                model = sgd(random_state=self.random_state, learning_rate='adaptive', **params)
            elif name.startswith('knn'):
                model = knn(n_jobs=self.inner_n_jobs, **params)
            elif name.startswith('kr'):
                model = kr(kernel='rbf', **params)
            elif name.startswith('svr'):
                model = svr(**params)
            elif name.startswith('cat'):
                model = cb.CatBoostRegressor(random_state=self.random_state, verbose=0, thread_count=self.inner_n_jobs, early_stopping_rounds=100, **params)
            else:
                return 1e6

            cv_jobs = 1 if self.n_cpu <= 2 and self.cv_folds >= 3 else min(self.cv_folds, self.inner_n_jobs)

            if stack:
                pipeline = Pipeline([
                    ('Trans', SkeTransX(method=self.trans_skew_type, scale=True, scale_method=self.standard_method)),
                    ('model', model)
                ])
                y_transformer = SkeTransY(method=self.trans_skew_type, scale=True, scale_method=self.standard_method)
                model = TransformedTargetRegressor(regressor=pipeline, transformer=y_transformer)
                score = cross_val_score(model, input_x, self.y_train, cv=self.cv_folds,
                                        scoring='neg_root_mean_squared_error', n_jobs=cv_jobs).mean()
            else:
                best_pipeline = Pipeline([
                    ('Trans', SkeTransX(method=self.trans_skew_type, scale=True, scale_method=self.standard_method,
                                        log_cols_index=self.log_cols_index, scale_cols_index=self.scale_cols_index)),
                    ('model', model)])
                y_transformer = SkeTransY(method=self.trans_skew_type, scale=True, scale_method=self.standard_method)
                model = TransformedTargetRegressor(regressor=best_pipeline, transformer=y_transformer)
                score = cross_val_score(model, self.x_train, self.y_train, cv=self.cv_folds,
                                        scoring='neg_root_mean_squared_log_error', n_jobs=cv_jobs).mean()
                                            # neg_root_mean_squared_log_error neg_root_mean_squared_error
            return score
        except Exception as e:
            print(f"objective Error in {name}: {e}")
            return 1e6

    # objective = self.make_objective_optuna(name, space, stack=stack, estimators=estimators, input_x=input_x)
    def make_objective_optuna(self, name, space, stack=False, estimators=None, input_x=None, meta_g=None):
        def objective(trial):
            params = {}
            for p in space:
                if isinstance(p, Integer):
                    params[p.name] = trial.suggest_int(p.name, p.low, p.high)
                elif isinstance(p, Real):
                    params[p.name] = trial.suggest_float(p.name, p.low, p.high, log=(p.prior == "log-uniform"))
                elif isinstance(p, Categorical):
                    params[p.name] = trial.suggest_categorical(p.name, p.categories)
            return self.evaluate_model(name, params, stack, input_x)
        return objective

    def best_model_(self, params, name, stack=False, estimators=None, input_x=None, meta_g=None):
        # 用最佳參數建立模型
        if name.startswith('rf'):
            model = rf(random_state=self.random_state,n_jobs=self.inner_n_jobs, **params)
        elif name.startswith('dt'):
            model = dt(random_state=self.random_state, **params)
        # elif name == 'mlp':
        #     model = mlp(random_state=42, max_iter=500, hidden_layer_sizes=(1000,1000,1000,1000,1000,1000), alpha=0.0001, batch_size=128,
        #                   learning_rate_init=0.0001, learning_rate='adaptive', early_stopping=True)
        elif name.startswith('lasso'):
            model = Lasso(max_iter=2000, **params)
        elif name.startswith('xgb'):
            model = xgb.XGBRegressor(random_state=self.random_state, objective='reg:squarederror', n_jobs=self.inner_n_jobs, tree_method='hist', **params)
        elif name.startswith('lgb'):
            model = lgb.LGBMRegressor(random_state=self.random_state, verbose=-1,n_jobs=self.inner_n_jobs, **params)
        elif name.startswith('lr'):
            model = lr(n_jobs=self.inner_n_jobs, **params)
        elif name.startswith('rdg'):
            model = rdg(**params)
        elif name.startswith('sgd'):
            model = sgd(random_state=self.random_state, learning_rate='adaptive', **params)
        elif name.startswith('knn'):
            model = knn(n_jobs=self.inner_n_jobs, **params)
        elif name.startswith('kr'):
            model = kr(kernel='rbf', **params)
        elif name.startswith('svr'):
            model = svr(**params)
        elif name.startswith('cat'):
            model = cb.CatBoostRegressor(random_state=self.random_state, verbose=0, thread_count=self.inner_n_jobs, **params)

        if stack:                    # StandardScaler() MinMaxScaler()
            pipeline = Pipeline([('Trans', SkeTransX(method=self.trans_skew_type, scale=True,scale_method=self.standard_method,)),('model', model)])
            y_transformer = SkeTransY(method=self.trans_skew_type, scale=True, scale_method=self.standard_method)
            final_model = TransformedTargetRegressor(regressor=pipeline, transformer=y_transformer)
            final_model.fit(input_x, self.y_train)


        else:
            # 基學習器使用完整轉換
            # # 目標變數 (y) 轉換器，可選 log/box-cox/yeo-johnson 和標準化方式 standard / minmax
            pipeline = Pipeline([('Trans', SkeTransX(method=self.trans_skew_type, scale=True,scale_method=self.standard_method,
                                                     log_cols_index=self.log_cols_index, scale_cols_index=self.scale_cols_index)),
                        ('model', model)])
            y_transformer = SkeTransY(method=self.trans_skew_type, scale=True, scale_method=self.standard_method)
            final_model = TransformedTargetRegressor(regressor=pipeline, transformer=y_transformer)
            final_model.fit(self.x_train, self.y_train)

        return final_model

    def search_param(self, stack=False, meta_models_to_try=None, estimators=None, input_x=None, meta_g=None):
        # 選擇要搜尋的模型
        print('search model params ...')
        if not stack:
            search_spaces = self.search_spaces
        else:
            search_spaces = {name: self.search_spaces[name] for name in meta_models_to_try if name in self.search_spaces}

        result = {}

        def tune_model(name, space):
            try:
                start_time = time.time()
                print(f"Start tuning {name} with Optuna study name: {name}_optimization")
                study = optuna.create_study(direction='maximize', study_name=f'{name}_optimization', storage=None,
                                            sampler=optuna.samplers.TPESampler(seed=self.random_state))
                # objective = self.make_objective_gp(name, space, stack=stack, estimators=estimators, input_x=input_x)
                objective = self.make_objective_optuna(name, space, stack=stack, estimators=estimators, input_x=input_x)

                if name == 'mlp':
                  actual_calls = 3
                else:
                  actual_calls = max(3, self.search_ncall)  # 限制搜尋次數

                if stack and len(meta_models_to_try) == 1:
                  actual_calls = actual_calls * 5

                study.optimize(objective, n_trials=actual_calls, timeout=3000, show_progress_bar=True)

                # res = gp_minimize(objective, dimensions=space, n_calls=actual_calls, n_random_starts=3, acq_func='EI',
                #                   random_state=self.random_state, n_points=5000, n_initial_points=5) # 這種貝葉斯適合stack
                second_time = time.time()

                best_model = self.best_model_(study.best_params, name, stack=stack, estimators=estimators, input_x=input_x, meta_g=meta_g)

                if stack:
                  best_model = Pipeline([('meta_g', meta_g), ('model', best_model)])
                  y_pred = best_model.predict(self.x_test)
                else:
                  y_pred = best_model.predict(self.x_test)

                rmse = np.sqrt(mean_squared_error(self.y_test, y_pred))
                mae = mean_absolute_error(self.y_test, y_pred)
                rmsle = np.sqrt(mean_squared_log_error(self.y_test, y_pred))
                r2 = r2_score(self.y_test, y_pred)

                elapsed1 = second_time - start_time
                elapsed2 = time.time() - start_time

                r2_threshold = 0.2 if self.fast_models_only else 0.85  # 原: 固定 0.4
                if r2 < r2_threshold:
                    print(f"{name} removed due to low R2 ({r2:.4f})")
                    return None

                print(f"\033[1m{'stack_' if stack else ''}{name} | RMSE: {rmse:.4f} | RMSLE: {rmsle:.4f} | R2: {r2:.4f} | SearchTime: {elapsed1:.1f}s | FitTime: {elapsed2:.1f}s\033[0m")

                metrics = {"RMSE": rmse, "MAE": mae, "RMSLE": rmsle, "R2": r2}
                # p = study.best_params.copy()
                param_names = [dim.name for dim in space]   # 取得每個維度名稱
                best_params_dict = study.best_params # Optuna的 best_params 本身就是字典
                print(best_params_dict)
                return name, best_model, metrics, best_params_dict

            except Exception as e:
                print(f"Error tuning {name}: {e}")
                # traceback.print_exc()
                return None

        # ✨ 2核CPU的外層並行策略
        if self.n_cpu <= 2:
            print(f"🐌 2核CPU模式：序列搜尋{len(search_spaces)}個{'堆疊' if stack else '基礎'}模型（避免競爭）")
            print(f"   每個模型內部使用{self.inner_n_jobs}個線程")
        else:
            print(f"⚡ 並行搜尋{len(search_spaces)}個{'堆疊' if stack else '基礎'}模型，外層{self.outer_n_jobs}並行 × 內層{self.inner_n_jobs}線程")

        results = Parallel(n_jobs=self.outer_n_jobs)(delayed(tune_model)(name, space) for name, space in search_spaces.items())

        # 收集結果
        base_models = {}
        params = {}
        for r in results:
            if r is not None:
                name, best_model, metrics, best_params_dict = r
                base_models[name] = best_model
                result[name] = metrics
                params[name] = best_params_dict


        if stack:
            if result:
                best_name = min(result, key=lambda x: result[x]["RMSLE"])
                best_meta_model = base_models[best_name]
                best_meta_metrics = result[best_name]
                best_meta_params = params[best_name]
                best_meta_model1 = get_raw_model(best_meta_model)
                print(f"Best meta model: {type(best_meta_model1).__name__} - Evaluation: {best_meta_metrics} - params:{best_meta_params}")
                return best_meta_model, best_meta_metrics, best_meta_params
            else:
                print("No suitable meta model found, using Ridge regression")
                meta_pipeline = Pipeline([('Trans', SkeTransX(method=self.trans_skew_type, scale=True, scale_method=self.standard_method, log_cols_index=self.log_cols_index,
                                    scale_cols_index=self.scale_cols_index)), ('model', skStack(estimators=estimators, final_estimator=rdg(), n_jobs=self.inner_n_jobs))])
                y_transformer = SkeTransY(method=self.trans_skew_type, scale=True, scale_method=self.standard_method)
                best_meta_model = TransformedTargetRegressor(regressor=meta_pipeline, transformer=y_transformer)
                best_meta_model.fit(self.x_train, self.y_train)

                y_pred = best_meta_model.predict(self.x_test)

                rmse = np.sqrt(mean_squared_error(self.y_test, y_pred))
                mae = mean_absolute_error(self.y_test, y_pred)
                rmsle = np.sqrt(mean_squared_log_error(self.y_test, y_pred))
                r2 = r2_score(self.y_test, y_pred)
                best_meta_metrics = {"RMSE": rmse, "MAE": mae, "RMSLE": rmsle,"R2": r2}
                best_meta_params = None
                return best_meta_model, best_meta_metrics, best_meta_params
        else:
            return base_models, result, params

    def fit(self, meta_model=3):
        print(f"Starting {'fast' if self.fast_models_only else 'full'} model search...")
        start_time = time.time()

        base_models, result, params = self.search_param()

        if not base_models:
            print("No suitable base models found!")
            return None

        # 儲存基模型和結果
        self.base_models = base_models
        self.results = result
        self.params = params

        print(f"Base model search completed in {time.time() - start_time:.1f}s")
        print(f"Found {len(base_models)} suitable base models: {list(base_models.keys())}")

        # Stacking
        estimators = [(name, model) for name, model in base_models.items()]

        print('Starting stacking fit...')
        stack_start = time.time()

        best_name = min(result, key=lambda x: result[x]['RMSLE'])

        # 先計算x_meta矩陣，也就是給 meta model 的輸入
        self.meta_g = MetaFeatureGenerator(base_models)
        x_meta = self.meta_g.fit(self.x_train, self.y_train).transform(self.x_train)

        if meta_model == 1 :
            best_base_model = base_models[best_name]
            self.best_result = result[best_name]
            self.best_params = params[best_name]

            meta_model = get_raw_model(best_base_model)
            print(f"use meta model: {type(meta_model).__name__} \n - Evaluation: {best_result}\n - best_params: {best_params}")
            meta_pipeline = Pipeline([('Trans', SkeTransX(method=self.trans_skew_type, scale=True,scale_method=self.standard_method, log_cols_index=self.log_cols_index, scale_cols_index=self.scale_cols_index)),
                                                            ('model', skStack(estimators=estimators, final_estimator=meta_model, n_jobs=self.inner_n_jobs, cv=self.cv_folds))])
            y_transformer = SkeTransY(method=self.trans_skew_type, scale=True, scale_method=self.standard_method)
            self.final_model = TransformedTargetRegressor(regressor=meta_pipeline, transformer=y_transformer)
            self.final_model.fit(self.x_train, self.y_train)


        elif meta_model == 2:
            meta_models_to_try = [best_name]
            final_model, best_result, best_params = self.search_param(stack=True, meta_models_to_try=meta_models_to_try, estimators=estimators, input_x=x_meta, meta_g=self.meta_g)
            meta_model = get_raw_model(final_model)
            # final_model_f = Pipeline([('meta_feature', meta_g), ('final_model', final_model)])
            final_model.fit(self.x_train, self.y_train)
            self.final_model = final_model
            self.best_result = best_result
            self.best_params = best_params
            print(f"use meta model: {type(meta_model).__name__} \n - Evaluation: {best_result}\n - best_params: {best_params}")

        elif meta_model == 3:
            meta_models_to_try = list(base_models.keys())
            final_model, best_result, best_params = self.search_param(stack=True, meta_models_to_try=meta_models_to_try, estimators=estimators, input_x=x_meta, meta_g=self.meta_g)
            meta_model = get_raw_model(final_model)
            # final_model_f = Pipeline([('meta_feature', meta_g), ('final_model', final_model)])
            final_model.fit(self.x_train, self.y_train)
            self.final_model = final_model
            self.best_result = best_result
            self.best_params = best_params
            print(f"use meta model: {type(meta_model).__name__} \n - Evaluation: {best_result}\n - best_params: {best_params}")

        else:
            print("請正確輸入meta_model參數")

        # 預測
        y_pred_stack = self.final_model.predict(self.x_test)
        # 計算誤差等
        rmse_stack = np.sqrt(mean_squared_error(self.y_test, y_pred_stack))
        mae_stack = mean_absolute_error(self.y_test, y_pred_stack)
        rmsle_stack = np.sqrt(mean_squared_log_error(self.y_test, y_pred_stack))
        r2_stack = r2_score(self.y_test, y_pred_stack)
        self.results['stacking'] = {"RMSE": rmse_stack, "MAE": mae_stack, "RMSLE": rmsle_stack, "R2": r2_stack}
        self.params['stacking'] = best_params

        print(f"Final Stacking - Evaluation: {self.results['stacking']}")
        print(f"Stacking completed in {time.time() - stack_start:.1f}s")
        print(f"Total training time: {time.time() - start_time:.1f}s")

        return self


    def predict(self, X):
        """使用已訓練的 stacking model 預測，並可選擇輸出到檔案"""
        if not hasattr(self, 'final_model'):
            raise AttributeError("Model is not fitted. Call fit() first.")

        preds = self.final_model.predict(X)
        return preds
    def save(self, output_path):
        # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        # model_path = os.path.join(output_path, f"model_{timestamp}.joblib")
        # txt_path = os.path.join(output_path, f"model_{timestamp}.txt")
        model_path = os.path.join(output_path, "model_RMSLE.joblib")
        txt_path = os.path.join(output_path, "result_RMSLE.txt")
        dump(self.final_model, model_path)
        # 輸出結果
        if output_path:
          with open(txt_path, "w", encoding="utf-8") as f:
            for name, metrics in self.results.items():
              if name != 'stacking':
                f.write(f"{name} - Evaluation: {metrics}\n")
                f.write(f"{name} - best_params: {self.params[name]}\n")

            f.write(f"\nFinal Stacking - Evaluation: {self.results['stacking']}\n")
            print(f" MODEL 已儲存到 {model_path}")

    def get_base_results(self):
        """取得所有 base model 評估結果"""
        return self.base_results

    def get_base_params(self):
        """取得所有 base model 最佳參數"""
        return self.base_params

    def get_meta_result(self):
        """取得 meta model 評估結果與最佳參數"""
        return self.best_result

    def get_meta_params(self):
        """取得 meta model 評估結果與最佳參數"""
        return self.best_params

    def get_final_model(self):
        """取得最終 stacking 模型"""
        return self.final_model


if __name__ == "__main__":
    save_dir = '/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/project/saved_model'

    # ====== data training =============================
    train_df = pd.read_csv("/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/train.csv")
    etl_pipeline = ETLPipeline()
    train_df = etl_pipeline.fit_transform(train_df, y_name="SalePrice", pca_n_components=20)
    etl_pipeline.save(save_dir)

    X_train = train_df.drop(columns=['SalePrice'])
    y_train = train_df['SalePrice']
    X_train = X_train.astype(np.float64)
    y_train = y_train.astype(np.float64)
    # df = X_train.copy()
    # df["target"] = y_train
    # df_sample = df.sample(n=100, random_state=41)
    # x_train_all = df_sample.drop(columns=["target"])
    # y_train = df_sample["target"]

    model = StackingModel(X_train, y_train, search_ncall=10, cv_folds=5, interx=etl_pipeline.X_interaction_list) # scale_col
    model.fit(meta_model=3)
    model.save(output_path=save_dir)

    # ====== data predict =============================
    test_df = pd.read_csv("/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/test.csv")
    etl_pipeline_pred = ETLPipeline.load('/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/project/saved_model/etl_pipeline_RMSLE.joblib')
    X_test = etl_pipeline_pred.transform(test_df)
    X_test = X_test.astype(np.float64)

    model_path = os.path.join(save_dir, 'model_RMSLE.joblib')
    model = load(model_path)

    y_pred = model.predict(X_test)
    result = pd.DataFrame({"Id": test_df["Id"], "SalePrice": y_pred})
    result.to_csv("/content/drive/MyDrive/機器學習練習檔/kaggle房價預測/project/result/prediction.csv", index=False)
    print(result)


# 預測

In [None]:
# 讀取模型
# loaded_model = load("trained_model.joblib")