In [1]:
import pandas as pd
import numpy as np
import logging
from sklearn.model_selection import train_test_split

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s-[%(name)s]\t[%(levelname)s]\t[%(funcName)s]: %(message)s")
"""
application: Decision_tree-ID3
writer: Flysky
Date: 2020年10月14日
"""


class DecisionTree:
    def __init__(self):
        self.DataSet = None
        self._threshold = 0.1
        self._leafCount = 0
        self.decision_tree = None
        self.y_predict = None

    def check_dataset(self, dataset: pd.DataFrame, dimension=2):
        if len(dataset.shape) != dimension:
            raise ValueError(f"data dimension not {dimension} but {len(dataset.shape)}")

    def empirical_entropy(self, dataset=None):
        """
        求经验熵
        $$H = -\sum^n_{i=1}p(x_i)log_2(p(x_i))$$
        :return: Float 经验熵
        """
        if dataset is None:
            dataset = self.DataSet
        columns_count = dataset.iloc[:, -1].value_counts()
        entropy = 0
        total_count = columns_count.sum()
        for count in columns_count:
            p = count / total_count
            entropy -= p * np.log2(p)
        return entropy

    def extract_dataset(self, dataset: pd.DataFrame, column, label):
        """
        根据column和label筛选出指定的数据集
        :return: pd.DataFrame 筛选后的数据集
        """
        if type(column) == int:
            split_dataset = dataset[dataset.iloc[:, column] == label].drop(dataset.columns[column], axis=1)
        else:
            split_dataset = dataset[dataset.loc[:, column] == label].drop(column, axis=1)
        return split_dataset

    def best_empirical_entropy(self, dataset: pd.DataFrame = None):
        """
        选取数据集中的columns中，最好的column(经验熵最大）
        :param dataset: 带选取的数据集
        :return: 返回column
        """
        if dataset is None:
            dataset = self.DataSet
        columns = dataset.columns[:-1]
        total_count = dataset.shape[0]
        empirical_entropy = self.empirical_entropy(dataset)
        logging.debug(f"now dataset shape is {dataset.shape}, column is {dataset.columns.tolist()}")
        logging.debug(f"empirical_entropy is {empirical_entropy}")
        informationGain_max = -1
        best_column = None
        for column in columns:
            entropy_tmp = 0
            data_counts = dataset.loc[:, column].value_counts()
            data_labels = data_counts.index
            logging.debug(f"now is {column}")
            for label in data_labels:
                split_dataset = self.extract_dataset(dataset, column, label)
                count = split_dataset.shape[0]
                p = count / total_count
                entropy_tmp += p * self.empirical_entropy(split_dataset)
                logging.debug(f"now label is {label}, chooseData shape is {split_dataset.shape}, "
                              f"Ans count: {split_dataset.iloc[:, -1].value_counts().tolist()}, "
                              f"entropy: {self.empirical_entropy(split_dataset)}")
            informationGain = empirical_entropy - entropy_tmp
            logging.debug(f"entropy： {entropy_tmp}, {column} informationGain:{informationGain}")
            if informationGain > informationGain_max:
                best_column = column
                informationGain_max = informationGain
        logging.debug(f"Choose {best_column}:{informationGain_max}")
        return best_column

    def id3(self, dataset: pd.DataFrame = None):
        '''
        实现决策树的ID3算法
        :param dataset: 输入的数据集
        :return: dict 决策树节点
        '''
        if dataset is None:
            dataset = self.DataSet
        next_tree = {}
        result_count = dataset.iloc[:, -1].value_counts()
        result_max = result_count.idxmax()
        next_tree["其他"] = result_max
        if result_count.shape[0] == 1 or dataset.shape[1] < 2 or self.empirical_entropy(dataset) < self._threshold:
            self._leafCount += 1
            logging.debug(
                f"select decision {result_max}, result_type:{result_count.tolist()}, dataset column:{dataset.shape}, lower than threshold:{self.empirical_entropy(dataset) < self._threshold}")
            tree = {"next": next_tree}
        else:
            best_column = self.best_empirical_entropy(dataset)
            value_counts = dataset[best_column].value_counts()
            labels = value_counts.index
            for label in labels:
                logging.debug(f"now choose_column:{best_column}, label: {label}")
                split_dataset = self.extract_dataset(dataset, best_column, label)
                next_decision = self.id3(split_dataset)
                next_tree[label] = next_decision
            tree = {"column": best_column, "next": next_tree}
        return tree

    def fit(self, x: pd.DataFrame, y=None, algorithm: str = "id3", threshold=0.1):
        '''
        拟合函数，输入数据集进行拟合，其中如果y没有输入，则x的最后一列应包含分类结果
        :param x: pd.DataFrame数据集的属性（当y为None时，为整个数据集-包含结果）
        :param y: list like,shape=(-1,)数据集的结果
        :param algorithm: 选择算法（目前仅有ID3）
        :param threshold: 选择信息增益的阈值
        :return: 决策树的根节点
        '''
        self.check_dataset(x, dimension=2)
        self.check_dataset(y, dimension=1)
        self._threshold = threshold
        dataset = x
        if y is not None:
            dataset.insert(dataset.shape[1], 'DECISION_tempADD', y)
        self.decision_tree = eval("self." + algorithm)(dataset)
        logging.info(f"decision_tree leaf:{self._leafCount}")
        return self.decision_tree

    def leaf_count(self):
        '''
        统计叶子节点个数（此处的叶子节点即能确定分类的属性值所对应的分类结果值
        :return: 叶子节点个数
        '''
        return self._leafCount

    def predict(self, x: pd.DataFrame):
        '''
        预测数据
        :param x:pd.DataFrame 输入的数据集
        :return: 分类结果
        '''
        self.y_predict = x.apply(self._predict_line, axis=1)
        return self.y_predict

    def _predict_line(self, line):
        """
        私有函数，用于在predict中，对每一行数据进行预测
        :param line: 输入的数据集的某一行数据
        :return: 该一行的分类结果
        """
        tree = self.decision_tree
        while True:
            try:
                if len(tree["next"]) == 1:
                    return tree["next"]["其他"]
                else:
                    value = line[tree["column"]]
                    tree = tree["next"][value]
            except:
                return tree["next"]["其他"]

    def score(self, y):
        '''
        评估函数，用于评估结果
        :param y: 输入实际的结果
        :return: None
        '''
        if self.y_predict is None:
            raise Exception("before score should predict first!")
        y_acutalTrue = y[(y == 1) & (self.y_predict == 1)].shape[0]
        y_acutalFalse = y[(y == 0) & (self.y_predict == 0)].shape[0]
        y_predictTrue = self.y_predict[self.y_predict == 1].shape[0]
        y_true = y[y == 1].shape[0]
        y_total = y.shape[0]
        logging.debug(f"y_acutalTrue:{y_acutalTrue}, y_acutalFalse:{y_acutalFalse}, y_predictTrue:{y_predictTrue}, "
                      f"y_true:{y_true}, y_total:{y_total}")
        Accuracy = (y_acutalTrue + y_acutalFalse) / y_total
        Precision = y_acutalTrue / y_predictTrue
        Recall = y_acutalTrue / y_true
        print("Accuracy: ", Accuracy,
              "Precision: ", Precision,
              "Recall: ", Recall
              )

    def visualOutput(self, savePath="", outputFormat="html", direction="TD"):
        '''
        将决策树可视化输出,格式为‘md'或’html'
        :param outputFormat: 设置输出格式
        :return: 对应输出格式的文本
        '''
        if self.decision_tree is None:
            raise Exception("should fit first!")
        text = ""
        if outputFormat == "md":
            text = self._format_md(direction=direction)
        elif outputFormat == "html":
            text = self._format_html(direction=direction)
        if savePath != "":
            open(savePath, "w", encoding="utf-8").write(text)
        return text

    def _format_html(self, direction="TD"):
        '''
        决策树的可视化为html格式
        :return: html代码
        '''
        html_start = '<!DOCTYPE html><html lang="en"><head>' \
                     '<meta charset="UTF-8">' \
                     '<meta name="viewport" content="width=device-width, initial-scale=1.0">' \
                     '<meta http-equiv="X-UA-Compatible" content="ie=edge">' \
                     '<title>DecisionTree</title>' \
                     '<script src="https://cdn.bootcss.com/mermaid/8.0.0-rc.8/mermaid.min.js">' \
                     '</script></head><body><div class="mermaid">{}</div>'
        html_end = '</body></html>'
        mermaid = self._format_md(end=";", direction=direction)
        mermaid = mermaid.replace("```mermaid\n", "").replace("```", "")
        html = html_start.replace("{}", mermaid) + html_end
        return html

    def _format_md(self, direction="TD", end="\n"):
        '''
        决策树的可视化为md代码（mermaid代码）
        :param end: 设置每行结尾符号
        :param direction: 设置方向
        :return:
        '''
        md = "```mermaid\n"
        md += f"graph {direction}{end}"
        total_node = 1
        current_nodeID = 0
        if len(self.decision_tree) != 2:
            code_line = f"{current_nodeID}(start)-->{self.decision_tree['next']['其他']}"
            return md + code_line + "\n```"

        queue = [self.decision_tree]
        while len(queue) > 0:
            node = queue.pop(0)
            ans_node = []
            for key in node["next"].keys():
                if type(node['next'][key]) == dict:
                    if len(node['next'][key]) == 1:
                        decision = node['next'][key]['next']['其他']
                        if decision not in ans_node:
                            ans_node.append(decision)
                        nodeID_ans = ans_node.index(decision)
                        code_line = f"{current_nodeID}({node['column']})--{key}-->" \
                                    f"L{current_nodeID}_{nodeID_ans}({decision})"
                    else:
                        code_line = f"{current_nodeID}({node['column']})--{key}-->{total_node}"
                        queue.append(node["next"][key])
                        total_node += 1
                else:
                    decision = node['next'][key]
                    if decision not in ans_node:
                        ans_node.append(decision)
                    nodeID_ans = ans_node.index(decision)
                    code_line = f"{current_nodeID}({node['column']})--{key}-->" \
                                f"L{current_nodeID}_{nodeID_ans}({decision})"
                # code_line_b = str(code_line.encode("utf-8")).lstrip("b'").rstrip("'")
                md += code_line + end
            current_nodeID += 1
        return md + "```"

    def load(self, savePath: str):
        tree = eval(open(savePath, "r").read())
        if type(tree) == dict:
            self.decision_tree = tree
        else:
            raise Exception("Load Faild!")

    def save(self, savePath: str):
        open(savePath, "w").write(str(decisionTree.decision_tree))
        logging.info(f"决策树已保存，位置:{savePath}")


if __name__ == '__main__':
    # 初始化决策树
    decisionTree = DecisionTree()

    # 不需要外部数据集的demo

    demo_data = [[0, 2, 0, 0, 0],
                 [0, 2, 0, 1, 0],
                 [1, 2, 0, 0, 1],
                 [2, 1, 0, 0, 1],
                 [2, 0, 1, 0, 1],
                 [2, 0, 1, 1, 0],
                 [1, 0, 1, 1, 1],
                 [0, 1, 0, 0, 0],
                 [0, 0, 1, 0, 1],
                 [2, 1, 1, 0, 1],
                 [0, 1, 1, 1, 1],
                 [1, 1, 0, 1, 1],
                 [1, 2, 1, 0, 1],
                 [2, 1, 0, 1, 0]]
    dataset = pd.DataFrame(demo_data)
    dataset.columns = ['年龄', '有工作', '是学生', '信贷情况', "借贷"]

    # UCI数据集Caesarian Section Classification

    # dataset = pd.read_csv("caesarian.csv", header=None)
    # dataset.columns = ["Age", "Delivery_number", "Delivery_time", "Blood_of_Pressure", "Heart_Problem", "Caesarian"]
    # age = dataset["Age"].value_counts().sort_index()  # 将Age分为三层，低于24岁，低于31岁，高于30岁
    # dataset["Age"][dataset["Age"] < 24] = 0
    # dataset["Age"][(dataset["Age"] > 23) & (dataset["Age"] < 31)] = 1
    # dataset["Age"][30 < dataset["Age"]] = 2
    # print(dataset.info())

    # 将数据集的属性和结果分开
    X = dataset.iloc[:, :-1]
    Y = dataset.iloc[:, -1]

    # 使用skleran切分数据集
    # X_train, X_test, Y_train, Y_test = train_test_split(X, Y, train_size=0.7, shuffle=True)
    # else直接使用数据集作为测试集
    X_train = X_test = X
    Y_train = Y_test = Y

    # 拟合
    e = decisionTree.fit(X_train, Y_train, threshold=-1)
    # 保存决策树
    decisionTree.save("decisionTree.txt")
    # 加载决策树
    decisionTree.load("decisionTree.txt")
    # 预测
    predict_y = decisionTree.predict(X_test)
    # 评估
    decisionTree.score(Y_test)
    # 可视化输出（html格式）
    # visualOutput可选参数outputFormat=["md", "html"],direction方向，设置决策树的方向=["LR","RL","TD","DT"],默认TD，从上到下
    decisionTree.visualOutput(savePath="decisionTree.html", outputFormat="html")



2023-06-03 17:41:29,567-[root]	[DEBUG]	[best_empirical_entropy]: now dataset shape is (14, 5), column is ['年龄', '有工作', '是学生', '信贷情况', 'DECISION_tempADD']
2023-06-03 17:41:29,568-[root]	[DEBUG]	[best_empirical_entropy]: empirical_entropy is 0.9402859586706311
2023-06-03 17:41:29,569-[root]	[DEBUG]	[best_empirical_entropy]: now is 年龄
2023-06-03 17:41:29,608-[root]	[DEBUG]	[best_empirical_entropy]: now label is 2, chooseData shape is (5, 4), Ans count: [3, 2], entropy: 0.9709505944546686
2023-06-03 17:41:29,616-[root]	[DEBUG]	[best_empirical_entropy]: now label is 0, chooseData shape is (5, 4), Ans count: [3, 2], entropy: 0.9709505944546686
2023-06-03 17:41:29,623-[root]	[DEBUG]	[best_empirical_entropy]: now label is 1, chooseData shape is (4, 4), Ans count: [4], entropy: 0.0
2023-06-03 17:41:29,625-[root]	[DEBUG]	[best_empirical_entropy]: entropy： 0.6935361388961918, 年龄 informationGain:0.24674981977443933
2023-06-03 17:41:29,629-[root]	[DEBUG]	[best_empirical_entropy]: now is 有工作
2023-06

Accuracy:  1.0 Precision:  1.0 Recall:  1.0
