所需库

In [371]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.datasets import load_wine

决策树分类模型

In [372]:
'''决策树分类模型
    先定义树节点类DecisonNode,再定义决策树类DecisionTreeClassifier
    使用信息增益infomation gain,建立ID3算法
'''
class DecisionNode:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, info_gain=None, value=None):
        # 对于决策节点
        self.feature_index = feature_index  #决策节点的特征的索引
        self.threshold = threshold          #决策节点的阈值
        self.left = left                    #决策节点的左孩子
        self.right = right                  #决策节点的右孩子
        self.info_gain = info_gain          #信息增益
        
        # 对于叶节点
        self.value = value                  #叶节点的值

class DecisionTreeClassifier:
    def __init__(self,min_samples_split=2, max_depth=2) -> None:
        # 初始化树的根节点
        self.root=None
        # 停止条件
        self.min_samples_split=min_samples_split    #最小分裂样本
        self.max_depth=max_depth                    #决策树最大深度

    '''
    训练模型
    X:样本数据中的变量
    y:样本数据中的类别
    '''   
    def fit(self, X, y):
        # 训练模型
        self.root = self.build_tree(X, y)    

    '''
    计算信息熵
    y:样本数据中的类别结果
    '''    
    def entropy(self, y):
        class_labels = np.unique(y)                 #样本有多少类别
        entropy = 0
        for cls in class_labels:
            p_cls = len(y[y == cls]) / len(y)       #每一类别的比例
            entropy += -p_cls * np.log2(p_cls)      #信息熵
        return entropy
    
    '''
    对样本数据进行分裂
    dataset:样本数据(X,Y)
    ''' 
    def split(self, X,y, feature_index, threshold):
        #根据阈值分割数据
        left_idx = X[:, feature_index] <= threshold
        right_idx = X[:, feature_index] > threshold
        return {'left': (X[left_idx], y[left_idx]), 'right': (X[right_idx], y[right_idx])}

    '''
    计算信息增益
    '''   
    def information_gain(self, parent, l_child, r_child):
        weight_l = len(l_child) / len(parent)               #左孩子占比
        weight_r = len(r_child) / len(parent)               #右孩子占比
        gain = self.entropy(parent) - (weight_l*self.entropy(l_child) + weight_r*self.entropy(r_child))
        return gain
    
    '''
    求最佳分割
    '''   
    def get_best_split(self, X, y, num_features):
        # 寻找最佳分割点
        best_split = {}
        # 初始化最大信息增益为负无穷大
        max_info_gain = -float('inf')
        # 遍历所有特征
        for feature_index in range(num_features):
            feature_values = X[:, feature_index]
            thresholds = np.unique(feature_values)
            # 遍历所有阈值
            for threshold in thresholds:
                # 数据分裂
                datasets = self.split(X, y, feature_index, threshold)
                # 如果
                if len(datasets['left'][1]) > 0 and len(datasets['right'][1]) > 0:
                # 计算信息增益
                    curr_info_gain = self.information_gain(y, datasets['left'][1], datasets['right'][1])
                    if curr_info_gain  > max_info_gain:
                        max_info_gain = curr_info_gain 
                        best_split = {'feature_index': feature_index, 'threshold': threshold, 'datasets': datasets,
                                      'info_gain':curr_info_gain, 'value': None}
        return best_split

    '''
    递归构建决策树
    '''   
    def build_tree(self, X, y, current_depth=0):
        # 样本数和特征数
        num_samples, num_features = X.shape
        
        #如果样本数大于最小分裂数，并且当前深度比最大深度小，则往下建树
        if num_samples >= self.min_samples_split and current_depth <= self.max_depth:
            
            best_split = self.get_best_split(X, y, num_features)

            # 如果信息增益大于0说明还能分，小于0说明不能分了，则为叶子节点
            if best_split['info_gain'] >0:
                left_subtree = self.build_tree(*best_split['datasets']['left'], current_depth+1)
                right_subtree = self.build_tree(*best_split['datasets']['right'], current_depth+1)
                
                return DecisionNode(best_split['feature_index'], best_split['threshold'], left_subtree, right_subtree,best_split['info_gain'])
            
                

        leaf_value = self.calculate_leaf_value(y)
        return DecisionNode(value=leaf_value)  
    
    def calculate_leaf_value(self, Y):
        Y = list(Y)
        return max(Y, key=Y.count)
        
    def predict(self, X):
        predictions = [self.make_prediction(x, self.root) for x in X]
        return predictions
    
    def make_prediction(self, x, tree):
        if tree.value != None: return tree.value
        feature_val = x[tree.feature_index]
        if feature_val <= tree.threshold:
            return self.make_prediction(x, tree.left)
        else:
            return self.make_prediction(x, tree.right)
    
    def train_test_split(self,X, y, test_size=0.2, random_state=None):
        """将数据集划分为训练集和测试集"""
        if random_state:
            np.random.seed(random_state)
        
        # 随机打乱索引
        indices = np.arange(X.shape[0])
        np.random.shuffle(indices)
        
        # 根据test_size计算测试集大小
        test_set_size = int(X.shape[0] * test_size)
        
        # 划分测试集和训练集
        test_indices = indices[:test_set_size]
        train_indices = indices[test_set_size:]
        
        # 划分数据集
        X_train = X[train_indices]
        X_test = X[test_indices]
        y_train = y[train_indices]
        y_test = y[test_indices]
        
        return X_train, X_test, y_train, y_test 
    
    def accuracy_score(self,y_true, y_pred):
        """计算准确率"""
        correct_predictions = sum(y_pred[i] == y_true[i] for i in range(len(y_true)))
        accuracy = correct_predictions / len(y_true)
        return accuracy
    
   
    

对鸢尾花数据进行训练，使用sklearn库中自带的鸢尾花数据集

In [373]:
'''鸢尾花数据集
    150个样本,按7:3划分为训练集和测试集
    3个类别('Iris-setosa', 'Iris-versicolor', 'Iris-virginica'),
    4个属性('sepal length','sepal width','petal length','petal width'),
    构建决策树分类模型并测试准确率
    呈现训练过程,绘制分类结果,评估模型准确性
'''
from sklearn.datasets import load_iris
# 加载数据集
iris = load_iris()
X, y = iris.data, iris.target
# 分割数据集为训练集和测试集
clf=DecisionTreeClassifier(min_samples_split=2, max_depth=3)
X_train, X_test, y_train, y_test = clf.train_test_split(X, y, test_size=0.3, random_state=42)

clf.fit(X_train,y_train)
y_pred=np.array(clf.predict(X_test))
accuracy = clf.accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")


Accuracy: 0.9555555555555556


对葡萄酒识别数据进行训练，使用sklearn库中自带的葡萄酒识别数据集

In [378]:
'''葡萄酒识别数据集
    178个样本,按7:3划分为训练集和测试集
    3个类别,
    13个属性,
    构建决策树分类模型并测试准确率
    呈现训练过程,绘制分类结果,评估模型准确性
'''
from sklearn.datasets import load_wine
wine=load_wine()
X,y=wine.data,wine.target
# 分割数据集为训练集和测试集
clf=DecisionTreeClassifier(min_samples_split=2, max_depth=3)
X_train, X_test, y_train, y_test = clf.train_test_split(X, y, test_size=0.3, random_state=42)

clf.fit(X_train,y_train)
y_pred=np.array(clf.predict(X_test))
accuracy = clf.accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")

Accuracy: 0.8490566037735849


In [375]:
'''决策树回归模型
    先定义树节点类DecisonNode,再定义决策树类DecisionTreeRegressor
    将信息增益改为方差减少量即可
'''
class DecisionNode:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, var_reduction=None, value=None):
        # 对于决策节点
        self.feature_index = feature_index  #决策节点的特征的索引
        self.threshold = threshold          #决策节点的阈值
        self.left = left                    #决策节点的左孩子
        self.right = right                  #决策节点的右孩子
        self.var_reduction = var_reduction          #方差减少量
        
        # 对于叶节点
        self.value = value                  #叶节点的值



class DecisionTreeRegressor:
    def __init__(self,min_samples_split=2, max_depth=3) -> None:
        # 初始化树的根节点
        self.root=None
        # 停止条件
        self.min_samples_split=min_samples_split    #最小分裂样本
        self.max_depth=max_depth                    #决策树最大深度

    '''
    训练模型
    X:样本数据中的变量
    y:样本数据中的类别
    '''   
    def fit(self, X, y):
        # 训练模型
        self.root = self.build_tree(X, y)    
    
    '''
    对样本数据进行分裂
    dataset:样本数据(X,Y)
    ''' 
    def split(self, X,y, feature_index, threshold):
        #根据阈值分割数据
        left_idx = X[:, feature_index] <= threshold
        right_idx = X[:, feature_index] > threshold
        return {'left': (X[left_idx], y[left_idx]), 'right': (X[right_idx], y[right_idx])}

    '''
    计算方差减少量
    '''   
    def variance_reduction(self, parent, l_child, r_child):
        #计算方差减少量
        weight_left = len(l_child) / len(parent)
        weight_right = len(r_child) / len(parent)
        reduction = np.var(parent) - (weight_left * np.var(l_child) + weight_right * np.var(r_child))
        return reduction
    
    '''
    求最佳分割
    '''   
    def get_best_split(self, X, y, num_features):
        # 寻找最佳分割点
        best_split = {}
        # 初始化最大信息增益为负无穷大
        max_var_reduction = -float('inf')
        # 遍历所有特征
        for feature_index in range(num_features):
            feature_values = X[:, feature_index]
            thresholds = np.unique(feature_values)
            # 遍历所有阈值
            for threshold in thresholds:
                # 数据分裂
                datasets = self.split(X, y, feature_index, threshold)
                # 如果
                if len(datasets['left'][1]) > 0 and len(datasets['right'][1]) > 0:
                # 计算信息增益
                    curr_var_reduction = self.variance_reduction(y, datasets['left'][1], datasets['right'][1])
                    if curr_var_reduction > max_var_reduction:
                        max_var_reduction = curr_var_reduction 
                        best_split = {'feature_index': feature_index, 'threshold': threshold, 'datasets': datasets,
                                      'var_reduction':curr_var_reduction, 'value': None}
        return best_split

    '''
    递归构建决策树
    '''   
    def build_tree(self, X, y, current_depth=0):
        # 样本数和特征数
        num_samples, num_features = X.shape
        
        #如果样本数大于最小分裂数，并且当前深度比最大深度小，则往下建树,不满足条件则为叶子节点
        if num_samples >= self.min_samples_split and current_depth <= self.max_depth:
            
            best_split = self.get_best_split(X, y, num_features)

            # 如果信息增益大于0说明还能分，小于0说明不能分了，则为叶子节点
            if best_split['var_reduction'] > 0:
                left_subtree = self.build_tree(*best_split['datasets']['left'], current_depth+1)
                right_subtree = self.build_tree(*best_split['datasets']['right'], current_depth+1)
                
                return DecisionNode(best_split['feature_index'], best_split['threshold'], 
                                    left_subtree, right_subtree,best_split['var_reduction'])
            
        return DecisionNode(value=np.mean(y))  
    
    
        
    def predict(self, X):
        predictions = [self.make_prediction(x, self.root) for x in X]
        return predictions
    
    def make_prediction(self, x, tree):
        if tree.value != None: return tree.value

        feature_val = x[tree.feature_index]
        if feature_val <= tree.threshold:
            return self.make_prediction(x, tree.left)
        else:
            return self.make_prediction(x, tree.right)
    
    def train_test_split(self,X, y, test_size=0.2, random_state=None):
        """将数据集划分为训练集和测试集"""
        if random_state:
            np.random.seed(random_state)
        
        # 随机打乱索引
        indices = np.arange(X.shape[0])
        np.random.shuffle(indices)
        
        # 根据test_size计算测试集大小
        test_set_size = int(X.shape[0] * test_size)
        
        # 划分测试集和训练集
        test_indices = indices[:test_set_size]
        train_indices = indices[test_set_size:]
        
        # 划分数据集
        X_train = X[train_indices]
        X_test = X[test_indices]
        y_train = y[train_indices]
        y_test = y[test_indices]
        
        return X_train, X_test, y_train, y_test 
    
    

    

    

对波士顿房价预测数据进行训练

In [376]:
import pandas as pd
from sklearn.metrics import r2_score
#读取房价数据集

raw_df=pd.read_csv(r'data/housing.csv',sep="\s+",header=None)
data = raw_df.values[:,:13]
target = raw_df.values[:, 13]
X,y=data,target   
# 分割数据集为训练集和测试集
model=DecisionTreeRegressor(min_samples_split=2, max_depth=3)
X_train, X_test, y_train, y_test = model.train_test_split(X, y, test_size=0.3, random_state=42)

model.fit(X_train,y_train)
y_pred=np.array(model.predict(X_test))
# 计算R²分数
r2 = r2_score(y_test, y_pred)
print(f'R²分数: {r2}')

R²分数: 0.7660068638828825


对房地产估计预测数据进行训练

In [377]:
from ucimlrepo import fetch_ucirepo 
  
# fetch dataset 
real_estate_valuation = fetch_ucirepo(id=477) 
  
# data (as pandas dataframes) 
X = real_estate_valuation.data.features 
y = real_estate_valuation.data.targets 
X=np.array(X)
X=X[:,1:]
y=np.array(y)

#分割数据集为训练集和测试集
model=DecisionTreeRegressor(min_samples_split=2, max_depth=3)
X_train, X_test, y_train, y_test = model.train_test_split(X, y, test_size=0.3, random_state=42)

model.fit(X_train,y_train)
y_pred=np.array(model.predict(X_test))

y_pred=[int(item) for item in y_pred]
# 计算R²分数
r2 = r2_score(y_test, y_pred)


print(f'R²分数: {r2}')

R²分数: 0.6790198690514948
