In [1]:
import numpy as np

In [2]:
X = np.array([
    [0, 0, 0, 0],
    [0, 0, 0, 1],
    [0, 1, 0, 1],
    [0, 1, 1, 0],
    [0, 0, 0, 0],
    [1, 0, 0, 0],
    [1, 0, 0, 1],
    [1, 1, 1, 1],
    [1, 0, 1, 2],
    [1, 0, 1, 2],
    [2, 0, 1, 2],
    [2, 0, 1, 1],
    [2, 1, 0, 1],
    [2, 1, 0, 2],
    [2, 0, 0, 0],
])
y = np.array([0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0])


In [3]:
n_samples, n_features = X.shape

classes = np.unique(y)
n_classes = classes.shape[0]

 ## 特征选择

 ### 熵

In [4]:
values, counts = np.unique(y, return_counts=True)
entropy = 0
for i in range(n_classes):
    p = counts[i] / n_samples
    entropy += p * np.log2(p)
entropy = -entropy

print(entropy)

0.9709505944546686


In [5]:
def entropy(x):
    values, counts = np.unique(x, return_counts=True)
    n_classes = values.shape[0]
    n_samples = x.shape[0]

    entropy_ = 0
    for i in range(n_classes):
        p = counts[i] / n_samples
        entropy_ -= p * np.log2(p)
    return entropy_
print(entropy(y))

0.9709505944546686


In [None]:
entropy([0,0,0,1])

 ### 条件熵

In [6]:
conditional_entropy = 0
splitter_idxs = {}

values = X[:, 1]
for idx, item in enumerate(values):
    splitter_idxs.setdefault(item, [])
    splitter_idxs[item].append(idx)

for key in splitter_idxs.keys():
    x = y[splitter_idxs[key]]
    conditional_entropy += len(x) / n_samples * entropy(x)

print(conditional_entropy)

0.8879430945988998


In [7]:
def conditional_entropy(y, x):
    n_samples = y.shape[0]

    conditional_entropy_ = 0
    splitter_idxs = {}

    for idx, item in enumerate(x):
        splitter_idxs.setdefault(item, [])
        splitter_idxs[item].append(idx)

    for key in splitter_idxs.keys():
        temp = y[splitter_idxs[key]]
        conditional_entropy_ += len(temp) / n_samples * entropy(temp)

    return conditional_entropy_

print(conditional_entropy(y, X[:, 0]))

0.8879430945988998


 ### 信息增益

In [8]:
entropy_ = entropy(y)
conditional_entropy_ = conditional_entropy(y, X[:, 0])
information_gain_ = entropy_ - conditional_entropy_

print(information_gain_)

0.08300749985576883


In [9]:
def information_gain(y, x):
    return entropy(y) - conditional_entropy(y, x)

print(information_gain(y, X[:, 0]))

0.08300749985576883


 ## 决策树的生成

### 决策树的生成

In [10]:
class Node(object):

    # !TODO 类型定义过于宽泛
    def __init__(self, feature=None, thresholds=None, sample_idxs=None, n_samples=None, impurity=None, label=None):
        # 所以节点均具有的属性
        self.n_samples = n_samples
        self.impurity = impurity

        # 切割节点具有的属性
        self.feature = feature
        self.children = []
        self.thresholds = [] # 每一个进入子节点的条件

        # 叶子节点具有的属性
        self.label = label

In [11]:
class Tree(object):

    def __init__(self):
        self.root = None

In [12]:
feature_idxs = list(range(n_features))

In [13]:
decision_tree = Tree()

In [14]:
max_information_gain = - np.inf
selected_feature = None
for i in range(n_features):
    information_gain_ = information_gain(y, X[:, i])
    if information_gain_ > max_information_gain:
        max_information_gain = information_gain_
        selected_feature = i

print(selected_feature)

2


In [15]:
feature_idxs.remove(selected_feature)

In [16]:
decision_tree.root = Node()
decision_tree.root.n_samples = n_samples
decision_tree.root.impurity = entropy(y)

values = X[:, selected_feature]
splitter_idxs = {}
for idx, item in enumerate(values):
    splitter_idxs.setdefault(item, [])
    splitter_idxs[item].append(idx)

In [17]:
for key in splitter_idxs.keys():
    child = Node()
    child.feature = selected_feature
    sample_idxs = splitter_idxs[key]
    child.n_samples = len(sample_idxs)
    child.impurity = entropy(y[sample_idxs])

    decision_tree.root.children.append(child)
    decision_tree.root.thresholds.append(key)

In [18]:
def build(X, y, features=None, samples=None, epsilon=1e-3):
    # 为保留原始的 feature 索引
    X_ = X[samples, :]
    y_ = y[samples]

    n_features = len(features)
    n_samples = len(samples)

    impurity = entropy(y_)

    
    values, counts = np.unique(y_, return_counts=True)
    label = values[np.argmax(counts)]
    # 如果特征集 A 为空集，或 D 中所有实例属于同一类
    if n_features == 0 or len(np.unique(y_)) == 1:
        root = Node(
            n_samples=n_samples,
            impurity=impurity,
            label=label
        )
        return root

    max_information_gain = - np.inf
    for feature in features:
        information_gain_ = information_gain(y_, X_[:, feature])
        if information_gain_ > max_information_gain:
            max_information_gain = information_gain_
            selected_feature = feature
    
    features.remove(selected_feature)

    # 如果最大信息增益小于阈值 epsilon
    if max_information_gain <= epsilon:
        root = Node(
            n_samples=n_samples,
            impurity=impurity,
            label=label
        )
        return root
    
    root = Node(
        feature=selected_feature,
        n_samples=n_samples,
        impurity=impurity
    )

    values = X_[:, selected_feature]
    splitter_idxs = {}
    for idx, item in enumerate(values):
        splitter_idxs.setdefault(item, [])
        splitter_idxs[item].append(samples[idx])

    for key in splitter_idxs.keys():
        samples = splitter_idxs[key]
        child = build(X, y, features, samples)
        threshold = key
        root.children.append(child)
        root.thresholds.append(threshold)

    return root

In [19]:
decision_tree.root = build(X, y, list(range(n_features)), list(range(n_samples)))

### 决策树的输出

In [20]:
# https://scikit-learn.org/stable/auto_examples/tree/plot_unveil_tree_structure.html

stack = [(decision_tree.root, 0)]
i = 0
while len(stack) > 0:
    node, depth = stack.pop()
    for child in node.children:
        stack.append((child, depth + 1))
    if len(node.children) == 0:
        print("{space}node {node_id} is a leaf node with label {node_label}, samples {node_samples} and impurity {node_impurity}" \
            .format(space=(depth)*'\t', node_id=i, node_label=node.label, node_samples=node.n_samples, node_impurity=node.impurity))
    else:
        print("{space}node {node_id} is a split node on feature {node_feature} with thresholds {node_thresholds}" \
            .format(space=(depth)*'\t', node_id=i, node_feature=node.feature, node_thresholds=node.thresholds))
    i += 1

node 0 is a split node on feature 2 with thresholds [0, 1]
	node 1 is a leaf node with label 1, samples 6 and impurity 0.0
	node 2 is a split node on feature 1 with thresholds [0, 1]
		node 3 is a leaf node with label 1, samples 3 and impurity 0.0
		node 4 is a leaf node with label 0, samples 6 and impurity 0.0


### 决策树的预测

In [21]:
X_test = np.array([[0, 0, 0, 0]])
y_test = np.array([0])

node = decision_tree.root
while node.label is None:
    selected_feature = node.feature
    sample_value = X_test[0][selected_feature]
    next_node_idx = node.thresholds.index(sample_value)
    node = node.children[next_node_idx]

node.label

0

In [22]:
def predict(X, decision_tree):
    n_samples = X.shape[0]
    y_pred = []
    for i in range(n_samples):
        node = decision_tree.root
        while node.label is None:
            selected_feature = node.feature
            sample_value = X[i][selected_feature]
            next_node_idx = node.thresholds.index(sample_value)
            node = node.children[next_node_idx]
        y_pred.append(node.label)
    return np.array(y_pred)

In [23]:
predict(X, decision_tree)

array([0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0])

In [26]:
y

array([0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0])

In [25]:
# !TODO 待完成代码改进
#   * 将以上代码抽象化为一个类，并增加绘图函数
#   1. 将决策树存储结构简化为二叉树
#   2. 使之能够处理连续性协变量
#   3. 增加 Gini 指数这一信息增益计算方法
#   4. 修改代码使之可以处理回归问题