In [None]:
import contextlib
import inspect
import json
import os
import pathlib
import typing as tp
import uuid

import numpy as np
import pandas as pd
import sklearn
from sklearn.metrics._scorer import _check_multimetric_scoring
from sklearn.model_selection._validation import _score
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
from sklearn.base import BaseEstimator
from sklearn.base import clone
from sklearn.utils import check_random_state
from sklearn.utils.estimator_checks import check_estimator

class MyAdaBoostClassifier:
    """
    Multiclass AdaBoost implementation with SAMME.R algorithm
    """
    big_number = 1 << 32
    eps = 1e-8

    def __init__(
            self,
            n_estimators: int,
            base_estimator: tp.Type[sklearn.base.BaseEstimator],
            seed: int,
            **kwargs
    ):
        """
        :param n_estimators: count of estimators
        :param base_estimator: base estimator (practically tree classifier)
        :param seed: global seed
        :param kwargs: keyword arguments of base estimator
        """
        self.n_classes = None
        self.error_history = []  # this is to track model learning process
        self.n_estimators = n_estimators
        self.rng = np.random.default_rng(seed)
        self.base_estimator = base_estimator
        self.base_estimator_kwargs = kwargs
        # deduce which keywords are used to set seed for an estimator (sklearn or own tree implementation)
        signature = inspect.signature(self.base_estimator.__init__)
        self.seed_keyword = None
        if 'seed' in signature.parameters:
            self.seed_keyword = 'seed'
        elif 'random_state' in signature.parameters:
            self.seed_keyword = 'random_state'
        self.estimators = []

    def create_new_estimator(
            self,
            seed: int
    ):
        """
        create new base estiamtor with proper keywords
        and new *unique* seed
        :param seed:
        :return:
        """

        params = self.base_estimator_kwargs.copy()
        if self.seed_keyword:
            params[self.seed_keyword] = seed
#        new_estimator = DecisionTreeClassifier(random_state=seed) #self.base_estimator(**self.base_estimator_kwargs)
        
        return self.base_estimator(**params)

    def get_new_weights(self, 
                        true_labels: np.ndarray, 
                        predictions: np.ndarray, 
                        weights: np.ndarray):
        """
        Обновляет веса объектов на основе алгоритма SAMME.R.
        
        Parameters:
            true_labels (np.ndarray): Истинные метки классов (размер n).
            predict_proba (np.ndarray): Предсказанные вероятности классов (размер n x n_classes).
            weights (np.ndarray): Текущие веса объектов (размер n).
            n_classes (int): Общее количество классов.
            
        Returns:
            np.ndarray: Новый массив весов (размер n).
        """
        n_classes = self.n_classes
        # Масштабирование вероятностей (логарифм и нормализация по числу классов)
        eps = self.eps  # Чтобы избежать деления на ноль
        probabilities = np.clip(predictions, eps, 1 - eps)  # Убираем нули и единицы
        log_proba = np.log(probabilities)
        
        # Вычисляем значения обновления весов
        # Индекс правильного класса для каждого объекта
        true_class_log_proba = log_proba[np.arange(len(true_labels)), true_labels]
        # Отрицательная "ошибка"
        weight_update = -true_class_log_proba + (1 / n_classes) * log_proba.sum(axis=1)
        
        # Обновляем веса
        new_weights = weights * np.exp(weight_update)
        
        # Нормируем веса
        new_weights /= np.sum(new_weights)
        
        return new_weights

    @staticmethod
    def get_estimator_error(
            estimator: sklearn.base.BaseEstimator,
            X: np.ndarray,
            y: np.ndarray,
            weights: np.ndarray
    ):
     #-> float:
        """
        Вычисляет взвешенную ошибку эстиматора на выборке с весами.

        Parameters:
            estimator (BaseEstimator): Обученный оценщик (например, DecisionTreeClassifier).
            X (np.ndarray): Матрица признаков (размер n x m).
            y (np.ndarray): Вектор истинных меток классов (размер n).
            weights (np.ndarray): Вектор весов объектов (размер n).

        Returns:
            float: Взвешенная ошибка эстиматора.
        """

        # Предсказания модели
        predictions = estimator.predict(X)
        
        # Индикатор неправильной классификации (1 для ошибок, 0 для правильных)
        misclassified = (predictions != y).astype(float)
        
        # Взвешенная ошибка
#        weighted_error = np.sum(weights * misclassified) / np.sum(weights)
        predict_proba = np.clip(estimator.predict_proba(X), MyAdaBoostClassifier.eps, 1 - MyAdaBoostClassifier.eps)
        weighted_error = np.sum(weights * (np.argmax(predict_proba, axis=1) != y))        
        return weighted_error

    def fit(self, X: np.ndarray, y: np.ndarray):
        """
        Sequentially fit estimators with updated weights on each iteration using SAMME.R algorithm.
        
        :param X: [n_samples, n_features]
        :param y: [n_samples]
        :return: self
        """
        self.error_history = []

        # Compute the number of classes for internal use
        self.n_classes = len(np.unique(y, return_counts=False))
        n_samples = X.shape[0]

        # Initialize weights uniformly over all samples
        weights = np.full((n_samples,), fill_value=1 / n_samples)

        # Sequentially fit each model and adjust weights
        for seed in self.rng.choice(
                max(MyAdaBoostClassifier.big_number, self.n_estimators),
                size=self.n_estimators,
                replace=False
        ):
            # Generate a unique seed for reproducibility
          #  seed = self.rng.integers(0, 1e6)

            # Add newly created estimator
            estimator = self.create_new_estimator(seed)
            self.estimators.append(estimator)

            # Fit the estimator to data with current sample weights
            estimator.fit(X=X, y=y, sample_weight=weights)

            # Get predicted probabilities for SAMME.R
            predict_proba = np.clip(estimator.predict_proba(X), self.eps, 1 - self.eps)

            # Compute estimator error (misclassification rate weighted by sample weights)
            estimator_error = self.get_estimator_error(estimator, X, y, weights)

            if estimator_error >= 1 - self.eps:
                estimator_error = 1 - self.eps  # Avoid division by zero or negative errors
            if estimator_error == 0:
                estimator_error = MyAdaBoostClassifier.eps

            # Calculate alpha (model weight) for SAMME.R
            estimator_alpha = 0.5 * np.log((1 - estimator_error) / estimator_error)

                # Update sample weights using SAMME.R formula
            true_class_log_proba = np.log(predict_proba[np.arange(n_samples), y])
            weights_update_factor = -true_class_log_proba + (1 / self.n_classes) * np.log(predict_proba).sum(axis=1)
            weights *= np.exp(estimator_alpha * weights_update_factor)

            # Normalize weights
            weights /= np.sum(weights)

            # Append the error to the error history
            self.error_history.append(estimator_error)

        return self


In [4]:
from sklearn.tree import DecisionTreeClassifier

# Инициализация AdaBoostClassifier
boost = MyAdaBoostClassifier(seed=42, n_estimators=1, base_estimator=DecisionTreeClassifier)

# Создаём новый базовый оценщик
new_estimator = boost.create_new_estimator(seed=42)

# Проверяем параметры нового оценщика
print("Новый базовый оценщик:", new_estimator)
print("Параметры нового оценщика:", new_estimator.get_params())

X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]])
y = np.array([0, 1, 0, 1])

boost.fit(X, y)

Новый базовый оценщик: DecisionTreeClassifier(random_state=42)
Параметры нового оценщика: {'ccp_alpha': 0.0, 'class_weight': None, 'criterion': 'gini', 'max_depth': None, 'max_features': None, 'max_leaf_nodes': None, 'min_impurity_decrease': 0.0, 'min_samples_leaf': 1, 'min_samples_split': 2, 'min_weight_fraction_leaf': 0.0, 'monotonic_cst': None, 'random_state': 42, 'splitter': 'best'}


  estimator_alpha = 0.5 * np.log((1 - estimator_error) / estimator_error)
  weights /= np.sum(weights)


<__main__.MyAdaBoostClassifier at 0x19689db9210>