In [1]:
import numpy as np
import threading
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'
MODEL_VERSION = 8

## 数据预处理

In [2]:
# 训练集
data = np.genfromtxt("adult/adult.data", delimiter=",", dtype=str)
data = data[~np.any(data == '?', axis=1)]
X = data[:, :14]
y = data[:, 14]

# 01编码
y = y == y[0]

# 数值编码
for i in [1, 3, 5, 6, 7, 8, 9, 13]:
    values = np.unique(X[:, i])
    encoding = {value: idx for idx, value in enumerate(values)}
    for j in range(X.shape[0]):
        X[j][i] = encoding[X[j][i]]

X = X.astype(int)
y = y.astype(int)

X_train = X[:X.shape[0] // 3 * 2, :]
y_train = y[:X.shape[0] // 3 * 2]

X_test = X[X.shape[0] // 3 * 2:, :]
y_test = y[X.shape[0] // 3 * 2:]


## 得分

In [3]:
class score(object):
    def __init__(self, y_true, y_pred):
        self.y_true = y_true
        self.y_pred = y_pred
    
    def accuracy_score(self):
        correct = np.sum(self.y_true == self.y_pred)
        total = len(self.y_true)
        accuracy = correct / total
        return accuracy
    
    def precision_score(self):
        true_positive = np.sum((self.y_true == 1) & (self.y_pred == 1))
        false_positive = np.sum((self.y_true == 0) & (self.y_pred == 1))
        precision = true_positive / (true_positive + false_positive)
        return precision
    
    def recall_score(self):
        true_positive = np.sum((self.y_true == 1) & (self.y_pred == 1))
        false_negative = np.sum((self.y_true == 1) & (self.y_pred == 0))
        recall = true_positive / (true_positive + false_negative)
        return recall
    
    def f1_score(self):
        precision = self.precision_score()
        recall = self.recall_score()
        f1 = 2 * precision * recall / (precision + recall)
        return f1
    
    def show(self, title=None):
        if title:
            print(title)
        print(f"""准确率: {self.accuracy_score()}
精确率: {self.precision_score()}
召回率: {self.recall_score()} 
F1 值: {self.f1_score()}
""")


## 数据划分（基于信息熵）

In [4]:
def calculate_entropy(y):
    entropy = 0
    for c in np.unique(y):
        p = np.sum(y == c) / y.shape[0]
        entropy -= p * np.log2(p)
    return entropy

def find_best_split(X, y):
    if np.unique(y).shape[0] < 0:
        return None, None
    
    best_feature_index = None
    best_threshold = None
    best_entropy = np.inf
    
    for feature_index in range(X.shape[1]):
        values = X[:, feature_index]
        unique_values = np.unique(values)
        if unique_values.shape[0] <= 1:
            continue
        for value in unique_values:
            left_indices = values <= value
            right_indices = ~left_indices
            
            entropy = np.sum(left_indices) / y.shape[0] * calculate_entropy(y[left_indices]) + np.sum(right_indices) / y.shape[0] * calculate_entropy(y[right_indices])
            
            if entropy < best_entropy:
                best_entropy = entropy
                best_feature_index = feature_index
                best_threshold = value
    
    return best_feature_index, best_threshold

## 结点

In [5]:
class TreeNode:
    def __init__(
            self, feature_index=None, threshold=None, 
            value=None, left=None, right=None
    ):
        self.feature_index = feature_index
        self.threshold = threshold
        self.value = value
        self.left = left
        self.right = right

## 决策树

In [6]:
class DecisionTree:
    def __init__(self, max_depth=3):
        self.max_depth = max_depth
        self.root = None
    
    def generateTree(self, X, y, depth=1):
        if depth == self.max_depth or len(set(y)) == 1:
            node = TreeNode(value=np.sort(y)[y.shape[0] >> 1])
        else:
            best_feature_index, best_threshold = find_best_split(X, y)
            left_indices = X[:, best_feature_index] <= best_threshold
            right_indices = ~left_indices
            
            left = self.generateTree(X[left_indices], y[left_indices], depth + 1)
            right = self.generateTree(X[right_indices], y[right_indices], depth + 1)
            
            node = TreeNode(
                feature_index=best_feature_index, threshold=best_threshold,
                left=left, right=right
            )
        return node
    
    def fit(self, X, y, info=True):
        info and print('training')
        self.root = self.generateTree(X, y)
        info and print('ok')
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        y_pred = []
        for sample in X:
            node = self.root
            while node.left is not None and node.right is not None:
                if sample[node.feature_index] <= node.threshold:
                    node = node.left
                else:
                    node = node.right
            y_pred.append(node.value)
        return np.array(y_pred)

## 决策森林

In [7]:
class DecisionForest(object):
    def __init__(self, tree_number: int = 1, max_depth: int = 3):
        self.tree: list[DecisionTree|None] = []
        self.weight: list[float|None] = []
        self.tree_number = tree_number
        self.max_depth = max_depth
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        print('Forest training start')

        # 训练
        threads = []
        for i in range(self.tree_number):
            self.tree.append(DecisionTree(max_depth=self.max_depth))
            indices = np.random.choice(
                a=range(X.shape[0]), 
                size=int(X.shape[0] * 0.3), 
                replace=True
            )
            threads.append(
                threading.Thread(
                    target=self.tree[i].fit, args=(
                        X[indices], y[indices], 
                        False,
                    )
                )
            )
            threads[i].start()
        
        for i in range(self.tree_number):
            threads[i].join()

        # 加权
        pred = np.array([sum(tree.predict(X) == y) for tree in self.tree])
        self.weight = np.array([p / sum(pred) for p in pred])
        print('Forest is ok')

    def predict(self, X):
        votes = np.zeros((self.tree_number, X.shape[0]))
        for i in range(self.tree_number):
            votes[i] = self.tree[i].predict(X)
        votes = np.array([np.dot(vote, self.weight) >= 0.5 for vote in votes.T])
        return votes

## 模型训练

In [8]:
clf = DecisionTree(max_depth=10)
clf.fit(X_train, y_train)

training
ok


In [9]:
forest = DecisionForest(tree_number=10, max_depth=10)
forest.fit(X_train, y_train)
print(forest.weight)

Forest training start
Forest is ok
[0.1005636  0.10035185 0.0998686  0.1001238  0.09991747 0.09926047
 0.10021067 0.09947223 0.10010208 0.10012923]


## 模型测试

In [10]:
y_pred_tree = clf.predict(X_test)
score(y_test, y_pred_tree).show('决策树')
y_pred_forest = forest.predict(X_test)
score(y_test, y_pred_forest).show('决策森林')

决策树
准确率: 0.8525103638876094
精确率: 0.8627515671395579
召回率: 0.9569407172481093 
F1 值: 0.9074084783991672

决策森林
准确率: 0.8563795485951174
精确率: 0.8667550546900895
召回率: 0.9569407172481093 
F1 值: 0.909617948866601
