In [None]:
import os
import sys
import math
import random
import numpy as np
import pandas as pd
import xgboost as xgb
from dataclasses import dataclass, field
from typing import List, Set, Optional

@dataclass
class DataSet:
    features:    List[float] = field(default_factory=list)
    labels:      List[float] = field(default_factory=list)
    ex_ret:     List[float] = field(default_factory=list) 
    rows: int = 0
    cols: int = 0

@dataclass
class DataPack:
    train: DataSet = field(default_factory=DataSet)  # 60 %
    val:   DataSet = field(default_factory=DataSet)  # 20 % – threshold search
    test:  DataSet = field(default_factory=DataSet)  # 20 % – final verification

@dataclass
class ModelConfig:
    id:               int   = 0
    max_depth:        int   = 5
    eta:              float = 0.05
    num_round:        int   = 400
    objective:        str   = "binary:logistic"
    eval_metric:      str   = "auc"
    min_child_weight: float = 1.0
    scale_pos_weight: float = 1.0
    subsample:        float = 1.0
    colsample:        float = 1.0

@dataclass
class ValidationMetrics:
    sharpe_ratio:          float = 0.0
    avg_return:            float = 0.0
    avg_pd:                float = 0.0
    approved_count:        int   = 0
    approved_rate:         float = 0.0
    best_pd_threshold:     float = 0.0
    best_return_threshold: float = 0.0

@dataclass
class ExperimentResult:
    best_cls_config: ModelConfig       = field(default_factory=ModelConfig)
    best_reg_config: ModelConfig       = field(default_factory=ModelConfig)
    best_metrics:    ValidationMetrics = field(default_factory=ValidationMetrics)

@dataclass
class ExperimentContext:
    pred_pd:         List[float] = field(default_factory=list)
    pred_est_return: List[float] = field(default_factory=list)
    ex_returns:      List[float] = field(default_factory=list)
    bond_yields:     List[float] = field(default_factory=list)
    test_size:       int = 0


In [None]:
class CsvLoader:

    def __init__(self, filename: str, target_col_name: str, ignore_col_names: Set[str]):
        self._fname       = filename
        self._target_col  = target_col_name
        self._ignore_col  = set(ignore_col_names)
        self._feature_names: List[str] = []

    def get_feature_names(self) -> List[str]:
        return self._feature_names

    def _read_df(self):
        print("[CsvLoader] Configuring columns...")
        df = pd.read_csv(self._fname)

        if self._target_col not in df.columns:
            raise RuntimeError(f"Target column not found: {self._target_col}")

        feature_cols = [
            c for c in df.columns
            if c != self._target_col and c not in self._ignore_col
        ]
        self._feature_names = feature_cols
        return df, feature_cols

    def _df_to_dataset(self, df: pd.DataFrame, feature_cols: List[str]) -> DataSet:
        ds             = DataSet()
        ds.rows        = len(df)
        ds.cols        = len(feature_cols)
        ds.features    = df[feature_cols].astype("float32").values.flatten().tolist()
        ds.labels      = df[self._target_col].astype("float32").tolist()
        ds.ex_ret     = df["Excess_Return"].astype("float32").tolist() if "Excess_Return" in df.columns else [0.0] * ds.rows
        return ds
   
    def load(self) -> DataSet:
        df, feature_cols = self._read_df()
        print(f"[CsvLoader] Loaded {len(df)} rows, {len(feature_cols)} feature cols.")
        return self._df_to_dataset(df, feature_cols)

    def load_and_split(self, train_ratio: float = 0.6, val_ratio: float = 0.2) -> DataPack:
        print(">>> [CsvLoader] Loading raw data...")
        df, feature_cols = self._read_df()

        print(f">>> [CsvLoader] Shuffling and Splitting "
              f"(Train: {train_ratio}, Val: {val_ratio})...")

        df = df.sample(frac=1, random_state=42).reset_index(drop=True)

        n         = len(df)
        train_end = int(n * train_ratio)
        val_end   = train_end + int(n * val_ratio)

        pack = DataPack(
            train = self._df_to_dataset(df.iloc[:train_end],        feature_cols),
            val   = self._df_to_dataset(df.iloc[train_end:val_end], feature_cols),
            test  = self._df_to_dataset(df.iloc[val_end:],          feature_cols),
        )

        print(">>> Split Complete.")
        print(f" - Train Rows: {pack.train.rows}")
        print(f" - Val Rows:   {pack.val.rows}")
        print(f" - Test Rows:  {pack.test.rows}")
        return pack

In [None]:
class ExperimentManager:
    candidate_depths: List[int]   = [5, 7]
    candidate_etas:   List[float] = [0.05, 0.01]

    @staticmethod
    def _make_dmatrix(dataset: DataSet, use_returns: bool = False) -> xgb.DMatrix:
        X = np.array(dataset.features, dtype=np.float32).reshape(dataset.rows, dataset.cols)
        y = np.array(dataset.returns if use_returns else dataset.labels, dtype=np.float32)
        return xgb.DMatrix(X, label=y, missing=float("nan"))

    @staticmethod
    def _train_booster(dm: xgb.DMatrix, config: ModelConfig) -> xgb.Booster:
        params = {
            "verbosity":        0,
            "tree_method":      "hist",
            "objective":        config.objective,
            "eval_metric":      config.eval_metric,
            "max_depth":        config.max_depth,
            "eta":              config.eta,
            "min_child_weight": config.min_child_weight,
            "scale_pos_weight": config.scale_pos_weight,
            "subsample":        config.subsample,
            "colsample_bytree": config.colsample,
            "nthread":          os.cpu_count() // 4,
        }
        return xgb.train(params, dm, num_boost_round=config.num_round, verbose_eval=False)

    @staticmethod
    def _generate_grid(classification: bool) -> List[ModelConfig]:
        obj    = "binary:logistic"   if classification else "reg:absoluteerror"
        metric = "auc"               if classification else "mae"

        configs: List[ModelConfig] = []
        id_counter = 1
        for depth in ExperimentManager.candidate_depths:
            for eta in ExperimentManager.candidate_etas:
                rounds = int(20.0 / eta)
                rounds = max(100, min(3000, rounds))
                configs.append(ModelConfig(
                    id=id_counter, max_depth=depth, eta=eta, num_round=rounds,
                    objective=obj, eval_metric=metric,
                    min_child_weight=1.0, scale_pos_weight=1.0,
                    subsample=0.8, colsample=0.8,
                ))
                id_counter += 1
        return configs

    @staticmethod
    def _calculate_sharpe_ratio(ex_rets:List[float], approvals:List[bool]) -> float:
        n = len(ex_rets)
        ex_rets_arr = np.array(ex_rets)
        approvals_arr = np.array(approvals)

        if n <= 1:
            return 0.0

        excess = np.where(approvals_arr, ex_rets_arr, 0.0)
        mean_excess = np.mean(excess)
        std_dev = np.std(excess, ddof=1)
        return 0.0 if std_dev < 1e-9 else float(mean_excess / std_dev)

    def _prepare_experiment(self,
                            dataset:     DataSet,
                            cls_config:  ModelConfig,
                            reg_config:  ModelConfig,
                            split_ratio: float) -> ExperimentContext:
        total_rows  = dataset.rows
        split_point = int(total_rows * split_ratio)
        test_size   = total_rows - split_point

        if test_size <= 0:
            print("Error: Test set size is 0.", file=sys.stderr)
            sys.exit(1)

        nc = dataset.cols

        train_ds = DataSet(
            features    = dataset.features[: split_point * nc],
            labels      = dataset.labels[: split_point],
            returns     = dataset.ex_ret[: split_point],
            rows=split_point, cols=nc,
        )
        test_ds = DataSet(
            features    = dataset.features[split_point * nc :],
            labels      = dataset.labels[split_point:],
            returns     = dataset.ex_ret[split_point:],
            rows=test_size, cols=nc,
        )

        print(">>> [Common] Training Best Models...")
        booster_cls = self._train_booster(self._make_dmatrix(train_ds, False), cls_config)
        booster_reg = self._train_booster(self._make_dmatrix(train_ds, True),  reg_config)

        X_test  = np.array(test_ds.features, dtype=np.float32).reshape(test_size, nc)
        dm_test = xgb.DMatrix(X_test, missing=float("nan"))

        print(f">>> [Common] Predicting Test Set ({test_size} rows)...")
        return ExperimentContext(
            pred_pd         = booster_cls.predict(dm_test).tolist(),
            pred_est_return = booster_reg.predict(dm_test).tolist(),
            ex_returns  = test_ds.returns[:],
            bond_yields     = test_ds.bond_yields[:],
            test_size       = test_size,
        )

    def _find_best_thresholds(self, ctx: ExperimentContext) -> ValidationMetrics:
        best = ValidationMetrics(sharpe_ratio=-999.0)

        pd_candidates  = [0.05, 0.10, 0.15, 0.20, 0.25, 0.30]
        ret_candidates = [0.02, 0.04, 0.05, 0.06, 0.07, 0.08]

        n        = ctx.test_size
        approval = [False] * n

        for pd_th in pd_candidates:
            for ret_th in ret_candidates:
                approved_count = 0
                sum_ret = sum_pd = 0.0

                for i in range(n):
                    b_pass = (ctx.pred_pd[i] < pd_th) and (ctx.pred_est_return[i] > ret_th)
                    approval[i] = b_pass
                    if b_pass:
                        approved_count += 1
                        sum_ret += ctx.ex_returns[i]
                        sum_pd  += ctx.pred_pd[i]

                if approved_count < 10:
                    continue

                sharpe = self._calculate_sharpe_ratio(ctx.ex_returns, ctx.bond_yields, approval)
                if sharpe > best.sharpe_ratio:
                    best.sharpe_ratio          = sharpe
                    best.best_pd_threshold     = pd_th
                    best.best_return_threshold = ret_th
                    best.approved_count        = approved_count
                    best.approved_rate         = approved_count / n * 100.0
                    best.avg_return            = sum_ret / approved_count
                    best.avg_pd                = sum_pd  / approved_count

        return best

    def _train_model_on_set(self, train_set: DataSet, config: ModelConfig) -> xgb.Booster:
        use_returns = "binary" not in config.objective
        return self._train_booster(self._make_dmatrix(train_set, use_returns), config)

    def _predict_on_set(self,
                        booster_cls: xgb.Booster,
                        booster_reg: xgb.Booster,
                        target_set:  DataSet) -> ExperimentContext:
        X  = np.array(target_set.features, dtype=np.float32).reshape(target_set.rows, target_set.cols)
        dm = xgb.DMatrix(X, missing=float("nan"))
        return ExperimentContext(
            pred_pd         = booster_cls.predict(dm).tolist(),
            pred_est_return = booster_reg.predict(dm).tolist(),
            ex_returns  = target_set.ex_ret[:],
            test_size       = target_set.rows,
        )

    def run_grid_search_auto(self, pack: DataPack) -> ExperimentResult:
        cls_configs = self._generate_grid(True)
        reg_configs = self._generate_grid(False)
        total_iter  = len(cls_configs) * len(reg_configs)

        print("\n>>> Automatic Grid Search on Validation Set")
        print(">>> Training on 60% (Train), Evaluating on 20% (Val)")
        print(f">>> Total Combinations: {total_iter}")

        best_result = ExperimentResult()
        best_result.best_metrics.sharpe_ratio = -999.0
        cur_round = 0

        with open("grid_search_auto.csv", "w") as f:
            f.write("Iter,Cls_Depth,Cls_Eta,Reg_Depth,Reg_Eta,"
                    "Best_PD_Thresh,Best_Ret_Thresh,"
                    "Approved_Cnt,Avg_Return,Avg_PD,Sharpe_Ratio\n")

            for c_conf in cls_configs:
                for r_conf in reg_configs:
                    cur_round += 1

                    h_cls   = self._train_model_on_set(pack.train, c_conf)
                    h_reg   = self._train_model_on_set(pack.train, r_conf)
                    val_ctx = self._predict_on_set(h_cls, h_reg, pack.val)
                    metrics = self._find_best_thresholds(val_ctx)

                    marker = ""
                    if metrics.sharpe_ratio > best_result.best_metrics.sharpe_ratio:
                        best_result.best_metrics    = metrics
                        best_result.best_cls_config = c_conf
                        best_result.best_reg_config = r_conf
                        marker = " [★ BEST]"

                    print(f"[{cur_round}/{total_iter}] "
                          f"C(d{c_conf.max_depth} e{c_conf.eta:.2f}) "
                          f"R(d{r_conf.max_depth} e{r_conf.eta:.2f}) "
                          f"| Val-Sharpe: {metrics.sharpe_ratio:.5f}{marker}")

                    f.write(f"{cur_round},{c_conf.max_depth},{c_conf.eta},"
                            f"{r_conf.max_depth},{r_conf.eta},"
                            f"{metrics.best_pd_threshold},{metrics.best_return_threshold},"
                            f"{metrics.approved_count},{metrics.avg_return},"
                            f"{metrics.avg_pd},{metrics.sharpe_ratio}\n")
                    f.flush()

        print("\n>>> Grid Search Complete.")
        print(f">>> Best Validation Sharpe: {best_result.best_metrics.sharpe_ratio}")
        return best_result

    def run_standard_validation(self,
                                pack:     DataPack,
                                best_cls: ModelConfig,
                                best_reg: ModelConfig) -> None:
       
        print(f">>> Training Models on Train Set ({pack.train.rows} rows)...")
        h_cls = self._train_model_on_set(pack.train, best_cls)
        h_reg = self._train_model_on_set(pack.train, best_reg)

        print(f">>> Optimizing Thresholds on Validation Set ({pack.val.rows} rows)...")
        val_ctx      = self._predict_on_set(h_cls, h_reg, pack.val)
        best_metrics = self._find_best_thresholds(val_ctx)

        print(f"   -> Found Best Thresholds on Validation Set:")
        print(f"      PD < {best_metrics.best_pd_threshold},  "
              f"Return > {best_metrics.best_return_threshold}")
        print(f"      Val Sharpe Ratio: {best_metrics.sharpe_ratio}")

        print(f">>>Final Verification on Test Set ({pack.test.rows} rows)...")
        test_ctx = self._predict_on_set(h_cls, h_reg, pack.test)

        final_approval: List[bool] = []
        approved_count = 0
        for i in range(test_ctx.test_size):
            b_pass = (
                test_ctx.pred_pd[i]         < best_metrics.best_pd_threshold and
                test_ctx.pred_est_return[i] > best_metrics.best_return_threshold
            )
            final_approval.append(b_pass)
            if b_pass:
                approved_count += 1

        if approved_count >= 10:
            final_sharpe = self._calculate_sharpe_ratio(
                test_ctx.ex_returns, test_ctx.bond_yields, final_approval
            )
        else:
            print(f">>> [WARNING] Not enough approved samples ({approved_count} < 10). "
                  f"Sharpe set to 0.0.")
            final_sharpe = 0.0

        print("\n------------------------------------------------------")
        print(">>> [Final Result]")
        print(f"1. Validation Sharpe : {best_metrics.sharpe_ratio}")
        print(f"2. Test Set Sharpe   : {final_sharpe}")
        print(f"3. Approved Count    : {approved_count} / {test_ctx.test_size} "
              f"({approved_count / test_ctx.test_size * 100:.1f}%)")

        if abs(final_sharpe - best_metrics.sharpe_ratio) < 0.5:
            print(">>> SUCCESS: Model is Robust! (Val & Test scores are similar)")
        else:
            print(">>> WARNING: Large gap detected. Check for Overfitting.")
        print("------------------------------------------------------")

        self._perform_random_permutation_test(
            test_ctx.pred_pd, test_ctx.pred_est_return,
            test_ctx.ex_returns, test_ctx.bond_yields,
            best_metrics.best_pd_threshold,
            best_metrics.best_return_threshold,
            1000
        )

    def _perform_random_permutation_test(self,
                                         pred_pd:     List[float],
                                         pred_ret:    List[float],
                                         actual_ret:  List[float],
                                         bond_yields: List[float],
                                         best_pd_th:  float,
                                         best_ret_th: float,
                                         iterations:  int) -> None:
        print("\n======================================================")
        print(">>> [Permutation Test] Verifying Best Strategy Significance <<<")
        print("======================================================")

        test_size = len(actual_ret)
        if test_size == 0:
            return

        fixed_approval = [
            (pred_pd[i] < best_pd_th) and (pred_ret[i] > best_ret_th)
            for i in range(test_size)
        ]
        approved_count  = sum(fixed_approval)
        original_sharpe = self._calculate_sharpe_ratio(actual_ret, bond_yields, fixed_approval)

        print(f"Best Config: PD < {best_pd_th}, Ret > {best_ret_th}")
        print(f"Original Sharpe Ratio: {original_sharpe} (Count: {approved_count})")
        print(f"Running {iterations} permutations...")

        better_count   = 0
        indices        = list(range(test_size))
        rng            = random.Random()
        shuffled_ret   = [0.0] * test_size
        shuffled_bonds = [0.0] * test_size
        progress_step  = max(1, iterations // 10)

        for k in range(iterations):
            rng.shuffle(indices)
            for i in range(test_size):
                shuffled_ret[i]   = actual_ret[indices[i]]
                shuffled_bonds[i] = bond_yields[indices[i]]

            random_sharpe = self._calculate_sharpe_ratio(shuffled_ret, shuffled_bonds, fixed_approval)
            if random_sharpe >= original_sharpe:
                better_count += 1

            if (k + 1) % progress_step == 0:
                print(".", end="", flush=True)

        print()

        # p-value = (betterCount + 1) / (iterations + 1)
        p_value = (better_count + 1) / (iterations + 1)

        print("------------------------------------------------------")
        print("Permutation Test Result:")
        print(f" - Better Random Strategies: {better_count} / {iterations}")
        print(f" - P-Value: {p_value:.4f}")

        if p_value < 0.05:
            print(">>> SUCCESS: The strategy is Statistically Significant! (Not Luck)")
        else:
            print(">>> WARNING: The strategy might be due to randomness.")
        print("======================================================")


In [None]:

csv_file      = "Data1.csv"
target_column = ""
ignore_list: Set[str] = {
        "Actual_term", "total_pymnt", "last_pymnt_amnt", "내부수익률",
        "loan_status", "Return",
}

print(">>> [1/4] Data Loading...")
loader    = CsvLoader(csv_file, target_column, ignore_list)
data_pack = loader.load_and_split(0.6, 0.2)

if data_pack.test.rows == 0:
    print("Error: No data found.", file=sys.stderr)


total_rows = data_pack.train.rows + data_pack.val.rows + data_pack.test.rows
print(f" -> Load Complete: {total_rows} Rows, {data_pack.test.cols} Cols")

manager = ExperimentManager()
print("\n>>> [1/4] Find BestConfig from Robust Range...")
print(f"    - ETA Range:   {ExperimentManager.candidate_etas}")
print(f"    - Depth Range: {ExperimentManager.candidate_depths}")

result = manager.run_grid_search_auto(data_pack)

print("\n>>> [2/4] Starting Integrated Grid Search...")
print("    - Strategy: Dual Model (Classification + Regression)")
print("    - Optimization: Parameter Grid + Threshold Auto-Tuning")

manager.run_standard_validation(data_pack, result.best_cls_config, result.best_reg_config)