# CART

## 基于NumPy的实现

In [1]:
import numpy as np

### 二叉决策树

In [2]:
### 定义树结点
class TreeNode:
    def __init__(self, feature_ix=None, threshold=None, leaf_value=None, left_branch=None, right_branch=None):
        # 特征索引
        self.feature_ix = feature_ix
        # 特征的划分阈值
        self.threshold = threshold
        # 叶子结点的取值
        self.leaf_value = leaf_value
        # 左子树
        self.left_branch = left_branch
        # 右子树
        self.right_branch = right_branch

In [3]:
### 定义二叉决策树
class BinaryDecisionTree:
    ### 决策树初始参数
    def __init__(self, min_samples_split=3, min_gini_impurity=999, max_depth=float("inf"), loss=None):
        # 根结点
        self.root = None
        # 结点的最小分裂样本数
        self.min_samples_split = min_samples_split
        # 结点的基尼不纯度
        self.min_gini_impurity = min_gini_impurity
        # 树的最大深度
        self.max_depth = max_depth
        # 基尼不纯度计算函数
        self.gini_impurity_calc = None
        # 叶子结点的值预测函数
        self.leaf_value_calc = None
        # 损失函数
        self.loss = loss

    ### 决策树拟合函数
    def fit(self, X, y, loss=None):
        # 递归构建决策树
        self.root = self._construct_tree(X, y)
        self.loss = None

    ### 决策树构建函数
    def _construct_tree(self, X, y, current_depth=0):
        # 初始化最小基尼不纯度
        init_gini_impurity = 999
        # 初始化最优特征索引和阈值
        best_criteria = None
        # 初始化数据子集
        best_subsets = None

        # 合并输入和标签
        Xy = np.concatenate((X, y), axis=1)
        # 获取样本数和特征数
        m, n = X.shape
        # 设定决策树构建条件
        # 训练样本量大于结点最小分裂样本数且当前树深度小于最大深度
        if m >= self.min_samples_split and current_depth <= self.max_depth:
            # 遍历计算每个特征的基尼不纯度
            for f_i in range(n):
                # 获取第i个特征的所有取值
                f_values = np.expand_dims(X[:, f_i], axis=1)
                # 获取第i个特征的唯一取值
                unique_values = np.unique(f_values)

                # 遍历取值并寻找最优特征分裂阈值
                for threshold in unique_values:
                    # 特征结点二叉分裂
                    Xy1, Xy2 = feature_split(Xy, f_i, threshold)
                    # 如果分裂后的子集大小都不为0
                    if len(Xy1) != 0 and len(Xy2) != 0:
                        # 获取两个子集的标签值
                        y1, y2 = Xy1[:, n:], Xy2[:, n:]

                        # 计算基尼不纯度
                        impurity = self.gini_impurity_calc(y, y1, y2)
                        # 获取最小基尼不纯度
                        # 最优特征索引和分类阈值
                        if impurity < init_gini_impurity:
                            init_gini_impurity = impurity
                            best_criteria = {
                                "feature_ix": f_i,
                                "threshold": threshold
                            }
                            best_subsets = {
                                "leftX": Xy1[:, :n],
                                "lefty": Xy1[:, n:],
                                "rightX": Xy2[:, :n],
                                "righty": Xy2[:, n:]
                            }

        # 如果计算的最小基尼不纯度小于设定的最小基尼不纯度
        if init_gini_impurity < self.min_gini_impurity:
            # 分别构建左右子树
            left_branch = self._construct_tree(best_subsets["leftX"], best_subsets["lefty"], current_depth + 1)
            right_branch = self._construct_tree(best_subsets["rightX"], best_subsets["righty"], current_depth + 1)
            return TreeNode(feature_ix=best_criteria["feature_ix"], threshold=best_criteria["threshold"],
                            left_branch=left_branch, right_branch=right_branch)

        # 计算叶子结点取值
        leaf_value = self.leaf_value_calc(y)
        return TreeNode(leaf_value=leaf_value)

    ### 数据集预测函数
    def predict(self, X):
        y_pred = [self.predict_value(sample) for sample in X]
        return y_pred

    ### 定义二叉树值的预测函数
    def predict_value(self, x, tree=None):
        if tree is None:
            tree = self.root
        # 如果叶子结点已有值，则直接返回已有值
        if tree.leaf_value is not None:
            return tree.leaf_value
        # 选择特征并获取特征值
        feature_value = x[tree.feature_ix]
        # 判断落入左子树还是右子树
        branch = tree.right_branch
        if isinstance(feature_value, int) or isinstance(feature_value, float):
            if feature_value >= tree.threshold:
                branch = tree.left_branch
        elif feature_value == tree.threshold:
            branch = tree.left_branch
        # 测试子集
        return self.predict_value(x, branch)

In [10]:
# 定义二叉特征分裂函数
def feature_split(X, feature_i, threshold):
    """
    输入:
    X: 训练数据集
    feature_i: 特征索引
    threshold: 特征分裂阈值
    输出:
    X_left, X_right: 分裂后的子数据集
    """
    if isinstance(threshold, int) or isinstance(threshold, float):
        split_func = lambda sample: sample[feature_i] >= threshold
    else:
        split_func = lambda sample: sample[feature_i] == threshold
    X_left = np.array([sample for sample in X if split_func(sample)])
    X_right = np.array([sample for sample in X if not split_func(sample)])
    return X_left, X_right

### 分类树

In [5]:
### CART分类树
class ClassificationTree(BinaryDecisionTree):
    ### 定义基尼不纯度的计算过程
    def _calculate_gini_impurity(self, y, y1, y2):
        p = len(y1) / len(y)
        gini_impurity = p * calculate_gini(y1) + (1 - p) * calculate_gini(y2)
        return gini_impurity

    ### 多数投票
    def _majority_vote(self, y):
        most_common = None
        max_count = 0
        for label in np.unique(y):
            # 统计多数
            count = len(y[y == label])
            if count > max_count:
                most_common = label
                max_count = count
        return most_common

    # 分类树拟合
    def fit(self, X, y):
        self.gini_impurity_calc = self._calculate_gini_impurity
        self.leaf_value_calc = self._majority_vote
        super(ClassificationTree, self).fit(X, y)

In [6]:
### 基尼指数计算函数
def calculate_gini(nums):
    """
    输入:
    nums: 包含类别取值的列表
    输出:
    gini: 基尼指数值
    """
    nums = nums.tolist()
    # 获取列表类别的概率分布
    probs = [nums.count(i) / len(nums) for i in np.unique(nums)]
    # 计算基尼指数
    gini = sum([p * (1-p) for p in probs])
    return gini

### 分类树测试

In [7]:
from sklearn import datasets
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

In [8]:
# 导入iris数据集
data = datasets.load_iris()
# 获取输入和标签
X, y = data.data, data.target
y = y.reshape((-1, 1))
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

In [11]:
# 创建分类树模型实例
clf = ClassificationTree()
# 分类树训练
clf.fit(X_train, y_train)
#分类树预测
y_pred = clf.predict(X_test)
# 打印模型分类准确率
print("Accuracy of CART classification tree based on NumPy: ", accuracy_score(y_test, y_pred))

Accuracy of CART classification tree based on NumPy:  0.9555555555555556


## 回归树

In [20]:
### CART回归树
class RegressionTree(BinaryDecisionTree):
    ### 计算方差减少量
    def _calculate_variance_reduction(self, y, y1, y2):
        var_tot = np.var(y, axis=0)
        var_y1 = np.var(y1, axis=0)
        var_y2 = np.var(y2, axis=0)
        frac_1 = len(y1) / len(y)
        frac_2 = len(y2) / len(y)
        # 计算方差减少量
        variance_reduction = var_tot - (frac_1 * var_y1 + frac_2 * var_y2)
        return sum(variance_reduction)

    ### 结点值取平均
    def _mean_of_y(self, y):
        value = np.mean(y, axis=0)
        return value if len(value) > 1 else value[0]

    # 回归树拟合
    def fit(self, X, y):
        self.gini_impurity_calc = self._calculate_variance_reduction
        self.leaf_value_calc = self._mean_of_y
        super(RegressionTree, self).fit(X, y)

### 回归树测试

In [15]:
from sklearn.datasets import load_boston
from sklearn.metrics import mean_squared_error

In [16]:
# 获取输入和标签
X, y = load_boston(return_X_y=True)
y = y.reshape((-1, 1))
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

In [21]:
# 创建回归树模型实例
reg = RegressionTree()
# 模型训练
reg.fit(X_train, y_train)
# 模型预测
y_pred = reg.predict(X_test)
# 评估均方误差
mse = mean_squared_error(y_test, y_pred)
print("MSE of CART regression tree based on NumPy: ", mse)

MSE of CART regression tree based on NumPy:  84.1635855263158


## 基于sklearn的实现

In [22]:
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor

In [13]:
# 创建分类树实例
clf = DecisionTreeClassifier()
# 分类树训练
clf.fit(X_train, y_train)
# 分类树预测
y_pred = clf.predict(X_test)
print("Accuracy of CART classification tree based on sklearn:", accuracy_score(y_test, y_pred))

Accuracy of CART classification tree based on sklearn: 0.9555555555555556


In [23]:
# 创建回归树实例
reg = DecisionTreeRegressor()
# 回归树训练
reg.fit(X_train, y_train)
# 回归树预测
y_pred = reg.predict(X_test)
print("MSE of CART regression tree based on sklearn:", mean_squared_error(y_test, y_pred))

MSE of CART regression tree based on sklearn: 25.763815789473686
