# Деревья решений и ансамбли

Был выбран датасет отражающий загрязненность воздуха

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from sklearn.preprocessing import StandardScaler

from build_data import get_processed_data
from dt import DTclassifier_custom
from dt_lib import DT_lib
from start_tests import (
    run_random_forest_experiment_custom,
    run_random_forest_experiment_lib,
    run_gradient_boosting_experiment_lib,
    run_experiment_lib,
    run_experiment_custom
)
from drawing_plots import plot_tree_depth, plot_accuracy

Загрузка и предобработка данных:

In [None]:
def load_data(file_path):
    return pd.read_csv(file_path).dropna()

In [None]:
def preprocess_data(df):
    custom_mapping = {'Hazardous': 0, 'Poor': 1, 'Moderate': 2, 'Good': 3}
    df['Air Quality'] = df['Air Quality'].map(custom_mapping)

    y = df['Air Quality']
    X = df.drop('Air Quality', axis=1)

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    return X_scaled, y

Уменьшение объема выборки с использованием стратификации, разделение данных на тренировочную и валидационную выборки

In [None]:
def stratified_sample(X, y, sample_size=1500, random_state=42):
    sss = StratifiedShuffleSplit(n_splits=1, train_size=sample_size, random_state=random_state)
    for train_index, _ in sss.split(X, y):
        X_sampled, y_sampled = X[train_index], y.iloc[train_index]
    return X_sampled, y_sampled

In [None]:
def split_data(X, y, test_size=0.2, random_state=42):
    return train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=y)

Общая функция подготовки и обработки данных

In [None]:
def get_processed_data(file_path='pollution_dataset.csv', sample_size=1500):
    df = load_data(file_path)
    X, y = preprocess_data(df)

    X_sampled, y_sampled = stratified_sample(X, y, sample_size=sample_size)

    X_train, X_val, y_train, y_val = split_data(X_sampled, y_sampled)

    return X_train, y_train.values, X_val, y_val.values

## Decision Tree Classifier Custom 

Узел дерева

In [None]:
class DecisionNode:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, *, value=None):
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

Реализация кастомного DT

In [None]:
class DTclassifier_custom:
    def __init__(self, max_depth=None, criterion='gini', min_samples_split=4, min_samples_leaf=2):
        self.max_depth = max_depth
        self.criterion = criterion
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.root = None
        self.n_classes_ = None
        self.n_features_ = None

    def fit(self, X, y):
        self.n_classes_ = len(np.unique(y))
        self.n_features_ = X.shape[1]
        self.root = self._grow_tree(X, y)

    def predict(self, X):
        return np.array([self._predict(inputs) for inputs in X])

    def _gini(self, y):
        m = len(y)
        return 1.0 - sum((np.sum(y == c) / m) ** 2 for c in np.unique(y))

    def _entropy(self, y):
        m = len(y)
        entropy = 0.0
        for c in np.unique(y):
            p = np.sum(y == c) / m
            entropy -= p * np.log2(p) if p > 0 else 0
        return entropy

    def _best_split(self, X, y):
        m, n = X.shape
        if m <= 1:
            return None, None

        if self.criterion == 'gini':
            parent_score = self._gini(y)
        elif self.criterion == 'entropy':
            parent_score = self._entropy(y)
        else:
            raise ValueError("Criterion must be 'gini' or 'entropy'")

        best_gain = 0.0
        best_feature = None
        best_threshold = None

        for feature_index in range(n):
            X_column = X[:, feature_index]
            thresholds = np.unique(X_column)
            for threshold in thresholds:
                left_indices = X_column <= threshold
                right_indices = X_column > threshold

                if sum(left_indices) < self.min_samples_leaf or sum(right_indices) < self.min_samples_leaf:
                    continue

                y_left, y_right = y[left_indices], y[right_indices]

                if self.criterion == 'gini':
                    score_left = self._gini(y_left)
                    score_right = self._gini(y_right)
                else:
                    score_left = self._entropy(y_left)
                    score_right = self._entropy(y_right)

                n_left, n_right = len(y_left), len(y_right)
                gain = parent_score - (n_left / m) * score_left - (n_right / m) * score_right

                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature_index
                    best_threshold = threshold

        return best_feature, best_threshold

    def _grow_tree(self, X, y, depth=0):
        num_samples_per_class = [np.sum(y == i) for i in range(self.n_classes_)]
        predicted_class = np.argmax(num_samples_per_class)
        node = DecisionNode(value=predicted_class)

        if (self.max_depth is not None and depth >= self.max_depth) or \
           len(np.unique(y)) == 1 or \
           len(y) < self.min_samples_split:
            return node

        feature_index, threshold = self._best_split(X, y)
        if feature_index is None:
            return node

        indices_left = X[:, feature_index] <= threshold
        X_left, y_left = X[indices_left], y[indices_left]
        X_right, y_right = X[~indices_left], y[~indices_left]

        node.feature_index = feature_index
        node.threshold = threshold
        node.left = self._grow_tree(X_left, y_left, depth + 1)
        node.right = self._grow_tree(X_right, y_right, depth + 1)
        node.value = None

        return node

    def _predict(self, inputs):
        node = self.root
        while node.value is None:
            if inputs[node.feature_index] <= node.threshold:
                node = node.left
            else:
                node = node.right
        return node.value

    def get_tree_depth(self):
        return self._get_depth(self.root)

    def _get_depth(self, node):
        if node is None:
            return 0
        if node.value is not None:
            return 1
        return 1 + max(self._get_depth(node.left), self._get_depth(node.right))

    @staticmethod
    def calculate_accuracy(y_true, y_pred):
        correct_predictions = np.sum(y_true == y_pred)
        total_predictions = len(y_true)
        accuracy = correct_predictions / total_predictions
        return accuracy

    def evaluate_with_min_samples_leaf(self, X_train, y_train, X_val, y_val):
        hyperparam_name = "min_samples_leaf"
        hyperparam_values = range(1, 21)
        fixed_params = {
            "max_depth": None,
            "criterion": "gini",
            "min_samples_split": 2
        }
        run_experiment_custom(hyperparam_name, hyperparam_values, X_train, y_train, X_val, y_val, fixed_params)

    def evaluate_min_samples_split(self, X_train, y_train, X_val, y_val):
        hyperparam_name = "min_samples_split"
        hyperparam_values = range(2, 21)
        fixed_params = {
            "max_depth": None,
            "criterion": "gini",
            "min_samples_leaf": 1
        }
        run_experiment_custom(hyperparam_name, hyperparam_values, X_train, y_train, X_val, y_val, fixed_params)

    def evaluate_with_criterion(self, X_train, y_train, X_val, y_val):
        hyperparam_name = "criterion"
        hyperparam_values = ["gini", "entropy"]
        fixed_params = {
            "max_depth": None,
            "min_samples_split": 2,
            "min_samples_leaf": 1
        }
        run_experiment_custom(hyperparam_name, hyperparam_values, X_train, y_train, X_val, y_val, fixed_params)

    def evaluate_with_max_depth(self, X_train, y_train, X_val, y_val):
        hyperparam_name = "max_depth"
        hyperparam_values = list(range(1, 21)) + [None]
        fixed_params = {
            "criterion": "gini",
            "min_samples_split": 2,
            "min_samples_leaf": 1
        }
        run_experiment_custom(hyperparam_name, hyperparam_values, X_train, y_train, X_val, y_val, fixed_params, plot_type="accuracy")

## Библиотечная версия DT

In [None]:
class DT_lib:
    def __init__(self, X_train, y_train, X_val, y_val):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val

    def evaluate_min_samples_leaf(self, min_samples_leaf_values=range(1, 21)):
        hyperparam_name = 'min_samples_leaf'
        fixed_params = {
            'max_depth': None,
            'criterion': 'entropy',
            'min_samples_split': 2,
            'random_state': 42
        }
        run_experiment_lib(hyperparam_name, min_samples_leaf_values, self.X_train, self.y_train, self.X_val, self.y_val, fixed_params)

    def evaluate_min_samples_split(self, min_samples_split_values=range(2, 21)):
        hyperparam_name = 'min_samples_split'
        fixed_params = {
            'max_depth': None,
            'criterion': 'gini',
            'min_samples_leaf': 1,
            'random_state': 42
        }
        run_experiment_lib(hyperparam_name, min_samples_split_values, self.X_train, self.y_train, self.X_val, self.y_val, fixed_params)

    def evaluate_max_leaf_nodes(self, max_leaf_nodes_values=None):
        if max_leaf_nodes_values is None:
            max_leaf_nodes_values = [None, 5, 10, 20, 50, 100]

        hyperparam_name = 'max_leaf_nodes'
        fixed_params = {
            'max_depth': None,
            'criterion': 'gini',
            'min_samples_split': 2,
            'min_samples_leaf': 1,
            'random_state': 42
        }
        run_experiment_lib(hyperparam_name, max_leaf_nodes_values, self.X_train, self.y_train, self.X_val, self.y_val, fixed_params)

    def evaluate_min_impurity_decrease(self, min_impurity_decrease_values=np.linspace(0.0, 0.15, 11)):
        hyperparam_name = 'min_impurity_decrease'
        fixed_params = {
            'max_depth': None,
            'criterion': 'gini',
            'min_samples_split': 2,
            'min_samples_leaf': 1,
            'random_state': 42
        }
        run_experiment_lib(hyperparam_name, min_impurity_decrease_values, self.X_train, self.y_train, self.X_val, self.y_val, fixed_params)

    def evaluate_ccp_alpha(self, ccp_alpha_values=np.linspace(0.0, 0.05, 30)):
        hyperparam_name = 'ccp_alpha'
        fixed_params = {
            'max_depth': None,
            'criterion': 'gini',
            'min_samples_split': 2,
            'min_samples_leaf': 1,
            'random_state': 42
        }
        run_experiment_lib(hyperparam_name, ccp_alpha_values, self.X_train, self.y_train, self.X_val, self.y_val, fixed_params)

    def evaluate_max_depth(self, max_depth_values=None):
        if max_depth_values is None:
            max_depth_values = list(range(1, 21)) + [None]

        hyperparam_name = 'max_depth'
        fixed_params = {
            'criterion': 'gini',
            'min_samples_split': 2,
            'min_samples_leaf': 1,
            'random_state': 42
        }
        run_experiment_lib(hyperparam_name, max_depth_values, self.X_train, self.y_train, self.X_val, self.y_val, fixed_params, plot_type="accuracy")

## Реализация алгоритма кастомного случайного леса

In [None]:
class RandomForestClassifierCustom:
    def __init__(self, n_estimators=100, max_features='sqrt', max_depth=None, bootstrap=True, random_state=None):
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.max_depth = max_depth
        self.bootstrap = bootstrap
        self.random_state = random_state
        self.trees = []
        self.features_indices = []

    def fit(self, X_train, y_train):
        np.random.seed(self.random_state)
        self.trees = []
        self.features_indices = []

        n_samples, n_features = X_train.shape

        if self.max_features == 'sqrt':
            max_features = int(np.sqrt(n_features))
        elif self.max_features == 'log2':
            max_features = int(np.log2(n_features))
        else:
            max_features = n_features

        for i in range(self.n_estimators):
            if self.bootstrap:
                indices = np.random.choice(n_samples, n_samples, replace=True)
                X_sample = X_train[indices]
                y_sample = y_train[indices]
            else:
                X_sample = X_train
                y_sample = y_train

            features_idx = np.random.choice(n_features, max_features, replace=False)
            self.features_indices.append(features_idx)

            tree = DTclassifier_custom(max_depth=self.max_depth)

            tree.fit(X_sample[:, features_idx], y_sample)
            self.trees.append(tree)

    def predict(self, X):
        predictions = []
        for tree, features_idx in zip(self.trees, self.features_indices):
            pred = tree.predict(X[:, features_idx])
            predictions.append(pred)

        predictions = np.array(predictions).T

        y_pred = []
        for preds in predictions:
            y_pred.append(Counter(preds).most_common(1)[0][0])

        return np.array(y_pred)

## Взаимодействие с алгоритмами, перебор гиперпараметров, вызов функций построения графиков

### dt custom

In [None]:
def run_experiment_custom(hyperparam_name, hyperparam_values, X_train, y_train, X_val, y_val, fixed_params, plot_type="tree_depth"):
    from dt import DTclassifier_custom

    tree_depths = []
    train_accuracies = []
    val_accuracies = []

    for value in hyperparam_values:
        params = fixed_params.copy()
        params[hyperparam_name] = value

        custom_tree = DTclassifier_custom(**params)
        custom_tree.fit(X_train, y_train)

        depth = custom_tree.get_tree_depth()
        tree_depths.append(depth)

        y_train_pred = custom_tree.predict(X_train)
        y_val_pred = custom_tree.predict(X_val)

        train_accuracy = DTclassifier_custom.calculate_accuracy(y_train, y_train_pred)
        val_accuracy = DTclassifier_custom.calculate_accuracy(y_val, y_val_pred)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        print(f"{hyperparam_name}: {value}, Tree Depth: {depth}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")
    if plot_type == "tree_depth":
        plot_tree_depth(hyperparam_name, hyperparam_values, tree_depths,  custom_realize=True)
    elif plot_type == "accuracy":
        plot_accuracy(hyperparam_name, hyperparam_values, train_accuracies, val_accuracies, custom_realize=True)

### dt lib

In [None]:
def run_experiment_lib(hyperparam_name, hyperparam_values, X_train, y_train, X_val, y_val, fixed_params, plot_type="tree_depth"):
    from sklearn.tree import DecisionTreeClassifier

    tree_depths = []
    train_accuracies = []
    val_accuracies = []

    for value in hyperparam_values:
        params = fixed_params.copy()
        params[hyperparam_name] = value

        lib_tree = DecisionTreeClassifier(**params)
        lib_tree.fit(X_train, y_train)

        depth = lib_tree.get_depth()
        tree_depths.append(depth)

        train_accuracy = lib_tree.score(X_train, y_train)
        val_accuracy = lib_tree.score(X_val, y_val)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        print(f"{hyperparam_name}: {value}, Tree Depth: {depth}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")

    if plot_type == "tree_depth":
        plot_tree_depth(hyperparam_name, hyperparam_values, tree_depths, custom_realize=False)
    elif plot_type == "accuracy":
        plot_accuracy(hyperparam_name, hyperparam_values, train_accuracies, val_accuracies, custom_realize=False)

### rf custom

In [None]:
def run_random_forest_experiment_custom(n_estimators_values, X_train, y_train, X_val, y_val):
    from dt import DTclassifier_custom
    from random_forest_custom import RandomForestClassifierCustom

    train_accuracies = []
    val_accuracies = []

    for n_estimators in n_estimators_values:
        rf_custom = RandomForestClassifierCustom(n_estimators=n_estimators, random_state=42)
        rf_custom.fit(X_train, y_train)

        y_train_pred = rf_custom.predict(X_train)
        y_val_pred = rf_custom.predict(X_val)

        train_accuracy = DTclassifier_custom.calculate_accuracy(y_train, y_train_pred)
        val_accuracy = DTclassifier_custom.calculate_accuracy(y_val, y_val_pred)

        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        print(f"n_estimators (rf_custom): {n_estimators}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")

    plot_accuracy('n_estimators (rf)', n_estimators_values, train_accuracies, val_accuracies, custom_realize=True)

### rf lib

In [None]:
def run_random_forest_experiment_lib(n_estimators_values, X_train, y_train, X_val, y_val):
    from sklearn.ensemble import RandomForestClassifier

    train_accuracies = []
    val_accuracies = []

    for n_estimators in n_estimators_values:
        rf = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
        rf.fit(X_train, y_train)

        train_accuracy = rf.score(X_train, y_train)
        val_accuracy = rf.score(X_val, y_val)

        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        print(f"n_estimators (rf_lib): {n_estimators}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")

    plot_accuracy('n_estimators (rf)', n_estimators_values, train_accuracies, val_accuracies, custom_realize=False)

## Использование алгоритма библиотечного бустинга, был выбран градиентный бустинг 

In [None]:
def run_gradient_boosting_experiment_lib(n_estimators_values, X_train, y_train, X_val, y_val):
    from sklearn.ensemble import GradientBoostingClassifier

    train_accuracies = []
    val_accuracies = []

    for n_estimators in n_estimators_values:
        gb = GradientBoostingClassifier(n_estimators=n_estimators, learning_rate=0.1, max_depth=3, random_state=42)
        gb.fit(X_train, y_train)

        train_accuracy = gb.score(X_train, y_train)
        val_accuracy = gb.score(X_val, y_val)

        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        print(f"n_estimators (gradient boosting): {n_estimators}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")

    plot_accuracy('n_estimators (gradient boosting)', n_estimators_values, train_accuracies, val_accuracies, custom_realize=False)

## Функции построения графиков, для глубины дерева и точности

In [None]:
def plot_tree_depth(hyperparam_name, hyperparam_values, tree_depths, custom_realize):
    plt.figure(figsize=(10, 6))
    plt.plot(hyperparam_values, tree_depths, marker='o')
    plt.xlabel(hyperparam_name)
    plt.ylabel('Tree Depth')
    if custom_realize:
        plt.title(f'Tree depth from: {hyperparam_name} (custom realization)')
    else:
        plt.title(f'Tree depth from: {hyperparam_name} (lib realization)')
    plt.grid(True)
    plt.show()

def plot_accuracy(hyperparam_name, hyperparam_values, train_accuracies, val_accuracies, custom_realize):
    plt.figure(figsize=(10, 6))
    plt.plot(hyperparam_values, train_accuracies, marker='o', label='Train Accuracy')
    plt.plot(hyperparam_values, val_accuracies, marker='s', label='Validation Accuracy')
    plt.xlabel(hyperparam_name)
    plt.ylabel('Accuracy')
    if (custom_realize):
        plt.title(f'Accuracy from: {hyperparam_name} (custom realization)')
    else:
        plt.title(f'Accuracy from: {hyperparam_name} (lib realization)')
    plt.legend()
    plt.grid(True)
    plt.show()

## Main

In [None]:
def main():
    X_train_np, y_train_np, X_val_np, y_val_np = get_processed_data()

    DTcustom = DTclassifier_custom(max_depth=None, criterion='entropy', min_samples_split=2, min_samples_leaf=1)

    DTcustom.evaluate_with_min_samples_leaf(X_train_np, y_train_np, X_val_np, y_val_np)
    DTcustom.evaluate_min_samples_split(X_train_np, y_train_np, X_val_np, y_val_np)
    DTcustom.evaluate_with_criterion(X_train_np, y_train_np, X_val_np, y_val_np)
    DTcustom.evaluate_with_max_depth(X_train_np, y_train_np, X_val_np, y_val_np)

    DTlib = DT_lib(X_train_np, y_train_np, X_val_np, y_val_np)
    DTlib.evaluate_min_samples_split()
    DTlib.evaluate_min_samples_leaf()
    DTlib.evaluate_min_impurity_decrease()
    DTlib.evaluate_max_leaf_nodes()
    DTlib.evaluate_ccp_alpha()
    DTlib.evaluate_max_depth()

    n_estimators_values = [5, 10, 20, 50, 100, 150]
    run_random_forest_experiment_custom(n_estimators_values, X_train_np, y_train_np, X_val_np, y_val_np)
    run_random_forest_experiment_lib(n_estimators_values, X_train_np, y_train_np, X_val_np, y_val_np)

    run_gradient_boosting_experiment_lib(n_estimators_values, X_train_np, y_train_np, X_val_np, y_val_np)
    
    
if __name__ == "__main__":
    main()