In [1]:
import numpy as np

def gini_impurity(y):
    """
    计算标签向量 y 的 Gini 不纯度
    
    :param y: 一维数组 (m,)
    :return: Gini 不纯度（标量）
    """
    # 统计每个类别出现的次数
    label_counts = {}
    for label in y:
        label_counts[label] = label_counts.get(label, 0) + 1
    
    m = len(y)  # 样本数
    gini = 1.0
    for label, count in label_counts.items():
        p = count / m  # 该标签在节点中的比例
        gini -= p**2   # 累减 p^2

    return gini



def split_dataset(X, y, feature_idx, threshold):
    """
    根据 feature_idx 列的阈值 threshold 划分数据集
    返回: (X_left, y_left, X_right, y_right)
    """
    # 选取该特征列
    col_values = X[:, feature_idx]
    # 根据阈值进行划分（小于阈值的归左，否则归右）
    left_mask = col_values < threshold
    right_mask = ~left_mask  # 取反，即大于等于 threshold

    X_left, y_left = X[left_mask], y[left_mask]
    X_right, y_right = X[right_mask], y[right_mask]
    return X_left, y_left, X_right, y_right

def best_split(X, y):
    """
    在所有特征的所有可能划分阈值上遍历，
    找出能使加权 Gini 不纯度最小的 (best_feature, best_threshold)
    同时返回对应划分的数据集 (X_left, y_left, X_right, y_right)
    如果无法再划分，则返回 None, None, None, None, None
    
    注意：这里最简单地把 X[:, feature_idx] 的所有可能取值都当做一个潜在阈值
    """
    m, n = X.shape
    if m <= 1:
        return None, None, None, None, None  # 数据太少，无法划分
    
    # 计算当前节点的 Gini，以便遇到无法找到更好划分时可以短路
    current_gini = gini_impurity(y)
    best_gain = 0.0
    best_feature, best_threshold = None, None
    best_splits = (None, None, None, None)
    
    for feature_idx in range(n):
        # 取该列特征的所有值，并去重排序，作为潜在阈值
        feature_values = X[:, feature_idx]
        unique_values = np.unique(feature_values)
        
        # 遍历这些唯一值
        for threshold in unique_values:
            X_left, y_left, X_right, y_right = split_dataset(X, y, feature_idx, threshold)
            if len(y_left) == 0 or len(y_right) == 0:
                # 如果划分后，有一边为空，则跳过
                continue
            
            # 计算左右子节点的 Gini
            gini_left = gini_impurity(y_left)
            gini_right = gini_impurity(y_right)
            # 加权 Gini（按左右子节点的样本比例）
            p_left = len(y_left) / m
            p_right = 1.0 - p_left
            weighted_gini = p_left * gini_left + p_right * gini_right
            
            # 信息增益 = current_gini - weighted_gini
            gain = current_gini - weighted_gini
            
            # 找到最优增益
            if gain > best_gain:
                best_gain = gain
                best_feature = feature_idx
                best_threshold = threshold
                best_splits = (X_left, y_left, X_right, y_right)
    
    if best_gain > 0:
        return best_feature, best_threshold, best_splits[0], best_splits[1], best_splits[2], best_splits[3]
    else:
        return None, None, None, None, None  # 找不到更好的划分


In [4]:
class DecisionTreeNode:
    def __init__(self, feature_idx=None, threshold=None,
                 left_child=None, right_child=None, 
                 leaf_value=None):
        """
        决策树的节点结构：
        :param feature_idx: 划分的特征列索引
        :param threshold: 划分阈值
        :param left_child: 左子节点
        :param right_child: 右子节点
        :param leaf_value: 如果是叶节点，存储预测类别
        """
        self.feature_idx = feature_idx
        self.threshold = threshold
        self.left_child = left_child
        self.right_child = right_child
        self.leaf_value = leaf_value

def most_common_label(y):
    """
    返回向量 y 中出现频次最高的类别
    作为叶节点的输出
    """
    labels, counts = np.unique(y, return_counts=True)
    max_count_index = np.argmax(counts)
    return labels[max_count_index]

class DecisionTreeClassifierScratch:
    def __init__(self, max_depth=None, min_samples_split=2):
        """
        :param max_depth: 决策树最大深度 (None 表示不限制)
        :param min_samples_split: 节点最少样本数，小于该值则不再划分
        """
        self.root = None
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
    
    def fit(self, X, y):
        """
        构建决策树
        """
        self.root = self._build_tree(X, y, depth=0)
    
    def _build_tree(self, X, y, depth):
        """
        递归地构建树，返回当前节点
        """
        # 1. 停止条件检查
        if len(np.unique(y)) == 1:
            # 如果当前节点只有一种类别，直接变成叶节点
            leaf_val = y[0]
            return DecisionTreeNode(leaf_value=leaf_val)
        
        if self.max_depth is not None and depth >= self.max_depth:
            # 达到最大深度，返回叶节点
            leaf_val = most_common_label(y)
            return DecisionTreeNode(leaf_value=leaf_val)
        
        if len(y) < self.min_samples_split:
            # 样本数太少，不再划分
            leaf_val = most_common_label(y)
            return DecisionTreeNode(leaf_value=leaf_val)
        
        # 2. 找最优划分
        feature_idx, threshold, X_left, y_left, X_right, y_right = best_split(X, y)
        
        if feature_idx is None:
            # 找不到更好的划分
            leaf_val = most_common_label(y)
            return DecisionTreeNode(leaf_value=leaf_val)
        
        # 3. 递归构建左右子树
        left_child = self._build_tree(X_left, y_left, depth+1)
        right_child = self._build_tree(X_right, y_right, depth+1)
        
        # 4. 返回当前节点
        return DecisionTreeNode(feature_idx=feature_idx,
                                threshold=threshold,
                                left_child=left_child,
                                right_child=right_child)


    def predict(self, X):
        """
        对于输入 X，每一行做一次预测，输出分类结果
        """
        predictions = [self._predict_sample(sample, self.root) for sample in X]
        return np.array(predictions)
    
    def _predict_sample(self, x, node):
        """
        从根节点开始，递归地对单条样本 x 进行预测
        """
        # 如果是叶节点，则直接返回叶节点中的类别
        if node.leaf_value is not None:
            return node.leaf_value
        
        # 根据当前节点的分割特征和阈值来判断走左还是右
        feature_val = x[node.feature_idx]
        if feature_val < node.threshold:
            return self._predict_sample(x, node.left_child)
        else:
            return self._predict_sample(x, node.right_child)


In [10]:
if __name__ == "__main__":
    # 准备一份示例数据
    X_train = np.array([
        [2.7, 2.5],
        [1.3, 1.0],
        [3.0, 2.1],
        [6.0, 2.3],
        [3.0, 3.0],
        [7.0, 2.0],
        [2.0, 5.0],
        [9.0, 1.0],
    ])
    y_train = np.array([0, 0, 0, 1, 0, 1, 0, 1])

    # 初始化并训练决策树
    tree = DecisionTreeClassifierScratch(max_depth=3, min_samples_split=2)
    tree.fit(X_train, y_train)

    # 预测
    X_test = np.array([
        [2.5, 3.0],
        [8.0, 2.0],
        [3.0, 2.4],
    ])
    y_pred = tree.predict(X_test)

    # 打印预测结果
    print("Test Samples:\n", X_test)
    print("Predictions:", y_pred)


Test Samples:
 [[2.5 3. ]
 [8.  2. ]
 [3.  2.4]]
Predictions: [0 1 0]
