# 勾配ブースティング木アルゴリズム

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from sklearn import datasets
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from mh_tree_algorithm import DecisionTreeMH

1. foは、最初の決定木モデル（通常の決定木と同様の方法で生成する）

2. m = 1, 2, …, Mとして以下を繰り返す
   -  サンプルiごとに損失関数の勾配の計算
   - 損失関数の勾配をy(多分目的変数)として、決定木モデルを構築
   - 決定木モデルの葉ノードをRmj( jは、葉ノードの番号)とする
   - 葉ノードごとに、以下を最小化するmjを計算
      - 計算式は、資料を参照する。
   - 以下のようにモデルfmを計算
     - fm( xi) = fm( xi) + xiが含まれるRmjにおけるmj

## テストデータ生成用関数

In [3]:
def generate_data(N=1000, test_size=0.2, seed=123):
    np.random.seed(seed)
    X = np.linspace(0, 2 * np.pi, N)
    X = X.reshape(-1, 1)
    y = 10 * np.sin(X[:, 0]) + np.random.standard_normal(N)
     
    return train_test_split(X, y, test_size=test_size, random_state=seed)

## 決定木
GBDTを構成する弱学習器である決定木あるいは回帰木は、情報利得が最大となる特徴でデータを再帰的に分割するアルゴリズムである。

通常は、二分決定木となる。分割条件は、分類問題の場合はエントロピー、ジニ不純度、分類誤差などで、回帰問題の場合は、MSE(mean squared error)、LAD(least absolute deviation)などがある。

### Treeクラス

In [4]:
class Tree(object):
    def __init__(self, pre_pruning=False, max_depth=6):
        self.feature = None
        self.label = None
        self.n_samples = None
        self.gain = None
        self.left = None
        self.right = None
        self.threshold = None
        self.pre_pruning = pre_pruning
        self.max_depth = max_depth
        self.depth = 0
 
    def build(self, features, target, criterion='gini'):
        self.n_samples = features.shape[0]
 
        if len(np.unique(target)) == 1:
            self.label = target[0]
            return
 
        best_gain = 0.0
        best_feature = None
        best_threshold = None
 
        if criterion in {'gini', 'entropy', 'error'}:
            self.label = max(target, key=lambda c: len(target[target==c]))
        else:
            self.label = np.mean(target)
 
        impurity_node = self._calc_impurity(criterion, target)
 
        for col in range(features.shape[1]):
            feature_level = np.unique(features[:,col])
            thresholds = (feature_level[:-1] + feature_level[1:]) / 2.0
 
            for threshold in thresholds:
                target_l = target[features[:,col] <= threshold]
                impurity_l = self._calc_impurity(criterion, target_l)
                n_l = target_l.shape[0] / self.n_samples
 
                target_r = target[features[:,col] > threshold]
                impurity_r = self._calc_impurity(criterion, target_r)
                n_r = target_r.shape[0] / self.n_samples
 
                ig = impurity_node - (n_l * impurity_l + n_r * impurity_r)
 
                if ig > best_gain or best_threshold is None or best_feature is None:
                    best_gain = ig
                    best_feature = col
                    best_threshold = threshold
 
        self.feature = best_feature
        self.gain = best_gain
        self.threshold = best_threshold
        if self.pre_pruning is False or self.depth < self.max_depth:
            self._divide_tree(features, target, criterion)
        else:
            self.feature = None
 
    def _divide_tree(self, features, target, criterion):
        features_l = features[features[:, self.feature] <= self.threshold]
        target_l = target[features[:, self.feature] <= self.threshold]
        self.left = Tree(self.pre_pruning, self.max_depth)
        self.left.depth = self.depth + 1
        self.left.build(features_l, target_l, criterion)
 
        features_r = features[features[:, self.feature] > self.threshold]
        target_r = target[features[:, self.feature] > self.threshold]
        self.right = Tree(self.pre_pruning, self.max_depth)
        self.right.depth = self.depth + 1
        self.right.build(features_r, target_r, criterion)
 
 
    def _calc_impurity(self, criterion, target):
        c = np.unique(target)
        s = target.shape[0]
 
        if criterion == 'gini':
            return self._gini(target, c, s)
        elif criterion == 'entropy':
            return self._entropy(target, c, s)
        elif criterion == 'error':
            return self._classification_error(target, c, s)
        elif criterion == 'mse':
            return self._mse(target)
        else:
            return self._gini(target, c, s)
 
    def _gini(self, target, classes, n_samples):
        gini_index = 1.0
        gini_index -= sum([(len(target[target==c]) / n_samples) ** 2 for c in classes])
        return gini_index
 
    def _entropy(self, target, classes, n_samples):
        entropy = 0.0
        for c in classes:
            p = len(target[target==c]) / n_samples
            if p > 0.0:
                entropy -= p * np.log2(p)
        return entropy
 
    def _classification_error(self, target, classes, n_samples):
        return 1.0 - max([len(target[target==c]) / n_samples for c in classes])
 
    def _mse(self, target):
        y_hat = np.mean(target)
        return np.square(target - y_hat).mean()
 
    # 決定木の事後剪定
    def prune(self, method, max_depth, min_criterion, n_samples):
        if self.feature is None:
            return
 
        self.left.prune(method, max_depth, min_criterion, n_samples)
        self.right.prune(method, max_depth, min_criterion, n_samples)
 
        pruning = False
 
        if method == 'impurity' and self.left.feature is None and self.right.feature is None: # Leaf
            if (self.gain * self.n_samples / n_samples) < min_criterion:
                pruning = True
        elif method == 'depth' and self.depth >= max_depth:
            pruning = True
 
        if pruning is True:
            self.left = None
            self.right = None
            self.feature = None
 
    def predict(self, d):
        if self.feature is None: # Leaf
            return self.label
        else: # Node
            if d[self.feature] <= self.threshold:
                return self.left.predict(d)
            else:
                return self.right.predict(d)

### 回帰木

In [5]:
class DecisionTreeRegressor(object):
    def __init__(self, criterion='mse', pre_pruning=False, pruning_method='depth', max_depth=3, min_criterion=0.05):
        self.root = None
        self.criterion = criterion
        self.pre_pruning = pre_pruning
        self.pruning_method = pruning_method
        self.max_depth = max_depth
        self.min_criterion = min_criterion
 
    def fit(self, features, target):
        self.root = Tree(self.pre_pruning, self.max_depth)
        self.root.build(features, target, self.criterion)
        if self.pre_pruning is False: # post-pruning
            self.root.prune(self.pruning_method, self.max_depth, self.min_criterion, self.root.n_samples)
 
    def predict(self, features):
        return np.array([self.root.predict(f) for f in features])

### 動作確認

In [6]:
X_train, X_test, y_train, y_test = generate_data()
regressor = DecisionTreeRegressor(criterion='mse', pre_pruning=True, pruning_method='depth', max_depth=3)
regressor.fit(X_train, y_train)

In [7]:
def mse(y, pred):
     return np.square(y - pred).mean()

print('MSE of the Train: %.2f, MSE of the Test: %.2f' % (mse(y_train, regressor.predict(X_train)), mse(y_test, regressor.predict(X_test))))

MSE of the Train: 2.92, MSE of the Test: 2.86


## 勾配ブースティング木

In [8]:
X_train, X_test, y_train, y_test = generate_data()
 
M = 10
predictions_history = [np.repeat(y_train.mean(), len(y_train))]
test_predictions_history = [np.repeat(y_test.mean(), len(y_test))]
 
# LS_TreeBoost (Algorithm-2)
for m in range(M):
    y_tilde = y_train - predictions_history[-1]
    base_learner = DecisionTreeRegressor(criterion='mse', pre_pruning=True, pruning_method='depth', max_depth=3)
    base_learner.fit(X_train, y_tilde)
 
    prediction = predictions_history[-1] + base_learner.predict(X_train)
    test_prediction = test_predictions_history[-1] + base_learner.predict(X_test)
 
    train_mse = mse(y_train, prediction)
    test_mse = mse(y_test, test_prediction)
      
    predictions_history.append(prediction)
    test_predictions_history.append(test_prediction)
     
    print("[%d] column: %d, threshold: %f, mse-of-train: %.2f, mse-of-test: %.2f\n" %
                  (m+1, base_learner.root.feature, base_learner.root.threshold,
                   train_mse, test_mse) + "-" * 50)

[1] column: 0, threshold: 3.179330, mse-of-train: 2.92, mse-of-test: 2.84
--------------------------------------------------
[2] column: 0, threshold: 0.210697, mse-of-train: 2.29, mse-of-test: 2.43
--------------------------------------------------
[3] column: 0, threshold: 5.430961, mse-of-train: 1.44, mse-of-test: 1.69
--------------------------------------------------
[4] column: 0, threshold: 0.688697, mse-of-train: 1.20, mse-of-test: 1.50
--------------------------------------------------
[5] column: 0, threshold: 2.638435, mse-of-train: 1.08, mse-of-test: 1.55
--------------------------------------------------
[6] column: 0, threshold: 3.915198, mse-of-train: 0.92, mse-of-test: 1.40
--------------------------------------------------
[7] column: 0, threshold: 6.251738, mse-of-train: 0.89, mse-of-test: 1.41
--------------------------------------------------
[8] column: 0, threshold: 0.015724, mse-of-train: 0.87, mse-of-test: 1.42
--------------------------------------------------
