In [2]:
import numpy as np

def createDataSet():
    DATASET = [[0, 0, 0, 0, 'no'],
               [0, 0, 0, 1, 'no'],
               [0, 1, 0, 1, 'yes'],
               [0, 1, 1, 0, 'yes'],
               [0, 0, 0, 0, 'no'],
               [1, 0, 0, 0, 'no'],
               [1, 0, 0, 1, 'no'],
               [1, 1, 1, 1, 'yes'],
               [1, 0, 1, 2, 'yes'],
               [1, 0, 1, 2, 'yes'],
               [2, 0, 1, 2, 'yes'],
               [2, 0, 1, 1, 'yes'],
               [2, 1, 0, 1, 'yes'],
               [2, 1, 0, 2, 'yes'],
               [2, 0, 0, 0, 'no'],]
    LABELS = ['F1-AGE', 'F2-WORK', 'F3-HOUSE', 'F4-LOAN']
    # 在录入数据时记录每种属性有几种值
    LABELVALUES = [[0, 1, 2], [0, 1], [0, 1], [0, 1, 2]]
    return DATASET, LABELS, LABELVALUES

## ID3 C4.5

In [3]:
class DecisionTree(object):
    
    def __init__(self, dataset, labelvalues, criterion="ID3"):
        self.dataset = dataset
        self.labelvalues = labelvalues
        self.criterion = criterion
        self.nodeNum = 0

    def createTree(self):
        self.root = self.__creatTree([a for a in range(len(self.dataset))], [a for a in range(len(self.dataset[0])-1)])

    def showFeatureSeq(self):
        print(self.root)

    # 后剪枝
    def trim(self, alpha=0):
        self.alpha = alpha
        self.__trim(self.root)

    def __trim(self, node):
        subNodes = node["subTree"]
        c = 0
        loss = 0
        for (k, sn) in subNodes.items():
            if sn["node"] == False:
                if self.__trim(sn):
                    c += 1
                    loss += sn["C_T"]
            else: c += 1
        
        if c == len(subNodes):
            if (node["C_T"] + self.alpha * (self.nodeNum - c + 1)) < (loss + self.alpha * self.nodeNum) : 
                node["node"] = True
                del node["subTree"]
                del node["feature"]
                self.nodeNum -= c - 1
                return True
            else: return False
        else: return False

    # dataset 数据集
    # labels 当前还有哪些特性坐标可选（里面的特征会被掏空）
    # featureLabels 已经选好的特性路径（里面会逐渐开始扩充特征）
    def __creatTree(self, data_indexes, label_indexes):
        if len(data_indexes) == 0:
            return "no data to predict"
        # 当前数据中所有结果
        resList = [self.dataset[i][-1] for i in data_indexes]
        # 分的很纯正，直接结束
        if resList.count(resList[0]) == len(resList):
            self.nodeNum += 1
            return {"node":True,
                    "C_T":self.__calcEntropy(data_indexes)*len(data_indexes),
                    "value":resList[0]}
        # 已经没有特征给你分类了
        if len(label_indexes) == 0:
            self.nodeNum += 1
            return {"node":True,
                    "C_T":0,
                    "value":self.__majority(resList)}
        # 选择分类效果最好的特性作为分类标准,并且直接删掉
        bestLabelIndex = self.__chooseBestFeatureToSplit(data_indexes, label_indexes)
        treeNode = {"feature":bestLabelIndex,
                    "value":self.__majority(resList),
                    "node":False,
                    "C_T":self.__calcEntropy(data_indexes)*len(data_indexes),
                    "subTree":{}}
        # 准备分支
        for value in self.labelvalues[bestLabelIndex]:
            treeNode["subTree"][value] = \
                self.__creatTree(self.__splitDataSet(data_indexes, bestLabelIndex, value), label_indexes.copy())
        return treeNode
    
    # 传入要计算熵的数据点
    def __calcEntropy(self, dataList, index=-1):
        total = len(dataList)
        resNum = {}
        for i in dataList:
            res = self.dataset[i][index]
            if res not in resNum.keys(): resNum[res] = 0
            resNum[res] += 1
        entropy = 0
        for key in resNum:
            p = float(resNum[key])/total
            if p != 0:
                entropy -= p*np.log(p)
        return entropy

    # 当前集合中最多的一个
    def __majority(self, list):
        classCount = {}
        max = -1
        maxRes = ''
        for res in list:
            if res not in classCount.keys():
                classCount[res] = 0
            classCount[res] += 1
            if classCount[res] > max:
                max = classCount[res]
                maxRes = res
        return  maxRes

    # 更新 labelsIndexList
    def __chooseBestFeatureToSplit(self, dataIndexes, labelIndexes):
        if self.criterion == "ID3":
           return self.__ID3(dataIndexes, labelIndexes)
        elif self.criterion == "C4.5":
            return self.__C4(dataIndexes, labelIndexes)
        elif self.criterion == "CART":
            return
        else:
            raise Exception('There is no ', self.criterion)
        
    def __C4(self, dataIndexes, labelIndexes):
        curEntropy = self.__calcEntropy(dataIndexes)
        bestRate = 0
        bestLabelIndex = -1
        bestLabelVal = -1
        for i, label in enumerate(labelIndexes):
            newEntropy = 0
            for val in self.labelvalues[label]:
                splitedList = self.__splitDataSet(dataIndexes, label, val)
                p = float(len(splitedList)/len(dataIndexes))
                newEntropy += p * self.__calcEntropy(splitedList)
            gain = curEntropy - newEntropy
            h_a = self.__calcEntropy(dataIndexes, label)
            rate = float(gain/h_a)
            if rate > bestRate:
                bestRate = rate
                bestLabelIndex = i
                bestLabelVal = label
        del labelIndexes[bestLabelIndex]
        return bestLabelVal
    
    def __ID3(self, dataIndexes, labelIndexes):
        curEntropy = self.__calcEntropy(dataIndexes)
        bestGain = 0
        bestLabelIndex = -1
        bestLabelVal = -1
        for i, label in enumerate(labelIndexes):
            newEntropy = 0
            for val in self.labelvalues[label]:
                splitedList = self.__splitDataSet(dataIndexes, label, val)
                p = float(len(splitedList)/len(dataIndexes))
                newEntropy += p * self.__calcEntropy(splitedList)
            gain = curEntropy - newEntropy
            if gain > bestGain:
                bestGain = gain
                bestLabelIndex = i
                bestLabelVal = label
        del labelIndexes[bestLabelIndex]
        return bestLabelVal
    
    def __splitDataSet(self, dataIndexes, label, val):
        resList = []
        for i in dataIndexes:
            if(self.dataset[i][label] == val):
                resList.append(i)
        return resList

In [4]:
DATASET, LABELS, LABELVALUES = createDataSet()
DT = DecisionTree(DATASET,LABELVALUES,"C4.5")
DT.createTree()
DT.showFeatureSeq()
DT.trim(10)
DT.showFeatureSeq()

{'feature': 2, 'value': 'yes', 'node': False, 'C_T': 10.095175005138849, 'subTree': {0: {'feature': 1, 'value': 'no', 'node': False, 'C_T': 5.728627514653315, 'subTree': {0: {'node': True, 'C_T': 0.0, 'value': 'no'}, 1: {'node': True, 'C_T': 0.0, 'value': 'yes'}}}, 1: {'node': True, 'C_T': 0.0, 'value': 'yes'}}}
{'value': 'yes', 'node': True, 'C_T': 10.095175005138849}


## CART

In [None]:
class CARTree(object):
    
    def __init__(self, dataset, labelTypes, mode="normal", numThreshold=2, giniThreshold=0.01):
        self.dataset = dataset
        self.labelTypes = labelTypes
        self.nodeNum = 0
        self.mode = mode
        self.numThreshold = numThreshold
        self.giniThreshold = giniThreshold

    # 现实构造出来的决策树
    def showFeatureSeq(self):
        print(self.root)

    # 构造树
    def createTree(self):
        self.root = self.__creatTree([a for a in range(len(self.dataset))], [a for a in range(len(self.dataset[0])-1)])

    def __creatTree(self, data_indexes, label_indexes):
        if len(data_indexes) == 0:
            return "no data to predict"

        # 选择分类效果最好的特性作为分类标准
        bestLabelIndex, dividedValue = self.__chooseBestFeature(data_indexes, label_indexes)

        # 已经没有特征给你分类了
        if bestLabelIndex == -1:
            resList = [self.dataset[i][-1] for i in data_indexes]
            self.nodeNum += 1
            return {"node":True,
                    "value":self.__majority(resList)}

        treeNode = {"feature":bestLabelIndex,
                    "featureValue":dividedValue,
                    "value":self.__majority(resList),
                    "node":False,
                    "subTree":{}}
        
        # 无限模式就干到死
        if self.mode == "normal":
            del label_indexes[bestLabelIndex]
        elif self.mode == "infinite":
            pass
        else: raise Exception('There is no ', self.mode)
        
        keyList, rest = self.__splitDataSet(data_indexes, bestLabelIndex, dividedValue)

        # 分支是个二叉树
        treeNode["subTree"]["yes"] = self.__creatTree(keyList, label_indexes.copy())
        treeNode["subTree"]["no"] = self.__creatTree(rest, label_indexes.copy())

        return treeNode
    
        # 更新 labelsIndexList
    def __chooseBestFeature(self, dataIndexes, labelIndexes):
        curGini = self.__calcGini(dataIndexes)
        deleteList = []
        bestLabelIndex = -1
        dividedValue = -1
        bestGiniGain = -1
        if len(dataIndex) <= self.numThreshold or curGini < self.giniThreshold:
            return bestLabelIndex, dividedValue
        for labelIndex in labelIndexes:
            if self.labelTypes[labelIndex] == "C":
                dict = {}
                for dataIndex in dataIndexes:
                    label = self.dataset[dataIndex][labelIndex]
                    if label not in dict: dict[label] = 0
                    dict[label] += 1
                if len(dict) == 1:
                    deleteList.append(labelIndex)
                    continue
                for key in dict:
                    keyList, rest = self.__splitDataSet(dataIndexes, labelIndex, key)
                    p = float(dict[key]/len(dataIndexes))
                    giniGain = curGini - p * self.__calcGini(keyList) - (1-p) * self.__calcGini(rest)
                    if giniGain > bestGiniGain:
                        bestGiniGain = giniGain
                        bestLabelIndex = labelIndex
                        dividedValue = key
            elif self.labelTypes[labelIndex] == "R":
                value_index = []
                for dataIndex in dataIndexes:
                    value_index.append([self.dataset[dataIndex][labelIndex], dataIndex])
                value_index = sorted(value_index, key=lambda tup: tup[0])
                if value_index[0][0] == value_index[-1][0]:
                    deleteList.append(labelIndex)
                    continue

                value = value_index[0][0]
                cur = 0
                while value != value_index[-1][0]:
                    keyList, rest = self.__splitDataSet(dataIndexes, labelIndex, value)
                    p = float(dict[key]/len(dataIndexes))
                    giniGain = curGini - p * self.__calcGini(keyList) - (1-p) * self.__calcGini(rest)
                    if giniGain > bestGiniGain:
                        bestGiniGain = giniGain
                        bestLabelIndex = labelIndex
                        dividedValue = key
                    while value == value_index[cur][0]:
                        cur += 1
                    value == value_index[cur][0]
            else: raise Exception("There is no ", self.labelTypes[labelIndex]) 
        
        # 删除已经没法再分的label
        for i, index in enumerate(deleteList):
            del labelIndexes[index - i]
            
        return bestLabelIndex, dividedValue


    def __splitDataSet(self, dataIndexes, label, val):
        resList = []
        rest = []
        type = self.labelTypes[label]
        if type == "C":
            for i in dataIndexes:
                if(self.dataset[i][label] == val):
                    resList.append(i)
                else: rest.append(i)
        elif type == "R":
            for i in dataIndexes:
                if(self.dataset[i][label] <= val):
                    resList.append(i)
                else: rest.append(i)
        else: raise Exception("label type error!")
        return resList, rest


    def __calcGini(self, dataList, index=-1):
        total = len(dataList)
        resNum = {}
        for i in dataList:
            res = self.dataset[i][index]
            if res not in resNum: resNum[res] = 0
            resNum[res] += 1
        gini = 1
        for key in resNum:
            gini -= float(resNum[key]/total)**2     
        return gini
    
    # 当前集合中最多的一个
    def __majority(self, list):
        classCount = {}
        max = -1
        maxRes = ''
        for res in list:
            if res not in classCount.keys():
                classCount[res] = 0
            classCount[res] += 1
            if classCount[res] > max:
                max = classCount[res]
                maxRes = res
        return  maxRes