In [123]:
import numpy as np
import pandas as pd
import copy
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from collections import Counter

## Load Data (例5.1)

In [124]:
# 书上题目5.1
def create_data():
    dataset = [['青年', '否', '否', '一般', '否'],
               ['青年', '否', '否', '好', '否'],
               ['青年', '是', '否', '好', '是'],
               ['青年', '是', '是', '一般', '是'],
               ['青年', '否', '否', '一般', '否'],
               ['中年', '否', '否', '一般', '否'],
               ['中年', '否', '否', '好', '否'],
               ['中年', '是', '是', '好', '是'],
               ['中年', '否', '是', '非常好', '是'],
               ['中年', '否', '是', '非常好', '是'],
               ['老年', '否', '是', '非常好', '是'],
               ['老年', '否', '是', '好', '是'],
               ['老年', '是', '否', '好', '是'],
               ['老年', '是', '否', '非常好', '是'],
               ['老年', '否', '否', '一般', '否'],
               ]
    label = [u'年龄', u'有工作', u'有自己的房子', u'信贷情况', u'类别']
    # 返回数据集和每个维度的名称
    return dataset, label

In [125]:
data_set, label = create_data()
train_data = pd.DataFrame(data_set, columns=label)
# print(np.array(train_data.groupby('年龄')['类别'].apply(lambda x: np.array(x))))
X_train, y_train = train_data.iloc[:, :4], train_data.iloc[:, -1]

## Load Data(Iris)

In [126]:
iris = load_iris()
df_iris = pd.DataFrame(iris.data, columns=iris.feature_names)
df_iris['label'] = iris.target
df_iris.columns = ['sepal length', 'sepal width', 'petal length', 'petal width', 'label']
iris_X, iris_y = df_iris.iloc[:, :4], df_iris.iloc[:, -1]

## 熵函数和基尼指数

In [127]:
def entropy(label):
    """
    计算数据集的熵
    label: 数据集样本的类别
    """
    label = np.array(label)
    hist = np.array(list(Counter(label).values()))
    ps = hist / np.sum(hist)
    return -np.sum([p * np.log2(p) for p in ps if p > 0])

def conditional_entropy(label):
    """
    计算数据集的条件熵
    label: 按特征值分组后的每个子数据集的类别集合
    """
    label = np.array(label)
    N = 0
    for i in label:
        N += len(i)
    cond_ent = 0
    for i in label:
        cond_ent += (len(i) / N) * entropy(i)
    return cond_ent

def info_gain(label_father, label_child):
    """
    计算信息增益
    """
    return entropy(label_father) - conditional_entropy(label_child)

def info_gain_ratio(label_father, label_child):
    """
    计算信息增益比
    """
    info_gain_res = info_gain(label_father, label_child)
    label_father = np.array(label_father)
    N = 0
    for i in label:
        N += len(i)
    cond_ent = 0
    for i in label:
        cond_ent += -(len(i) / N) * np.log2(len(i) / N)
    feature_entropy = info_gain_res / cond_ent
    return feature_entropy

def gini(label):
    """
    计算基尼指数
    label: 按特征值分组后的每个子数据集的类别集合
    """
    label = np.array(label)
    N = 0
    for i in label:
        N += len(i)
    gini = 0
    for i in label:
        num = np.array(list(dict(Counter(i)).values()))
        p = num / len(i)
        gini_i = sum(p * (1 - p))
        gini += (len(i) / N) * gini_i
    return gini

## ID3 C4.5 CART

In [128]:
class Node:
    def __init__(self, leaf=True, label=None, feature_name=None, feature_value=None, data_label=None):
        """
        leaf: 是否是叶节点
        label: 当前叶节点所属类
        feature_name: 节点特征名称
        feature_value: 节点特征分类值
        """
        self.leaf = leaf
        self.label = label
        self.feature_name = feature_name
        self.feature_value = feature_value
        self.tree = {}
        self.data_label = data_label
        
    def addNode(self, val, node):
        """
        val: 节点对应的特征值
        node: 节点
        """
        self.leaf = False
        self.tree[val] = node
        
    def predict(self, features):
        """
        feature: 特征向量
        """
        node = self
        while True:
            if node.leaf is True:
                return node.label
            node = node.tree[features[node.feature_name]]
            
    def score(self, X_test, y_test):
        """
        得分
        X_test: 测试集样本特征
        y_test: 测试集样本类别
        """
        accurate_count = 0
        N = 0
        for i in range(len(X_test)):
            if self.predict(dict(X_test.iloc[i])) == y_test.iloc[i]:
                accurate_count += 1
            N += 1
        return accurate_count / N

In [129]:
class DTree:
    def __init__(self, epsilon=0.01, ID3=True, C45=False, CART=False):
        self.epsilon = epsilon
        self.ID3 = ID3
        self.C45 = C45
        self.CART = CART
        self._tree = {}
        
    def info_gain_train(self, dataset, feature_names):
        """
        返回当前数据集信息增益最大的特征名称，以及信息增益
        dataset(DataFrame): 数据集
        """
        label_name = dataset.columns[-1]
        labels = dataset.iloc[:, -1]
        dataset_ent = entropy(labels)
        min_cond_entropy = float(np.inf)
        best_feature = None
        for feature in feature_names:
            label_splited = np.array(dataset.groupby(feature)[label_name].apply(lambda x: np.array(x)))
            cond_entropy = conditional_entropy(label_splited)
            if cond_entropy < min_cond_entropy:
                min_cond_entropy = cond_entropy
                best_feature  = feature
        return best_feature, (dataset_ent - min_cond_entropy)
    
    def info_gain_ratio_train(self, dataset, feature_names):
        """
        返回当前数据集信息增益比最大的特征名称，以及信息增益比
        dataset(DataFrame): 数据集
        """
        label_name = dataset.columns[-1]
        labels = dataset.iloc[:, -1]
        max_info_gain_ratio = float(-np.inf)
        best_feature = None
        for feature in feature_names:
            label_splited = np.array(dataset.groupby(feature)[label_name].apply(lambda x: np.array(x)))
            res = info_gain_ratio(labels, label_splited)
            if res > max_info_gain_ratio:
                max_info_gain_ratio = res
                best_feature  = feature
        return best_feature, max_info_gain_ratio
    
    def gini_train(self, dataset, feature_names):
        """
        返回当前数据集尼基指数最小的特征名称，以及基尼指数
        dataset(DataFrame): 数据集
        """
        label_name = dataset.columns[-1]
        min_gini = float(np.inf)
        best_feature = None
        for feature in feature_names:
            label_splited = np.array(dataset.groupby(feature)[label_name].apply(lambda x: np.array(x)))
            res = gini(label_splited)
            if res < min_gini:
                min_gini = res
                best_feature = feature
        return best_feature, min_gini
    
    def train(self, dataset, features):
        """
        生成树辅助函数
        dataset(DataFrame): 数据集
        """
        y_train = dataset.iloc[:, -1]
        if len(y_train.value_counts()) == 1:
            return Node(leaf=True, label=y_train.iloc[0], data_label=y_train)
        if len(features) == 0:
            return Node(leaf=True, label=y_train.value_counts().sort_values(ascending=False).index[0], data_label=y_train)
        
        if self.ID3==True and self.C45==False and self.CART==False:
            best_feature, max_info_gain = self.info_gain_train(dataset, features)
        elif self.ID3==False and self.C45==True and self.CART==False:
            best_feature, max_info_gain = self.info_gain_ratio_train(dataset, features)
        elif self.ID3==False and self.C45==False and self.CART==True:
            best_feature, max_info_gain = self.gini_train(dataset, features)
        else:
            raise('Didn\'t set the right decision tree type.')
        
        if max_info_gain < self.epsilon and not (self.ID3==False and self.C45==False and self.CART==True):
            return Node(leaf=True, label=y_train.value_counts().sort_values(ascending=False).index[0], data_label=y_train)
        
        
        node_tree = Node(leaf=False, feature_name=best_feature, data_label=y_train)
        feature_values = dataset.loc[:, best_feature].value_counts().index
        print(best_feature)
        features.remove(best_feature)
        for value in feature_values:
            sub_dataset = dataset[dataset[best_feature] == value]
            sub_tree = self.train(sub_dataset, features)
            node_tree.addNode(value, sub_tree)
            
        return node_tree
    
    def fit(self, dataset):
        """
        生成树
        dataset(DataFrame): 数据集
        """
        features = list(dataset.columns[:-1])
        self._tree = self.train(dataset, features)
        return self._tree
    
    def predict(self, feature):
        """
        预测
        feature(DataFrame): 特征向量
        """
        feature = dict(feature)
        return self._tree.predict(feature)
    
    def score(self, X_test, y_test):
        """
        得分
        X_test: 测试集样本特征
        y_test: 测试集样本类别
        """
        accurate_count = 0
        N = 0
        for i in range(len(X_test)):
            if self.predict(X_test.iloc[i]) == y_test.iloc[i]:
                accurate_count += 1
            N += 1
        return accurate_count / N

In [130]:
dt = DTree(ID3=True, C45=False, CART=False)
dt.fit(train_data)
dt.predict(train_data.iloc[-1])

有自己的房子
有工作


'否'

In [131]:
dt = DTree(ID3=False, C45=True, CART=False)
dt.fit(train_data)
dt.predict(train_data.iloc[-1])

有自己的房子
有工作


'否'

In [132]:
dt = DTree(ID3=False, C45=False, CART=True)
dt.fit(train_data)
dt.predict(train_data.iloc[-1])

有自己的房子
有工作


'否'

## 剪枝

In [133]:
def cal_alpha(decision_tree):
    num_leaves = 0
    leaves_cost = 0
    single_node_cost = 0
    traversal_list = []
    traversal_list.append(decision_tree)
    while len(traversal_list) > 0:
        current_node = traversal_list[0]
        for node in current_node.tree.values():
            traversal_list.append(node)
        if current_node.leaf == True:
            node_entropy = entropy(current_node.data_label)
            leaves_cost += len(current_node.data_label) * node_entropy
            num_leaves += 1
        traversal_list.remove(current_node)
    single_node_cost = len(decision_tree.data_label) * entropy(decision_tree.data_label)
    alpha = (single_node_cost - leaves_cost) / (num_leaves - 1)
    return alpha

In [134]:
def post_pruning_helper(decision_tree):
    min_alpha = float(np.inf)
    pruning_node = None
    traversal_list = []
    for node in decision_tree.tree.values():
        traversal_list.append(node)
    while len(traversal_list) > 0:
        current_node = traversal_list[0]
        for node in current_node.tree.values():
            if not node.leaf:
                traversal_list.append(node)
        alpha = cal_alpha(current_node)
        if alpha < min_alpha:
            min_alpha = alpha
            pruning_node = current_node
        traversal_list.remove(current_node)
    pruning_node.leaf = True
    return decision_tree
    
def judge(decision_tree):
    """
    To judge whether the children of decision tree are all leaves.
    """
    for child in decision_tree.tree.values():
        if child.leaf == False:
            return False
    return True

def post_pruning(decision_tree, test_data=[], test_label=[]):
    """
    对决策树进行后剪枝操作
    :param decision_tree: 决策树根节点
    :param test_data: 测试数据集
    :param test_label: 测试数据集的标签
    :param train_label: 训练数据集的标签
    :return:
    """
    max_acc = 0
    best_tree = None
    sub_tree = [decision_tree]
    while not judge(decision_tree):
        decision_tree = post_pruning_helper(copy.deepcopy(decision_tree))
        sub_tree.append(decision_tree)
        
    for tree in sub_tree:
        acc = tree.score(test_data, test_label)
        if acc > max_acc:
            max_acc = acc
            best_tree = tree
    
    return best_tree

In [135]:
dt = DTree(ID3=False, C45=False, CART=True)
dt.fit(train_data)
dt = post_pruning(dt._tree, X_train, y_train)
dt.predict(train_data.iloc[-1])

有自己的房子
有工作




'否'

## sklearn

In [136]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.tree import export_graphviz

In [137]:
X_train , X_test, y_train, y_test = train_test_split(iris_X, iris_y, test_size=0.3)
clf = DecisionTreeClassifier()
clf.fit(X_train, y_train)

DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                       max_depth=None, max_features=None, max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, presort='deprecated',
                       random_state=None, splitter='best')

In [138]:
clf.score(X_test, y_test)

0.9555555555555556

In [139]:
with open('tree_pic.dot', 'w') as f:
    f = export_graphviz(clf, out_file=f)

参考代码: https://github.com/fengdu78/lihang-code

python: 3.7.6