In [1]:
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import train_test_split

In [2]:
class Node:
    """Один узел дерева решений.

    Attributes:
        feature (int | None): Индекс признака, по которому происходит разбиение.
        threshold (float | None): Пороговое значение для разбиения.
        left (Node | None): Левый потомок.
        right (Node | None): Правый потомок.
        value (float | int | None): Значение предсказания в листе.
    """
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf(self):
        """Проверяет, является ли узел листом.

        Returns:
            bool: True, если это лист (у узла есть value).
        """
        return self.value is not None

In [12]:
class DecisiontTree:
    """Простое дерево решений (CART) для классификации и регрессии.

    Поддерживает две задачи:
        - Классификация (criterion = 'gini')
        - Регрессия (criterion = 'mse')

    Args:
        task (str): Тип задачи — 'classification' или 'regression'.
        max_depth (int): Максимальная глубина дерева.
        min_samples_split (int): Минимальное количество примеров для разбиения.
    """
    def __init__(self, task="classification", max_depth=3, min_samples_split=2):
        self.task = task
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.root = None

    def fit(self, X, y):
        """Строит дерево решений.

        Args:
            X (np.ndarray): Матрица признаков (n_samples, n_features).
            y (np.ndarray): Вектор ответов (n_samples,).
        """
        X = np.array(X)
        y = np.array(y)
        self.root = self._build_tree(X, y, depth=0)

    def predict(self, X):
        """Делает предсказания для новых объектов.

        Args:
            X (np.ndarray): Матрица признаков (n_samples, n_features).

        Returns:
            np.ndarray: Вектор предсказаний (n_samples,).
        """
        X = np.array(X)
        return np.array([self._predict_one(x, self.root) for x in X])

    def _build_tree(self, X, y, depth):
        """Рекурсивно строит дерево решений.

        Args:
            X (np.ndarray): Матрица признаков текущего узла.
            y (np.ndarray): Ответы текущего узла.
            depth (int): Текущая глубина дерева.

        Returns:
            Node: Корень поддерева (ветвь или лист).
        """
        n_samples, n_features = X.shape

        if depth >= self.max_depth or n_samples < self.min_samples_split:
            return self._create_leaf(y)

        best_features, best_threshold, nest_score = None, None, float("inf")
        best_left, best_right = None, None

        for features in range(n_features):
            values = np.unique(X[:, feature])
            for threshold in values:
                left_mask = X[:, feature] <= threshold
                right_mask = X[:, feature] > threshold
                if len(y[left_mask]) == 0 or len(y[right_mask]) == 0:
                    continue

                if self.task == "classification":
                    score = self._gini(y[left_mask], y[right_mask])
                else:
                    score = self._mse(y[left_mask], y[right_mask])

                if score < best_score:
                    best_feature = feature
                    best_threshold = threshold
                    best_score = score
                    best_left = left_mask
                    best_right = right_mask

        if best_feature is None:
            return self._create_leaf(y)

        left_branch = self._build_tree(X[best_left], y[best_left], depth + 1)
        right_branch = self._build_tree(X[best_right], y[best_right], depth + 1)

        return Node(feature=best_feature, threshold=best_threshold, left=left_branch, right=right_branch)

    def _create_leaf(self, y):
        """Создаёт листовой узел (с предсказанием).

        Args:
            y (np.ndarray): Целевые значения в листе.

        Returns:
            Node: Узел-лист с заполненным value.
        """
        if self.task == "classification":
            values, counts = np.unique(y, return_counts=True)
            most_common = values[np.argmax(counts)]
            return Node(value=int(most_common))
        else:
            return Node(value=float(np.mean(y)))

    def _predict_one(self, x, node):
        """Спускает один объект по дереву до листа.

        Args:
            x (np.ndarray): Один объект (вектор признаков).
            node (Node): Текущий узел.

        Returns:
            float | int: Предсказание (число или класс).
        """
        if node.is_leaf():
            return node.value
            
        if x[node.feature] <= node.threshold:
            return self._predict_one(x, node.left)
        else:
            return self._predict_one(x, node.right)

    def _gini(self, left_y, right_y):
        """Вычисляет impurity (грязность) для классификации.

        Args:
            left_y (np.ndarray): Ответы левого подмножества.
            right_y (np.ndarray): Ответы правого подмножества.

        Returns:
            float: Средняя "грязность" двух частей.
        """
        def gini_part(y):
            if len(y) == 0:
                return 0
            _, counts = np.unique(y, return_counts=True)
            p = counts / len(y)
            return 1 - np.sum(p ** 2)
        n = len(left_y) + len(right_y)
        return (len(left_y)/n) * gini_part(left_y) + (len(right_y)/n) * gini_part(right_y)

    def _mse(self, left_y, right_y):
        """Вычисляет impurity (грязность) для регрессии (через MSE).

        Args:
            left_y (np.ndarray): Ответы левого подмножества.
            right_y (np.ndarray): Ответы правого подмножества.

        Returns:
            float: Средняя квадратичная ошибка.
        """
        def mse_part(y):
            if len(y) == 0:
                return 0
            return np.mean((y - np.mean(y)) ** 2)
        n = len(left_y) + len(right_y)
        return (len(left_y)/n) * mse_part(left_y) + (len(right_y)/n) * mse_part(right_y)

In [13]:
class RandomForest:
    """Простой Random Forest поверх DecisionTree (классификация/регрессия).

    Работает по принципу бэггинга:
    - Каждый базовый обучаемый алгоритм — это MyDecisionTree.
    - Для каждого дерева берём бутстрэп-выборку объектов (строки).
    - (Опционально) Для каждого дерева берём случайное подмножество признаков (столбцы).

    Args:
        task (str): 'classification' или 'regression'.
        n_estimators (int): Количество деревьев в лесе.
        max_depth (int): Максимальная глубина каждого дерева.
        min_samples_split (int): Минимум объектов для разбиения узла в дереве.
        bootstrap (bool): Использовать ли бутстрэп (выбор с возвращением).
        max_features_per_tree (str | int | None): Сколько признаков брать на дерево.
            - None: взять все признаки.
            - 'sqrt': взять round(sqrt(n_features)).
            - int: взять ровно указанное число признаков.
        random_state (int | None): Фиксирует случайность для воспроизводимости.
    """
    def __init__(self,
                 task="classification",
                 n_estimators=50,
                 max_depth=3,
                 min_samples_split=2,
                 bootstrap=True,
                 max_features_per_tree=None,
                 random_state=None):
        self.task = task
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.bootstrap = bootstrap
        self.max_features_per_tree = max_features_per_tree
        self.random_state = random_state

        self.trees_ = []
        self.features_idx_ = []
        self.rs_ = np.random.RandomState(random_state)

    def fit(self, X, y):
        """Обучает лес.

        Args:
            X (np.ndarray): Матрица признаков формы (n_samples, n_features).
            y (np.ndarray): Вектор ответов формы (n_samples,).

        Returns:
            SimpleRandomForest: self.
        """
        X = np.array(X)
        y = np.array(y)

        n_samples, n_features = X.shape
        self.trees_ = []
        self.features_idx_ = []

        for m in range(self.n_estimators):
            feat_idx = self._choose_features(n_features)
            self.features_idx_.append(feat_idx)

            if self.bootstrap:
                rows = self.rs_.randint(0, n_samples, size=n_samples)  # с возвращением
            else:
                rows = np.arange(n_samples)

            X_sub = X[rows][:, feat_idx]
            y_sub = y[rows]

            tree = DecisionTree(task=self.task,
                                max_depth=self.max_depth,
                                min_samples_split=self.min_samples_split)
            tree.fit(X_sub, y_sub)
            self.trees_.append(tree)

        return self

    def predict(self, X):
        """Делает предсказания ансамбля.

        Args:
            X (np.ndarray): Матрица признаков формы (n_samples, n_features).

        Returns:
            np.ndarray: Предсказания формы (n_samples,).
        """
        X = np.array(X)
        all_preds = []
        for tree, feat_idx in zip(self.trees_, self.features_idx_):
            preds = tree.predict(X[:, feat_idx])
            all_preds.append(preds)
        all_preds = np.vstack(all_preds)  # shape: (n_estimators, n_samples)

        if self.task == "classification":
            n_samples = X.shape[0]
            final = np.zeros(n_samples, dtype=int)
            for i in range(n_samples):
                votes = all_preds[:, i].astype(int)
                final[i] = np.bincount(votes).argmax()
            return final
        else:
            return np.mean(all_preds, axis=0)

    def _choose_features(self, n_features):
        """Выбирает индексы признаков для конкретного дерева.

        Args:
            n_features (int): Общее число признаков.

        Returns:
            np.ndarray: Отсортированный массив индексов выбранных признаков.
        """
        if self.max_features_per_tree is None:
            return np.arange(n_features)

        if isinstance(self.max_features_per_tree, str):
            if self.max_features_per_tree == "sqrt":
                k = max(1, int(round(np.sqrt(n_features))))
            else:
                k = n_features
        elif isinstance(self.max_features_per_tree, int):
            k = max(1, min(self.max_features_per_tree, n_features))
        else:
            k = n_features

        return np.sort(self.rs_.choice(n_features, size=k, replace=False))

In [14]:
def _sigmoid(z):
    """Сигмоида."""
    return 1.0 / (1.0 + np.exp(-z))

def _logit(p):
    """Обратная к сигмоиде (logit). Защищаемся от 0 и 1."""
    p = np.clip(p, 1e-6, 1 - 1e-6)
    return np.log(p / (1 - p))

In [15]:
class GradientBoosting:
    """Простой Gradient Boosting поверх MyDecisionTree.

    Реализует две задачи:
        - 'regression': градиентный бустинг по MSE (обучаем деревья на остатках y - F).
        - 'binary': логистический бустинг (log-loss). Держим логиты F, считаем p=sigmoid(F).
                    Псевдо-остатки r = y - p, обучаем регрессионное дерево на r.

    Базовый алгоритм (weak learner): MyDecisionTree с task='regression'.

    Args:
        task (str): 'regression' или 'binary'.
        n_estimators (int): Количество слабых деревьев.
        learning_rate (float): Шаг обучения (умножается на предсказание дерева).
        max_depth (int): Максимальная глубина каждого базового дерева.
        min_samples_split (int): Минимум объектов для разбиения узла в дереве.
        subsample (float): Доля объектов для стохастического бустинга (0 < subsample <= 1.0).
        random_state (int | None): Фиксирует случайность для воспроизводимости.
    """
    def __init__(self,
                 task="regression",
                 n_estimators=100,
                 learning_rate=0.1,
                 max_depth=3,
                 min_samples_split=2,
                 subsample=1.0,
                 random_state=None):
        self.task = task
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.subsample = subsample
        self.random_state = random_state

        self.init_ = 0.0
        self.trees_ = []
        self.rs_ = np.random.RandomState(random_state)

    def fit(self, X, y):
        """Обучает бустинг.

        Args:
            X (np.ndarray): Матрица признаков формы (n_samples, n_features).
            y (np.ndarray): Вектор целевой переменной формы (n_samples,).
                - Для task='regression' — любые вещественные значения.
                - Для task='binary' — метки 0/1.

        Returns:
            SimpleGradientBoosting: self.
        """
        X = np.array(X)
        y = np.array(y).astype(float)

        n_samples = X.shape[0]
        self.trees_ = []

        if self.task == "regression":
            # F0 = среднее по y
            self.init_ = float(np.mean(y))
            F = np.full(n_samples, self.init_, dtype=float)

            for m in range(self.n_estimators):
                residuals = y - F

                idx = self._subsample_indices(n_samples)

                base = MyDecisionTree(task="regression",
                                      max_depth=self.max_depth,
                                      min_samples_split=self.min_samples_split)
                base.fit(X[idx], residuals[idx])
                self.trees_.append(base)

                F += self.learning_rate * base.predict(X)

        else:
            unique = np.unique(y)
            if set(unique.tolist()) - {0.0, 1.0}:
                raise ValueError("Для task='binary' ожидаются метки 0/1.")

            prior = float(np.mean(y))
            self.init_ = _logit(prior)
            F = np.full(n_samples, self.init_, dtype=float)

            for m in range(self.n_estimators):
                p = _sigmoid(F)
                residuals = y - p

                idx = self._subsample_indices(n_samples)

                base = MyDecisionTree(task="regression",
                                      max_depth=self.max_depth,
                                      min_samples_split=self.min_samples_split)
                base.fit(X[idx], residuals[idx])
                self.trees_.append(base)

                F += self.learning_rate * base.predict(X)

        return self

    def predict(self, X):
        """Делает предсказания.

        Для 'regression' возвращает вещественные числа.
        Для 'binary' возвращает метки классов 0/1 по порогу 0.5.

        Args:
            X (np.ndarray): Матрица признаков (n_samples, n_features).

        Returns:
            np.ndarray: Вектор предсказаний формы (n_samples,).
        """
        X = np.array(X)
        if self.task == "regression":
            return self._predict_regression(X)
        else:
            proba = self.predict_proba(X)
            return (proba >= 0.5).astype(int)

    def predict_proba(self, X):
        """Возвращает вероятность класса 1 (только для 'binary').

        Args:
            X (np.ndarray): Матрица признаков (n_samples, n_features).

        Returns:
            np.ndarray: Вектор вероятностей формы (n_samples,).

        Raises:
            ValueError: Если task != 'binary'.
        """
        if self.task != "binary":
            raise ValueError("predict_proba доступен только для task='binary'.")
        X = np.array(X)
        F = self._predict_logit(X)
        return _sigmoid(F)

    def _subsample_indices(self, n_samples):
        """Возвращает индексы подвыборки для стохастического бустинга.

        Args:
            n_samples (int): Общее число объектов.

        Returns:
            np.ndarray: Индексы выбранных объектов.
        """
        if self.subsample >= 1.0:
            return np.arange(n_samples)
        k = max(1, int(self.subsample * n_samples))
        return self.rs_.choice(n_samples, size=k, replace=False)

    def _predict_regression(self, X):
        """Собирает сумму предсказаний деревьев (регрессия)."""
        y_hat = np.full(X.shape[0], self.init_, dtype=float)
        for t in self.trees_:
            y_hat += self.learning_rate * t.predict(X)
        return y_hat

    def _predict_logit(self, X):
        """Собирает логиты для 'binary' (до применения сигмоиды)."""
        F = np.full(X.shape[0], self.init_, dtype=float)
        for t in self.trees_:
            F += self.learning_rate * t.predict(X)
        return F