In [9]:
# =========================
# From-scratch WEIGHTED Regression Tree
# (only stdlib + NumPy inside the method)
# =========================
import numpy as np
from dataclasses import dataclass
from typing import Optional, Tuple

# -------- primary metric for selection/reporting --------
def wmae(y, yhat, w):
    w = np.asarray(w, float)
    return (np.abs(y - yhat) * w).sum() / w.sum()

# -------- small helpers --------
def wmean(y, w):
    w = np.asarray(w, float)
    sw = w.sum()
    return (y * w).sum() / sw if sw > 0 else 0.0

def leaf_sse(y, w):
    mu = wmean(y, w)
    return ((y - mu) ** 2 * w).sum()

@dataclass
class Node:
    feature: Optional[int] = None
    threshold: Optional[float] = None
    left: Optional["Node"] = None
    right: Optional["Node"] = None
    value: Optional[float] = None  # prediction at leaf

    def is_leaf(self) -> bool:
        return self.value is not None

class DecisionTreeRegressorScratch:
    """
    Simple exposure-weighted regression tree for rates.
    - Splits minimize weighted SSE (sum of leaf SSEs).
    - Leaf prediction = exposure-weighted mean of y in the region.
    - Pre-pruning via max_depth and min_leaf_weight (exposure units).
    """
    def __init__(self, max_depth: Optional[int] = None, min_leaf_weight: float = 5.0):
        self.max_depth = max_depth
        self.min_leaf_weight = float(min_leaf_weight)
        self.root: Optional[Node] = None

    def fit(self, X: np.ndarray, y: np.ndarray, w: np.ndarray):
        X = np.asarray(X, float)
        y = np.asarray(y, float)
        w = np.asarray(w, float)
        self.root = self._build_tree(X, y, w, np.arange(X.shape[0]), depth=0)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        X = np.asarray(X, float)
        return np.array([self._traverse(x, self.root) for x in X], float)

    # ----- internal: build tree -----
    def _build_tree(self, X, y, w, idx, depth) -> Node:
        # stopping rules
        if (self.max_depth is not None and depth >= self.max_depth) or \
           (w[idx].sum() < 2 * self.min_leaf_weight) or \
           np.allclose(y[idx], y[idx][0]):
            return Node(value=wmean(y[idx], w[idx]))

        feat, thr, L, R = self._best_split(X, y, w, idx)
        if feat is None:
            return Node(value=wmean(y[idx], w[idx]))

        left = self._build_tree(X, y, w, L, depth+1)
        right = self._build_tree(X, y, w, R, depth+1)
        return Node(feature=feat, threshold=thr, left=left, right=right)

    # ----- internal: best split -----
    def _best_split(self, X, y, w, idx) -> Tuple[Optional[int], Optional[float], Optional[np.ndarray], Optional[np.ndarray]]:
        best = (None, None, None, None, np.inf)
        n, d = X.shape
        for j in range(d):
            xj = X[idx, j]
            uniq = np.unique(xj)
            if uniq.size <= 1:
                continue
            # thresholds: midpoints; for one-hot 0/1, just 0.5
            if uniq.size == 2 and uniq.min() == 0.0 and uniq.max() == 1.0:
                candidates = [0.5]
            else:
                u = np.unique(np.sort(xj))
                candidates = (u[:-1] + u[1:]) / 2.0

            for t in candidates:
                Lmask = xj <= t
                if not Lmask.any() or Lmask.all():
                    continue
                L = idx[Lmask]; R = idx[~Lmask]
                # exposure-weighted minimum leaf size
                if w[L].sum() < self.min_leaf_weight or w[R].sum() < self.min_leaf_weight:
                    continue
                sse = leaf_sse(y[L], w[L]) + leaf_sse(y[R], w[R])
                if sse < best[4]:
                    best = (j, t, L, R, sse)

        return best[0], best[1], best[2], best[3]

    # ----- internal: predict one -----
    def _traverse(self, x, node: Node) -> float:
        while not node.is_leaf():
            node = node.left if x[node.feature] <= node.threshold else node.right
        return node.value


In [10]:
import pandas as pd
from preprocessing.preprocessing_utils import preprocess_for_tree

train = pd.read_csv("../data/claims_train.csv")
test  = pd.read_csv("../data/claims_test.csv")

X_tr, y_tr, w_tr = preprocess_for_tree(train)
X_te, y_te, w_te = preprocess_for_tree(test)

# Convert to NumPy
X_np = X_tr.values.astype(float)
y_np = y_tr.values.astype(float)
w_np = w_tr.values.astype(float)

# Simple train/val split (you may use sklearn's train_test_split if you want)
rng = np.random.default_rng(42)
idx = np.arange(len(y_np))
rng.shuffle(idx)
cut = int(0.8 * len(idx))
tr_idx, va_idx = idx[:cut], idx[cut:]

X_tr_np, y_tr_np, w_tr_np = X_np[tr_idx], y_np[tr_idx], w_np[tr_idx]
X_va_np, y_va_np, w_va_np = X_np[va_idx], y_np[va_idx], w_np[va_idx]

# ---- tiny CV grid (keep it small & readable) ----
depth_grid = [3, 5, 7, 9]
leafw_grid = [5.0, 10.0, 20.0]

best = None
for d in depth_grid:
    for m in leafw_grid:
        tree = DecisionTreeRegressorScratch(max_depth=d, min_leaf_weight=m).fit(X_tr_np, y_tr_np, w_tr_np)
        yhat = tree.predict(X_va_np)
        score = wmae(y_va_np, yhat, w_va_np)
        if (best is None) or (score < best["wmae"]):
            best = {"max_depth": d, "min_leaf_weight": m, "wmae": score, "model": tree}

print("Chosen caps:", {k: best[k] for k in ["max_depth","min_leaf_weight","wmae"]})

# ---- final fit on FULL training set with chosen caps ----
final_tree = DecisionTreeRegressorScratch(
    max_depth=best["max_depth"],
    min_leaf_weight=best["min_leaf_weight"]
).fit(X_np, y_np, w_np)

# ---- evaluate on your held-out test (if you have labels) ----
if "y_te" in globals() and y_te is not None:
    X_te_np = X_te.values.astype(float)
    y_te_np = y_te.values.astype(float)
    w_te_np = w_te.values.astype(float)
    yhat_te = final_tree.predict(X_te_np)
    print("Test WMAE:", wmae(y_te_np, yhat_te, w_te_np))


ModuleNotFoundError: No module named 'preprocessing'