In [1]:
import numpy as np
import time
from sklearn.metrics import accuracy_score

In [2]:
class Tree:
    def __init__(self, max_depth = 3, min_samples = 1, min_child_weight = 1, lambda_ = 0, gamma = 0):
        self.max_depth = max_depth
        self.min_samples = min_samples
        self.min_child_weight = min_child_weight
        self.lambda_ = lambda_
        self.gamma = gamma
        self.tree = {}
        self.fbs_time = 0

    def similarity(self, residual, probs):
        nu = np.sum(residual) ** 2
        de = np.sum(probs * (1 - probs)) + self.lambda_
        return nu / de

    def compute_output(self, residual, probs):
        nu = np.sum(residual)
        de = np.sum(probs * (1 - probs)) + self.lambda_
        return nu / de

    def cover(self, probs):
        return np.sum(probs * (1 - probs))

    def split_data(self, X, feature_idx, split_value):
        left_idx = X[:, feature_idx] <= split_value
        right_idx = X[:, feature_idx] > split_value
        return left_idx, right_idx

    def find_best_split(self, X, residual, probs):
        best_gain = -np.inf
        best_split_feature_idx = None
        best_split_value = None

        for feature_idx in range(X.shape[1]):
            list_values = X[:, feature_idx]
            list_unique = np.unique(list_values)

            for i in range(len(list_unique) - 1):
                value = (list_unique[i] + list_unique[i + 1]) / 2

                left_idx, right_idx = self.split_data(X, feature_idx, value)
                p_left = probs[left_idx]
                p_right = probs[right_idx]

                if (len(left_idx) < self.min_samples or len(right_idx) < self.min_samples
                    or self.cover(p_left) < self.min_child_weight or self.cover(p_right) < self.min_child_weight):
                    continue

                r_left = residual[left_idx]
                r_right = residual[right_idx]

                gain = self.similarity(r_left, p_left) + self.similarity(r_right, p_right) - self.similarity(residual, probs)

                if gain > best_gain:
                    best_gain = gain
                    best_split_feature_idx = feature_idx
                    best_split_value = value

        if(best_gain - self.gamma < 0):
            best_split_feature_idx = None
            best_split_value = None

        return best_split_feature_idx, best_split_value

    def build_tree(self, X, residual, probs, depth):
        if depth >= self.max_depth or len(X) <= self.min_samples:
            return self.compute_output(residual, probs)

        start = time.time()
        split_feature_idx, split_value = self.find_best_split(X, residual, probs)
        end = time.time()
        self.fbs_time += (end - start)

        if split_feature_idx is None:
            return self.compute_output(residual, probs)

        left_idx, right_idx = self.split_data(X, split_feature_idx, split_value)
        left = self.build_tree(X[left_idx], residual[left_idx], probs[left_idx], depth + 1)
        right = self.build_tree(X[right_idx], residual[right_idx], probs[right_idx], depth + 1)

        self.tree = {
            'split_feature_idx': split_feature_idx,
            'split_value': split_value,
            'left_child': left,
            'right_child': right
        }
        return self.tree

    def get_output(self, x, tree):
        if isinstance(tree, dict):
            split_feature_idx = tree['split_feature_idx']
            split_value = tree['split_value']
            if x[split_feature_idx] <= split_value:
                return self.get_output(x, tree['left_child'])
            else:
                return self.get_output(x, tree['right_child'])
        else:
            return tree

    def fit(self, X, residual, probs):
        depth = 0
        self.tree = self.build_tree(X, residual, probs, depth)

    def predict(self, X):
        return np.array([self.get_output(x, self.tree) for x in X])

In [3]:
class XGBoost:
    def __init__(self, n_estimators, lr, lambda_ = 1e-7, gamma = 0, min_child_weight = 1, max_depth = 3):
        self.n_estimators = n_estimators
        self.lr = lr
        self.initial_pred = 0.5
        self.lambda_ = lambda_
        self.min_child_weight = min_child_weight
        self.max_depth = max_depth
        self.gamma = gamma
        self.models = []
        self.fbs_time = 0
        self.logodds_time = 0
        self.residual_time = 0
        self.logodds_predict_time = 0
        self.pred_time = 0

    def compute_logodds(self, p):
        return np.log(p / (1 - p))

    def residual(self, y_true, y_pred):
        return (y_true - y_pred)

    def fit(self, X, y):
        p = np.full(len(y), self.initial_pred)

        for _ in range(self.n_estimators):
            probs = np.copy(p)
            start = time.time()
            residual = self.residual(y, p)
            end = time.time()
            self.residual_time += (end - start)

            model = Tree(lambda_ = self.lambda_, gamma = self.gamma, max_depth = self.max_depth, min_child_weight = self.min_child_weight)
            model.fit(X, residual, probs)
            self.fbs_time += model.fbs_time

            start = time.time()
            log_odds = self.compute_logodds(p)
            end = time.time()
            self.logodds_time += (end - start)

            start = time.time()
            logodds_p = log_odds + self.lr * model.predict(X)
            end = time.time()
            self.logodds_predict_time += (end - start)

            start = time.time()
            p = np.exp(logodds_p) / (1 + np.exp(logodds_p))
            end = time.time()
            self.pred_time += (end - start)

            self.models.append(model)

    def predict_proba(self, X):
        pred = np.full(len(X), self.initial_pred)
        for model in self.models:
            logodds_p = self.compute_logodds(pred) + self.lr * model.predict(X)
            pred = np.exp(logodds_p) / (1 + np.exp(logodds_p))
        return pred

In [19]:
train = np.load('train_data.npz', allow_pickle = True)
X_train = train['data']
y_train = train['label']

test = np.load('test_data.npz', allow_pickle = True)
X_test = test['data']
y_test = test['label']

minimal_X_train = X_train[:3000]
minimal_y_train = y_train[:3000]
minimal_X_test = X_test[:600]
minimal_y_test = y_test[:600]

In [20]:
xgb_model = XGBoost(3, 0.3)
xgb_model.fit(minimal_X_train, (minimal_y_train == 0).astype(int))

y_prob_pred = xgb_model.predict_proba(minimal_X_test)
accuracy_score((minimal_y_test == 0).astype(int), (y_prob_pred > 0.5).astype(int))

0.9566666666666667

In [None]:
''' Đọc data
train = np.load('<tên file>.npz',allow_pickle=True)
X_train = train['data']
y_train = train['label']

test = np.load('<tên file>.npz',allow_pickle=True)
X_test = test['data']
y_test = test['label']
'''

In [None]:
''' Mã giả của MultiClassifier
class Multi:
    __init__():
        self.models = []
        self.time = 0

    fit():
        for y_i in labels:
            y_2labels = (y == y_i).astype(int)
            model = XGBoost(...)
            model.fit(X, y_2labels)
            self.models.append(model)
            Tính thời gian + lưu thời gian

    predict():
        preds = []
        for model in self.models:
            preds.append(model.predict_proba(X_test))
        y_pred = np.argmax(preds, axis = 0)

        return y_pred

    show_time():  -> để show ra hàm nào mất thời gian nhất & cần song song -> tùy ý
        ....
'''

In [24]:
class MultiClassifier:
  def __init__(self):
    self.models = []
    self.training_time = 0

  def fit(self, X, y):
    start_time = time.time()
    for label in np.unique(y):
      binary_labels = (y == label).astype(int)
      model = XGBoost(3, 0.3)
      model.fit(X, binary_labels)
      self.models.append(model)
    end_time = time.time()
    self.training_time += (end_time - start_time)

  def predict(self, X):
    preds = []
    for model in self.models:
      preds.append(model.predict_proba(X))

    return np.argmax(preds, axis = 0)


In [25]:
train_3labels = np.load('train_data_3labels.npz', allow_pickle = True)
X_train_3labels = train_3labels['data']
y_train_3labels = train_3labels['label']

test_3labels = np.load('test_data_3labels.npz', allow_pickle = True)
X_test_3labels = test_3labels['data']
y_test_3labels = test_3labels['label']

minimal_X_train = X_train[:3000]
minimal_y_train = y_train[:3000]
minimal_X_test = X_test[:600]
minimal_y_test = y_test[:600]


In [26]:
multi_classifier = MultiClassifier()
multi_classifier.fit(X_train_3labels, y_train_3labels)

y_pred = multi_classifier.predict(X_test_3labels)
accuracy_score(y_test_3labels, y_pred)

0.9566666666666667