# 梯度提升

回顾一下回归提升树通过前向分步算法求解的过程：
$$
\begin{array}{l}{f_{0}(x)=0} \\ {f_{m}(x)=f_{m-1}(x)+T\left(x ; \Theta_{m}\right), \quad m=1,2, \cdots, M} \\ {f_{M}(x)=\sum_{m=1}^{M} T\left(x ; \Theta_{m}\right)}\end{array}
$$
第 $m$ 步，给定模型 $f(m-1)$ ，求解经验风险极小化
$$
\hat{\Theta}_{m}=\arg \min _{\Theta_{m}} \sum_{i=1}^{N} L\left(y_{i}, f_{m-1}\left(x_{i}\right)+T\left(x_{i} ; \Theta_{m}\right)\right)
$$
以此得到第 $m$ 棵树的参数，而当选用平方损失函数 $L(y, f(x))=(y-f(x))^{2}$ 时，有
$$
\begin{aligned} L\left(y, f_{m-1}(x)+T\left(x ; \Theta_{m}\right)\right) &=\left[y-f_{m-1}(x)-T\left(x ; \Theta_{m}\right)\right]^{2} \\ &=\left[r-T\left(x ; \Theta_{m}\right)\right]^{2} \end{aligned}
$$
其中 $r = y- f_{m-1}(x)$，可见回归树只需要拟合残差使得 $r$ 极小即可，很简单也很容易优化，但对于一般损失函数就不太容易了。

对此人们提出了梯度提升（gradient boosting)的概念，关键是在于将当前模型损失函数的负梯度值作为回归提升树中残差的近似值，拟合出一棵回归树。

## 与梯度下降的比较

一般的优化思路是 $L(y,f(x))--f = g(x;w)——L(y,f(x);w)$，由损失和模型的关系到损失和模型参数的关系，所以最后结果是求解参数的负梯度更新参数，这也就是梯度下降的思想。

而梯度上升实质上只有前半部分 $L(y,f(x))--L(y,f(x);f)$ ，不再从参数空间中寻找，而是从函数空间中搜索，降低了优化门槛，极大拓展了模型选择空间。当然依旧是依靠负梯度进行更新。

## GBDT 算法描述

输入：训练数据集 $T=\left\{\left(x_{1}, y_{1}\right),\left(x_{2}, y_{2}\right), \cdots,\left(x_{N}, y_{N}\right)\right\}$，$x_{i} \in \mathcal{X} \subseteq \mathbf{R}^{n}, y_{i} \in \mathcal{Y} \subseteq \mathbf{R}$；损失函数 $L(y,f(x))$

输出：回归树 $\hat{f}(x)$

1. 初始化 $f_{0}(x)=\arg \min _{c} \sum_{i=1}^{N} L\left(y_{i}, c\right)$

2. 对 $m=1,2,\ldots,M$

   1. 对 $i = 1,2,\ldots,N$，计算

   $$
   r_{m i}=-\left[\frac{\partial L\left(y_{i}, f\left(x_{i}\right)\right)}{\partial f\left(x_{i}\right)}\right]_{f(x)=f_{m-1}(x)}
   $$

   2. 对 $r_{mi}$ 拟合一个回归树，得到第 $m$ 棵树的结点区域 $R_{m j}, j=1,2, \cdots, J$。
   3. 对 $j=1,2, \cdots, J$，计算 $c_{m j}=\arg \min _{c} \sum_{x_{i} \in R_{m j}} L\left(y_{i}, f_{m-1}\left(x_{i}\right)+c\right)$
   4. 更新 $f_{m}(x)=f_{m-1}(x)+\sum_{j=1}^{J} c_{m j} I\left(x \in R_{m j}\right)$

3. 得到回归树

$$
\hat{f}(x)=f_{M}(x)=\sum_{m=1}^{M} \sum_{j=1}^{J} c_{m j} I\left(x \in R_{m j}\right)
$$

对于2.1步，对于平方损失函数 $L\left(y, f_{m-1}(x)\right)=\frac{1}{2}\left(y-f_{m-1}(x)\right)^{2}$，得到的 $r_m$ 就是残差，对于一般损失函数则是残差的近似

## 算法实现

**导入相关库**

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline

**硬件与版本信息**

In [2]:
%load_ext watermark
%watermark -v -m -p ipywidgets,matplotlib,numpy,pandas,sklearn

CPython 3.7.3
IPython 7.6.1

ipywidgets 7.5.0
matplotlib 3.1.0
numpy 1.16.4
pandas 0.24.2
sklearn 0.21.2

compiler   : MSC v.1915 64 bit (AMD64)
system     : Windows
release    : 10
machine    : AMD64
processor  : Intel64 Family 6 Model 60 Stepping 3, GenuineIntel
CPU cores  : 4
interpreter: 64bit


**导入回归树**

GBDT 的实现依赖于回归树，这里使用[决策树](https://libertydream.github.io/statistical_learning_method/notebook/5.decision_tree.html)中实现过的回归树

In [3]:
class DNode:
    """叶结点或内部结点"""
    
    def __init__(self, feature_i=None, threshold=None, value=None,
                 yes_subtree=None, no_subtree=None):
        self.feature_i = feature_i
        self.threshold = threshold
        self.value = value
        self.yes_subtree = yes_subtree
        self.no_subtree = no_subtree   

In [4]:
def divide_on_feature(X, feature_i, threshold):
    '''对选定特征进一步分类，获取独热数据集'''
    split_func = None
    if isinstance(threshold, int) or isinstance(threshold, float):
        split_func = lambda sample:sample[feature_i] >= threshold
    else:
        split_func = lambda sample:sample[feature_i] == threshold
    
    X_1 = np.array([sample for sample in X if split_func(sample)])
    X_2 = np.array([sample for sample in X if not split_func(sample)])
    
    return np.array([X_1, X_2])

In [5]:
class DecisionTree(object):
    """回归树和分类树的父类"""
    
    def __init__(self, min_split_num=2, min_impurity=1e-7):
        
        # 决策树根节点
        self.root = None
        
        # 最小切分单位大小，样本数少于该值不进行进一步分割
        self.min_split_num = min_split_num
        
        # 最小增益值，当划分带来的增益小于该值时停止生成
        self.min_impurity = min_impurity
        
        # 计算增益，分类树下计算信息增益，回归树下计算方差缩减程度
        self._cal_impurity = None
        
        # 计算叶子结点给出的预测 y
        self._cal_leaf_val = None
        
        # y 是否经过 one-hot 编码，默认没有(one-dim)
        self.one_dim = None
        
    def fit(self, X_train, y_train):
        self.one_dim = len(np.shape(y_train)) == 1
        self.root = self.__build_tree(X_train, y_train)
    
    def __build_tree(self, X_train, y_train):
        """递归创建一棵决策树
        
        对 X 按特征进行划分，比对不同划分选择下的误差以进行最优分类
        """
        max_impurity = 0
        best_criteria = None  # 最优划分特征
        best_sets = None  # 最优划分形成的子集集合
        
        # 拼接成习惯的训练集形式
        if len(np.shape(y_train)) == 1:
            y_train = np.expand_dims(y_train, axis=1)
        train_set = np.concatenate((X_train, y_train),axis=1)
        
        n_samples, n_features = np.shape(X_train)
        
        if n_samples > self.min_split_num: # 停止条件之一
            
            for feature_i in range(n_features):
                
                # 当前特征可取哪些值
                feature_values = np.expand_dims(X_train[:,feature_i], axis=1)
                unique_values = np.unique(feature_values)
                
                # 计算以当前特征为划分标准时对应的误差
                for threshold in unique_values:
                    
                    X_y1, X_y2 = divide_on_feature(train_set, feature_i, threshold)
                    
                    if len(X_y1) > 0 and len(X_y2) > 0:  # 还有划分的必要
                        y1 = X_y1[:, n_features:]
                        y2 = X_y2[:, n_features:]
                        
                        impurity = self._cal_impurity(y_train, y1, y2)
                        
                        if impurity > max_impurity:
                            max_impurity = impurity
                            best_criteria = {"feature_i":feature_i, "threshold":threshold}
                            best_sets = {
                                "yes_X": X_y1[:, : n_features],
                                "yes_y": X_y1[:, n_features : ],
                                "no_X": X_y2[:, : n_features],
                                "no_y": X_y2[:, n_features : ]
                            }
                            
                # 还值得继续生成树
                if max_impurity > self.min_impurity:
                
                    yes_subtree = self.__build_tree(best_sets["yes_X"],best_sets["yes_y"])
                    no_subtree = self.__build_tree(best_sets["no_X"],best_sets["no_y"])
                    return DNode(feature_i=best_criteria["feature_i"], threshold=best_criteria["threshold"],
                            yes_subtree=yes_subtree, no_subtree=no_subtree)
        
        # 停止划分，已经成为叶子结点了，计算此时的预测输出
        leaf_value = self._cal_leaf_val(y_train)
            
        return DNode(value=leaf_value)
    
    def predict_value(self, x, tree=None):
        '''树tree对样本x的预测,递归实现'''
        
        if tree is None:
            tree = self.root
        
        # 抵达叶子结点，给出预测
        if tree.value is not None:
            return tree.value
        
        # 还在内部结点，选择前进方向
        feature_value = x[tree.feature_i]
        goto = tree.yes_subtree
        if isinstance(feature_value, int) or isinstance(feature_value, float):
            if feature_value < tree.threshold:
                goto = tree.no_subtree
        elif feature_value != tree.threshold:
            goto = tree.no_subtree
        
        return self.predict_value(x, goto)
    
    def predict(self, X_test):
        y_pred = [self.predict_value(sample) for sample in X_test]
        return y_pred

In [6]:
def cal_var(X):
    mean = np.ones(np.shape(X)) * X.mean(0)
    n_samples = np.shape(X)[0]
    var = (1 / n_samples) * np.diag((X-mean)).T.dot(X - mean)
    
    return var

In [7]:
class RegressDT(DecisionTree):
    
    def __cal_var_reduct(self, y_train, y1, y2):
        
        var_tot = cal_var(y_train)
        var_1 = cal_var(y1)
        var_2 = cal_var(y2)
        
        frac_1 = len(y1) / len(y_train)
        frac_2 = len(y2) / len(y_train)
        
        var_reduct = var_tot - (frac_1 * var_1 + frac_2 * var_2)
        
        return sum(var_reduct)
    
    def __mean(self, y):
        value = np.mean(y, axis=0)
        return value if len(value) > 1 else value[0]
    
    def fit(self, X_train, y_train):
        self._cal_impurity = self.__cal_var_reduct
        self._cal_leaf_val = self.__mean
        super(RegressDT, self).fit(X_train, y_train)

**辅助类设计**

GBDT 本身是回归树，但是输出的连续值既可以用于连续值预测，也可以用于分类。两类任务对应于平方损失和对数损失

In [8]:
class SquareLoss(object):
    def __init__(self): pass
    
    def loss(self, y_true, y_pred):
        return 0.5 * np.power((y_true - y_pred),2)
    
    def gradient(self, y_true, y_pred):
        return -(y_true - y_pred)

In [9]:
class CrossEntropy(object):
    def __init__(self): pass
    
    def loss(self, y_true, p_pred):
        
        # 防止除 0
        p_pred = np.clip(p_pred, 1e-10, 1 - 1e-10)
        
        return -y_true * np.log(p_pred) - (1-y_true)*np.log(1- p_pred)
    
    def gradient(self, y_true, p_pred):
        
        p_pred = np.clip(p_pred, 1e-10, 1 - 1e-10)
        
        return -(y_true / p_pred) + (1 - y_true) / (1 - p_pred)

**GBDT**

In [10]:
class GBDT(object):
    '''梯度提升分类树和梯度提升回归树的父类
    
    n_estimators:
        学习器数量
    learning_rate:
        每一轮迭代步长
    min_impurity:
        收益阈值，低于该值时停止学习
    min_split_num:
        当叶子结点内样本数低于该值，停止学习
    regression:
        判断当前是回归还是分类问题    
    '''
    def __init__(self, n_estimators=10, learning_rate=0.3, 
                 min_split_num=2, min_impurity=1e-5, regression=True):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.min_split_num = min_split_num
        self.min_impurity = min_impurity
        self.regression = regression
        
        # 选择损失函数
        self.loss = SquareLoss()
        if not regression:
            self.loss = CrossEntropy()
        
        # 初始化学习器
        self.trees = []
        for _ in range(n_estimators):
            tree = RegressDT(min_split_num = self.min_split_num, 
                             min_impurity = self.min_impurity)
            self.trees.append(tree)
    
    def fit(self, X_train, y_train):
        
        # 初始化预测值为一常数
        y_pred = np.full(y_train.shape, np.mean(y, axis=0))
        
        for i in range(self.n_estimators):
            gradient = self.loss.gradient(y_train, y_pred)
            self.trees[i].fit(X_train, gradient)
            gain = self.trees[i].predict(X_train)
            
            # 负梯度更新
            y_pred -= np.multiply(self.learning_rate, gain)
    
    def predict(self, X_test):
        
        y_pred = np.array([])
        
        for tree in self.trees:
            
            gain = tree.predict(X_test)
            gian = np.multiply(self.learning_rate, gain)
            y_pred -= -gain if not y_pred.any() else y_pred - gain
    
        if not self.regression:
            
            y_pred = np.exp(y_pred) / np.expand_dims(np.sum(np.exp(y_pred), axis=1), axis=1)
            
            y_pred = np.argmax(y_pred, axis=1)
        
        return y_pred

In [11]:
class GBDTRegressor(GBDT):
    def __init__(self, n_estimators = 25, learning_rate = 0.5, min_split_num = 2,
                min_var_red = 1e-7):
        super(GBDTRegressor, self).__init__(n_estimators=n_estimators,
                                            learning_rate=learning_rate,
                                           min_impurit = min_var_red,
                                            min_split_num = min_split_num,
                                            regression=True)

In [12]:
def to_categorical(x, n_col = None):
    '''独热编码'''
    if not n_col:
        n_col = np.amax(x) + 1
    one_hot = np.zeros((x.shape[0], n_col))
    one_hot[np.arange(x.shape[0]), x] = 1
    return one_hot

class GBDTClassifier(GBDT):
    def __init__(self, n_estimators = 25, learning_rate = 0.5, min_split_num = 2,
                min_info_gain = 1e-7):
        super(GBDTClassifier, self).__init__(n_estimators = n_estimators,
                                            learning_rate = learning_rate,
                                            min_split_num = min_split_num,
                                            min_impurity = min_info_gain,
                                            regression = False)
    def fit(self, X_train, y_train):
        y_train = to_categorical(y_train)
        super(GBDTClassifier, self).fit(X_train, y_train)

---

作者：Daniel Meng

GitHub: [LibertyDream](https://github.com/LibertyDream)

博客：[明月轩](https://libertydream.github.io/)