In [1]:
import numpy as np
from collections import Counter
from math import log2

### 经验熵：$H(D) = Sum(|C_k|/|D| * log2(|C_k|/|D|)))$
### 信息增益：$H(D) - H(D|A)$
### 信息纯度：$Gini(D) = 1 - (sum(|C_k|/|D|))^2$
- ID3：通过信息增益来进行分支，基础公式：
- C4.5: 通过信息增益比
- CART:Gini

### $H(D|A) = Sum(D_i/D * H(D))$, $D_i$表示特征A取值为i的样本子集

In [None]:
class Node:
    def __init__(self, attribute=None, value=None, label=None):
        self.attribute = attribute
        self.value = value # 分裂属性的取值
        self.label = label # 叶子节点的类标签
        self.children = {} # 子节点的字典，格式为属性值：子节点

In [None]:
class DecisionTreeID3:
    def __init__(self):
        self.root = None

    def fit(self, X, y):
        self.root = self._build_tree(X,y)

    def _build_tree(self, X, y):
        attributes = list(range(X.shape[1])) # 获取所有属性的索引表

        # 递归构建决策树
        return self._recursive_build_tree(X, y, attributes)

    def _recursive_build_tree(self, X, y, attributes):
        node = Node()

        # 所有的样本属于同一类别，直接设置叶子节点的类标签
        if np.all(y == y[0]):
            node.label = y[0]
            return node

        # 若属性列表为空，说明所有的属性都已用于构建决策树，无更多属性可供分裂，此时叶子节点的类标签为样本中最多的类别
        if not attributes:
            node.label = np.argmax(np.bincount(y))
            return node

        # 找到最优划分的特征，对其进行分裂
        best_attr, best_value = self._choose_best_attr(X,y, attributes)

        # 设置当前节点的分裂属性和取值
        node.attribute = best_attr
        node.value = best_value

        attr_col = X[:, best_attr]
        unique_values = np.unique(attr_col) # 根据特征的不同取值进行分裂

        for value in unique_values:
            # 选取分裂属性为best_attr,取值为value的样本
            mask = attr_col == value
            X_subset, y_subset = X[mask], y[mask]

            # 从属性列表中移除best_attr，递归构建子树
            attributes_subset = attributes.copy()
            attributes_subset.remove(best_attr)

            child_node = self._recursive_build_tree(X_subset, y_subset, attributes_subset)
            node.children[value] = child_node

        return node

    def _choose_best_attr(self, X, y, attributes):
        best_attr = None
        best_value = None

        best_info_gain = -np.inf

        # 计算初始信息熵
        initial_entropy = self._calculate_entropy(y)

        # 计算每个属性的信息增益，选择最大的信息增益
        for attr in attributes:
            attr_col = X[:, attr]
            unique_values = np.unique(attr_col)
            for value in unique_values:
                # 根据属性和取值划分数据集
                mask = attr_col == value
                X_subset, y_subset = X[mask], y[mask]

                # 计算划分后的加权信息熵
                subset_entropy = self._calculate_entropy(y_subset)

                info_gain = initial_entropy - subset_entropy

                # 计算信息增益
                if info_gain > best_info_gain:
                    best_info_gain = best_info_gain
                    best_attr = attr
                    best_value = value

        return best_attr, best_value

    def _calculate_entropy(self, y):
        # 计算给定标签的信息熵
        class_cnt = np.bincount(y)
        class_probs = class_cnt / len(y)
        entropy = -np.sum(class_probs * np.log2(class_probs + 1e-8))
        return entropy

    def _traverse_tree(self, node:Node, instance):
        # 遍历决策树，根据实例的属性值预测类别
        if node.label is not None:
            return node.label

        attr_value = instance[node.attribute]
        if attr_value in node.children:
            chile_node = node.children[attr_value]
            return self._traverse_tree(chile_node, instance)

        return np.random.choice(np.unique(instance))

    def predict(self, x):
        # 如果当前节点是叶子节点，返回叶子节点的类别
        node = self.root
        while node.children:
            attribute_value = x.item(node.attribute)
            if attribute_value in node.children:
                node = node.children[attribute_value]
            else:
                # 未找到对应的节点，返回根节点的类别
                return node.label

        return node.label

In [None]:
# 模拟训练与测试数据
X_train = np.array([[1, 'S'], [1, 'M'], [1, 'M'], [1, 'S'], [1, 'S'], [2, 'S'], [2, 'M'], [2, 'M'], [2, 'L'], [2, 'L'], [3, 'L'], [3, 'M'], [3, 'M'], [3, 'L'], [3, 'L']])
y_train = np.array(['N', 'N', 'Y', 'Y', 'N', 'N', 'N', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'N'])

X_test = np.array([[2, 'S'], [1, 'M'], [3, 'L'], [3, 'M'], [2, 'L']])
y_test = np.array(['N', 'N', 'Y', 'Y', 'Y'])

In [None]:
from sklearn.preprocessing import LabelEncoder
# 对字符特征进行处理
encoder = LabelEncoder()

X_train_encoded = X_train.copy()
X_train_encoded[:,1] = encoder.fit_transform(X_train[:,1])

y_train_encoded = encoder.fit_transform(y_train)

In [None]:
X_test_encoded = X_test.copy()
X_test_encoded[:,1] = encoder.fit_transform(X_test[:,1])

y_test_encoded = encoder.fit_transform(y_test)

In [None]:
# 训练与测试验证
dt = DecisionTreeID3()
dt.fit(X_train_encoded, y_train_encoded)

In [None]:
err_cnt = 0
for idx in range(len(X_test_encoded)):
    y_pred = dt.predict(X_test_encoded[idx])
    err_cnt += 1 if y_pred != y_test_encoded[idx] else 0

acc = 1 - err_cnt / len(y_test_encoded)
print(f'ACC is {acc:.2f}')

In [None]:
class DecisionTreeID3:
    def __init__(self):
        self.root = None

    def fit(self, X, y):
        self.root = self._build_tree(X,y)

    def _build_tree(self, X, y):
        attributes = list(range(X.shape[1])) # 获取所有属性的索引表

        # 递归构建决策树
        return self._recursive_build_tree(X, y, attributes)

    def _recursive_build_tree(self, X, y, attributes):
        node = Node()

        # 所有的样本属于同一类别，直接设置叶子节点的类标签
        if np.all(y == y[0]):
            node.label = y[0]
            return node

        # 若属性列表为空，说明所有的属性都已用于构建决策树，无更多属性可供分裂，此时叶子节点的类标签为样本中最多的类别
        if not attributes:
            node.label = np.argmax(np.bincount(y))
            return node

        # 找到最优划分的特征，对其进行分裂
        best_attr, best_value = self._choose_best_attr(X,y, attributes)

        # 设置当前节点的分裂属性和取值
        node.attribute = best_attr
        node.value = best_value

        attr_col = X[:, best_attr]
        unique_values = np.unique(attr_col) # 根据特征的不同取值进行分裂

        for value in unique_values:
            # 选取分裂属性为best_attr,取值为value的样本
            mask = attr_col == value
            X_subset, y_subset = X[mask], y[mask]

            # 从属性列表中移除best_attr，递归构建子树
            attributes_subset = attributes.copy()
            attributes_subset.remove(best_attr)

            child_node = self._recursive_build_tree(X_subset, y_subset, attributes_subset)
            node.children[value] = child_node

        return node

    def _choose_best_attr(self, X, y, attributes):
        best_attr = None
        best_value = None

        best_info_gain = -np.inf

        # 计算初始信息熵
        initial_entropy = self._calculate_entropy(y)

        # 与ID3不同，C4.5选择最大信息增益比
        for attr in attributes:
            attr_col = X[:, attr]
            unique_values = np.unique(attr_col)
            for value in unique_values:
                # 根据属性和取值划分数据集
                mask = attr_col == value
                X_subset, y_subset = X[mask], y[mask]

                # 计算划分后的加权信息熵
                subset_entropy = self._calculate_entropy(y_subset)

                info_gain = initial_entropy - subset_entropy

                # 计算信息增益
                if info_gain > best_info_gain:
                    best_info_gain = best_info_gain
                    best_attr = attr
                    best_value = value

        return best_attr, best_value

    def _calculate_entropy(self, y):
        # 计算给定标签的信息熵
        class_cnt = np.bincount(y)
        class_probs = class_cnt / len(y)
        entropy = -np.sum(class_probs * np.log2(class_probs + 1e-8))
        return entropy

    def _traverse_tree(self, node:Node, instance):
        # 遍历决策树，根据实例的属性值预测类别
        if node.label is not None:
            return node.label

        attr_value = instance[node.attribute]
        if attr_value in node.children:
            chile_node = node.children[attr_value]
            return self._traverse_tree(chile_node, instance)

        return np.random.choice(np.unique(instance))

    def predict(self, x):
        # 如果当前节点是叶子节点，返回叶子节点的类别
        node = self.root
        while node.children:
            attribute_value = x.item(node.attribute)
            if attribute_value in node.children:
                node = node.children[attribute_value]
            else:
                # 未找到对应的节点，返回根节点的类别
                return node.label

        return node.label

In [49]:
err_cnt = 0
for idx in range(len(X_test_encoded)):
    y_pred = dt.predict(X_test_encoded[idx])
    err_cnt += 1 if y_pred != y_test_encoded[idx] else 0

acc = 1 - err_cnt / len(y_test_encoded)
print(f"ACC is {acc:.2f}")

ACC is 1.00


In [None]:
import numpy as np
from collections import Counter
from math import log2

class Node:
    def __init__(self, attribute=None, value=None, label=None, split_point=None):
        self.attribute = attribute
        self.value = value
        self.label = label
        self.split_point = split_point
        self.children = {}

class C45DecisionTree:
    def __init__(self, min_samples_split=2):
        self.min_samples_split = min_samples_split
        self.root = None

    def fit(self, X, y, attributes):
        self.root = self._build_tree(X, y, attributes)

    def _build_tree(self, X, y, attributes):
        if len(set(y)) == 1:
            # 所有样本属于同一类别，创建叶子节点
            label = y[0]
            return Node(label=label)

        if len(attributes) == 0:
            # 属性列表为空，创建叶子节点，类标签为样本中最多的类别
            label = Counter(y).most_common(1)[0][0]
            return Node(label=label)

        # 选择最佳划分属性
        best_attribute, split_point = self._choose_best_attribute(X, y, attributes)
        node = Node(attribute=best_attribute, split_point=split_point)

        if best_attribute is None:
            # 没有合适的属性可用，创建叶子节点，类标签为样本中最多的类别
            label = Counter(y).most_common(1)[0][0]
            return Node(label=label)

        # 根据最佳属性进行划分
        attribute_values = X[:, attributes.index(best_attribute)]
        if split_point is not None:
            mask = attribute_values <= split_point
            X_left, y_left = X[mask], y[mask]
            X_right, y_right = X[~mask], y[~mask]
            node.value = split_point
        else:
            attribute_values = X[:, attributes.index(best_attribute)]
            unique_values = np.unique(attribute_values)
            X_splits = []
            y_splits = []
            for value in unique_values:
                mask = attribute_values == value
                X_split, y_split = X[mask], y[mask]
                X_splits.append(X_split)
                y_splits.append(y_split)
            node.value = unique_values

        # 递归构建子节点
        for i, value in enumerate(node.value):
            if split_point is not None:
                child_X, child_y = X_left, y_left if i == 0 else X_right, y_right
            else:
                child_X, child_y = X_splits[i], y_splits[i]
            child_attributes = attributes.copy()
            child_attributes.remove(best_attribute)
            child_node = self._build_tree(child_X, child_y, child_attributes)
            node.children[value] = child_node

        return node

    def _choose_best_attribute(self, X, y, attributes):
        num_samples = len(y)
        entropy = self._calculate_entropy(y)

        best_attribute = None
        best_gain_ratio = 0.0
        best_split_point = None

        for attribute in attributes:
            attribute_values = X[:, attributes.index(attribute)]

            if all(isinstance(value, str) for value in attribute_values):
                # 离散属性
                information_gain = entropy - self._calculate_attribute_entropy(attribute_values, y)
                gain_ratio = self._calculate_gain_ratio(information_gain, attribute_values, num_samples)

                if gain_ratio > best_gain_ratio:
                    best_gain_ratio = gain_ratio
                    best_attribute = attribute
                    best_split_point = None

            else:
                # 连续属性
                sorted_indices = np.argsort(attribute_values)
                sorted_attribute = attribute_values[sorted_indices]
                sorted_labels = y[sorted_indices]

                split_points = []
                for i in range(len(sorted_attribute) - 1):
                    if sorted_attribute[i] != sorted_attribute[i + 1]:
                        split_points.append((sorted_attribute[i] + sorted_attribute[i + 1]) / 2)

                for split_point in split_points:
                    left_indices = attribute_values <= split_point
                    right_indices = attribute_values > split_point

                    left_labels = sorted_labels[left_indices]
                    right_labels = sorted_labels[right_indices]

                    left_entropy = self._calculate_entropy(left_labels)
                    right_entropy = self._calculate_entropy(right_labels)
                    information_gain = entropy - (len(left_labels) / num_samples) * left_entropy - (len(right_labels) / num_samples) * right_entropy

                    gain_ratio = self._calculate_gain_ratio(information_gain, attribute_values, num_samples)

                    if gain_ratio > best_gain_ratio:
                        best_gain_ratio = gain_ratio
                        best_attribute = attribute
                        best_split_point = split_point

        return best_attribute, best_split_point

    def _calculate_entropy(self, labels):
        class_counts = Counter(labels)
        num_samples = len(labels)
        entropy = 0.0

        for count in class_counts.values():
            probability = count / num_samples
            entropy -= probability * log2(probability)

        return entropy

    def _calculate_attribute_entropy(self, attribute, labels):
        unique_values = np.unique(attribute)
        num_samples = len(labels)
        entropy = 0.0

        for value in unique_values:
            value_labels = labels[attribute == value]
            value_count = len(value_labels)
            value_entropy = self._calculate_entropy(value_labels)
            entropy += (value_count / num_samples) * value_entropy

        return entropy

    def _calculate_gain_ratio(self, information_gain, attribute, num_samples):
        intrinsic_value = self._calculate_intrinsic_value(attribute, num_samples)
        if intrinsic_value == 0:
            return 0
        return information_gain / intrinsic_value

    def _calculate_intrinsic_value(self, attribute, num_samples):
        value_counts = Counter(attribute)
        intrinsic_value = 0.0

        for count in value_counts.values():
            probability = count / num_samples
            intrinsic_value -= probability * log2(probability)

        return intrinsic_value