In [1]:
#pip install optuna

#### 載入套件與資料

In [2]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, r2_score
import optuna

# 載入資料
df_train_full = pd.read_csv("AMAZON_FREIGHT_train.csv")
df_test_final = pd.read_csv("AMAZON_FREIGHT_test.csv")

#### 自訂 Target Encoder

In [3]:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np

class TargetEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.mapping_ = {}
        self.defaults_ = {}
        self.columns = []

    def fit(self, X, y):
        # 計算每個類別的平均 target（log_cost）
        X = pd.DataFrame(X)
        self.columns = X.columns.tolist()
        for col in self.columns:
            df = pd.DataFrame({col: X[col], 'target': y})
            self.mapping_[col] = df.groupby(col)['target'].mean().to_dict()
            self.defaults_[col] = df['target'].mean()
        return self

    def transform(self, X):
        # 將欄位值轉換為對應的平均 target 值
        X = pd.DataFrame(X)
        return np.hstack([
            X[col].map(self.mapping_[col]).fillna(self.defaults_[col]).values.reshape(-1, 1)
            for col in self.columns
        ])
    
    def get_feature_names_out(self, input_features=None):
        return [f"{col}_target_encoded" for col in self.columns]

#### 自訂 Frequency Encoder

In [4]:
from sklearn.base import BaseEstimator, TransformerMixin

class FrequencyEncoder(BaseEstimator, TransformerMixin):
    """
    將分類特徵編碼為相對頻率 (0~1)。
    可同時處理多欄位，支援 get_feature_names_out。
    """
    def __init__(self):
        self.freq_maps_ = {}
        self.columns_ = []
        self.global_freq_ = {}
        
    def fit(self, X, y=None):
        X = pd.DataFrame(X)
        self.columns_ = X.columns.tolist()
        n_samples = len(X)
        for col in self.columns_:
            freq = X[col].value_counts(dropna=False) / n_samples
            self.freq_maps_[col] = freq.to_dict()
            self.global_freq_[col] = 1.0 / n_samples      # fallback ≈最小頻率
        return self
    
    def transform(self, X):
        X = pd.DataFrame(X)
        encoded = []
        for col in self.columns_:
            encoded_col = X[col].map(self.freq_maps_[col]).fillna(self.global_freq_[col])
            encoded.append(encoded_col.values.reshape(-1, 1))
        return np.hstack(encoded)
    
    # 讓 ColumnTransformer 能自動抓欄位名
    def get_feature_names_out(self, input_features=None):
        return [f"{col}_freq_encoded" for col in self.columns_]


#### 把欄位清單和 ColumnTransformer 都包成一支小函式

In [5]:
from sklearn.compose import ColumnTransformer

# -----------------------------------------------------------
# 1. 產生 feature_cols：只告訴你有哪些欄位
# -----------------------------------------------------------
def build_feature_cols(
        numeric_cols,
        target_encode_cols=None,
        freq_encode_cols=None
    ):
    """回傳最終要用來建模的欄位清單（list）。"""
    feature_cols = list(numeric_cols)
    if target_encode_cols:
        feature_cols += target_encode_cols
    if freq_encode_cols:
        feature_cols += freq_encode_cols
    return feature_cols


# -----------------------------------------------------------
# 2. 產生 preprocessor：給 Pipeline 用的 ColumnTransformer
# -----------------------------------------------------------
def build_preprocessor(
        numeric_cols,
        target_encode_cols=None,
        freq_encode_cols=None
    ):
    """回傳 ColumnTransformer（直接丟進 Pipeline）。"""
    transformers = []
    if target_encode_cols:
        transformers.append(('te', TargetEncoder(), target_encode_cols))
    if freq_encode_cols:
        transformers.append(('freq', FrequencyEncoder(), freq_encode_cols))

    # numeric_cols 沒有指定 transformer ⇒ remainder='passthrough' 直接帶出
    return ColumnTransformer(
        transformers=transformers,
        remainder='passthrough'
    )

#### 指定特徵欄位並進行encoding

In [6]:
# 預測目標
target_col = 'log_cost'

# 類別與數值欄位
numeric_cols       = ["log_weight","log_Hdis",]
target_encode_cols = ["to_state"]   # 不用就改成 []
freq_encode_cols   = []   # 不用就改成 []                         

# 1. 取得欄位清單（做 EDA、選擇 X 時會用到）
feature_cols = build_feature_cols(
    numeric_cols,
    target_encode_cols,
    freq_encode_cols
)

# 2. 取得 ColumnTransformer（接 Pipeline）
preprocessor = build_preprocessor(
    numeric_cols,
    target_encode_cols,
    freq_encode_cols
)

#### 劃分樣本大小間距

In [7]:
def choose_budget(train_n, param_dim=3):
    """
    三檔分級：
        小樣本  (<  500) → n_trials = 120, timeout =  600s
        中樣本  (≤ 3000) → n_trials = 80 , timeout = 1200s
        大樣本  (> 3000) → n_trials = 60 , timeout = 1800s
    """
    # ── 固定試驗數（可依需求微調） ──
    if train_n < 500:          # 小
        n_trials = 120
        timeout  = 600
    elif train_n <= 3000:      # 中
        n_trials = 80
        timeout  = 1200
    else:                      # 大
        n_trials = 60
        timeout  = 1800
        
    # 最少仍保底「參數維度 × 4」以避免過低
    n_trials = max(n_trials, 4 * param_dim)
    return n_trials, timeout

#### 建立模型並輸出指標結果

In [None]:
# 若想用 KFold -> from sklearn.model_selection import KFold, cross_val_score
from sklearn.model_selection import KFold, cross_val_score

results = []

for method in df_train_full['ship_method'].unique():
    # 篩選該運送方式資料
    df_method = df_train_full[df_train_full['ship_method'] == method].copy()
    
    # 根據資料量決定 n_trials
    train_n = len(df_method)   # 只看 train+val 前的筆數即可
    n_trials, time_budget = choose_budget(train_n)

    X = df_method[feature_cols]
    y = df_method[target_col]

    # 拆分訓練與驗證集
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    # 定義 Optuna 目標函數
    def objective(trial):
         # --- 建立「新的」preprocessor ---
        preprocessor = build_preprocessor(
        numeric_cols,
        target_encode_cols,
        freq_encode_cols
    )
        
        # 超參數範圍
        C       = trial.suggest_loguniform('C',       1e-3, 1e3)
        epsilon = trial.suggest_loguniform('epsilon', 1e-3, 1.0)
        gamma   = trial.suggest_loguniform('gamma',   1e-4, 10)

        # 建立 SVR 模型
        svr = SVR(kernel='rbf', C=C, epsilon=epsilon, gamma=gamma)

        # 建立 pipeline
        svr_model = Pipeline(steps=[
            ('preprocessor', preprocessor),                 # 僅做編碼
            ('scaler_post',  StandardScaler(with_mean=False)),  # 統一尺度
            ('regressor',    svr)
        ])

        # 模型訓練
        svr_model.fit(X_train, y_train)

        # -------- 若想用 K-Fold，把下三行改成 cross_val_score --------
        # 使用 KFold 交叉驗證
        kf = KFold(n_splits=5, shuffle=True, random_state=42)
        scores = cross_val_score(svr_model, X_train, y_train, cv=kf, scoring='neg_mean_squared_error')
        return -scores.mean()  # cross_val_score 回傳負的 MSE，取負號變回正的 MSE
        # -----------------------------------------------------------
    
    # 使用 Optuna 優化超參數
    study = optuna.create_study(direction='minimize',
                                pruner=optuna.pruners.HyperbandPruner())
    study.optimize(objective, n_trials=n_trials, timeout=time_budget,show_progress_bar=True)
    
    # 儲存最佳參數
    best_params = study.best_params
    print(f"[{method}] best params:", best_params)

    # 使用最佳參數重新訓練模型
    best_svr = SVR(kernel='rbf', **best_params)
    best_svr_model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('scaler_post',  StandardScaler(with_mean=False)),
        ('regressor',    best_svr)
    ])
    best_svr_model.fit(X_train, y_train)

    # 驗證集評估
    y_val_pred = best_svr_model.predict(X_val)
    mse_val = mean_squared_error(y_val, y_val_pred)
    r2_val = r2_score(y_val, y_val_pred)
    
    # 測試集評估
    df_test_method = df_test_final[df_test_final['ship_method'] == method].copy()
    if not df_test_method.empty:
        X_test = df_test_method[feature_cols]
        y_test = df_test_method[target_col]
        y_test_pred = best_svr_model.predict(X_test)
        mse_test = mean_squared_error(y_test, y_test_pred)
        r2_test  = r2_score(y_test, y_test_pred)
    else:
        mse_test = r2_test = None

# 儲存結果
    results.append({
        'ship_method': method,
        'mse_val': mse_val,
        'r2_val':  r2_val,
        'mse_test': mse_test,
        'r2_test':  r2_test,
        'best_params': best_params
    })
        

In [9]:
results_df = pd.DataFrame(results)
# Add test_n column: count of test samples for each ship_method
test_n_list = []
train_n_list = []
for r in results:
    method = r['ship_method']
    n_test = df_test_final[df_test_final['ship_method'] == method].shape[0]
    n_train = df_train_full[df_train_full['ship_method'] == method].shape[0]
    test_n_list.append(n_test)
    train_n_list.append(n_train)
results_df['test_n'] = test_n_list
results_df['train_n'] = train_n_list

print("\nResults with Optuna optimization:")
print(results_df.sort_values(by='mse_test'))


Results with Optuna optimization:
      ship_method   mse_val    r2_val  mse_test   r2_test  \
0  AMAZON_FREIGHT  0.209166  0.921933  0.770382  0.689501   

                                         best_params  test_n  train_n  
0  {'C': 150.14618361731786, 'epsilon': 0.0066413...      32      290  


In [10]:
from sklearn.inspection import permutation_importance

# 假設 svr_model 已訓練好
best_svr_model.fit(X_train, y_train)
result = permutation_importance(best_svr_model, X_val, y_val, n_repeats=10, random_state=42, scoring='neg_mean_squared_error')

# 顯示特徵重要性
feature_importance = pd.DataFrame({
    'feature': feature_cols,
    'importance': result.importances_mean,
    'std': result.importances_std
})
print(feature_importance.sort_values(by='importance', ascending=False))

      feature  importance       std
0  log_weight    2.927363  0.406883
2    to_state    1.440990  0.309009
1    log_Hdis    0.962254  0.246745


In [11]:
""" import matplotlib.pyplot as plt

plt.figure(figsize=(6,6))
# 畫理想線
min_val = min(y_val.min(), y_test.min())
max_val = max(y_val.max(), y_test.max())
plt.plot([min_val, max_val], [min_val, max_val], 'r--', label='Ideal')

# 畫驗證集
plt.scatter(y_val, y_val_pred, alpha=0.7, label='Validation Set')
# 畫測試集
plt.scatter(y_test, y_test_pred, alpha=0.7, color='orange', label='Test Set')

plt.xlabel('Actual Value')
plt.ylabel('Predicted Value')
plt.title('Actual vs. Predicted - 'f'{method} Method')
plt.legend()
plt.show() """

" import matplotlib.pyplot as plt\n\nplt.figure(figsize=(6,6))\n# 畫理想線\nmin_val = min(y_val.min(), y_test.min())\nmax_val = max(y_val.max(), y_test.max())\nplt.plot([min_val, max_val], [min_val, max_val], 'r--', label='Ideal')\n\n# 畫驗證集\nplt.scatter(y_val, y_val_pred, alpha=0.7, label='Validation Set')\n# 畫測試集\nplt.scatter(y_test, y_test_pred, alpha=0.7, color='orange', label='Test Set')\n\nplt.xlabel('Actual Value')\nplt.ylabel('Predicted Value')\nplt.title('Actual vs. Predicted - 'f'{method} Method')\nplt.legend()\nplt.show() "

In [12]:
from pathlib import Path
from datetime import datetime
import json, numpy as np
from joblib import dump

#寫一個專用轉換器
def json_serial(obj):            # 讓 json.dump 能吃 numpy 型別
    if isinstance(obj, (np.integer,)):
        return int(obj)          # 轉成原生 int
    if isinstance(obj, (np.floating,)):
        return float(obj)        # 轉成原生 float
    if isinstance(obj, (np.ndarray,)):
        return obj.tolist()      # 把陣列攤平成 list
    return str(obj)              # 其他自訂類別保底轉成字串

# 寫 metadata.json，把重要資訊整理成 dict
metadata = {
"ship_method": method,
"target_col": target_col,
"selected_features": feature_cols,
"num_features": [f for f in feature_cols if f in numeric_cols],
"cat_features": [f for f in feature_cols if f not in numeric_cols],
"best_params": best_params,
# 轉成 {feature: importance}，比較容易讀
"feature_importance": dict(
    zip(feature_importance["feature"], feature_importance["importance"])
),
# 如果日後要比對資料量，可額外留下
"n_train": len(X_train),
"n_val": len(X_val),
"n_test": len(df_test_method) if "df_test_method" in locals() else 0,
"timestamp": datetime.now().isoformat(),
}

# 2) 決定輸出資料夾並確保存在
output_dir = Path("model_artifacts")
output_dir.mkdir(parents=True, exist_ok=True)

# 3) 寫出 json（default=_json_serial 會自動把 numpy 型別轉成原生 Python）
with open(output_dir / f"{method}_metadata.json", "w", encoding="utf-8") as f:
    json.dump(metadata, f, indent=2, ensure_ascii=False, default=json_serial)

In [13]:
#pip install joblib 

In [14]:
import cloudpickle

model_path = output_dir / f"{method}_model.pkl"
with open(model_path, "wb") as f:
    cloudpickle.dump(best_svr_model, f)

#### 在存檔那台機器先記錄版本

In [15]:
%pip freeze > {method}_requirements.txt

Note: you may need to restart the kernel to use updated packages.
