In [10]:
# ==========================================================
# C5.0 (custom): gain-ratio + pessimistic pruning + raising
# ==========================================================
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
import math
import numpy as np
import pandas as pd
from collections import Counter, defaultdict
from sklearn.metrics import accuracy_score, classification_report
import copy

# ---------- helpers

def entropy_from_counts(counts: Dict[Any, float]) -> float:
    n = float(sum(counts.values()))
    if n <= 0: return 0.0
    ent = 0.0
    for c in counts.values():
        if c > 0:
            p = c / n
            ent -= p * math.log2(p)
    return ent

def majority_label(counts: Dict[Any, float]) -> Any:
    return max(counts.items(), key=lambda kv: kv[1])[0] if counts else None

def _norm_ppf(p: float) -> float:
    # Acklam approximation of inverse normal CDF
    a = [-3.969683028665376e+01,  2.209460984245205e+02, -2.759285104469687e+02,
          1.383577518672690e+02, -3.066479806614716e+01,  2.506628277459239e+00]
    b = [-5.447609879822406e+01,  1.615858368580409e+02, -1.556989798598866e+02,
          6.680131188771972e+01, -1.328068155288572e+01]
    c = [-7.784894002430293e-03, -3.223964580411365e-01, -2.400758277161838e+00,
         -2.549732539343734e+00,  4.374664141464968e+00,  2.938163982698783e+00]
    d = [ 7.784695709041462e-03,  3.224671290700398e-01,  2.445134137142996e+00,
          3.754408661907416e+00]
    plow, phigh = 0.02425, 1 - 0.02425

    if p < plow:
        q = math.sqrt(-2 * math.log(p))
        num = (((((c[0]*q + c[1])*q + c[2])*q + c[3])*q + c[4])*q + c[5])
        den = ((((d[0]*q + d[1])*q + d[2])*q + d[3])*q + 1.0)
        return num / den

    if p > phigh:
        q = math.sqrt(-2 * math.log(1 - p))
        num = (((((c[0]*q + c[1])*q + c[2])*q + c[3])*q + c[4])*q + c[5])
        den = ((((d[0]*q + d[1])*q + d[2])*q + d[3])*q + 1.0)
        return - (num / den)

    q = p - 0.5
    r = q * q
    num = (((((a[0]*r + a[1])*r + a[2])*r + a[3])*r + a[4])*r + a[5]) * q
    den = (((((b[0]*r + b[1])*r + b[2])*r + b[3])*r + b[4]) * r + 1.0)
    return num / den


def pessimistic_error_upper(e: float, n: float, cf: float) -> float:
    if n <= 0: return 0.0
    f = e / n
    z = _norm_ppf(1 - cf)   # cf=0.25 -> z≈0.674
    denom = 1 + (z*z)/n
    centre = f + (z*z)/(2*n)
    adj = z * math.sqrt((f*(1-f) + (z*z)/(4*n))/n)
    return (centre + adj)/denom

# ---------- tree node

@dataclass
class Node:
    is_leaf: bool
    prediction: Any
    depth: int
    n_samples: int
    class_counts: Dict[Any, float]
    # split info
    feature: Optional[str] = None
    threshold: Optional[float] = None          # numeric
    branches: Optional[Dict[Any, "Node"]] = None  # categorical
    left: Optional["Node"] = None              # numeric
    right: Optional["Node"] = None             # numeric
    default_child: Optional[Any] = None        # categorical unseen routing
    gain_ratio: float = 0.0
    # bookkeeping for raising
    idx: Optional[np.ndarray] = None           # training indices that reach this node

# ---------- main C5.0

class C50DecisionTree:
    def __init__(self,
                 max_depth: Optional[int]=None,
                 max_leaf_nodes: Optional[int]=None,
                 min_samples_split: int=20,
                 min_samples_leaf: int=10,
                 min_gain_ratio: float=0.0,
                 cf: float=0.25,
                 subtree_raising: bool=True,
                 class_weight: Optional[Dict[Any, float]]=None,
                 random_state: int=42,
                 tie_eps: float=1e-12,
                 viz_max_depth: Optional[int]=3):
        self.max_depth = max_depth
        self.max_leaf_nodes = max_leaf_nodes
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.min_gain_ratio = min_gain_ratio
        self.cf = cf
        self.subtree_raising = subtree_raising
        self.class_weight = class_weight or {}
        self.random_state = random_state
        self.tie_eps = tie_eps
        self.viz_max_depth = viz_max_depth

        self._tree_: Optional[Node] = None
        self._leaf_count = 0
        self._feature_types: Dict[str, str] = {}
        self._num_median: Dict[str, float] = {}
        self._cat_unknown_token = "Unknown"

        # keep training data for raising evaluation
        self._X: Optional[pd.DataFrame] = None
        self._y: Optional[np.ndarray] = None
        self._w: Optional[np.ndarray] = None

    # ---- preprocessing

    def _infer_types(self, df: pd.DataFrame):
        for c in df.columns:
            self._feature_types[c] = 'numeric' if pd.api.types.is_numeric_dtype(df[c]) else 'categorical'

    def _fit_imputers(self, X: pd.DataFrame):
        for c, t in self._feature_types.items():
            if t == 'numeric':
                self._num_median[c] = float(pd.to_numeric(X[c], errors='coerce').median())

    def _transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X2 = X.copy()
        for c, t in self._feature_types.items():
            if t == 'numeric':
                X2[c] = pd.to_numeric(X2[c], errors='coerce').fillna(self._num_median[c])
            else:
                X2[c] = X2[c].astype('object').where(X2[c].notna(), self._cat_unknown_token)
        return X2

    # ---- gain-ratio split search

    def _best_split(self, X: pd.DataFrame, y: np.ndarray, w: np.ndarray) -> Tuple[float, dict]:
        n = len(y)
        # weighted parent counts
        parent_counts = Counter()
        for yi, wi in zip(y, w): parent_counts[yi] += wi
        parent_entropy = entropy_from_counts(parent_counts)

        best_gr, best_spec = -1.0, None
        rng = np.random.RandomState(self.random_state)

        for col, t in self._feature_types.items():
            if t == 'numeric':
                xs = X[col].values
                order = np.argsort(xs)
                xs_sorted, y_sorted, w_sorted = xs[order], y[order], w[order]
                uniq = np.unique(xs_sorted)
                if len(uniq) <= 1: continue
                thresholds = (uniq[:-1] + uniq[1:]) / 2.0

                left_counts = Counter(); left_w = 0.0
                total_counts = Counter()
                total_w = float(w_sorted.sum())
                for yi, wi in zip(y_sorted, w_sorted): total_counts[yi] += wi

                ptr = 0
                for thr in thresholds:
                    while ptr < n and xs_sorted[ptr] <= thr:
                        left_counts[y_sorted[ptr]] += w_sorted[ptr]
                        left_w += w_sorted[ptr]
                        ptr += 1
                    nL, nR = left_w, total_w - left_w
                    if nL < self.min_samples_leaf or nR < self.min_samples_leaf:
                        continue
                    right_counts = {k: total_counts[k] - left_counts[k] for k in total_counts}
                    gain = parent_entropy \
                        - (nL/total_w)*entropy_from_counts(left_counts) \
                        - (nR/total_w)*entropy_from_counts(right_counts)
                    split_info = 0.0
                    for m in (nL, nR):
                        p = m/total_w
                        if p > 0: split_info -= p*math.log2(p)
                    if split_info <= 1e-12: continue
                    gr = gain / split_info
                    # tie-break slightly random within eps
                    if gr > best_gr + self.tie_eps or (abs(gr - best_gr) <= self.tie_eps and rng.rand() < 0.5):
                        best_gr, best_spec = gr, dict(kind='numeric', feature=col, threshold=float(thr))
            else:
                groups = defaultdict(list)
                for i, v in enumerate(X[col].values): groups[v].append(i)
                if len(groups) <= 1: continue

                valid = True
                child_entropy = 0.0
                total_w = float(w.sum())
                for idxs in groups.values():
                    wk = float(w[idxs].sum())
                    if wk < self.min_samples_leaf: valid = False; break
                    ck = Counter()
                    for ii in idxs: ck[y[ii]] += w[ii]
                    child_entropy += (wk/total_w)*entropy_from_counts(ck)
                if not valid: continue
                gain = parent_entropy - child_entropy
                split_info = entropy_from_counts(Counter({k: float(w[idxs].sum()) for k, idxs in groups.items()}))
                if split_info <= 1e-12: continue
                gr = gain / split_info
                if gr > best_gr + self.tie_eps or (abs(gr - best_gr) <= self.tie_eps and rng.rand() < 0.5):
                    best_gr = gr
                    best_spec = dict(kind='categorical', feature=col,
                                     groups={k: np.asarray(v, dtype=int) for k, v in groups.items()})
        return best_gr, best_spec

    # ---- build

    def _weighted_counts(self, y, w) -> Dict[Any, float]:
        c = Counter()
        for yi, wi in zip(y, w):
            c[yi] += wi * (self.class_weight.get(yi, 1.0))
        return c

    def _build(self, X: pd.DataFrame, y: np.ndarray, w: np.ndarray, depth: int, idx: np.ndarray) -> Node:
        counts = self._weighted_counts(y, w)
        pred = majority_label(counts)
        total_w = float(w.sum())

        # stop
        if (self.max_depth is not None and depth >= self.max_depth) or \
           (self.max_leaf_nodes is not None and self._leaf_count >= self.max_leaf_nodes) or \
           total_w < self.min_samples_split or \
           len(counts) == 1:
            self._leaf_count += 1
            return Node(True, pred, depth, int(total_w), dict(counts), idx=idx)

        best_gr, spec = self._best_split(X, y, w)
        if spec is None or best_gr <= self.min_gain_ratio:
            self._leaf_count += 1
            return Node(True, pred, depth, int(total_w), dict(counts), idx=idx)

        # split
        if spec['kind'] == 'numeric':
            f, thr = spec['feature'], spec['threshold']
            mask = (X[f].values <= thr)
            XL, yL, wL, idxL = X[mask], y[mask], w[mask], idx[mask]
            XR, yR, wR, idxR = X[~mask], y[~mask], w[~mask], idx[~mask]
            if float(wL.sum()) < self.min_samples_leaf or float(wR.sum()) < self.min_samples_leaf:
                self._leaf_count += 1
                return Node(True, pred, depth, int(total_w), dict(counts), idx=idx)
            left  = self._build(XL, yL, wL, depth+1, idxL)
            right = self._build(XR, yR, wR, depth+1, idxR)
            node = Node(False, pred, depth, int(total_w), dict(counts),
                        feature=f, threshold=thr, left=left, right=right,
                        gain_ratio=best_gr, idx=idx)
        else:
            f, groups = spec['feature'], spec['groups']
            branches = {}
            # default branch = largest weight child
            largest_k = None; largest_w = -1
            for k, id_arr in groups.items():
                Xi, yi, wi = X.iloc[id_arr], y[id_arr], w[id_arr]
                wsum = float(wi.sum())
                if wsum < self.min_samples_leaf: continue
                branches[k] = self._build(Xi, yi, wi, depth+1, idx[id_arr])
                if wsum > largest_w: largest_w, largest_k = wsum, k
            node = Node(False, pred, depth, int(total_w), dict(counts),
                        feature=f, branches=branches, default_child=largest_k,
                        gain_ratio=best_gr, idx=idx)
        return node

    # ---- pruning + raising

    def _subtree_empirical_error(self, node: Node, idx: np.ndarray) -> Tuple[float, float]:
        """Return (errors, total_w) evaluated on training subset idx with weights/self.class_weight."""
        if len(idx) == 0: return 0.0, 0.0
        Xs = self._X.iloc[idx]
        ys = self._y[idx]
        ws = self._w[idx]
        # predictions
        preds = self._predict_batch_rows(Xs, node)
        err = 0.0; tot = 0.0
        for y_true, y_pred, wi in zip(ys, preds, ws):
            w_eff = wi * self.class_weight.get(y_true, 1.0)
            tot += w_eff
            if y_true != y_pred: err += w_eff
        return err, tot

    def _prune(self, node: Node) -> Tuple[float, float]:
        if node.is_leaf:
            e = node.n_samples - node.class_counts.get(node.prediction, 0.0)
            return e, float(node.n_samples)

        # post-order
        if node.branches is not None:
            child_err = 0.0; child_w = 0.0
            for ch in node.branches.values():
                e, w = self._prune(ch)
                child_err += e; child_w += w
        else:
            eL, wL = self._prune(node.left)
            eR, wR = self._prune(node.right)
            child_err, child_w = eL+eR, wL+wR

        # leaf error if collapsed
        leaf_err = node.n_samples - node.class_counts.get(node.prediction, 0.0)

        child_rate = pessimistic_error_upper(child_err, child_w, self.cf)
        leaf_rate  = pessimistic_error_upper(leaf_err,  float(node.n_samples), self.cf)

        if leaf_rate <= child_rate + 1e-12:
            node.is_leaf = True
            node.feature = node.threshold = None
            node.branches = None; node.left = node.right = None
            return leaf_err, float(node.n_samples)
        return child_err, child_w

    def _assign_indices(self, node: Node, idx: np.ndarray):
        """Push parent idx down the subtree to refresh children's idx after raising."""
        node.idx = idx
        if node.is_leaf: return
        Xs = self._X.iloc[idx]
        if node.branches is not None:
            # categorical
            buckets = defaultdict(list)
            col = node.feature
            for i, v in zip(idx, Xs[col].values):
                buckets[v].append(i)
            for k, ch in node.branches.items():
                self._assign_indices(ch, np.array(buckets.get(k, []), dtype=int))
        else:
            # numeric
            col, thr = node.feature, node.threshold
            mask = (Xs[col].values <= thr)
            self._assign_indices(node.left,  idx[mask])
            self._assign_indices(node.right, idx[~mask])

    def _try_raising_here(self, node: Node):
        """Try subtree raising: replace 'node' with one of its children if pessimistic error doesn't get worse."""
        if node.is_leaf: return
        # evaluate current subtree on node.idx
        cur_err, cur_w = self._subtree_empirical_error(node, node.idx)
        cur_rate = pessimistic_error_upper(cur_err, cur_w, self.cf)

        candidates = []
        if node.branches is not None:
            candidates = list(node.branches.values())
        else:
            candidates = [node.left, node.right]

        for child in candidates:
            ch_err, ch_w = self._subtree_empirical_error(child, node.idx)
            ch_rate = pessimistic_error_upper(ch_err, ch_w, self.cf)
            if ch_rate <= cur_rate - 1e-12:   # strictly better
                # raise: copy child's structure into node
                # (deep copy to avoid aliasing other references)
                clone = copy.deepcopy(child)
                node.is_leaf = clone.is_leaf
                node.prediction = clone.prediction
                node.depth = node.depth  # keep same
                node.n_samples = node.n_samples
                node.class_counts = node.class_counts
                node.feature = clone.feature
                node.threshold = clone.threshold
                node.branches = clone.branches
                node.left = clone.left
                node.right = clone.right
                node.default_child = clone.default_child
                node.gain_ratio = clone.gain_ratio
                # re-distribute indices to refreshed structure
                self._assign_indices(node, node.idx)
                # after one successful raising,可以再嘗試連鎖提升
                return self._try_raising_here(node)

    def _raising(self, node: Node):
        if node.is_leaf: return
        # post-order
        if node.branches is not None:
            for ch in node.branches.values():
                self._raising(ch)
        else:
            self._raising(node.left)
            self._raising(node.right)
        # then try raising at this node
        self._try_raising_here(node)

    # ---- API

    def fit(self, X: pd.DataFrame, y: pd.Series, sample_weight: Optional[np.ndarray]=None):
        self._X = X.copy()
        yy = y.values if isinstance(y, pd.Series) else np.asarray(y)
        self._y = yy
        self._w = sample_weight if sample_weight is not None else np.ones(len(yy), dtype=float)

        self._infer_types(self._X)
        self._fit_imputers(self._X)
        X2 = self._transform(self._X)

        self._leaf_count = 0
        idx_all = np.arange(len(yy), dtype=int)
        self._tree_ = self._build(X2, yy, self._w, depth=0, idx=idx_all)
        # pruning
        self._prune(self._tree_)
        # raising
        if self.subtree_raising:
            self._assign_indices(self._tree_, idx_all)
            self._raising(self._tree_)
        return self

    def _predict_row(self, row: pd.Series, node: Node):
        while not node.is_leaf:
            if node.branches is not None:
                v = row[node.feature]
                child = node.branches.get(v)
                if child is None:
                    child = node.branches[node.default_child]
                node = child
            else:
                node = node.left if row[node.feature] <= node.threshold else node.right
        return node.prediction

    def _predict_batch_rows(self, X: pd.DataFrame, node: Optional[Node]=None):
        if node is None: node = self._tree_
        return [self._predict_row(X.iloc[i], node) for i in range(len(X))]

    def predict(self, X: pd.DataFrame) -> np.ndarray:
        X2 = self._transform(X)
        return np.array(self._predict_batch_rows(X2))

    # ---- pretty print

    def print_tree(self, node: Optional[Node]=None, depth: int=0, max_depth: Optional[int]=None):
        if node is None: node = self._tree_
        if node is None:
            print("(empty)"); return
        if max_depth is None: max_depth = self.viz_max_depth
        indent = "  " * depth
        if node.is_leaf or depth >= max_depth:
            print(f"{indent}Leaf[n={node.n_samples}] -> {node.prediction}  counts={node.class_counts}")
            return
        if node.branches is not None:
            print(f"{indent}Split @[#{node.feature}]  GR={node.gain_ratio:.4f}  n={node.n_samples}")
            for k, ch in node.branches.items():
                print(f"{indent} ├─ {k}:")
                self.print_tree(ch, depth+1, max_depth)
        else:
            print(f"{indent}Split @[{node.feature} <= {node.threshold:.4f}]  GR={node.gain_ratio:.4f}  n={node.n_samples}")
            print(f"{indent} ├─ True:");  self.print_tree(node.left,  depth+1, max_depth)
            print(f"{indent} └─ False:"); self.print_tree(node.right, depth+1, max_depth)

# =========================
# Dataset(標籤欄位：class）
# =========================
# ======================
# 參數
# ======================
import pandas as pd
import numpy as np

TRAIN_PATH = "adult_data_no_duplicates.csv"
TEST_PATH  = "adult_test_no_duplicates.csv"

MAX_DEPTH         = 10
MAX_LEAF_NODES    = 64
MIN_SAMPLES_SPLIT = 20
MIN_SAMPLES_LEAF  = 10         
MIN_GAIN          = 1e-4        # C4.5/C5.0：視為 min_gain_ratio 門檻
CF                = 0.25        # 悲觀誤差修剪的信賴係數 (C4.5/C5.0)
USE_RAISING       = True        # C5.0 子樹提升

# ======================
# 讀資料（標籤欄位=class）
# ======================
# ====== 對齊標籤欄（放在讀完 CSV 之後） ======

def load_adult(train_path=TRAIN_PATH, test_path=TEST_PATH,
               target_candidates=("class","income","label","target","y")):
    # 1) 讀檔
    train_df = pd.read_csv(train_path)
    test_df  = pd.read_csv(test_path)

    # 2) 欄名正規化
    def _norm_cols(df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        df.columns = [str(c).replace("\ufeff","").strip().lower() for c in df.columns]
        return df
    train_df = _norm_cols(train_df)
    test_df  = _norm_cols(test_df)

    # 3) 由訓練集決定目標欄；測試集改名對齊
    target_col = next((c for c in train_df.columns if c in target_candidates),
                      train_df.columns[-1])
    if target_col not in test_df.columns:
        test_df = test_df.rename(columns={test_df.columns[-1]: target_col})

    # 4) 標籤正規化（移除句點、統一大小寫）
    def _norm_labels(s: pd.Series) -> pd.Series:
        if s.dtype == object or s.dtype.name == "category":
            return (s.astype(str).str.strip()
                    .str.replace(r"\.$","", regex=True)
                    .str.replace(">50k", ">50K", case=False)
                    .str.replace("<=50k","<=50K", case=False))
        return s

    X_train = train_df.drop(columns=[target_col]).replace("?", np.nan)
    y_train = _norm_labels(train_df[target_col])
    X_test  = test_df.drop(columns=[target_col]).replace("?", np.nan)
    y_test  = _norm_labels(test_df[target_col])

    print("使用的目標欄位:", target_col)
    print("train 最後5個欄位:", train_df.columns[-5:].tolist())
    print("test  最後5個欄位:",  test_df.columns[-5:].tolist())
    return X_train, y_train, X_test, y_test

# 一鍵取得資料
X_train, y_train, X_test, y_test = load_adult()


# ======================
# C5.0
# 需先已定義好 C50DecisionTree 類別
# ======================
c50 = C50DecisionTree(
    max_depth=MAX_DEPTH,
    max_leaf_nodes=MAX_LEAF_NODES,
    min_samples_split=MIN_SAMPLES_SPLIT,
    min_samples_leaf=MIN_SAMPLES_LEAF,
    min_gain_ratio=MIN_GAIN,    # ← 用MIN_GAIN 當 gain-ratio 門檻
    cf=CF,
    subtree_raising=USE_RAISING,
    viz_max_depth=3
).fit(X_train, y_train)
# ========= Auto-tune C5.0 by ΔAcc (inline) =========
from sklearn.metrics import accuracy_score

GAP_THRESHOLD = 0.025
MAX_TUNE_STEPS = 5

def _fit_eval_c50(_params):
    _model = C50DecisionTree(
        max_depth=_params.get("max_depth"),
        max_leaf_nodes=_params.get("max_leaf_nodes"),
        min_samples_split=_params.get("min_samples_split", 20),
        min_samples_leaf=_params.get("min_samples_leaf", 10),
        min_gain_ratio=_params.get("min_gain_ratio", 1e-4),
        cf=_params.get("cf", 0.25),
        subtree_raising=_params.get("subtree_raising", True),
        random_state=_params.get("random_state", 42),
        viz_max_depth=3
    ).fit(X_train, y_train)
    _tr = accuracy_score(y_train, _model.predict(X_train))
    _te = accuracy_score(y_test,  _model.predict(X_test))
    return _model, _tr, _te, _tr - _te

# 初始參數
_init = dict(
    max_depth=globals().get("MAX_DEPTH", 10),
    max_leaf_nodes=globals().get("MAX_LEAF_NODES", 64),
    min_samples_split=globals().get("MIN_SAMPLES_SPLIT", 20),
    min_samples_leaf=globals().get("MIN_SAMPLES_LEAF", 10),
    min_gain_ratio=globals().get("MIN_GAIN", 1e-4),
    cf=globals().get("CF", 0.25),
    subtree_raising=globals().get("USE_RAISING", True),
    random_state=42
)

_hist_rows = []
_params = _init.copy()
c50_final = None
for _step in range(1, MAX_TUNE_STEPS + 1):
    _m, _tr, _te, _gap = _fit_eval_c50(_params)
    _hist_rows.append({"step": _step, "train_acc": _tr, "test_acc": _te, "gap": _gap, **_params})
    print(f"[C5.0][step {_step}] Train={_tr:.4f} Test={_te:.4f} Δ={_gap:.4f} params={_params}")
    if _gap <= GAP_THRESHOLD:
        c50_final = _m
        break
    # 收緊
    _params["min_gain_ratio"]    = min(_params.get("min_gain_ratio", 1e-4) * 1.7, 1e-1)
    _params["min_samples_split"] = min(int(_params.get("min_samples_split", 20) * 1.25), 300)
    _params["min_samples_leaf"]  = min(int(_params.get("min_samples_leaf", 10)  * 1.25), 200)
    if _params.get("max_depth", None) is not None:
        _params["max_depth"] = max(3, int(_params["max_depth"]) - 1)
    if _params.get("max_leaf_nodes", None) is not None:
        _params["max_leaf_nodes"] = max(16, int(_params["max_leaf_nodes"] * 0.8))
    _params["cf"] = min(0.5, float(_params.get("cf", 0.25)) * 1.1)

if c50_final is None:
    c50_final = _m

import pandas as _pd
_c50_hist = _pd.DataFrame(_hist_rows).round(6)
_c50_hist.to_csv("c50_tune_log.csv", index=False)
print("[C5.0] 調參歷程已輸出：c50_tune_log.csv")

from sklearn.metrics import accuracy_score as _acc
print("\n=== C5.0 最終模型（inline tuned）===")
print("Train_Acc=", _acc(y_train, c50_final.predict(X_train)))
print("Test_Acc =", _acc(y_test,  c50_final.predict(X_test)))

try:
    c50_final.print_tree(max_depth=3)
except Exception as e:
    print("[warn] print_tree 失敗：", e)
# ========= /Auto-tune C5.0 by ΔAcc (inline) =========



使用的目標欄位: class
train 最後5個欄位: ['capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'class']
test  最後5個欄位: ['capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'class']
[C5.0][step 1] Train=0.8520 Test=0.8533 Δ=-0.0013 params={'max_depth': 10, 'max_leaf_nodes': 64, 'min_samples_split': 20, 'min_samples_leaf': 10, 'min_gain_ratio': 0.0001, 'cf': 0.25, 'subtree_raising': True, 'random_state': 42}
[C5.0] 調參歷程已輸出：c50_tune_log.csv

=== C5.0 最終模型（inline tuned）===
Train_Acc= 0.8520453637397425
Test_Acc = 0.853342344556402
Split @[capital-gain <= 7073.5000]  GR=0.3400  n=32537
 ├─ True:
  Split @[capital-gain <= 5119.0000]  GR=0.0852  n=31138
   ├─ True:
    Split @[age <= 24.5000]  GR=0.0733  n=29378
     ├─ True:
      Leaf[n=5511] -> <=50K  counts={'<=50K': np.float64(5473.0), '>50K': np.float64(38.0)}
     └─ False:
      Leaf[n=23867] -> <=50K  counts={'<=50K': np.float64(18648.0), '>50K': np.float64(5219.0)}
   └─ False:
    Split @[capital-gain <= 5316.5000

In [13]:
# === 匯出 C5.0 樹圖（前 3 層；與先前風格一致，Graphviz 優先，無則用 Matplotlib 後備） ===
import matplotlib.pyplot as plt

try:
    from graphviz import Source
    _GV_OK = True
except Exception:
    _GV_OK = False
    print("[warn] 找不到 graphviz，將改用 Matplotlib 簡易繪圖。")

def _c50_to_dot(node, view_max_depth=3):
    lines = ['digraph Tree {',
             'node [shape=box, fontname="Helvetica"];',
             'edge [fontname="Helvetica"];']
    nid = [0]
    def walk(n, depth):
        my = nid[0]; nid[0]+=1
        if n.is_leaf or depth >= view_max_depth:
            lbl = f'leaf\\nclass={n.prediction}\\nsamples={n.n_samples}'
            lines.append(f'{my} [label="{lbl}", style="rounded,filled"];')
            return my
        if n.branches is not None:
            lines.append(f'{my} [label="{n.feature}"];')
            for k, ch in n.branches.items():
                cid = walk(ch, depth+1)
                lab = str(k).replace('"','\\"')
                lines.append(f'{my} -> {cid} [label="{lab}"];')
        else:
            lines.append(f'{my} [label="{n.feature} <= {n.threshold:.4f}"];')
            L = walk(n.left,  depth+1)
            R = walk(n.right, depth+1)
            lines.append(f'{my} -> {L} [label="True"];')
            lines.append(f'{my} -> {R} [label="False"];')
        return my
    walk((c50_final if "c50_final" in globals() and c50_final is not None else c50)._tree_, 0)
    lines.append('}')
    return "\n".join(lines)

# Matplotlib 後備簡易排版
def _c50_count_leaves(n):
    if n.is_leaf: return 1
    if n.branches is not None:
        return sum(_c50_count_leaves(ch) for ch in n.branches.values())
    return _c50_count_leaves(n.left) + _c50_count_leaves(n.right)

def _c50_layout(n, x0, x1, y, step, coords, edges, labels, depth, view_max_depth):
    my_id = id(n); coords[my_id] = ((x0+x1)/2, y, n)
    if n.is_leaf or depth >= view_max_depth: return
    if n.branches is not None:
        kids = list(n.branches.items())
        sizes = [ _c50_count_leaves(ch) for _, ch in kids ]
        total = sum(sizes)
        cur = x0
        for (lab, ch), sz in zip(kids, sizes):
            w = (x1-x0) * sz / total
            nx0, nx1 = cur, cur+w; cur += w
            _c50_layout(ch, nx0, nx1, y-step, step, coords, edges, labels, depth+1, view_max_depth)
            edges.append((my_id, id(ch))); labels[(my_id, id(ch))] = str(lab)
    else:
        sizes = [_c50_count_leaves(n.left), _c50_count_leaves(n.right)]
        total = sum(sizes); wL = (x1-x0) * sizes[0] / total
        _c50_layout(n.left,  x0,     x0+wL, y-step, step, coords, edges, labels, depth+1, view_max_depth)
        _c50_layout(n.right, x0+wL, x1,     y-step, step, coords, edges, labels, depth+1, view_max_depth)
        edges.append((my_id, id(n.left)));  labels[(my_id, id(n.left))]  = "True"
        edges.append((my_id, id(n.right))); labels[(my_id, id(n.right))] = "False"

def save_c50_tree_png(model, out_png="Tree_C50.png", view_max_depth=3):
    root = model._tree_
    if _GV_OK:
        dot = _c50_to_dot(root, view_max_depth=view_max_depth)
        src = Source(dot); src.format = "png"; src.render(out_png.replace(".png",""), cleanup=True)
        print(f"[OK] C5.0 樹圖片：{out_png}")
        return
    # 後備：Matplotlib
    coords, edges, labels = {}, [], {}
    _c50_layout(root, 0.0, 1.0, 1.0, 0.18, coords, edges, labels, 0, view_max_depth)
    fig, ax = plt.subplots(figsize=(12,6))
    for (x, y, n) in [(*coords[k][:2], coords[k][2]) for k in coords]:
        if n.is_leaf:
            txt = f'leaf\nclass={n.prediction}\nN={n.n_samples}'
        else:
            txt = n.feature if n.branches is not None else f'{n.feature} <= {n.threshold:.3f}'
        ax.text(x, y, txt, ha="center", va="center", bbox=dict(boxstyle="round,pad=0.3", fc="w", ec="k"))
    for (u,v) in edges:
        x1,y1,_ = coords[u]; x2,y2,_ = coords[v]
        ax.annotate("", xy=(x2,y2+0.01), xytext=(x1,y1-0.01), arrowprops=dict(arrowstyle="-"))
        ax.text((x1+x2)/2, (y1+y2)/2, labels[(u,v)], ha="center", va="center")
    ax.set_axis_off(); plt.tight_layout(); plt.savefig(out_png, dpi=300); plt.close()
    print(f"[OK] C5.0 樹圖片(後備)：{out_png}")

# 取最終模型並輸出樹圖（只畫 3 層）
_c50_model = c50_final if "c50_final" in globals() and c50_final is not None else c50
save_c50_tree_png(_c50_model, out_png="Tree_C50.png", view_max_depth=3)

# === 匯出測試結果到 Excel：加入 C5.0 ===
import numpy as np, pandas as pd
from pathlib import Path
import time

# 1) 整理 y_true（字串與 0/1）
y_true_str = pd.Series(y_test).astype(str).str.strip().str.replace('.', '', regex=False)
map01 = {'<=50K': 0, '>50K': 1}
y_true01 = y_true_str.map(map01).astype(int).to_numpy()

# 2) 各模型預測（若未定義就跳過）
def as01(x): 
    return np.asarray(x).astype(int).ravel()

cols = {
    "ID3_pred":  None,
    "C4.5_pred": None,
    "CART_pred": None,
    "C5.0_pred": None,
}


# 2b) C5.0 預測（用最終模型）
c50_pred_str = pd.Series(_c50_model.predict(X_test)).astype(str).str.strip().str.replace('.', '', regex=False)
cols["C5.0_pred"] = c50_pred_str.map(map01).astype(int).to_numpy()

# 3) 組合總表（含字串真值 + 各模型 0/1 與是否正確）
out = pd.DataFrame({
    'row': np.arange(len(y_true01)),
    'true': y_true_str,
    'true01': y_true01,
})
for name, arr in cols.items():
    if arr is not None:
        out[name] = arr
        out[name + "_correct"] = (arr == y_true01)

# 4) 輸出 Excel（優先 openpyxl；無則 xlsxwriter；避免檔案被鎖時自動換名）
try:
    import openpyxl
    _eng = "openpyxl"
except Exception:
    _eng = "xlsxwriter"

export_dir = Path("exports")
export_dir.mkdir(exist_ok=True)
base = export_dir / "Test_Predictions.xlsx"

def _write_xlsx(path: Path):
    with pd.ExcelWriter(path, engine=_eng, mode='w') as xw:
        out.to_excel(xw, sheet_name='combined', index=False)
        # 分表
        for name, arr in cols.items():
            if arr is None: 
                continue
            pd.DataFrame({'true01': y_true01, name: arr, 'correct': (arr == y_true01)}).to_excel(
                xw, sheet_name=name.replace("_pred",""), index=False
            )
        # 也輸出 C5.0 字串版方便看
        pd.DataFrame({'true': y_true_str, 'C5.0_pred_str': c50_pred_str}).to_excel(
            xw, sheet_name='C5.0_str', index=False
        )

try:
    _write_xlsx(base)
    print(f"[OK] 已輸出：{base.resolve()}")
except PermissionError:
    ts = time.strftime("%Y%m%d_%H%M%S")
    alt = base.with_name(f"{base.stem}_{ts}{base.suffix}")
    print("[warn] 原檔案可能被 Excel 鎖住，改存新檔。")
    _write_xlsx(alt)
    print(f"[OK] 已輸出：{alt.resolve()}")


[OK] C5.0 樹圖片：Tree_C50.png
[OK] 已輸出：C:\Users\rui0731\anaconda_projects\0e478e30-a257-4faa-aa14-a67f5fcce6cf\exports\Test_Predictions.xlsx


In [12]:
# === 輸出混淆矩陣圖（依據可用的模型預測自動輸出） ===
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score
import matplotlib.pyplot as plt

def _save_cm(y_true, y_pred, title, filename, class_names=('< =50K','>50K')):
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    fig, ax = plt.subplots(figsize=(4, 4))
    disp.plot(ax=ax, values_format='d', colorbar=False)
    ax.set_title(f"{title}  Acc={accuracy_score(y_true, y_pred):.4f}")
    plt.tight_layout(); plt.savefig(filename, dpi=300); plt.close()
    print(f"[OK] 混淆矩陣：{filename}")

name_map = {
    "ID3_pred":  ("ID3",  "CM_ID3.png"),
    "C4.5_pred": ("C4.5", "CM_C45.png"),
    "CART_pred": ("CART", "CM_CART.png"),
    "C5.0_pred": ("C5.0", "CM_C50.png"),  
}

for key, (title, fname) in name_map.items():
    if key in cols and cols[key] is not None:
        _save_cm(y_true01, cols[key], title, fname, class_names=('< =50K','>50K'))


[OK] 混淆矩陣：CM_C50.png
