In [27]:
import numpy as np
import pandas as pd
import operator

class ID3:
    def __init__(self,pruning = None):
        self.tree = None
        self.pruning = pruning

    def fit(self, dataset_train,dataset_test,features):
        """
        训练模型
        
        参数:
        - dataset_train: 训练数据集
        - dataset_test: 测试数据集
        - features: 特征列表 
        """

        if self.pruning == None:
            self.tree = self.createTree(dataset_train, features)
        elif self.pruning == 'prepruning':
            self.tree = self.prepruning(dataset_train,dataset_test, features)
        elif self.pruning == 'postpruning':
            self.tree = self.postpruning(dataset_train,dataset_test, features)
        else:
            raise ValueError("Unsupported pruning method: %s" % self.pruning)

    def createTree(self, Dataset, features):
        """ 
        创建决策树
        
        参数:
        - Dataset: 训练数据集
        - features: 特征列表
        
        返回:
        - 决策树
        """
        # 提取标签
        classList = [example[-1] for example in Dataset]

        # 如果所有样本的标签相同，则返回该标签
        if len(set(classList)) == 1:
            return classList[0]
        
        # 如果特征集为空，则返回出现次数最多的标签
        if len(Dataset[0]) == 1:
            return self.majorityCnt(classList)
        
        # 选择最优特征
        bestFeatureIndex, bestSplitValue = self.chooseBestFeature(Dataset)
        bestFeatureLabel = features[bestFeatureIndex]

        # 创建节点
        tree = {bestFeatureLabel: {}}
        # 使用副本避免修改原始列表
        subfeatures = features.copy()
        # 删除当前特征
        del subfeatures[bestFeatureIndex]
        # 连续特征
        if type(bestSplitValue).__name__ == 'float':
            tree[bestFeatureLabel]['<=' + str(bestSplitValue)] = self.createTree(self.splitDataSetByValue(Dataset, bestFeatureIndex, bestSplitValue, False), subfeatures)
            tree[bestFeatureLabel]['>' + str(bestSplitValue)] = self.createTree(self.splitDataSetByValue(Dataset, bestFeatureIndex, bestSplitValue, True), subfeatures)
        # 离散特征
        else:
            # 取出当前特征的取值
            featValue = [example[bestFeatureIndex] for example in Dataset]
            uniqueVals = set(featValue)
            # 遍历所有取值,开始递归
            for value in uniqueVals:
                subDataset = self.splitDataSet(Dataset, bestFeatureIndex, value)
                tree[bestFeatureLabel][value] = self.createTree(subDataset, subfeatures)
        return tree
    
    def prepruning(self, dataset_train, dataset_test, features):
        """ 
        创建决策树(预裁剪)
        
        参数:
        - dataset_train: 训练数据集
        - dataset_test: 测试数据集
        - features: 特征列表
        
        返回:
        - 决策树
        """
        # 取出所有样本的标签
        classList = [example[-1] for example in dataset_train]
        # 如果所有样本的标签相同，则返回该标签
        if classList.count(classList[0]) == len(classList):
            return classList[0]
        # 如果特征集为空，则返回出现次数最多的标签
        if len(dataset_train[0]) == 1:
            return self.majorityCnt(classList)
        
        # 计算不分裂时的准确率（叶节点多数类）
        majority_class = self.majorityCnt(classList)
        # print("majority_class:", majority_class)
        accuracy_before = sum(1 for ex in dataset_test if ex[-1] == majority_class) / len(dataset_test)
        # print("accuracy_before:", accuracy_before)

        # 选择最优特征进行数据集划分
        bestfeatureIndex, bestValue = self.chooseBestFeature(dataset_train)
        bestFeatLabel = features[bestfeatureIndex]

        # 计算分裂后的准确率
        # 连续特征
        if type(bestValue).__name__ == 'float':
            splitRightNums = 0
            # 将训练集划分为左右两部分，计算出每个分叉的多数类
            subLeftDataset = self.splitDataSetByValue(dataset_train, bestfeatureIndex, bestValue, False)
            subRightDataset = self.splitDataSetByValue(dataset_train, bestfeatureIndex, bestValue, True)
            majority_class_left = self.majorityCnt([ex[-1] for ex in subLeftDataset])
            majority_class_right = self.majorityCnt([ex[-1] for ex in subRightDataset])
            # 将测试集划分为左右两部分，计算出每个分叉的正确分类个数
            subLeftDataset_test = self.splitDataSetByValue(dataset_test, bestfeatureIndex, bestValue, False)
            subRightDataset_test = self.splitDataSetByValue(dataset_test, bestfeatureIndex, bestValue, True)
            splitRightNums = sum(1 for ex in subLeftDataset_test if ex[-1] == majority_class_left) + \
                            sum(1 for ex in subRightDataset_test if ex[-1] == majority_class_right)
            accuracy_after = splitRightNums / len(dataset_test)
        # 离散特征
        else:
            splitRightNums = 0
            featValue = [example[bestfeatureIndex] for example in dataset_train]
            uniqueVals = set(featValue) 
            # 统计分裂后每一个分叉中的正确分类个数
            for value in uniqueVals:
                subDataSet = self.splitDataSet(dataset_train, bestfeatureIndex, value)
                majority_class_sub = self.majorityCnt([ex[-1] for ex in subDataSet])
                subDataSet_test = self.splitDataSet(dataset_test, bestfeatureIndex, value)
                splitRightNums += sum(1 for ex in subDataSet_test if ex[-1] == majority_class_sub)
            accuracy_after = splitRightNums / len(dataset_test)
            # print("accuracy_after:", accuracy_after)

        # 比较准确率，决定是否继续分裂(根据奥卡姆剃刀准则，准确率相等时不分裂)
        if (accuracy_after > accuracy_before):
            myTree = {bestFeatLabel: {}}
        else:
        # 返回分裂前最多的标签
            return majority_class

        # 使用副本避免修改原始列表
        subfeatures = features.copy()  
        del subfeatures[bestfeatureIndex]

        # 连续特征
        if type(bestValue).__name__ == 'float':

            myTree[bestFeatLabel]['<=' + str(bestValue)] = self.prepruning(subLeftDataset, subLeftDataset_test, subfeatures)
            myTree[bestFeatLabel]['>' + str(bestValue)] = self.prepruning(subRightDataset, subRightDataset_test, subfeatures)
        # 离散特征
        else:
            # 递归每一个特征值
            for value in uniqueVals:
                subDataSet = self.splitDataSet(dataset_train, bestfeatureIndex, value)
                # 测试集也需要划分
                subDataSet_test = self.splitDataSet(dataset_test, bestfeatureIndex, value)
                myTree[bestFeatLabel][value] = self.prepruning(subDataSet, subDataSet_test,subfeatures)
                
        return myTree
    
    def postpruning(self, dataset_train, dataset_test, features):
        """ 
        创建决策树(后裁剪)
        
        参数:
        - dataset_train: 训练数据集
        - dataset_test: 测试数据集
        - features: 特征列表
        
        返回:
        - 决策树
        """
        # 取出所有样本的标签
        classList = [example[-1] for example in dataset_train]
        # 如果所有样本的标签相同，则返回该标签
        if classList.count(classList[0]) == len(classList):
            return classList[0]
        # 如果特征集为空，则返回出现次数最多的标签
        if len(dataset_train[0]) == 1:
            return self.majorityCnt(classList)

        # 选择最优特征进行数据集划分
        bestfeatureIndex, bestValue = self.chooseBestFeature(dataset_train)
        bestFeatLabel = features[bestfeatureIndex]

        # 创建节点
        myTree = {bestFeatLabel: {}}

        # 使用副本避免修改原始列表
        subfeatures = features.copy()  
        del subfeatures[bestfeatureIndex]

        # 连续特征
        if type(bestValue).__name__ == 'float':
            subLeftDataset = self.splitDataSetByValue(dataset_train, bestfeatureIndex, bestValue, False)
            subRightDataset = self.splitDataSetByValue(dataset_train, bestfeatureIndex, bestValue, True)
            subLeftDataset_test = self.splitDataSetByValue(dataset_test, bestfeatureIndex, bestValue, False)
            subRightDataset_test = self.splitDataSetByValue(dataset_test, bestfeatureIndex, bestValue, True)
            myTree[bestFeatLabel]['<=' + str(bestValue)] = self.postpruning(subLeftDataset, subLeftDataset_test, subfeatures)
            myTree[bestFeatLabel]['>' + str(bestValue)] = self.postpruning(subRightDataset, subRightDataset_test, subfeatures)
        # 离散特征
        else:
            # 递归每一个特征值
            featValue = [example[bestfeatureIndex] for example in dataset_train]
            uniqueVals = set(featValue) 
            for value in uniqueVals:
                subDataSet = self.splitDataSet(dataset_train, bestfeatureIndex, value)
                # 测试集也需要划分
                subDataSet_test = self.splitDataSet(dataset_test, bestfeatureIndex, value)
                myTree[bestFeatLabel][value] = self.postpruning(subDataSet, subDataSet_test,subfeatures)

        # 在每一条分支到达叶节点时，开始后裁剪(避免生成完整的树后再进行裁剪，提高效率)
        # 计算未裁剪树的准确率
        unpruneRightNums = 0
        # 连续特征
        if type(bestValue).__name__ == 'float':
            majority_class_left = self.majorityCnt([ex[-1] for ex in subLeftDataset])
            majority_class_right = self.majorityCnt([ex[-1] for ex in subRightDataset])
            unpruneRightNums = sum(1 for ex in dataset_test if ex[-1] == majority_class_left) + \
                            sum(1 for ex in dataset_test if ex[-1] == majority_class_right)
        # 离散特征
        else:
            for value in uniqueVals:
                subDataSet = self.splitDataSet(dataset_train, bestfeatureIndex, value)
                majority_class_sub = self.majorityCnt([ex[-1] for ex in subDataSet])
                subDataSet_test = self.splitDataSet(dataset_test, bestfeatureIndex, value)
                unpruneRightNums += sum(1 for ex in subDataSet_test if ex[-1] == majority_class_sub)
        accuracy_before = unpruneRightNums / len(dataset_test)
        # 计算裁剪后的准确率
        majority_class = self.majorityCnt(classList)
        accuracy_after = sum(1 for ex in dataset_test if ex[-1] == majority_class) / len(dataset_test)
        # 比较准确率，决定是否继续分裂(根据奥卡姆剃刀准则，准确率相等时裁剪)
        if (accuracy_after >= accuracy_before):
            return majority_class
        return myTree

    def majorityCnt(self, classList):
        """返回最多的标签"""
        # 统计标签出现的次数
        label_count = {}
        for label in classList:
            if label not in label_count:
                label_count[label] = 0
            label_count[label] += 1
        # 降序排序[(类标签,出现次数),(),()]
        sortedclassCount = sorted(label_count.items(), key=operator.itemgetter(1), reverse=True)
        return sortedclassCount[0][0]

    def chooseBestFeature(self,Dataset):
        """通过信息增益选择最优特征"""
        featureNum = len(Dataset[0]) - 1
        bestInfoGain = 0.0
        bestSplitValue = 0
        bestFeatureIndex = -1
        baseEntropy = self.calculateEntropy(Dataset)

        # 遍历所有特征
        for i in range(featureNum):
            # 提取当前特征下的取值
            featureValues = [example[i] for example in Dataset]
            # 连续特征
            if type(featureValues[0]).__name__ == 'float':
                # 对特征值进行排序
                sortedFeatureValues = sorted(featureValues)
                # 计算分割值（取相邻两个取值的中点）
                splitList = []
                for j in range(len(sortedFeatureValues) - 1):
                    splitList.append((sortedFeatureValues[j] + sortedFeatureValues[j + 1]) / 2.0)
                # 遍历所有分割值,相当于做二分类
                for splitValue in splitList:
                    currentEntropy = 0.0
                    subDataset1 = self.splitDataSetByValue(Dataset, i, splitValue, True)
                    subDataset2 = self.splitDataSetByValue(Dataset, i, splitValue, False)
                    prob1 = len(subDataset1) / float(len(Dataset))
                    prob2 = len(subDataset2) / float(len(Dataset))
                    currentEntropy  = prob1 * self.calculateEntropy(subDataset1) + prob2 * self.calculateEntropy(subDataset2)
                    # 计算信息增益
                    infoGain = baseEntropy - currentEntropy
                    if (infoGain > bestInfoGain):
                        bestInfoGain = infoGain
                        bestFeatureIndex = i
                        bestSplitValue = splitValue
            # 离散特征
            else:
                uniqueValues = set(featureValues)
                currentEntropy = 0.0
                # 遍历所有取值
                for value in uniqueValues:
                    subDataset = self.splitDataSet(Dataset, i, value)
                    prob = len(subDataset) / len(Dataset)
                    currentEntropy += prob * self.calculateEntropy(subDataset)
                # 计算信息增益
                infoGain = baseEntropy - currentEntropy
                if (infoGain >= bestInfoGain):
                    bestInfoGain = infoGain
                    bestFeatureIndex = i
                    bestSplitValue = None
        
        return bestFeatureIndex, bestSplitValue

    def calculateEntropy(self, Dataset):
        """计算信息熵,公式(4.1)"""
        sample_num = len(Dataset)
        # 统计标签出现的次数
        label_count = {}
        for featVec in Dataset:
            label = featVec[-1]
            if label not in label_count:
                label_count[label] = 0
            label_count[label] += 1
        
        # 计算信息熵
        entropy = 0.0
        for count in label_count.values():
            prob = float(count) / sample_num
            entropy -= prob * np.log2(prob)

        return entropy

    def splitDataSet(self,Dataset, axis, val):
        '''
        根据特征索引i和离散特征值value将数据集切分

        参数:
        - Dataset: 训练数据集
        - axis: 特征索引
        - val: 特征值

        返回:
        - 切分后的子集
        '''
        subDataset = []
        # 遍历每一行
        for featVec in Dataset:
            if featVec[axis] == val:
                reducedFeature = featVec[:axis]
                reducedFeature.extend(featVec[axis + 1:])
                subDataset.append(reducedFeature)
        return subDataset

    def splitDataSetByValue(self, Dataset, axis, val, isAbove):
        '''
        根据特征索引i和连续特征值value将数据集切分

        参数:
        - Dataset: 训练数据集
        - axis: 特征索引
        - val: 特征值
        - isAbove: True表示大于value,False表示小于等于value

        返回:
        - 切分后的子集
        '''
        subDataset = []
        # 遍历每一行
        for featVec in Dataset:
            if isAbove and featVec[axis] > val:
                reducedFeature = featVec[:axis]
                reducedFeature.extend(featVec[axis + 1:])
                subDataset.append(reducedFeature)
            elif not isAbove and featVec[axis] <= val:
                reducedFeature = featVec[:axis]
                reducedFeature.extend(featVec[axis + 1:])
                subDataset.append(reducedFeature)
        return subDataset

    def predict(self, inputTree,features, testVec):
        '''
        预测测试数据集

        参数:
        - inputTree: 训练好的决策树
        - features: 特征列表
        - testVec: 测试数据集

        返回:
        - 预测结果
        '''
        # 提取当前节点(每个决策树节点只有一个特征标签)
        firstStr = list(inputTree.keys())[0]
        # 提取当前节点下的子节点
        secondDict = inputTree[firstStr]
        # 获取当前节点的特征标签序号
        featureIndex = features.index(firstStr)

        # 遍历每个子节点
        for key in secondDict.keys():
            # 连续特征
            if type(key).__name__ == 'str' and ('<=' in key or '>' in key):
                # 去除字符串中的符号，取出阈值
                threshold = float(key.strip('<=').strip('>'))
                # 判断测试数据是否满足阈值
                if key.startswith('<=') and testVec[featureIndex] <= threshold:
                    childTree = secondDict[key]
                    # 判断当前是不是叶节点,如果不是，继续递归
                    if isinstance(childTree, dict):
                        return self.predict(childTree,features, testVec)
                    else:
                        return childTree
                elif key.startswith('>') and testVec[featureIndex] > threshold:
                    childTree = secondDict[key]
                    # 判断当前是不是叶节点
                    if isinstance(childTree, dict):
                        return self.predict(childTree,features, testVec)
                    else:
                        return childTree
            # 离散特征
            else:
                # 判断测试数据是否满足取值
                if testVec[featureIndex] == key:
                    childTree = secondDict[key]
                    # 判断当前是不是叶节点
                    if isinstance(childTree, dict):
                        return self.predict(childTree,features, testVec)
                    else:
                        return childTree
        return "Unknown !"
        
    def printTree(self):
        """打印决策树"""
        print(self.tree)

    def calculateAccuracy(self, dataset_test, features):
        """计算准确率"""
        correct_count = 0
        for example in dataset_test:
            label = example[-1]
            predict_label = self.predict(self.tree, features, example)
            if label == predict_label:
                correct_count += 1
        accuracy = float(correct_count) / len(dataset_test)
        return accuracy
      


if __name__ == '__main__':
    # 加载数据
    df = pd.DataFrame(pd.read_csv("../Data/watermelon3.0.csv", encoding="ansi"))
    df.drop(labels=["编号"], axis=1, inplace=True)  # 删除编号这一列，inplace=True表示直接在原对象修改
    # 转化为列表
    dataset = df.values.tolist()
    # 特征列表
    features = ['色泽', '根蒂', '敲声', '纹理', '脐部', '触感', '密度', '含糖率']
    # 创建决策树
    ID3_model = ID3()
    ID3_model.fit(dataset,None,features)
    # 打印决策树
    ID3_model.printTree()
    # 测试数据
    test_data = ['青绿', '蜷缩', '浊响', '清晰', '凹陷', '硬滑', 0.6, 0.3]
    # 预测结果
    result = ID3_model.predict(ID3_model.tree, features, test_data)
    print("预测结果:", result)
   


{'纹理': {'模糊': '否', '清晰': {'密度': {'<=0.3815': '否', '>0.3815': '是'}}, '稍糊': {'触感': {'硬滑': '否', '软粘': '是'}}}}
预测结果: 是


In [46]:
from sklearn import datasets
from sklearn.model_selection import train_test_split

# 加载鸢尾花
iris = datasets.load_iris()
X = iris.data
y = iris.target
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=30)
# 合并特征和标签
dataset_train = np.c_[X_train, y_train]
dataset_test = np.c_[X_test, y_test]
# 转化为列表
dataset_train = dataset_train.tolist()
dataset_test = dataset_test.tolist()
# 特征列表
features = ['花萼长度','花萼宽度','花瓣长度','花瓣宽度']
# 训练
ID3_model = ID3(pruning='postpruning')
ID3_model.fit(dataset_train,dataset_test,features)
# 打印
ID3_model.printTree()
# 计算准确度
accuracy = ID3_model.calculateAccuracy(dataset_test,features)
print("准确度：",accuracy)


{'花瓣长度': {'<=2.5999999999999996': 0.0, '>2.5999999999999996': {'花瓣宽度': {'<=1.7': {'花萼宽度': {'<=2.6': 1.0, '>2.6': 1.0}}, '>1.7': 2.0}}}}
准确度： 0.9333333333333333
