# Лабораторная работа №4: Проведение исследований со случайным лесом

## Задача классификации

## 2. Создание бейзлайна и оценка качества

In [None]:
X = bl_cdata.drop('HeartDisease', axis=1) # Все столбцы кроме 'HeartDisease' - это признаки
y = bl_cdata['HeartDisease'] # 'HeartDisease' - это целевая переменная, которую мы хотим предсказать

# Разделяем данные на обучающую и тестовую выборки
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Масштабируем признаки, это важно для некоторых моделей (хотя для случайного леса это не обязательно, но оставим для согласованности)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # Масштабируем обучающие данные
X_test_scaled = scaler.transform(X_test) # Масштабируем тестовые данные

# Создаем модель случайного леса
model = RandomForestClassifier(random_state=42)

# Обучаем модель на масштабированных обучающих данных
model.fit(X_train_scaled, y_train)

# Делаем предсказания на масштабированных тестовых данных
y_pred = model.predict(X_test_scaled)

# Вычисляем метрики
accuracy = accuracy_score(y_test, y_pred) # Доля правильных ответов
precision = precision_score(y_test, y_pred, average='weighted', zero_division=0) # Точность предсказаний для каждого класса, взвешенная
recall = recall_score(y_test, y_pred, average='weighted', zero_division=0) # Полнота предсказаний для каждого класса, взвешенная

# Выводим метрики
print(f"Метрики для классификации (Случайный лес):")
print(f"  Accuracy: {accuracy:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall: {recall:.4f}")

In [None]:
  Accuracy: 0.8859
  Precision: 0.8880
  Recall: 0.8859


## 3. Улучшение бейзлайна

Подбор оптимальных гиперпараметров для модели случайного леса с помощью GridSearchCV улучшит ее производительность по сравнению с использованием параметров по умолчанию. Гиперпараметры модели существенно влияют на ее способность к обобщению и точности предсказаний. Поиск наилучших параметров через GridSearchCV позволит обучить модель более эффективно.

In [None]:
X = bl_cdata.drop('HeartDisease', axis=1)
y = bl_cdata['HeartDisease']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

def evaluate_model_cv(model, X, y, cv=2): 
    scores_acc = cross_val_score(model, X, y, cv=cv, scoring='accuracy')
    scores_prec = cross_val_score(model, X, y, cv=cv, scoring='precision_weighted', error_score=0)
    scores_rec = cross_val_score(model, X, y, cv=cv, scoring='recall_weighted', error_score=0)
    return np.mean(scores_acc), np.mean(scores_prec), np.mean(scores_rec)


base_rf = RandomForestClassifier(random_state=42)
base_rf_acc, base_rf_prec, base_rf_rec = evaluate_model_cv(base_rf, X_train_scaled, y_train)


param_distributions = {
    'n_estimators': [100, 200, 300, 500],
    'max_depth': [5, 10, 15, None],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

random_search = RandomizedSearchCV(
    RandomForestClassifier(random_state=42),
    param_distributions,
    n_iter=10,  
    cv=2,  
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)
random_search.fit(X_train_scaled, y_train)

best_rf = random_search.best_estimator_
best_params = random_search.best_params_
best_rf_acc, best_rf_prec, best_rf_rec = evaluate_model_cv(best_rf, X_train_scaled, y_train)

Обучаем модели с лучшими параметрами

In [None]:
best_rf_improved = RandomForestClassifier(random_state=42, **best_params)
best_rf_improved.fit(X_train_scaled, y_train)
best_rf_improved_y_pred = best_rf_improved.predict(X_test_scaled)

best_rf_improved_accuracy = accuracy_score(y_test, best_rf_improved_y_pred)
best_rf_improved_precision = precision_score(y_test, best_rf_improved_y_pred, average='weighted', zero_division=0)
best_rf_improved_recall = recall_score(y_test, best_rf_improved_y_pred, average='weighted', zero_division=0)

print("\nМетрики улучшенной модели на тестовых данных:")
print(f"  Улучшенный Случайный лес:")
print(f"  Accuracy: {best_rf_improved_accuracy:.4f}, Precision: {best_rf_improved_precision:.4f}, Recall: {best_rf_improved_recall:.4f}")

In [None]:
Метрики улучшенной модели на тестовых данных:
  Улучшенный Случайный лес:
  Accuracy: 0.8859, Precision: 0.8901, Recall: 0.8859

Можно заметить небольшое улучшение на Precision

## 4. Имплементация алгоритма машинного обучения

Напишем собственную реализацию решающего дерева для классификации и регрессии

In [None]:
class Node:
    """Узел дерева решений."""
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature  # Индекс признака для разбиения
        self.threshold = threshold  # Пороговое значение для разбиения
        self.left = left  # Левый дочерний узел
        self.right = right  # Правый дочерний узел
        self.value = value  # Значение в листовом узле

class DecisionTree:
    """Дерево решений для регрессии и классификации."""
    def __init__(self, min_samples_split=2, max_depth=100, mode = 'classification'):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.root = None
        self.mode = mode # 'classification' or 'regression'

    def _entropy(self, y):
        """Вычисляет энтропию для классификации."""
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log2(p) for p in ps if p > 0])

    def _variance(self, y):
       """Вычисляет дисперсию для регрессии."""
       if len(y) == 0:
         return 0
       return np.var(y)

    def _information_gain(self, y, y_left, y_right):
        """Вычисляет прирост информации."""
        if self.mode == 'classification':
           p = len(y_left) / len(y)
           return self._entropy(y) - p * self._entropy(y_left) - (1 - p) * self._entropy(y_right)
        elif self.mode == 'regression':
          p = len(y_left) / len(y)
          return self._variance(y) - p * self._variance(y_left) - (1-p) * self._variance(y_right)


    def _best_split(self, X, y):
        """Находит наилучшее разбиение для заданных данных."""
        best_gain = -1
        split_idx, split_thresh = None, None

        for feature_idx in range(X.shape[1]):
            X_column = X[:, feature_idx]
            thresholds = np.unique(X_column)
            for threshold in thresholds:
                y_left = y[X_column <= threshold]
                y_right = y[X_column > threshold]
                if len(y_left) == 0 or len(y_right) == 0:
                    continue
                gain = self._information_gain(y, y_left, y_right)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feature_idx
                    split_thresh = threshold

        return split_idx, split_thresh

    def _build_tree(self, X, y, depth=0):
        """Рекурсивно строит дерево решений."""
        n_samples, n_features = X.shape

        # Критерии остановки
        if depth >= self.max_depth or n_samples < self.min_samples_split or len(np.unique(y)) == 1:
           if self.mode == 'classification':
               most_common = Counter(y).most_common(1)[0][0]
               return Node(value=most_common)
           elif self.mode == 'regression':
               return Node(value=np.mean(y))


        best_feature, best_thresh = self._best_split(X, y)

        if best_feature is None: # Не нашли хорошего разбиения
             if self.mode == 'classification':
               most_common = Counter(y).most_common(1)[0][0]
               return Node(value=most_common)
             elif self.mode == 'regression':
                return Node(value=np.mean(y))


        left_idxs = X[:, best_feature] <= best_thresh
        right_idxs = X[:, best_feature] > best_thresh

        left_child = self._build_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right_child = self._build_tree(X[right_idxs, :], y[right_idxs], depth + 1)

        return Node(feature=best_feature, threshold=best_thresh, left=left_child, right=right_child)


    def fit(self, X, y):
        """Обучает дерево решений."""
        self.root = self._build_tree(X, y)

    def _predict_one(self, x, node):
       """Предсказывает значение для одного экземпляра."""
       if node.value is not None:
           return node.value

       if x[node.feature] <= node.threshold:
           return self._predict_one(x, node.left)
       else:
           return self._predict_one(x, node.right)


    def predict(self, X):
        """Предсказывает значения для набора данных."""
        return [self._predict_one(x, self.root) for x in X]



class RandomForest:
    """Случайный лес для классификации и регрессии."""
    def __init__(self, n_trees=10, max_features='sqrt', min_samples_split=2, max_depth=100, mode='classification'):
        self.n_trees = n_trees
        self.max_features = max_features # 'sqrt' or int
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.trees = []
        self.mode = mode # 'classification' or 'regression'


    def fit(self, X, y):
        """Обучает случайный лес."""
        self.trees = []
        n_features = X.shape[1]

        if self.max_features == 'sqrt':
            n_feat_sub = int(np.sqrt(n_features))
        elif isinstance(self.max_features, int):
           n_feat_sub = self.max_features
        else:
           n_feat_sub = n_features # использовать все признаки


        for _ in range(self.n_trees):
            # Bootstrap агрегирование (случайная выборка с возвращением)
            idxs = np.random.choice(len(y), len(y), replace=True)
            X_sample = X[idxs]
            y_sample = y[idxs]

            # Случайный выбор подмножества признаков
            feat_idxs = np.random.choice(n_features, n_feat_sub, replace=False)
            X_sample_subset = X_sample[:, feat_idxs]


            tree = DecisionTree(min_samples_split = self.min_samples_split, max_depth=self.max_depth, mode=self.mode)
            tree.fit(X_sample_subset, y_sample)
            self.trees.append((tree, feat_idxs))


    def predict(self, X):
        """Предсказывает значения для набора данных."""
        if not self.trees:
           raise ValueError("Случайный лес не обучен. Сначала вызовите fit.")

        predictions = np.zeros((X.shape[0], self.n_trees))

        for i, (tree, feat_idxs) in enumerate(self.trees):
            predictions[:, i] = tree.predict(X[:, feat_idxs])


        if self.mode == 'classification':
          final_predictions = np.array([Counter(row).most_common(1)[0][0] for row in predictions])
        elif self.mode == 'regression':
          final_predictions = np.mean(predictions, axis=1) # Среднее предсказание всех деревьев

        return final_predictions


In [None]:
X_clf = bl_cdata.drop("HeartDisease", axis=1).values
y_clf = bl_cdata["HeartDisease"].values

X_train_clf, X_test_clf, y_train_clf, y_test_clf = train_test_split(X_clf, y_clf, test_size=0.2, random_state=42)

rf_clf = RandomForest(n_trees=100, mode='classification')
rf_clf.fit(X_train_clf, y_train_clf)
y_pred_clf = rf_clf.predict(X_test_clf)


accuracy = accuracy_score(y_test_clf, y_pred_clf)
recall = recall_score(y_test_clf, y_pred_clf)
precision = precision_score(y_test_clf, y_pred_clf)


print("\nКлассификация (Heart Disease Dataset):")
print(f"Accuracy: {accuracy}")
print(f"Recall: {recall}")
print(f"Precision: {precision}")

In [None]:
Accuracy: 0.8858
Precision: 0.9134
Recall: 0.8878


Результаты лучше, чем у библиотечной реализации

## Задача регрессии

## 2. Создание бейзлайна и оценка качества

In [None]:
X = bl_rdata.drop(target_variable, axis=1)
y = bl_rdata[target_variable]


y_reg = y.astype(float)

X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(X, y_reg, test_size=0.2, random_state=42)


scaler = StandardScaler()
X_train_scaled_reg = scaler.fit_transform(X_train_reg)
X_test_scaled_reg = scaler.transform(X_test_reg)


model_reg = RandomForestRegressor(random_state=42)


model_reg.fit(X_train_scaled_reg, y_train_reg)


y_pred_reg = model_reg.predict(X_test_scaled_reg)


mae = mean_absolute_error(y_test_reg, y_pred_reg) 
r2 = r2_score(y_test_reg, y_pred_reg) 


print(f"\nМетрики для регрессии (Случайный лес):")
print(f"  MAE: {mae:.4f}")
print(f"  R2: {r2:.4f}")

In [None]:
  MAE: 0.1571
  R2: 0.7764

## 3. Улучшение бейзлайна

In [None]:
Воспользуемся тем же методом, что и в случае с классификацией

In [None]:
X = bl_cdata.drop('HeartDisease', axis=1)
y = bl_cdata['HeartDisease'].astype(float)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)


param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [5, 10, 15],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

grid_search = GridSearchCV(RandomForestRegressor(random_state=42),
                            param_grid,
                            cv=3,  
                            scoring='neg_mean_absolute_error',  
                            n_jobs=-1) 

grid_search.fit(X_train_scaled, y_train)

best_params = grid_search.best_params_
best_score = -grid_search.best_score_ # Возвращаем положительный MAE
print(f"Лучшие гиперпараметры: {best_params}")
print(f"Лучший MAE на кросс-валидации: {best_score:.4f}")


best_model = RandomForestRegressor(**best_params, random_state=42)
best_model.fit(X_train_scaled, y_train)
y_pred = best_model.predict(X_test_scaled)

mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"\nМетрики на тестовой выборке с лучшими гиперпараметрами:")
print(f"  MAE: {mae:.4f}")
print(f"  R2: {r2:.4f}")


  MAE: 0.2061
  R2: 0.6102

Результат стал хуже в сравнении с изначальной версией

## 4. Имплементация алгоритма машинного обучения

Воспользуемся теми же классами, что использовались для задачи регрессии

In [None]:
class Node:
    """Узел дерева решений."""
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature  # Индекс признака для разбиения
        self.threshold = threshold  # Пороговое значение для разбиения
        self.left = left  # Левый дочерний узел
        self.right = right  # Правый дочерний узел
        self.value = value  # Значение в листовом узле

class DecisionTree:
    """Дерево решений для регрессии и классификации."""
    def __init__(self, min_samples_split=2, max_depth=100, mode = 'classification'):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.root = None
        self.mode = mode # 'classification' or 'regression'

    def _entropy(self, y):
        """Вычисляет энтропию для классификации."""
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log2(p) for p in ps if p > 0])

    def _variance(self, y):
       """Вычисляет дисперсию для регрессии."""
       if len(y) == 0:
         return 0
       return np.var(y)

    def _information_gain(self, y, y_left, y_right):
        """Вычисляет прирост информации."""
        if self.mode == 'classification':
           p = len(y_left) / len(y)
           return self._entropy(y) - p * self._entropy(y_left) - (1 - p) * self._entropy(y_right)
        elif self.mode == 'regression':
          p = len(y_left) / len(y)
          return self._variance(y) - p * self._variance(y_left) - (1-p) * self._variance(y_right)


    def _best_split(self, X, y):
        """Находит наилучшее разбиение для заданных данных."""
        best_gain = -1
        split_idx, split_thresh = None, None

        for feature_idx in range(X.shape[1]):
            X_column = X[:, feature_idx]
            thresholds = np.unique(X_column)
            for threshold in thresholds:
                y_left = y[X_column <= threshold]
                y_right = y[X_column > threshold]
                if len(y_left) == 0 or len(y_right) == 0:
                    continue
                gain = self._information_gain(y, y_left, y_right)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feature_idx
                    split_thresh = threshold

        return split_idx, split_thresh

    def _build_tree(self, X, y, depth=0):
        """Рекурсивно строит дерево решений."""
        n_samples, n_features = X.shape

        # Критерии остановки
        if depth >= self.max_depth or n_samples < self.min_samples_split or len(np.unique(y)) == 1:
           if self.mode == 'classification':
               most_common = Counter(y).most_common(1)[0][0]
               return Node(value=most_common)
           elif self.mode == 'regression':
               return Node(value=np.mean(y))


        best_feature, best_thresh = self._best_split(X, y)

        if best_feature is None: # Не нашли хорошего разбиения
             if self.mode == 'classification':
               most_common = Counter(y).most_common(1)[0][0]
               return Node(value=most_common)
             elif self.mode == 'regression':
                return Node(value=np.mean(y))


        left_idxs = X[:, best_feature] <= best_thresh
        right_idxs = X[:, best_feature] > best_thresh

        left_child = self._build_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right_child = self._build_tree(X[right_idxs, :], y[right_idxs], depth + 1)

        return Node(feature=best_feature, threshold=best_thresh, left=left_child, right=right_child)


    def fit(self, X, y):
        """Обучает дерево решений."""
        self.root = self._build_tree(X, y)

    def _predict_one(self, x, node):
       """Предсказывает значение для одного экземпляра."""
       if node.value is not None:
           return node.value

       if x[node.feature] <= node.threshold:
           return self._predict_one(x, node.left)
       else:
           return self._predict_one(x, node.right)


    def predict(self, X):
        """Предсказывает значения для набора данных."""
        return [self._predict_one(x, self.root) for x in X]



class RandomForest:
    """Случайный лес для классификации и регрессии."""
    def __init__(self, n_trees=10, max_features='sqrt', min_samples_split=2, max_depth=100, mode='classification'):
        self.n_trees = n_trees
        self.max_features = max_features # 'sqrt' or int
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.trees = []
        self.mode = mode # 'classification' or 'regression'


    def fit(self, X, y):
        """Обучает случайный лес."""
        self.trees = []
        n_features = X.shape[1]

        if self.max_features == 'sqrt':
            n_feat_sub = int(np.sqrt(n_features))
        elif isinstance(self.max_features, int):
           n_feat_sub = self.max_features
        else:
           n_feat_sub = n_features # использовать все признаки


        for _ in range(self.n_trees):
            # Bootstrap агрегирование (случайная выборка с возвращением)
            idxs = np.random.choice(len(y), len(y), replace=True)
            X_sample = X[idxs]
            y_sample = y[idxs]

            # Случайный выбор подмножества признаков
            feat_idxs = np.random.choice(n_features, n_feat_sub, replace=False)
            X_sample_subset = X_sample[:, feat_idxs]


            tree = DecisionTree(min_samples_split = self.min_samples_split, max_depth=self.max_depth, mode=self.mode)
            tree.fit(X_sample_subset, y_sample)
            self.trees.append((tree, feat_idxs))


    def predict(self, X):
        """Предсказывает значения для набора данных."""
        if not self.trees:
           raise ValueError("Случайный лес не обучен. Сначала вызовите fit.")

        predictions = np.zeros((X.shape[0], self.n_trees))

        for i, (tree, feat_idxs) in enumerate(self.trees):
            predictions[:, i] = tree.predict(X[:, feat_idxs])


        if self.mode == 'classification':
          final_predictions = np.array([Counter(row).most_common(1)[0][0] for row in predictions])
        elif self.mode == 'regression':
          final_predictions = np.mean(predictions, axis=1) # Среднее предсказание всех деревьев

        return final_predictions


In [None]:
X_reg = bl_rdata.drop("Depression_Yes", axis=1).values
y_reg = bl_rdata["Depression_Yes"].values


X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(X_reg, y_reg, test_size=0.2, random_state=42)

rf_reg = RandomForest(n_trees=100, mode='regression')
rf_reg.fit(X_train_reg, y_train_reg)
y_pred_reg = rf_reg.predict(X_test_reg)


mae = mean_absolute_error(y_test_reg, y_pred_reg)
r2 = r2_score(y_test_reg, y_pred_reg)


print("Регрессия (Depression Dataset):")
print(f"MAE: {MAE}")
print(f"R2: {R2}")

In [None]:
  MAE: 0.2376
  R2: 0.2244

Результат снова стал хуже