In [1]:
import numpy as np

In [2]:
class BasicTree:
    
    def __init__(self):
        raise NotImplementedError("Метод требует переопределения в классе наследования")
    
    @staticmethod
    def find_best_split_entropy(X, y):
        """
        Находит лучшее разбиение для вектора признаков X и вектора целевой переменной y, используя критерий энтропии.
        Аргументы:
        - X: вектор numpy с вещественными значениями признаков.
        - y: вектор numpy с дискретными значениями целевой переменной.

        Возвращает:
        - best_feature: индекс признака, по которому было найдено лучшее разбиение.
        - best_threshold: значение порога, по которому было найдено лучшее разбиение.
        - best_gain: значение критерия энтропии для лучшего разбиения.
        """
        def entropy(y):
            """
            Вычисляет энтропию вектора y со значениями дискретных переменных.
            Аргументы:
            - y: вектор numpy с дискретными значениями.
            Возвращает:
            - entropy: значение энтропии типа float.
            """
            # Подсчитываем количество каждого уникального значения в y.
            _, counts = np.unique(y, return_counts=True)
            # Вычисляем вероятность каждого уникального значения.
            probs = counts / len(y)
            # Вычисляем значение энтропии.
            return -np.sum(probs * np.log2(probs))
        
        best_feature, best_threshold, best_gain = None, None, 0
        # Итерируемся по всем признакам.
        for feature in range(X.shape[1]):
            # Находим уникальные значения признака.
            thresholds = np.unique(X[:, feature])
            # Итерируемся по всем возможным пороговым значениям признака.
            for threshold in thresholds:
                # Определяем индексы объектов, которые относятся к левому поддереву и правому поддереву.
                left_indices = X[:, feature] <= threshold
                right_indices = X[:, feature] > threshold
                # Пропускаем текущую итерацию, если не найдены объекты, которые относятся к левому или правому поддереву.
                if len(left_indices) == 0 or len(right_indices) == 0:
                    continue
                # Определяем вектор целевой переменной для объектов, которые относятся к левому и правому поддереву.
                left_y, right_y = y[left_indices], y[right_indices]
                # Вычисляем значение критерия энтропии для текущего разбиения.
                gain = entropy(y) - (len(left_y) / len(y)) * entropy(left_y) \
                                       - (len(right_y) / len(y)) * entropy(right_y)
                # Обновляем значения лучшего разбиения, если найдено разбиение с большим значением
                if gain > best_gain:
                    best_feature, best_threshold, best_gain = feature, threshold, gain
        return best_feature, best_threshold, best_gain
    
        
    @staticmethod   
    def find_best_split_mse(X, y):
        """
        Находит лучшее разбиение для вектора признаков X и вектора целевой переменной y, 
        используя критерий среднеквадратичной ошибки (MSE).

        Аргументы:
        - X: вектор numpy с вещественными значениями признаков.
        - y: вектор numpy с вещественными значениями целевой переменной.

        Возвращает:
        - best_feature: индекс признака, по которому было найдено лучшее разбиение.
        - best_threshold: значение порога, по которому было найдено лучшее разбиение.
        - best_mse: значение критерия среднеквадратичной ошибки для лучшего разбиения.
        """
        best_feature, best_threshold, best_mse = None, None, float('inf')
        for feature in range(X.shape[1]):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_indices = X[:, feature] <= threshold
                right_indices = X[:, feature] > threshold
                if len(left_indices) == 0 or len(right_indices) == 0:
                    continue
                left_y, right_y = y[left_indices], y[right_indices]
                mse = np.mean((left_y - np.mean(left_y))**2) + np.mean((right_y - np.mean(right_y))**2)
                if mse < best_mse:
                    best_feature, best_threshold, best_mse = feature, threshold, mse
        return best_feature, best_threshold, best_mse

    def fit(self, X, y, y_pred=None):
        """
        Обучает дерево регрессии на обучающих данных X и y.

        Аргументы:
        - X: вектор numpy с вещественными значениями признаков.
        - y: вектор numpy с вещественными значениями целевой переменной.
        - y_pred (опционально): вектор numpy с вещественными значениями предсказаний (для Gradient boosting)
        """
        if y_pred is not None:
            self.tree = self._build_tree(X, y, depth=0, y_pred=y_pred)
        else:
            self.tree = self._build_tree(X, y, depth=0)

    def predict(self, X):
        """
        Выполняет предсказание для входных данных X.

        Аргументы:
        - X: вектор numpy с вещественными значениями признаков.

        Возвращает:
        - predictions: вектор numpy с предсказанными вещественными значениями.
        """
        return np.array([self._traverse_tree(x, self.tree) for x in X])

    def _build_tree(self, X, y, depth, y_pred=None):
        """
        Рекурсивно строит дерево регрессии, используя входные данные X и y.

        Аргументы:
        - X: вектор numpy с вещественными значениями признаков.
        - y: вектор numpy с вещественными значениями целевой переменной.
        - y_pred (опционально): вектор numpy с вещественными значениями предсказаний (для Gradient boosting)
        - depth: текущая глубина дерева.

        Возвращает:
        - node: словарь, представляющий узел дерева.

        """
        # Проверка условий останова по максимальной глубине и другим критериям
        if self.max_depth is not None and depth >= self.max_depth:
            # Создание листового узла
            return self._create_leaf_node(y, y_pred)
        
        # Нахождение лучшего разбиения по критерию энтропии
        if self.criterion == "entropy":
            best_feature, best_threshold, _ = self.find_best_split_entropy(X, y)

        # Нахождение лучшего разбиения по критерию mse
        elif self.criterion == "mse":
            best_feature, best_threshold, _ = self.find_best_split_mse(X, y)
            
        else:
            raise Exception('Следует задать критерий разбиения из списка ["mse", "entropy"]')

        # Проверка условия останова, если не удалось найти лучшее разбиение
        if best_feature is None or best_threshold is None:
            # Создание листового узла
            if y_pred is not None:
                return self._create_leaf_node(y, y_pred)
            return self._create_leaf_node(y)

        # Разделение данных на левое и правое поддеревья
        left_indices = X[:, best_feature] <= best_threshold
        right_indices = X[:, best_feature] > best_threshold

        # Рекурсивное построение левого и правого поддеревьев
        if y_pred is not None:
            left_tree = self._build_tree(X[left_indices], y[left_indices], y_pred[left_indices], depth + 1)
            right_tree = self._build_tree(X[right_indices], y[right_indices], y_pred[right_indices], depth + 1)
        else:
            left_tree = self._build_tree(X[left_indices], y[left_indices], depth + 1)
            right_tree = self._build_tree(X[right_indices], y[right_indices], depth + 1)

        # Создание узла с информацией о лучшем разбиении
        node = {'feature': best_feature, 'threshold': best_threshold,
                'left': left_tree, 'right': right_tree}

        return node


    def _create_leaf_node(self, y, y_pred=None):
        raise NotImplementedError("Метод требует переопределения в классе наследования")


    def _traverse_tree(self, x, node):
        """
        Обходит дерево регрессии для выполнения предсказания на входных данных x.

        Аргументы:
        - x: вектор numpy с вещественными значениями признаков.
        - node: текущий узел дерева.

        Возвращает:
        - value: предсказанное вещественное значение.
        """
        if 'value' in node:
            return node['value']

        if x[node['feature']] <= node['threshold']:
            return self._traverse_tree(x, node['left'])
        else:
            return self._traverse_tree(x, node['right'])


In [3]:
class RegressionTree(BasicTree):
    def __init__(self, max_depth=None, criterion="mse"):
        """
        Инициализирует объект RegressionTree.

        Аргументы:
        - max_depth: максимальная глубина дерева (опционально). 
        Если значение None, то дерево будет строиться без ограничения глубины.
        - criterion: выбор способа разбиений деревьев. Выбирается из списка: 
        ["mse", "entropy", "gini"]
        """
        self.max_depth = max_depth
        self.criterion = criterion
        self.tree = None


    def _create_leaf_node(self, y, y_pred=None):
        """
        Создает листовой узел дерева регрессии.

        Аргументы:
        - y: вектор numpy с вещественными значениями целевой переменной.

        Возвращает:
        - node: словарь, представляющий листовой узел среднего значения целевой переменной.
        """
        return {'value': np.mean(y)}

In [5]:
X_train = np.array([[1, 2], [3, 4], [5, 6], [7, 8]])
y_train = np.array([1, 2, 3, 4])

regressor = RegressionTree(max_depth=3)
regressor.fit(X_train, y_train)

X_test = np.array([[2, 3], [6, 7]])
y_test = regressor.predict(X_test)

print(y_test)

[2. 4.]
