In [70]:
from forest_class import MyForestClf
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import numpy as np
import pandas as pd
import random

In [121]:
class MyBoostClf:
    def __init__(
        self,
        n_estimators=10,
        learning_rate=0.1,
        max_depth=5,
        min_samples_split=2,
        max_leafs=20,
        bins=16,
        metric=None,  # параметр для метрики
        max_features=0.5, # стохаст. градиентный бустинг
        max_samples=0.5,
        random_state=42,
        reg=0.1,  # Новый параметр регуляризации
        early_stopping=None,  # Параметр ранней остановки
    ):
        # Сохранение параметров как атрибутов класса
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.max_leafs = max_leafs
        self.bins = bins
        
        self.pred_0 = None  # Изначальное предсказание (среднее по таргету)
        self.trees = []  # Список для хранения обученных деревьев
        self.metric = metric  # Метрика для оценки
        self.best_score = None  # Лучший результат
        self.random_state = random_state
        self.max_features = max_features
        self.max_samples = max_samples
        self.reg = reg
        self.leafs_cnt = 0  # Инициализация счётчика листьев
        self.fi = {}  # Словарь для хранения важности фичей
        self.early_stopping = early_stopping  # Сохраняем параметр ранней остановки

    def __str__(self):
        # Формируем строку с параметрами экземпляра
        params = vars(self)
        params_str = ', '.join(f"{key}={value}" for key, value in params.items())
        return f"MyBoostClf class: {params_str}"
    
    def _get_learning_rate(self, step):
        #Возвращает значение learning_rate для текущего шага
        if callable(self.learning_rate):  # Если learning_rate - лямбда-функция
            return self.learning_rate(step)
        return self.learning_rate  # Иначе возвращаем фиксированное значение
    
    def _get_metric_score(self, y_true, y_pred):
        if self.metric == 'accuracy':
            return accuracy_score(y_true, y_pred)
        elif self.metric == 'precision':
            return precision_score(y_true, y_pred)
        elif self.metric == 'recall':
            return recall_score(y_true, y_pred)
        elif self.metric == 'f1':
            return f1_score(y_true, y_pred)
        elif self.metric == 'roc_auc':
            return roc_auc_score(y_true, y_pred)
        else:
            return None  # Если метрика не указана
        
    
    def fit(self, X: pd.DataFrame, y: pd.Series, X_eval=None, y_eval=None, early_stopping=None, verbose=None):
        # Фиксируем seed
        random.seed(self.random_state)

        # Преобразуем X в DataFrame, если это ndarray
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X)

        # Преобразуем y в Series, если это ndarray
        if isinstance(y, np.ndarray):
            y = pd.Series(y)

        self.pred_0 = y.mean()  # Изначальное предсказание (среднее по таргету)
        current_prediction = np.full(y.shape, self.pred_0)  # Инициализируем предсказания

        init_cols = list(X.columns)  # Список всех колонок
        init_rows_cnt = X.shape[0]  # Общее количество строк

        # Инициализируем важность фичей
        self.fi = {col: 0.0 for col in init_cols}

        # Для ранней остановки
        best_eval_score = None
        epochs_without_improvement = 0
        best_trees = []

        # Основной цикл для обучения деревьев
        for step in range(1, self.n_estimators + 1):
            # Определяем количество колонок и строк для подвыборки
            cols_smpl_cnt = round(len(init_cols) * self.max_features)
            rows_smpl_cnt = round(init_rows_cnt * self.max_samples)

            # Генерируем случайные индексы колонок и строк
            cols_idx = random.sample(init_cols, cols_smpl_cnt)
            rows_idx = random.sample(range(init_rows_cnt), rows_smpl_cnt)

            # Создаем подвыборку данных
            X_sub = X.iloc[rows_idx][cols_idx]
            y_sub = y.iloc[rows_idx]

            # Вычисляем остатки (градиенты логистической функции потерь)
            residuals = y_sub - 1 / (1 + np.exp(-current_prediction[rows_idx]))

            # Обучаем дерево на остатках
            tree = MyForestClf(
                n_estimators=1,
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                max_leafs=self.max_leafs,
                bins=self.bins
            )
            tree.fit(X_sub, residuals)
            # Добавляем дерево в список
            self.trees.append(tree)

            # Обновляем важность фичей (из каждого дерева)
            tree_feature_importances = tree.feature_importances()
            for feature, importance in tree_feature_importances.items():
                self.fi[feature] += importance  # Суммируем важность фичей

            # Подсчет листьев и регуляризационный штраф
            num_leaves = tree.leafs_cnt  # Используем листы из текущего дерева
            self.leafs_cnt += num_leaves  # Обновляем общее количество листьев
            reg_penalty = self.reg * num_leaves  # Регуляризационный штраф для текущего дерева

            # Обновляем предсказания с учетом регуляризации
            tree_predictions = np.array(tree.predict(X))
            current_learning_rate = self._get_learning_rate(step)
            current_prediction += current_learning_rate * tree_predictions - reg_penalty

            # Логирование метрик
            if verbose and step % verbose == 0:
                y_pred = (1 / (1 + np.exp(-current_prediction)) > 0.5).astype(int)
                score = self._get_metric_score(y, y_pred) if self.metric else None
                print(f"{step}. LogLoss: {self._log_loss(y, 1 / (1 + np.exp(-current_prediction))):.6f} | "
                      f"Регуляризационный штраф: {reg_penalty:.4f} | "
                      f"{self.metric.capitalize() if self.metric else 'N/A'}: {score:.2f}")

            # Оценка на валидационных данных для ранней остановки
            if X_eval is not None and y_eval is not None and early_stopping is not None:
                y_pred_eval = (1 / (1 + np.exp(-current_prediction)) > 0.5).astype(int)
                eval_score = self._get_metric_score(y_eval, y_pred_eval) if self.metric else self._log_loss(y_eval, 1 / (1 + np.exp(-current_prediction)))
                print(f"Validation score at step {step}: {eval_score:.4f}")

                if best_eval_score is None or eval_score > best_eval_score:  # Для метрики с максимизацией
                    best_eval_score = eval_score
                    epochs_without_improvement = 0
                    best_trees = self.trees.copy()
                else:
                    epochs_without_improvement += 1

                # Если улучшения нет в течение `early_stopping` шагов, останавливаем обучение
                if epochs_without_improvement >= early_stopping:
                    print(f"Early stopping triggered at step {step}.")
                    self.trees = best_trees  # Откатываем модель к лучшему состоянию
                    break

        # Нормализуем важность фичей
        total_importance = sum(self.fi.values())
        if total_importance > 0:
            for feature in self.fi:
                self.fi[feature] /= total_importance

        # Финальная метрика по окончанию обучения
        y_pred_final = (1 / (1 + np.exp(-current_prediction)) > 0.5).astype(int)
        if self.metric:
            self.best_score = self._get_metric_score(y, y_pred_final)
        else:
            self.best_score = self._log_loss(y, 1 / (1 + np.exp(-current_prediction)))


    def _log_loss(self, y_true, y_pred, eps=1e-15):
        y_pred = np.clip(y_pred, eps, 1 - eps)  # Ограничиваем предсказания
        return -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
    
    def predict_proba(self, X):
        # Преобразуем X в DataFrame, если это ndarray
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X)
        
        # Начальные предсказания
        F_x = np.full((X.shape[0],), self.pred_0)
        
        # Предсказания деревьев
        for step, tree in enumerate(self.trees, start=1):
            tree_predictions = np.array(tree.predict(X))
            
            # Рассчитываем коэффициент learning_rate для текущего шага
            if callable(self.learning_rate):
                lr = self.learning_rate(step)  # Вычисление динамического learning_rate
            else:
                lr = self.learning_rate  # Статическое значение
            
            F_x += lr * tree_predictions  # Обновляем предсказания
        
        # Преобразуем логарифмические шансы в вероятности
        odds = np.exp(F_x)
        probs = odds / (1 + odds)
        
        return probs
    
    def predict(self, X):
        probs = self.predict_proba(X)
        # Преобразуем вероятности в классы, используя порог 0.5
        return (probs > 0.5).astype(int)
    
    def feature_importances(self):
        return self.fi

In [122]:
#Тест остановки
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)

# Параметры модели
early_stopping = 3  # Останавливаем обучение, если не будет улучшений в течение 3 шагов
metric = 'accuracy'  # Используем метрику точности (accuracy)

# Инициализируем модель
model = MyBoostClf(n_estimators=100, learning_rate=0.1, metric=metric, max_depth=5, early_stopping=early_stopping)

# Обучаем модель с параметрами ранней остановки и на всех данных
model.fit(X, y, X_eval=X, y_eval=y, early_stopping=early_stopping, verbose=10)

# Выводим лучший результат
print(f"Лучший результат (best_score): {model.best_score:.4f}")

Validation score at step 1: 0.5000
Validation score at step 2: 0.5000
Validation score at step 3: 0.5000
Validation score at step 4: 0.5000
Early stopping triggered at step 4.
Лучший результат (best_score): 0.5000


In [115]:
#Тест важности фичей
X, y = make_classification(n_samples=300, n_features=10, random_state=41)

# Инициализируем модель
model = MyBoostClf(n_estimators=5, max_depth=3, learning_rate=0.4)

# Обучаем модель
model.fit(X, y)

# Получаем важность фичей
feature_importances = model.feature_importances()
print("Важность фичей:", feature_importances)

Важность фичей: {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 0.0}


In [94]:
#Тест регуляризации
X, y = make_classification(n_samples=40, n_features=10, random_state=42)
y = pd.Series(y)
X = pd.DataFrame(X)

# Тест без регуляризации
model_no_reg = MyBoostClf(n_estimators=10, learning_rate=0.1, metric='accuracy', reg=0.0, random_state=42)
model_no_reg.fit(X, y)
print(f"No regularization best_score: {model_no_reg.best_score}")

# Тест с умеренной регуляризацией
model_moderate_reg = MyBoostClf(n_estimators=10, learning_rate=0.1, metric='accuracy', reg=0.1, random_state=42)
model_moderate_reg.fit(X, y)
print(f"Moderate regularization best_score: {model_moderate_reg.best_score}")

# Тест с сильной регуляризацией
model_high_reg = MyBoostClf(n_estimators=10, learning_rate=0.1, metric='accuracy', reg=1.0, random_state=42)
model_high_reg.fit(X, y)
print(f"High regularization best_score: {model_high_reg.best_score}")

No regularization best_score: 0.5
Moderate regularization best_score: 0.5
High regularization best_score: 0.5


In [83]:
#Тест learning rate
X, y = make_classification(n_samples=100, n_features=10, random_state=42)
y = pd.Series(y)
X = pd.DataFrame(X)

# Тест с фиксированным learning_rate
model_fixed = MyBoostClf(n_estimators=10, learning_rate=0.1, metric='accuracy', random_state=42)
model_fixed.fit(X, y)
predictions_fixed = model_fixed.predict(X)
print(f"Fixed learning_rate predictions sum: {np.sum(predictions_fixed)}")

# Тест с линейно уменьшающимся learning_rate
model_linear = MyBoostClf(n_estimators=10, learning_rate=lambda step: 0.5 / step, metric='accuracy', random_state=42)
model_linear.fit(X, y)
predictions_linear = model_linear.predict(X)
print(f"Linear decreasing learning_rate predictions sum: {np.sum(predictions_linear)}")

# Тест с экспоненциально уменьшающимся learning_rate
model_exponential = MyBoostClf(n_estimators=10, learning_rate=lambda step: 0.1 * (0.9 ** step), metric='accuracy', random_state=42)
model_exponential.fit(X, y)
predictions_exponential = model_exponential.predict(X)
print(f"Exponential decreasing learning_rate predictions sum: {np.sum(predictions_exponential)}")

Fixed learning_rate predictions sum: 100
Linear decreasing learning_rate predictions sum: 100
Exponential decreasing learning_rate predictions sum: 100


In [78]:
# Тест стохастического градиентного бустинга
X, y = make_classification(n_samples=500, n_features=10, n_classes=2, random_state=42)

# Список наборов параметров для тестирования
param_sets = [
    {"n_estimators": 10, "learning_rate": 0.1, "max_depth": 3, "max_features": 0.6, "max_samples": 0.7, "metric": "accuracy"},
    {"n_estimators": 15, "learning_rate": 0.05, "max_depth": 5, "max_features": 0.5, "max_samples": 0.6, "metric": "precision"},
    {"n_estimators": 20, "learning_rate": 0.1, "max_depth": 4, "max_features": 0.4, "max_samples": 0.5, "metric": "recall"},
    {"n_estimators": 25, "learning_rate": 0.1, "max_depth": 3, "max_features": 0.7, "max_samples": 0.8, "metric": "f1"},
    {"n_estimators": 30, "learning_rate": 0.15, "max_depth": 6, "max_features": 0.6, "max_samples": 0.5, "metric": "roc_auc"}
]

# Обучение моделей с разными параметрами и вывод best_score
for i, params in enumerate(param_sets):
    print(f"Training model {i+1}")
    
    # Создание и обучение модели
    model = MyBoostClf(**params)
    model.fit(X, y)
    
    # Вывод лучшей метрики
    print(f"Best {params['metric']} score: {model.best_score:.4f}\n")

Training model 1
Best accuracy score: 0.5000

Training model 2
Best precision score: 0.5000

Training model 3
Best recall score: 1.0000

Training model 4
Best f1 score: 0.6667

Training model 5
Best roc_auc score: 0.5000



In [68]:
#Тестирование метрик
X, y = make_classification(
    n_samples=100,  # Общее количество выборок
    n_features=10,  # Количество признаков
    n_classes=2,    # Бинарная классификация
    random_state=42
)

# Список метрик для тестирования
metrics = ["accuracy", "precision", "recall"]
# Параметры для модели
params = {
    "n_estimators": 10,
    "learning_rate": 0.1,
    "max_depth": 3,
    "min_samples_split": 2,
    "max_leafs": 10,
    "bins": 16,
}
# Тестирование модели с разными метриками
for metric in metrics:
    print(f"=== Testing with metric: {metric} ===")
    params["metric"] = metric  # Задаем текущую метрику
    model = MyBoostClf(**params)
    model.fit(X, y)
    
    probs = model.predict_proba(X)
    preds = model.predict(X)
    # Итоговая метрика
    print(f"Final {metric}: {model.best_score:.4f}")

=== Testing with metric: accuracy ===
Final accuracy: 0.5000
=== Testing with metric: precision ===
Final precision: 0.5000
=== Testing with metric: recall ===
Final recall: 1.0000


In [59]:
#Тестирование предсказания
X, y = make_classification(n_samples=50, n_features=10, n_classes=2, random_state=42)

# Параметры для модели
params = {
    "n_estimators": 5,
    "learning_rate": 0.1,
    "max_depth": 3,
    "min_samples_split": 2,
    "max_leafs": 10,
    "bins": 16
}

# Обучение модели
model = MyBoostClf(**params)
model.fit(X, y)

# Получение вероятностей
probs = model.predict_proba(X)
print(f"Predicted probabilities: {probs[:5]}")  # Печатаем первые 5 вероятностей

# Получение предсказанных классов
predictions = model.predict(X)
print(f"Predicted classes: {predictions[:5]}")  # Печатаем первые 5 предсказанных классов

# Сумма листьев всех деревьев
leaf_count = sum(tree.leafs_cnt for tree in model.trees)
print(f"Total leaf count: {leaf_count}")

Predicted probabilities: [0.62245933 0.62245933 0.62245933 0.62245933 0.62245933]
Predicted classes: [1 1 1 1 1]
Total leaf count: 20


In [60]:
#Тестирование обучения
X1, y1 = make_classification(n_samples=50, n_features=10, n_classes=2, random_state=42)

# Параметры для модели
params = {
    "n_estimators": 5,
    "learning_rate": 0.1,
    "max_depth": 3,
    "min_samples_split": 2,
    "max_leafs": 10,
    "bins": 16
}

# Обучение модели на первом наборе данных
model1 = MyBoostClf(**params)
model1.fit(X1, y1)
print(f"Model 1 - pred_0: {model1.pred_0:.2f}")
leaf_count1 = sum(tree.leafs_cnt for tree in model1.trees)
print(f"Model 1 - Total leaf count: {leaf_count1}")


X2, y2 = make_classification(n_samples=50, n_features=5, n_classes=2, random_state=43)
# Обучение модели на втором наборе данных
model2 = MyBoostClf(**params)
model2.fit(X2, y2)
print(f"Model 2 - pred_0: {model2.pred_0:.2f}")
leaf_count2 = sum(tree.leafs_cnt for tree in model2.trees)
print(f"Model 2 - Total leaf count: {leaf_count2}")

Model 1 - pred_0: 0.50
Model 1 - Total leaf count: 20
Model 2 - pred_0: 0.48
Model 2 - Total leaf count: 20


In [61]:

# Тестирование класса
X, y = make_classification(n_samples=20, n_features=5, random_state=42)
X = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(X.shape[1])])
y = pd.Series(y)

# Создание модели
model = MyBoostClf(n_estimators=5, learning_rate=0.1, max_depth=3)

# Обучение
model.fit(X, y, verbose=1)

1. LogLoss: 0.724077
2. LogLoss: 0.724077
3. LogLoss: 0.724077
4. LogLoss: 0.724077
5. LogLoss: 0.724077


In [62]:
# Тестирование класса
model1 = MyBoostClf()
model2 = MyBoostClf(n_estimators=10,  max_depth=5, min_samples_split=5, max_leafs=10, bins=6)
model3 = MyBoostClf(n_estimators=8, max_depth=5, min_samples_split=5, max_leafs=20, bins=16)

# Проверка
print(model1)
print(model2)
print(model3)

MyBoostClf class: n_estimators=10, learning_rate=0.1, max_depth=5, min_samples_split=2, max_leafs=20, bins=16, pred_0=None, trees=[], metric=None
MyBoostClf class: n_estimators=10, learning_rate=0.1, max_depth=5, min_samples_split=5, max_leafs=10, bins=6, pred_0=None, trees=[], metric=None
MyBoostClf class: n_estimators=8, learning_rate=0.1, max_depth=5, min_samples_split=5, max_leafs=20, bins=16, pred_0=None, trees=[], metric=None
