In [1]:
from sklearn.preprocessing import LabelEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
from sklearn import tree

import numpy as np
import pandas as pd
from collections import Counter, defaultdict
import random

## 一、数据预处理

In [2]:
columns = ['age', 'workclass', 'fnlwgt', 'education', 'educationNum', 
           'maritalStatus', 'occupation', 'relationship', 'race', 'sex',
           'capitalGain', 'capitalLoss', 'hoursPerWeek', 'nativeCountry', 'income']
continues_columns = ['age', 'fnlwgt', 'educationNum', 'capitalGain', 'capitalLoss', 'hoursPerWeek']
discrete_columns = [_ for _ in columns if _ not in continues_columns]


# 读取数据
train_data = pd.read_csv('data/adult.data', delimiter=', ', header=None, engine='python')
test_data = pd.read_csv('data/adult.test', delimiter=', ', header=None, skiprows=1, engine='python')
train_data.columns = columns
test_data.columns = columns


# 清洗数据：删除含有缺失值的样本
print(f"清洗前的数据规模：训练集 -> {train_data.shape} 测试集 -> {test_data.shape}")
for column in columns:
    train_data = train_data[~train_data[column].astype(str).str.contains('\?')]
    test_data = test_data[~test_data[column].astype(str).str.contains('\?')]
print(f"清洗后的数据规模：训练集 -> {train_data.shape} 测试集 -> {test_data.shape}")


# 离散数据：按列编码为float类型
LE = LabelEncoder()
for i in range(len(columns)):
    if columns[i] in continues_columns: continue
    train_data[columns[i]] = LE.fit_transform(train_data[columns[i]]).astype(float)
    test_data[columns[i]] = LE.fit_transform(test_data[columns[i]]).astype(float)


# 连续数据：离散化为分位数区间的中值
for column in continues_columns:
    train_qcut = pd.qcut(train_data[column].astype(float), 5, duplicates='drop')
    train_data[column] = train_qcut.apply(lambda x: x.mid)
    test_qcut = pd.qcut(test_data[column].astype(float), 5, duplicates='drop')
    test_data[column] = test_qcut.apply(lambda x: x.mid)


# 划分数据集
X_train = train_data.iloc[:, :-1]
y_train = train_data.iloc[:, -1]
X_test = test_data.iloc[:, :-1]
y_test = test_data.iloc[:, -1]

清洗前的数据规模：训练集 -> (32561, 15) 测试集 -> (16281, 15)
清洗后的数据规模：训练集 -> (30162, 15) 测试集 -> (15060, 15)


## 二、自定义决策树模型

In [3]:
class Node:
    def __init__(self, attribute=None, children=None, label=None):
        self.attribute = attribute  # 当前结点属性
        self.children = children    # 子节点字典，value: Node
        self.label = label          # 叶结点标签

In [22]:
class MyDecisionTreeClassifier:
    def __init__(self, criterion='entropy'):
        self.criterion = criterion
        self.tree = None


    def fit(self, X_train, y_train):
        self.tree = self._build_tree(X_train, y_train)


    def _build_tree(self, X: pd.DataFrame, y: pd.Series):
        # 如果数据集为空，返回 None
        if len(y) == 0:
            return None

        # 如果标签相同，返回叶子结点
        if len(set(y)) == 1:
            return Node(label=y.iloc[0])

        # 选择最佳划分属性
        best_attribute, best_gain = self._choose_best_attribute(X, y)

        # 如果没有合适的划分，返回叶子结点
        if best_gain == 0:
            return Node(label=Counter(y).most_common(1)[0][0])

        # 构建叶子结点
        children = {}
        for value in X[best_attribute].unique():
            # 根据属性值划分数据集
            value_indices = (X[best_attribute] == value)
            X_subset = X.loc[value_indices]
            y_subset = y.loc[value_indices]

            # 递归构建叶子结点
            child_node = self._build_tree(X_subset, y_subset)
            children[value] = child_node

        # 返回内叶子结点
        return Node(attribute=best_attribute, children=children)


    def _choose_best_attribute(self, X, y):
        best_gain = 0
        best_attribute = None

        # 遍历所有特征
        for attribute in X.columns:
            # 获取当前特征的唯一值
            values = X[attribute].unique()

            # 划分数据集
            subsets = defaultdict(list)
            for value in values:
                indices = (X[attribute] == value)
                y_subset = y.loc[indices]
                subsets[value] = y_subset

            gain = self._calculate_gain(y, subsets)

            if gain > best_gain:
                best_gain = gain
                best_attribute = attribute

        return best_attribute, best_gain


    def _calculate_gain(self, y, subsets):
        # 根据选择的标准计算增益
        if self.criterion == 'entropy':
            gain = self._entropy_gain(y, subsets)
        elif self.criterion == 'log_loss':
            gain = self._ratio_gain(y, subsets)
        elif self.criterion == 'gini':
            gain = self._gini_gain(y, subsets)
        else:
            raise ValueError("Unsupported criterion")
        return gain


    def _calculate_entropy(self, y: pd.Series) -> float:
        counts = Counter(y)
        total = len(y)
        entropy = 0

        for count in counts.values():
            prob = count / total
            entropy -= prob * np.log2(prob)

        return entropy


    def _entropy_gain(self, y: pd.Series, subsets: dict) -> float:
        entropy = self._calculate_entropy(y)
        child_entropy = np.sum([len(subset) / len(y) * self._calculate_entropy(subset) for subset in subsets.values()])        
        return entropy - child_entropy


    def _ratio_gain(self, y: pd.Series, subsets: dict) -> float:
        gain = self._entropy_gain(y, subsets)
        iv = np.sum([-len(subset) / len(y) * np.log2(len(subset) / len(y)) for subset in subsets.values()])
        return 0 if iv == 0 else gain / iv
    

    def _gini_gain(self, y: pd.Series, subsets: dict) -> float:
        gini_index = 0
        for subset in subsets.values():
            gini_index += len(subset) / len(y) * (1 - np.sum([(len(subset.loc[subset == label]) / len(subset)) ** 2 for label in subset.unique()]))
        return -gini_index


    def predict(self, X_test) -> np.array:
        y_pred = [self._predict_single_sample(sample) for _, sample in X_test.iterrows()]
        return np.array(y_pred)


    def _predict_single_sample(self, sample):
        node = self.tree
        while node.label is None:
            value = sample[node.attribute]

            # 如果叶子结点为空，返回子结点中最多的标签
            if value not in node.children or node.children[value] is None:
                child_labels = []

                # 仅当子节点存在且是叶节点时，获取其标签
                for child_node in node.children.values():
                    if child_node is not None and child_node.label is not None:
                        child_labels.append(child_node.label)
                
                if len(child_labels) == 0: return 0.0
                most_common_label = Counter(child_labels).most_common(1)[0][0]
                return most_common_label

            else:
                node = node.children[value]

        return node.label

## 三、多模型对照测试

### 3.1 使用 entropy 作为划分标准

In [7]:
clf_std = DecisionTreeClassifier(criterion='entropy')
clf_std.fit(X_train, y_train)
y_pred_std = clf_std.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_std)
print(f'标准包准确率：{accuracy:.2f}')

clf_my = MyDecisionTreeClassifier(criterion='entropy')
clf_my.fit(X_train, y_train)
y_pred_my = clf_my.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_my)
print(f'自实现准确率：{accuracy:.2f}')

# tree.plot_tree(clf_std, filled=True)

标准包准确率：0.78
自实现准确率：0.77


### 3.2 使用 log_loss 作为划分标准

In [18]:
clf_std = DecisionTreeClassifier(criterion='log_loss')
clf_std.fit(X_train, y_train)
y_pred_std = clf_std.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_std)
print(f'标准包准确率：{accuracy:.2f}')


clf_my = MyDecisionTreeClassifier(criterion='log_loss')
clf_my.fit(X_train, y_train)
y_pred_my = clf_my.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_my)
print(f'自实现准确率：{accuracy:.2f}')

标准包准确率：0.78
自实现准确率：0.78


### 3.3 使用 gini 作为划分标准

In [23]:
clf_std = DecisionTreeClassifier(criterion='gini')
clf_std.fit(X_train, y_train)
y_pred_std = clf_std.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_std)
print(f'标准包准确率：{accuracy:.2f}')


clf_my = MyDecisionTreeClassifier(criterion='gini')
clf_my.fit(X_train, y_train)
y_pred_my = clf_my.predict(X_test)
accuracy = accuracy_score(y_test, y_pred_my)
print(f'自实现准确率：{accuracy:.2f}')

标准包准确率：0.78
自实现准确率：0.75
