In [1]:
import numpy as np


class DecisionTreeClassifierWithWeight:
    def __init__(self):
        self.best_err = 1  # 最小的加权错误率
        self.best_fea_id = 0  # 最优特征id
        self.best_thres = 0  # 选定特征的最优阈值
        self.best_op = 1  # 阈值符号，其中 1: >, 0: <

    def fit(self, X, y, sample_weight=None):
        if sample_weight is None:
            sample_weight = np.ones(len(X)) / len(X)
        n = X.shape[1]
        for i in range(n):
            feature = X[:, i]  # 选定特征列
            fea_unique = np.sort(np.unique(feature))  # 将所有特征值从小到大排序
            for j in range(len(fea_unique) - 1):
                thres = (fea_unique[j] + fea_unique[j + 1]) / 2  # 逐一设定可能阈值
                for op in (0, 1):
                    y_ = 2 * (feature >= thres) - 1 if op == 1 else 2 * (feature < thres) - 1  # 判断何种符号为最优
                    err = np.sum((y_ != y) * sample_weight)
                    if err < self.best_err:  # 当前参数组合可以获得更低错误率，更新最优参数
                        self.best_err = err
                        self.best_op = op
                        self.best_fea_id = i
                        self.best_thres = thres
        return self

    def predict(self, X):
        feature = X[:, self.best_fea_id]
        return 2 * (feature >= self.best_thres) - 1 if self.best_op == 1 else 2 * (feature < self.best_thres) - 1

    def score(self, X, y, sample_weight=None):
        y_pre = self.predict(X)
        if sample_weight is not None:
            return np.sum((y_pre == y) * sample_weight)
        return np.mean(y_pre == y)

In [2]:
class AdaBoostClassifier_:
    def __init__(self, n_estimators=50):
        self.n_estimators = n_estimators
        self.estimators = []
        self.alphas = []

    def fit(self, X, y):
        sample_weight = np.ones(len(X)) / len(X)  # 初始化样本权重为 1/N
        for _ in range(self.n_estimators):
            dtc = DecisionTreeClassifierWithWeight().fit(X, y, sample_weight)  # 训练弱学习器
            alpha = 1 / 2 * np.log((1 - dtc.best_err) / dtc.best_err)  # 权重系数
            y_pred = dtc.predict(X)
            sample_weight *= np.exp(-alpha * y_pred * y)  # 更新迭代样本权重
            sample_weight /= np.sum(sample_weight)  # 样本权重归一化
            self.estimators.append(dtc)
            self.alphas.append(alpha)
        return self

    def predict(self, X):
        y_pred = np.empty((len(X), self.n_estimators))  # 预测结果二维数组，其中每一列代表一个弱学习器的预测结果
        for i in range(self.n_estimators):
            y_pred[:, i] = self.estimators[i].predict(X)
        y_pred = y_pred * np.array(self.alphas)  # 将预测结果与训练权重乘积作为集成预测结果
        return 2 * (np.sum(y_pred, axis=1) > 0) - 1  # 以0为阈值，判断并映射为-1和1

    def score(self, X, y):
        y_pred = self.predict(X)
        return np.mean(y_pred == y)

In [3]:
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

X, y = load_breast_cancer(return_X_y=True)
y = 2 * y - 1  # 将0/1取值映射为-1/1取值
x_train, x_test, y_train, y_test = train_test_split(X, y, random_state=42)

In [4]:
from sklearn.ensemble import AdaBoostClassifier

print(AdaBoostClassifier_().fit(x_train, y_train).score(x_test, y_test))  # 0.986013986013986
print(AdaBoostClassifier().fit(x_train, y_train).score(x_test, y_test))  # 0.965034965034965

0.972027972027972
0.951048951048951


In [5]:
# 基尼指数计算
def gini_calculate(groups, class_values):
    gini = 0.0
    D = len(groups[0]) + len(groups[1])
    for class_value in class_values:
        for group in groups:
            size = len(group)
            if size == 0:
                continue
            proportion = [row[-1] for row in group].count(class_value) / float(size)
            gini += float(size) / D * (proportion * (1.0 - proportion))
    return gini


# 找出最优分割特征
def get_split(dataset, n_features):
    class_values = list(set(row[-1] for row in dataset))
    b_index, b_value, b_score, b_groups = 999, 999, 999, None
    features = list()
    while len(features) < n_features:
        index = randrange(len(dataset[0]) - 1)  #这里是随机森林第二个核心思想，随机选取特征列进行决策树构造!!
        if index not in features:
            features.append(index)
    for index in features:
        for row in dataset:
            groups = train_split(index, row[index], dataset)
            gini = gini_calculate(groups, class_values)
            if gini < b_score:
                b_index, b_value, b_score, b_groups = index, row[index], gini, groups
    return {'index': b_index, 'value': b_value, 'groups': b_groups}


# 找到分类最多的标签，作为最终预测标签,这里其实和bagging函数的构造方法类似
def to_terminal(group):
    outcomes = [row[-1] for row in group]
    return max(set(outcomes), key=outcomes.count)


# 递归分类函数构建
def split(node, max_depth, min_size, n_features, depth):
    left, right = node['groups']
    del (node['groups'])
    if not left or not right:
        node['left'] = node['right'] = to_terminal(left + right)
        return
    if depth >= max_depth:  #若分类还未结束，则选取数据中分类标签较多的作为结果，防止过拟合
        node['left'], node['right'] = to_terminal(left), to_terminal(right)
        return
    if len(left) <= min_size:
        node['left'] = to_terminal(left)
    else:
        node['left'] = get_split(left, n_features)
        split(node['left'], max_depth, min_size, n_features, depth + 1)
    if len(right) <= min_size:
        node['right'] = to_terminal(right)
    else:
        node['right'] = get_split(right, n_features)
        split(node['right'], max_depth, min_size, n_features, depth + 1)


# 构建决策树函数
def build_tree(train, max_depth, min_size, n_features):
    # 返回最优列和相关的信息
    root = get_split(train, n_features)
    split(root, max_depth, min_size, n_features, 1)
    return root

In [None]:
from random import randrange


# 决策树预测函数predict构建
def predict(node, row):
    if row[node['index']] < node['value']:
        if isinstance(node['left'], dict):
            return predict(node['left'], row)
        else:
            return node['left']
    else:
        if isinstance(node['right'], dict):
            return predict(node['right'], row)
        else:
            return node['right']


# bagging套袋法构建，这里是随机森林算法核心思想!!
def bagging(trees, row):
    predictions = [predict(tree, row) for tree in trees]
    return max(set(predictions), key=predictions.count)  #这里选取每个决策树预测最多的那个分类结果作为最终预测结果


# 决策树采样函数构建
def subsample(dataset, ratio):
    sample = list()
    n_sample = round(len(dataset) * ratio)
    while len(sample) < n_sample:
        index = randrange(len(dataset))  # 有放回的随机采样
        sample.append(dataset[index])
    return sample


# 随机森林模型构建
def random_forest(train, test, max_depth, min_size, sample_size, n_trees, n_features):
    trees = list()
    for i in range(n_trees):
        sample = subsample(train, sample_size)
        tree = build_tree(sample, max_depth, min_size, n_features)  # 创建一个决策树
        trees.append(tree)
    predictions = [bagging(trees, row) for row in test]  # bagging预测test
    return predictions


# 评估函数构造，用ACC作为评价指标
def acc(actual, predicted):
    correct = 0
    for i in range(len(actual)):
        if actual[i] == predicted[i]:
            correct += 1
    return correct / float(len(actual)) * 100.0

