In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import loadmat
from scipy.sparse import coo_matrix
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import copy
import pandas as pd
from tqdm import tqdm
from typing import Counter
import math
from sklearn.decomposition import PCA
from sklearn.metrics import f1_score, recall_score, precision_score
import sys
import time


def load_data():
    data = loadmat("data/mnist_all.mat")

    # print(data.keys())

    train_data = pd.DataFrame()
    test_data = pd.DataFrame()

    for i in range(10):
        temp_df = pd.DataFrame(data["train" + str(i)])
        temp_df['label'] = i
        train_data = train_data.append(temp_df)
        temp_df = pd.DataFrame(data["test" + str(i)])
        temp_df['label'] = i
        test_data = test_data.append(temp_df)

    train_data = shuffle(train_data)
    test_data = shuffle(test_data)

    train_labels = np.array(train_data['label'])
    test_labels = np.array(test_data['label'])

    train_data = train_data.drop('label', axis=1)
    test_data = test_data.drop('label', axis=1)
    
    train_data = np.array(train_data) / 255
    test_data = np.array(test_data) / 255
    
    pca = PCA(0.95)
    pca.fit(train_data)
    train_data = pca.transform(train_data)
    test_data = pca.transform(test_data)

    return train_data, test_data, train_labels, test_labels


X_train, X_test, y_train, y_test = load_data()

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train, y_train, test_size=0.2, random_state=0)

In [2]:
import warnings
warnings.filterwarnings("ignore", category=Warning)

In [3]:
# 注意，经过PCA降维后，认为所有的特征都是连续值
X = np.concatenate((X_train, X_valid), axis=0)
y = np.concatenate((y_train, y_valid), axis=0)

In [4]:
print(X.shape, y.shape)

(60000, 154) (60000,)


In [5]:
from xgboost.sklearn import XGBClassifier
standard_xgboost = XGBClassifier(n_estimators=2, learning_rate=0.1, max_depth=3)
standard_xgboost.fit(X[:100], y[:100])

XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
              colsample_bynode=1, colsample_bytree=1, gamma=0, gpu_id=-1,
              importance_type='gain', interaction_constraints='',
              learning_rate=0.1, max_delta_step=0, max_depth=3,
              min_child_weight=1, missing=nan, monotone_constraints='()',
              n_estimators=2, n_jobs=0, num_parallel_tree=1,
              objective='multi:softprob', random_state=0, reg_alpha=0,
              reg_lambda=1, scale_pos_weight=None, subsample=1,
              tree_method='exact', validate_parameters=1, verbosity=None)

In [6]:
predict_standard = standard_xgboost.predict(X_test)
print('standard xgboost test acc: {}'.format((sum(predict_standard == np.array(y_test)))/len(X_test)))

standard xgboost test acc: 0.4237


In [7]:
# 未使用min_split_loss进行预剪枝
class XGBoostRegressionTree:
    def __init__(self, max_depth=6, 
                 min_child_weight=1, reg_lambda=1, reg_alpha=0):
        self.max_depth = max_depth
        self.min_child_weight = min_child_weight
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        
        
    def fit(self, x, grad, hess):
        self.tree = self.build_tree(x, grad, hess, 0)
    
    
    def build_tree(self, x, grad, hess, depth):
        
        # 当前节点是叶子节点
        if depth >= self.max_depth:
            return self.get_leaf_value(grad, hess)
        
        best_feature_index, best_threshold = self.choose_best_feature_to_split(x, grad, hess)
        
        # 当前节点是叶子节点
        if best_feature_index is None or best_threshold is None:
            return self.get_leaf_value(grad, hess)
                
        left_mask = x[:, best_feature_index] <= best_threshold
        right_mask = x[:, best_feature_index] > best_threshold
        
        tree = {}
        
        tree[(best_feature_index, best_threshold, '<=')] = self.build_tree(x[left_mask], grad[left_mask], hess[left_mask], depth + 1)
        tree[(best_feature_index, best_threshold, '>')] = self.build_tree(x[right_mask], grad[right_mask], hess[right_mask], depth + 1)
        
        return tree
    
    
    def get_leaf_value(self, grad, hess):
        G = np.sum(grad)
        H = np.sum(hess)
        
        return -G/(H + self.reg_lambda)

        
    def predict_value(self, x):
        tree = self.tree
        
        while type(tree).__name__ == 'dict':
            
            for key in tree.keys():
                
                if key[2] == '<=':
                    key1 = key
                elif key[2] == '>':
                    key2 = key
            
            feature_index = key1[0]
            threshold = key1[1]

            
            if x[feature_index] <= threshold:
                tree = tree[key1]
            elif x[feature_index] > threshold:
                tree = tree[key2]
        
        if type(tree).__name__ != 'dict':
            return tree
        else:
            pass
    
    
    def predict(self, X):
        return np.array([self.predict_value(x) for x in X])
    
    
    def calculate_gain(self, grad, hess, mask=None):
        if mask is None:
            G = np.sum(grad)
            G *= G
            H = np.sum(hess)
        else:
            G = np.sum(grad[mask])
            G *= G
            H = np.sum(hess[mask])
        return G/(H + self.reg_lambda)
    
    
    def choose_best_feature_to_split(self, x, grad, hess):
        n_features = x.shape[1]
        
        total_gain = self.calculate_gain(grad, hess)
        
        best_gain = -sys.maxsize
        best_feature_index = None
        best_threshold = None
        
        for feature_index in range(n_features):
            values = np.array(list(set(x[:, feature_index])))
            
            for value in values:
                left_mask = x[:, feature_index]<=value
                right_mask = x[:, feature_index]>value
                
                left_gain = self.calculate_gain(grad, hess, left_mask)
                right_gain = self.calculate_gain(grad, hess, right_mask)
                
                gain = (left_gain + right_gain - total_gain)/2 - self.reg_alpha
                
                if gain < 0 or self.not_valid_split(hess, left_mask) or self.not_valid_split(hess, right_mask):
                    continue
                
                if gain > best_gain:
                    best_gain = gain
                    best_feature_index = feature_index
                    best_threshold = value
               
        # todo        
        if best_threshold is None or best_feature_index is None:
            pass
        
        return best_feature_index, best_threshold
    
    
    def not_valid_split(self, hess, mask):
        if np.sum(hess[mask]) < self.min_child_weight:
            return True
        
        return False

In [8]:
# 未使用col_sample_ratio和row_sample_ratio 因为和随机森林的思想类似，很容易理解
class XGBoost:
    def __init__(self, n_estimators, n_classes, learning_rate=0.3, row_sample_ratio=1, col_sample_ratio=1,
                 reg_lambda=1, reg_alpha=0, max_depth=6):
        self.row_sample_ratio = row_sample_ratio
        self.col_sample_ratio = col_sample_ratio
        self.reg_lambda = reg_lambda
        self.reg_alpha = reg_alpha
        self.max_depth = max_depth
        self.n_estimators = n_estimators
        self.n_classes = n_classes
        self.learning_rate = learning_rate
        
        self.trees = {}
    
    
    def fit(self, x, y):
        
        y = np.array([self.to_one_hot(_y, self.n_classes) for _y in y])
        
        y_predict = np.full(y.shape, 0.5)
        
        for i in tqdm(range(self.n_estimators)):
            self.trees[i] = {}
            
            grad = np.array([self.grad(y[j], y_predict[j]) for j in range(len(y))])
            hess = np.array([self.hess(y_predict[j]) for j in range(len(y_predict))])
            
            for c in range(self.n_classes):           
                
                tree = XGBoostRegressionTree(max_depth=self.max_depth, 
                                             min_child_weight=1,
                                             reg_lambda=self.reg_lambda,
                                             reg_alpha=self.reg_alpha)
                
                tree.fit(x, grad[:, c], hess[:, c])
                
                self.trees[i][c] = tree
                
                # update y_predict
                # 这个 y_predict 就是 GBDT 中的 F
                for j in range(len(x)):
                    y_predict[j][c] += self.learning_rate * self.trees[i][c].predict_value(x[j])
    
    
    def predict_value(self, x):
        p = [0]*self.n_classes
        for m in self.trees:
            for k in range(self.n_classes):
                p[k] += self.learning_rate * self.trees[m][k].predict_value(x)
        
        return np.argmax(self.softmax(p))
    
    
    def predict(self, X):
        return np.array([self.predict_value(x) for x in X])
    
    
    # get first order gradient for single sample
    # https://zhuanlan.zhihu.com/p/149419189
    # 参考上面这篇文章，求一阶导时，i不等于j 和 i等于j 时 两者的形式虽然不同，
    # 但是真实标签是one-hot（只有一个1），所以恰好可以表示为 y_probability - y
    # 或许也可以直接由sigmoid的导数直接强推过来
    def grad(self, y, y_predict):
        y_probability = self.softmax(y_predict)
        return y_probability - y
    
    
    # get second order gradient for single sample
    # 注意 y_predict 和 y_probability 的关系，可以理解为softmax的过程在交叉熵损失函数里面
    # 这样交叉熵损失函数对y_predict求导就要涉及到softmax求导
    def hess(self, y_predict):
        y_probability = self.softmax(y_predict)
        return y_probability * (1 - y_probability)
    
    
    # convert label y to one-hot vector for single sample
    def to_one_hot(self, y, n_classes):
        one_hot = np.zeros(n_classes)
        one_hot[y] = 1
        
        return one_hot
    
    
    # convert y_predict to softmax probability for single sample
    def softmax(self, a):
        c = np.max(a)
        exp_a = np.exp(a - c)
        sum_exp_a = np.sum(exp_a)
        p = exp_a / sum_exp_a
        return p

In [9]:
custom_xgboost = XGBoost(n_estimators=2, learning_rate=0.1, n_classes=10, max_depth=3)
print('start training custom xgboost...')
start = time.time()
custom_xgboost.fit(X[:100], y[:100])
end = time.time()
print('finish training... time cost: {}s'.format(end-start))

  0%|                                                                                            | 0/2 [00:00<?, ?it/s]

start training custom xgboost...


100%|████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:39<00:00, 19.72s/it]

finish training... time cost: 39.445207834243774s





In [10]:
predict_custom = custom_xgboost.predict(X_test)

In [11]:
print('custom xgboost test acc: {}'.format((sum(predict_custom == np.array(y_test)))/len(X_test)))

custom xgboost test acc: 0.4024
