# Ch05 决策树

决策树学习的本质是从训练数据集中归纳出一组分类规则    
决策树学习的损失函数通常是正则化的极大似然函数，学习策略是以损失函数为目标函数的最小化。    

## 特征选择

如果利用一个特征进行分类的结果与随机分类的结果没有很大差别,则称这个特征是没有分类能力的。经验上扔掉这样的特征对决策树学习的精度影响不大。    
通常特征选择的准则是**信息增益**或**信息增益比**            

**熵**的定义:           
$$
p(X = x_i) = p_i    \\
H(X) = -\sum_{i=1}^n p_i log p_i
$$

**条件熵**的定义:       
$$
H(Y|X) = \sum_{i=1}^n p_i H(Y|X=x_i) 
$$

信息增益(information gain)表示得知特征X的信息而使得类Y的信息的不确定性减少的程度   
特征X对训练数据集D的信息增益$g(D,X)$
$$
g(D, X) = H(D) - H(D|X)
$$

**信息增益算法**               
输入: 训练数据集D和特征A    
输出: 特征A对训练数据集D的信息增益$g(D,A)$   

![image.png](./img/gain.png)

**计算书上的例题 5.2**    

In [40]:
import numpy as np
import pandas as pd


def create_data():
    datasets = [['青年', '否', '否', '一般', '否'],
                ['青年', '否', '否', '好', '否'],
                ['青年', '是', '否', '好', '是'],
                ['青年', '是', '是', '一般', '是'],
                ['青年', '否', '否', '一般', '否'],
                ['中年', '否', '否', '一般', '否'],
                ['中年', '否', '否', '好', '否'],
                ['中年', '是', '是', '好', '是'],
                ['中年', '否', '是', '非常好', '是'],
                ['中年', '否', '是', '非常好', '是'],
                ['老年', '否', '是', '非常好', '是'],
                ['老年', '否', '是', '好', '是'],
                ['老年', '是', '否', '好', '是'],
                ['老年', '是', '否', '非常好', '是'],
                ['老年', '否', '否', '一般', '否'],
                ]
    labels = [u'年龄', u'有工作', u'有自己的房子', u'信贷情况', u'类别']
    # 返回数据集和每个维度的名称
    return datasets, labels

datasets, labels = create_data()
train_data = pd.DataFrame(datasets, columns=labels)
print(train_data)

    年龄 有工作 有自己的房子 信贷情况 类别
0   青年   否      否   一般  否
1   青年   否      否    好  否
2   青年   是      否    好  是
3   青年   是      是   一般  是
4   青年   否      否   一般  否
5   中年   否      否   一般  否
6   中年   否      否    好  否
7   中年   是      是    好  是
8   中年   否      是  非常好  是
9   中年   否      是  非常好  是
10  老年   否      是  非常好  是
11  老年   否      是    好  是
12  老年   是      否    好  是
13  老年   是      否  非常好  是
14  老年   否      否   一般  否


**1. 定义熵计算公式**

In [41]:
def entropy(p):
    if (p == 0.0):
        return 0

    return - p * np.log2(p)

**2. 定义计算经验熵公式**       
根据数据集的类别将数据集$D$划分为不同的子集$D_1, D_2, \dots, D_k$

$$
H(D) = sum(entropy(len(D_i) / len(D)))
$$

In [42]:
def calc_ent(df:pd.DataFrame, feature_name: str):
    res = 0.0
    unique_label = np.unique(df[feature_name])
    
    for label in unique_label:
        sub_df = df[df[feature_name]==label]
        res += entropy(len(sub_df) / len(df))
        
    return res

print(calc_ent(train_data, "类别"))

0.9709505944546686


**3. 计算信息增益**     
给定特征`feature_name`, 先计算整个数据集的$H(D)$, 根据特征的取值将数据集$D$划分为不同的子集$D_1, D_2, \dots$    

$$
H(D|A) = \sum \frac{len(D_i)}{len(D)} * calc\_ent(D_i) \\
g(D, A) = H(D) - H(D|A)
$$

In [43]:
def info_gain(df:pd.DataFrame, feature_name:str, label_name:str):
    H_D_A = 0.0
    H_D = calc_ent(df, label_name)
    
    unique_feature = np.unique(df[feature_name])
    
    for feature in unique_feature:
        sub_df = df[df[feature_name]==feature]
        H_D_A += len(sub_df) / len(df) * calc_ent(sub_df, label_name)
    
    return H_D - H_D_A

feature_names = train_data.columns.to_list()[:-1]
for feature_name in feature_names:
    gain = info_gain(train_data, feature_name, "类别")
    print(f"info gain of {feature_name}: {gain}")

info gain of 年龄: 0.08300749985576883
info gain of 有工作: 0.32365019815155627
info gain of 有自己的房子: 0.4199730940219749
info gain of 信贷情况: 0.36298956253708536


**信息增益比**  
以信息增益作为划分训练数据集的特征,存在偏向于选择取值较多的特征的问 题。使用信息增益比(information gain ratio)可以对这一问题进行校正

$$
g_R(D, A) = g(D, A) / H_A(D)
$$

In [44]:
feature_name = "年龄"
label_name = "类别"
gain = info_gain(train_data, feature_name, label_name)
h_a_d = calc_ent(train_data, feature_name)
print(gain / h_a_d)

0.05237190142858302


In [45]:
from typing import Optional
class Node:
    def __init__(self, feature=None, label=None):
        self.feature = feature
        self.label = label
        self.child = {}
    
    def is_leaf(self):
        return self.label is not None
    
    def __repr__(self):
        if self.is_leaf():
            return f"Leaf({self.label})"
        return f"Node({self.feature}, {self.child})"
    
class ID3DecisionTree:
    def __init__(self):
        self.root = None

    def best_split(self, df: pd.DataFrame) -> str:
        feature_names = df.columns.to_list()[:-1]
        label_name = df.columns.to_list()[-1]
        gain_map = {}
        
        for feature_name in feature_names:
            gain = info_gain(df, feature_name, label_name)
            gain_map[feature_name] = gain 
            
        return max(gain_map, key=gain_map.get)
        
    def train(self, df : pd.DataFrame):
        self.root = self.build_tree(df)
    
    def build_tree(self, df: pd.DataFrame) -> Optional[Node]:
        col_names = df.columns.to_list()
        feature_names = col_names[:-1]
        label_name = col_names[-1]
        df_y = df[label_name]
        unique_y = np.unique(df_y)
        
        if len(unique_y) == 1: # 只有一个类别, 返回叶子节点
            return Node(label=unique_y[0])
            
        if len(feature_names) == 0:
            # 没有特征可以划分
            return Node(label=df_y.mode())
        
        best_feature = self.best_split(df)
        root = Node(feature=best_feature)
        
        unique_feature_value = np.unique(df[best_feature])
        
        for feature_value in unique_feature_value:
            sub_df = df[df[best_feature] == feature_value].drop(columns=[best_feature])
            root.child[feature_value] = self.build_tree(sub_df)

        return root
    
    def predict(self, x):
        node = self.root 
        
        while not node.is_leaf():
            value = x.get(node.feature)    # 获取样本在当前feature上的取值
            if value in node.child:
                node = node.child[value]
            else:
                return None 
        
        return node.label  

In [46]:
model = ID3DecisionTree()
model.train(train_data)

for _, data in train_data.iterrows():
    x = data.iloc[:-1]
    y = data.iloc[-1]
    y_pred = model.predict(x)
    
    print(y, y_pred)

否 否
否 否
是 是
是 是
否 否
否 否
否 否
是 是
是 是
是 是
是 是
是 是
是 是
是 是
否 否


## CART算法

其实纵观决策树的生成代码, 重要的不过两个函数:   
- best_split: 根据不同的指标得到数据集的最佳划分点, 将这个划分点作为根节点, 划分出来的子数据集作为子树生成使用      
- build_tree: 确定边界条件; 根据best_split函数得到的划分点生成根节点以及递归调用build_tree生成子树      

In [None]:
from __future__ import annotations  # 启用新的类型提示解析
import numpy as np

class Node:
    def __init__(self, feature: str = None, thr: float = None, left: Node = None, right: Node = None, value: float = None):
        self.feature = feature
        self.thr = thr
        self.value = value
        self.left: Node = left
        self.right: Node = right

    def is_leaf(self):
        return self.value is not None


def gini(y: np.ndarray) -> float:
    _, counts = np.unique(y, return_counts=True)
    probs = counts / len(y)  # 计算每个类别的概率
    return 1.0 - np.sum(probs ** 2)  # 计算基尼指数


class CARTClassfier:
    def __init__(self, min_sample: int = 2, max_depth: int = 3):
        self.min_sample = min_sample
        self.max_depth = max_depth
        self.root: Node = None

    def best_split(self, x: np.ndarray, y: np.ndarray):
        """ 返回最佳分割的特征和特征值 """
        best_gini = np.inf
        best_feature, best_thr = None, None
        best_left, best_right = None, None

        n_samples, n_feats = x.shape
        for feat in range(n_feats):
            thrs = np.unique(x[:, feat])    # 对特征值进行去重操作
            for thr in thrs:
                left_mask = x[:, feat] <= thr   # 小于当前阈值的划归左子树
                right_mask = ~left_mask # 大于当前阈值的划归右子树

                if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
                    continue

                left_gini = gini(y[left_mask])
                right_gini = gini(y[right_mask])
                final_gini = (np.sum(left_mask) * left_gini + np.sum(right_mask) * right_gini) / n_samples

                if final_gini < best_gini:
                    best_gini = final_gini
                    best_feature, best_thr = feat, thr
                    best_left, best_right = left_mask, right_mask

        return (best_feature, best_thr, best_left, best_right)

    def build_tree(self, x: np.ndarray, y: np.ndarray, depth=0):
        if len(set(y)) == 1 or len(y) < self.min_sample or (self.max_depth and depth >= self.max_depth):
            return Node(value=np.argmax(np.bincount(y)))    # 返回多数类别

        feature, thr, left_mask, right_mask = self.best_split(x, y)
        if feature == None:
            return Node(value=np.argmax(np.bincount(y)))

        left_tree = self.build_tree(x[left_mask], y[left_mask], depth+1)
        right_tree = self.build_tree(x[right_mask], y[right_mask], depth+1)

        return Node(feature=feature, thr=thr, left=left_tree, right=right_tree)

    def train(self, x: np.ndarray, y: np.ndarray):
        self.root = self.build_tree(x, y)

    def predict(self, x: np.ndarray):
        node = self.root

        while not node.is_leaf():
            if x[node.feature] <= node.thr:
                node = node.left
            else:
                node = node.right

        return node.value