# 决策树模型构建

In [30]:
#import necessary lib
from math import log
import operator
import numpy as np


class Decision_Tree_C45:
    # define entropy calculation
    def Entropy(self, train_data):
        inst_num = len(train_data)  # instances number
        label_counts = {}  # count instances of each class
        for i in range(inst_num):
            label = train_data[i][-1]  #get instance class
            if label not in label_counts.keys():
                label_counts[label] = 0
            label_counts[label] += 1  #count
        ent = 0
        for key in label_counts.keys():
            #calculate each class proportion
            prob = float(label_counts[key]) / inst_num
            ent -= prob * log(prob, 2)  # see Eq.(3.1)
        return ent

    #split data according to feature and feature value
    def split_data(self, train_data, feature_index, feature_value, feature_type):
        splitedData = []  # store splited data
        if feature_type == "D":  # for discrete feature
            for feat_vect in train_data:
                if feat_vect[feature_index] == feature_value:
                    reducedVect = []
                    #delete used discrete feature from data
                    for i in range(len(feat_vect)):
                        if i < feature_index or i > feature_index:
                            reducedVect.append(feat_vect[i])
                    splitedData.append(reducedVect)
        if feature_type == "L":  #for continous feature
            for feat_vect in train_data:
                if feat_vect[feature_index] <= feature_value:
                    splitedData.append(feat_vect)
        if feature_type == "R":  #for continous feature
            for feat_vect in train_data:
                if feat_vect[feature_index] > feature_value:
                    splitedData.append(feat_vect)
        return splitedData

    #choose best feature to split
    def choose_split_feature(self, train_data):
        feat_num = len(train_data[0]) - 1  # get available features
        base_ent = self.Entropy(train_data)
        bestInforGain = 0.0
        best_feat_index = -1
        best_feat_value = 0
        for i in range(feat_num):
            if isinstance(train_data[0][i], str):  #for discrete feature
                feat_list = [example[i] for example in train_data]
                unique_values = set(feat_list)
                newEnt = 0
                for value in unique_values:
                    sub_data = self.split_data(train_data, i, value, "D")
                    prop = float(len(sub_data)) / len(train_data)
                    newEnt += prop * self.Entropy(sub_data)  #see Eq.(3.2)
                    inforgain = base_ent - newEnt
                    if inforgain > bestInforGain:
                        best_feat_index = i
                        bestInforGain = inforgain
                else:  #for continous feature
                    feat_list = [example[i] for example in train_data]
                    unique_values = set(feat_list)
                    sort_unique_values = sorted(unique_values)
                    minEnt = np.inf
                    for j in range(len(sort_unique_values) - 1):
                        div_value = (sort_unique_values[j] + sort_unique_values[j + 1]) / 2
                        sub_data_left = self.split_data(train_data, i, div_value, "L")
                        sub_data_right = self.split_data(train_data, i, div_value, "R")
                        prop_left = float(len(sub_data_left)) / len(train_data)
                        prop_right = float(len(sub_data_right)) / len(train_data)
                        ent = prop_left * self.Entropy(sub_data_left) + \
                              prop_right * self.Entropy(sub_data_right)  #see Eq.(3.6)
                        if ent < minEnt:
                            minEnt = ent
                            best_feat_value = div_value
                    inforgain = base_ent - minEnt
                    if inforgain > bestInforGain:
                        bestInforGain = inforgain
                        best_feat_index = i
            return best_feat_index, best_feat_value

    # get major class
    def get_major_class(self, classList):
        classcount = {}
        for vote in classList:
            if vote not in classcount.key():
                classcount[vote] = 0
            classcount[vote] += 1
            sortedclasscount = sorted(classcount.iteritems(), key=operator.itemgetter(1), reverse=True)
            major = sortedclasscount[0][0]
            return major

    # create decision tree
    def create_decision_tree(self, train_data, feat_names):
        classList = [example[-1] for example in train_data]
        if classList.count(classList[0]) == len(classList):  #see condition A
            return classList[0]
        if len(train_data[0]) == 1:  #see condition B
            return self.get_major_class(classList)
        if len(train_data) == 0:  #see condition C
            return
        # choose best division feature
        best_feat, best_div_value = self.choose_split_feature(train_data)
        if isinstance(train_data[0][best_feat], str):  # for discrete feature
            feat_name = feat_names[best_feat]
            tree_model = {feat_name: {}}  # generate a root node
            del (feat_names[best_feat])  # del feature used
            feat_values = [example[best_feat] for example in train_data]
            unique_feat_values = set(feat_values)
            #create a node for each value of the best feature
            for value in unique_feat_values:
                sub_feat_names = feat_names[:]
                tree_model[feat_name][value] = self.create_decision_tree(self.split_data(train_data,
                                                                                         best_feat, value, "D"),
                                                                         sub_feat_names)
        else:  #for contiunous feature
            best_feat_name = feat_names[best_feat] + "<" + str(best_div_value)
            tree_model = {best_feat_name: {}}  # generate a root node
            sub_feat_names = feat_names
            # generate left node
            tree_model[best_feat_name]["Y"] = self.create_decision_tree(self.split_data(train_data,
                                                                                        best_feat,
                                                                                        best_div_value, "L"),
                                                                        sub_feat_names)
            #generate right node
            tree_model[best_feat_name]["N"] = self.create_decision_tree(self.split_data(train_data,
                                                                                        best_feat,
                                                                                        best_div_value, "R"),
                                                                        sub_feat_names)
        return tree_model

    #define predict function
    def predict(self, tree_model, feat_names, feat_vect):
        firstStr = list(tree_model.keys())[0]  # get tree root
        lessIndex = str(firstStr).find('<')
        if lessIndex > -1:  # if root is a continous feature
            # recursively search untill leaft node
            secondDict = tree_model[firstStr]
            feat_name = str(firstStr)[:lessIndex]
            featIndex = feat_names.index(feat_name)
            div_value = float(str(firstStr)[lessIndex + 1:])
            if feat_vect[featIndex] <= div_value:
                if isinstance(secondDict["Y"], dict):
                    classLabel = self.predict(secondDict["Y"],
                                              feat_names, feat_vect)
                else:
                    classLabel = secondDict["Y"]
            else:
                if isinstance(secondDict["N"], dict):
                    classLabel = self.predict(secondDict["N"],
                                              feat_names, feat_vect)
                else:
                    classLabel = secondDict["N"]
            return classLabel
        else:  #if root is a discrete feature
            # recursively search untill leaft node
            secondDict = tree_model[firstStr]
            featIndex = feat_names.index(firstStr)
            key = feat_vect[featIndex]
            valueOfFeat = secondDict[key]
            if isinstance(valueOfFeat, dict):
                classLabel = self.predict(valueOfFeat, feat_names, feat_vect)
            else:
                classLabel = valueOfFeat
            return classLabel


# 案例实践

In [31]:

#导入数据集
#Add code here to load data from file and preprocess data
import pandas as pd
from sklearn.model_selection import train_test_split

data = pd.read_csv("Client_Info.csv", encoding='gb2312')
data = np.array(data)
feat_names = ['x1', 'x2', 'x3', 'x4', 'x5']
train, test = train_test_split(data, test_size=0.3, random_state=2022)

#Add code here to build decision tree model using Decision_Tree_C45
model = Decision_Tree_C45()
tree = model.create_decision_tree(train, feat_names)
pred_labels = []
for i in range(len(test)):
    label = model.predict(tree, feat_names, test[i])
    pred_labels.append(label)
acc = 0
for i in range(len(test)):
    if pred_labels[i] == test[i, -1]:
        acc += 1.0
print("Decision_Tree_C45 accuracy:%.3f" % (acc / len(test)))





# s-klearn包
from sklearn.tree import DecisionTreeClassifier

#Add code here to build decision tree model using sklearn
model_skl = DecisionTreeClassifier(criterion="entropy")
model_skl.fit(train[:, 0:-1], train[:, -1])
pred_labels_skl = model_skl.predict(test[:, 0:-1])
acc = 0
for i in range(len(test)):
    if pred_labels_skl[i] == test[i, -1]:
        acc += 1.0
print("Decision_Tree_SKL accuracy:%.3f" % (acc / len(test)))

Decision_Tree_C45 accuracy:0.760
Decision_Tree_SKL accuracy:0.757


# 拓展训练

In [33]:
# -*- coding: utf-8 -*-
#import necessary lib
from math import log
import operator
import numpy as np


class Decision_Tree_C45:
    def __init__(self, max_depth=float('inf'), min_samples_split=2, criterion='gini'):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.criterion = criterion  # "gini" or "entropy"

    # define entropy calculation
    def Entropy(self, train_data):
        inst_num = len(train_data)  # instances number
        label_counts = {}  # count instances of each class
        for i in range(inst_num):

            label = train_data[i][-1]  #get instance class
            if label not in label_counts.keys():
                label_counts[label] = 0
            label_counts[label] += 1  #count
        ent = 0
        for key in label_counts.keys():
            #calculate each class proportion
            prob = float(label_counts[key]) / inst_num
            ent -= prob * log(prob, 2)  # see Eq.(3.1)
        return ent
        #split data according to feature and feature value

    def split_data(self, train_data, feature_index, feature_value, feature_type):
        splitedData = []  # store splited data
        if feature_type == "D":  # for discrete feature
            for feat_vect in train_data:
                if feat_vect[feature_index] == feature_value:
                    reducedVect = []
                    #delete used discrete feature from data
                    for i in range(len(feat_vect)):
                        if i < feature_index or i > feature_index:
                            reducedVect.append(feat_vect[i])
                    splitedData.append(reducedVect)
        if feature_type == "L":  #for continous feature
            for feat_vect in train_data:
                if feat_vect[feature_index] <= feature_value:
                    splitedData.append(feat_vect)
        if feature_type == "R":  #for continous feature
            for feat_vect in train_data:
                if feat_vect[feature_index] > feature_value:
                    splitedData.append(feat_vect)
        return splitedData

    # #choose best feature to split
    # def choose_split_feature(self, train_data, depth):
    #     # 检查最大深度和最小样本数的条件
    #     if depth >= self.max_depth or len(train_data) < self.min_samples_split:
    #         return None, None
    #
    #     # 选择划分标准：基尼指数或熵
    #     if self.criterion == 'gini':
    #         base_measure = self.Gini(train_data)
    #     else:
    #         base_measure = self.Entropy(train_data)
    #
    #     feat_num = len(train_data[0]) - 1  # get available features
    #     base_ent = self.Entropy(train_data)
    #     bestInforGain = 0.0
    #     best_feat_index = -1
    #     best_feat_value = 0
    #     for i in range(feat_num):
    #         if isinstance(train_data[0][i], str):  #for discrete feature
    #             feat_list = [example[i] for example in train_data]
    #             unique_values = set(feat_list)
    #             newEnt = 0
    #             for value in unique_values:
    #                 sub_data = self.split_data(train_data, i, value, "D")
    #                 prop = float(len(sub_data)) / len(train_data)
    #                 newEnt += prop * self.Entropy(sub_data)  #see Eq.(3.2)
    #                 inforgain = base_ent - newEnt
    #                 if inforgain > bestInforGain:
    #                     best_feat_index = i
    #                     bestInforGain = inforgain
    #             else:  #for continous feature
    #                 feat_list = [example[i] for example in train_data]
    #                 unique_values = set(feat_list)
    #                 sort_unique_values = sorted(unique_values)
    #                 minEnt = np.inf
    #                 for j in range(len(sort_unique_values) - 1):
    #                     div_value = (sort_unique_values[j] + sort_unique_values[j + 1]) / 2
    #                     sub_data_left = self.split_data(train_data, i, div_value, "L")
    #                     sub_data_right = self.split_data(train_data, i, div_value, "R")
    #                     prop_left = float(len(sub_data_left)) / len(train_data)
    #                     prop_right = float(len(sub_data_right)) / len(train_data)
    #                     ent = prop_left * self.Entropy(sub_data_left) + \
    #                           prop_right * self.Entropy(sub_data_right)  #see Eq.(3.6)
    #                     if ent < minEnt:
    #                         minEnt = ent
    #                         best_feat_value = div_value
    #                 inforgain = base_ent - minEnt
    #                 if inforgain > bestInforGain:
    #                     bestInforGain = inforgain
    #                     best_feat_index = i
    #         return best_feat_index, best_feat_value


    def choose_split_feature(self, train_data, depth):
        # 检查最大深度和最小样本数的条件
        if depth >= self.max_depth or len(train_data) < self.min_samples_split:
            return None, None

        feat_num = len(train_data[0]) - 1  # 获取可用特征的数量
        bestInforGain = 0.0
        best_feat_index = -1
        best_feat_value = None

        # 根据选择的标准计算基础度量（基尼指数或熵）
        if self.criterion == 'gini':
            base_measure = self.Gini(train_data)
        else:
            base_measure = self.Entropy(train_data)

        for i in range(feat_num):
            feat_list = [example[i] for example in train_data]
            unique_values = set(feat_list)

            for value in unique_values:
                sub_data = self.split_data(train_data, i, value, "D")
                prop = float(len(sub_data)) / len(train_data)
                if self.criterion == 'gini':
                    new_measure = prop * self.Gini(sub_data)
                else:
                    new_measure = prop * self.Entropy(sub_data)

                inforgain = base_measure - new_measure
                if inforgain > bestInforGain:
                    bestInforGain = inforgain
                    best_feat_index = i
                    best_feat_value = value

        return best_feat_index, best_feat_value

    # get major class
    def get_major_class(self, classList):
        classcount = {}
        for vote in classList:
            if vote not in classcount:
                classcount[vote] = 0
            classcount[vote] += 1

        sortedclasscount = sorted(classcount.items(), key=operator.itemgetter(1), reverse=True)
        major = sortedclasscount[0][0]
        return major

    # # create decision tree
    # def create_decision_tree(self, train_data, feat_names):
    #     classList = [example[-1] for example in train_data]
    #     if classList.count(classList[0]) == len(classList):  #see condition A
    #         return classList[0]
    #     if len(train_data[0]) == 1:  #see condition B
    #         return self.get_major_class(classList)
    #     if len(train_data) == 0:  #see condition C
    #         return
    #     # choose best division feature
    #     best_feat, best_div_value = self.choose_split_feature(train_data)
    #     if isinstance(train_data[0][best_feat], str):  # for discrete feature
    #         feat_name = feat_names[best_feat]
    #         tree_model = {feat_name: {}}  # generate a root node
    #         del (feat_names[best_feat])  # del feature used
    #         feat_values = [example[best_feat] for example in train_data]
    #         unique_feat_values = set(feat_values)
    #         #create a node for each value of the best feature
    #         for value in unique_feat_values:
    #             sub_feat_names = feat_names[:]
    #             tree_model[feat_name][value] = self.create_decision_tree(self.split_data(train_data,
    #                                                                                      best_feat, value, "D"),
    #                                                                      sub_feat_names)
    #     else:  #for contiunous feature
    #         best_feat_name = feat_names[best_feat] + "<" + str(best_div_value)
    #         tree_model = {best_feat_name: {}}  # generate a root node
    #         sub_feat_names = feat_names
    #         # generate left node
    #         tree_model[best_feat_name]["Y"] = self.create_decision_tree(self.split_data(train_data,
    #                                                                                     best_feat,
    #                                                                                     best_div_value, "L"),
    #                                                                     sub_feat_names)
    #         #generate right node
    #         tree_model[best_feat_name]["N"] = self.create_decision_tree(self.split_data(train_data,
    #                                                                                     best_feat,
    #                                                                                     best_div_value, "R"),
    #                                                                     sub_feat_names)
    #     return tree_model
    def create_decision_tree(self, train_data, feat_names, depth=0):
        # 检查是否所有的类标签都相同
        classList = [example[-1] for example in train_data]
        if classList.count(classList[0]) == len(classList):
            return classList[0]

        # 检查是否还有可用的特征或者是否达到最大深度
        if len(train_data[0]) == 1 or depth >= self.max_depth:
            return self.get_major_class(classList)

        # 选择最佳分割特征
        best_feat, best_div_value = self.choose_split_feature(train_data)
        if best_feat is None:  # 如果没有更多特征可用于进一步分割
            return self.get_major_class(classList)

        # 根据最佳特征的类型进行分割
        if isinstance(train_data[0][best_feat], str):  # 离散特征
            feat_name = feat_names[best_feat]
            tree_model = {feat_name: {}}
            del feat_names[best_feat]  # 删除已使用的特征
            unique_feat_values = set([example[best_feat] for example in train_data])
            for value in unique_feat_values:
                sub_feat_names = feat_names[:]  # 复制特征列表
                sub_data = self.split_data(train_data, best_feat, value, "D")
                tree_model[feat_name][value] = self.create_decision_tree(sub_data, sub_feat_names, depth + 1)
        else:  # 连续特征
            feat_name = feat_names[best_feat] + "<=" + str(best_div_value)
            tree_model = {feat_name: {}}
            sub_feat_names = feat_names[:]
            # 分割为左右子树
            left_subtree = self.split_data(train_data, best_feat, best_div_value, "L")
            right_subtree = self.split_data(train_data, best_feat, best_div_value, "R")
            tree_model[feat_name]["yes"] = self.create_decision_tree(left_subtree, sub_feat_names, depth + 1)
            tree_model[feat_name]["no"] = self.create_decision_tree(right_subtree, sub_feat_names, depth + 1)

        return tree_model

    #define predict function
    def predict(self, tree_model, feat_names, feat_vect):
        firstStr = list(tree_model.keys())[0]  # get tree root
        lessIndex = str(firstStr).find('<')
        if lessIndex > -1:  # if root is a continous feature
            # recursively search untill leaft node
            secondDict = tree_model[firstStr]
            feat_name = str(firstStr)[:lessIndex]
            featIndex = feat_names.index(feat_name)
            div_value = float(str(firstStr)[lessIndex + 1:])
            if feat_vect[featIndex] <= div_value:
                if isinstance(secondDict["Y"], dict):
                    classLabel = self.predict(secondDict["Y"],
                                              feat_names, feat_vect)
                else:
                    classLabel = secondDict["Y"]
            else:
                if isinstance(secondDict["N"], dict):
                    classLabel = self.predict(secondDict["N"],
                                              feat_names, feat_vect)
                else:
                    classLabel = secondDict["N"]
            return classLabel
        else:  #if root is a discrete feature
            # recursively search untill leaft node
            secondDict = tree_model[firstStr]
            featIndex = feat_names.index(firstStr)
            key = feat_vect[featIndex]
            valueOfFeat = secondDict[key]
            if isinstance(valueOfFeat, dict):
                classLabel = self.predict(valueOfFeat, feat_names, feat_vect)
            else:
                classLabel = valueOfFeat
            return classLabel

    def Gini(self, train_data):
        inst_num = len(train_data)
        label_counts = {}
        for feat_vect in train_data:
            label = feat_vect[-1]
            if label not in label_counts:
                label_counts[label] = 0
            label_counts[label] += 1
        gini = 1.0
        for key in label_counts:
            prob = float(label_counts[key]) / inst_num
            gini -= prob ** 2
        return gini


In [34]:
# -*- coding: utf-8 -*-

#Add code here to load data from file and preprocess data
import pandas as pd
from sklearn.model_selection import train_test_split

data = pd.read_csv("Client_Info.csv", encoding='gb2312')
data = np.array(data)
feat_names = ['x1', 'x2', 'x3', 'x4', 'x5']
train, test = train_test_split(data, test_size=0.3, random_state=2021)
#Add code here to build decision tree model using Decision_Tree_C45
model = Decision_Tree_C45(max_depth=5, min_samples_split=10, criterion='gini')
tree = model.create_decision_tree(train, feat_names)
# model = Decision_Tree_C45()
# tree = model.create_decision_tree(train, feat_names)
pred_labels = []
for i in range(len(test)):
    label = model.predict(tree, feat_names, test[i])
    pred_labels.append(label)
acc = 0
for i in range(len(test)):
    if pred_labels[i] == test[i, -1]:
        acc += 1.0
print("Decision_Tree_C45 accuracy:%.3f" % (acc / len(test)))
from sklearn.tree import DecisionTreeClassifier

#Add code here to build decision tree model using sklearn
model_skl = DecisionTreeClassifier(criterion="entropy")
model_skl.fit(train[:, 0:-1], train[:, -1])
pred_labels_skl = model_skl.predict(test[:, 0:-1])
acc = 0
for i in range(len(test)):
    if pred_labels_skl[i] == test[i, -1]:
        acc += 1.0
print("Decision_Tree_SKL accuracy:%.3f" % (acc / len(test)))

TypeError: choose_split_feature() missing 1 required positional argument: 'depth'

# 测试

In [29]:
train[-1][-1]

1

第一个控制行，第二个控制列（-1代表最后一列）