# Лабораторная работа №3 "Проведение исследований с решающим деревом"

### Ход работы

Импортируем библиотеки перед работой

In [3]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

import warnings

from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder
from sklearn.decomposition import PCA
from sklearn.model_selection import StratifiedKFold, GridSearchCV, train_test_split, KFold, RandomizedSearchCV
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, mean_absolute_error, r2_score
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from scipy.stats import randint
from sklearn.exceptions import ConvergenceWarning
from collections import Counter
from abc import ABC, abstractmethod

##### Создание бейзлайна для модели классификации

Проведём те же манипуляции, что и ранне: выгрузим датасет и минимально его обработаем

In [6]:
c_base_df = pd.read_csv("../classification.csv").sample(frac=1, random_state=42).reset_index(drop=True)

c_base_df = c_base_df.drop(columns=['instance_id', 'obtained_date', 'track_name', 'artist_name'])

c_base_df.drop_duplicates()

c_base_df['tempo'] = pd.to_numeric(c_base_df['tempo'], errors='coerce')

le = LabelEncoder()
c_base_df['mode'] = le.fit_transform(c_base_df['mode'])
c_base_df['music_genre'] = le.fit_transform(c_base_df['music_genre'])
c_base_df['key'] = le.fit_transform(c_base_df['key'])

median_tempo = c_base_df['tempo'].median()
c_base_df['tempo'] = c_base_df['tempo'].fillna(median_tempo)

X_c_base = c_base_df.drop(columns=["music_genre"])
y_c_base = c_base_df["music_genre"]

X_c_base_train, X_c_base_test, y_c_base_train, y_c_base_test = train_test_split(
    X_c_base,
    y_c_base,
    test_size=0.2,
    random_state=42,
    stratify=y_c_base
)

Теперь обучим модель из sklearn

In [4]:
model = DecisionTreeClassifier(random_state=42)

model.fit(X_c_base_train, y_c_base_train)

y_c_base_pred = model.predict(X_c_base_test)

accuracy = accuracy_score(y_c_base_test, y_c_base_pred)
print("Accuracy:", accuracy)

Accuracy: 0.4298


##### Улучшение бейзлайна для модели классификации

In [5]:
print(classification_report(y_c_base_test, y_c_base_pred))

              precision    recall  f1-score   support

           0       0.27      0.27      0.27      1000
           1       0.65      0.66      0.66      1000
           2       0.39      0.41      0.40      1000
           3       0.74      0.74      0.74      1000
           4       0.43      0.43      0.43      1000
           5       0.46      0.47      0.46      1000
           6       0.31      0.34      0.32      1000
           7       0.38      0.37      0.38      1000
           8       0.25      0.23      0.24      1000
           9       0.41      0.38      0.39      1000

    accuracy                           0.43     10000
   macro avg       0.43      0.43      0.43     10000
weighted avg       0.43      0.43      0.43     10000



Это лучше чем регрессия. Теперь преобразуем датасет как ранее

In [15]:
c_df = pd.read_csv("../classification.csv").sample(frac=1, random_state=42).reset_index(drop=True)

c_df = c_df.drop(columns=['instance_id', 'obtained_date', 'track_name', 'artist_name'])


sc = StandardScaler()
scaled = sc.fit_transform(c_df[['loudness', 'acousticness', 'energy']]) # ранее я не замечал, что loudness просто съедала остальные фичи 
pca = PCA(n_components=2)
c_df[['pc1', 'pc2']] = pca.fit_transform(scaled)
c_df = c_df.drop(columns=['loudness', 'acousticness', 'energy'])

c_df['tempo'] = pd.to_numeric(c_df['tempo'], errors='coerce')

le = LabelEncoder()
c_df['music_genre'] = le.fit_transform(c_df['music_genre'])
c_df['mode'] = le.fit_transform(c_df['mode'])

ohe = OneHotEncoder(sparse_output=False, drop='first')
encoded_key = ohe.fit_transform(c_df[['key']])
encoded_df_key = pd.DataFrame(encoded_key, columns=ohe.get_feature_names_out(['key']))
c_df = c_df.drop(columns=['key']).reset_index(drop=True)
c_df = pd.concat([c_df, encoded_df_key], axis=1)

c_df['duration_ms'] = c_df['duration_ms'].replace(-1, np.nan)

c_df['instrumental_flag'] = (c_df['instrumentalness'] > 0.05).astype(int)
c_df = c_df.drop(columns=['instrumentalness'])

c_df['undefined_tempo'] = c_df['tempo'].isna().astype(int)

median_tempo = c_df['tempo'].median()
c_df['tempo'] = c_df['tempo'].fillna(median_tempo)

median_duration = c_df['duration_ms'].median()
c_df['duration_ms'] = c_df['duration_ms'].fillna(median_duration)

float_features = [
    'popularity', 'danceability', 'duration_ms',
    'liveness', 'speechiness', 'tempo',
    'valence', 'pc1', 'pc2'
]

other_features = [
    'mode', 'instrumental_flag', 'undefined_tempo'
] + list(encoded_df_key.columns) 

X_c = c_df[float_features + other_features]
y_c = c_df['music_genre']

X_c_train, X_c_test, y_c_train, y_c_test = train_test_split(
    X_c, y_c, test_size=0.2, stratify=y_c, random_state=42
)

Перейдём к обучению

In [None]:
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

pipe = Pipeline([
    ('dt', DecisionTreeClassifier(random_state=42))
])

param_dist = {
    'dt__criterion': ['gini', 'entropy'],
    'dt__max_depth': [None, 5, 8, 12, 16, 20, 30],
    'dt__min_samples_split': randint(2, 60),
    'dt__min_samples_leaf': randint(1, 40),
    'dt__max_features': [None, 'sqrt', 'log2', 'auto'],
    'dt__class_weight': [None, 'balanced']
}

rand_search = RandomizedSearchCV(
    estimator=pipe,
    param_distributions=param_dist,
    n_iter=60,                  
    scoring='accuracy',         
    cv=cv,
    verbose=1,
    random_state=42,
    n_jobs=-1,
    return_train_score=True
)

rand_search.fit(X_c_train, y_c_train)

print("Best params:", rand_search.best_params_)
best_model = rand_search.best_estimator_

y_pred = best_model.predict(X_c_test)

print("Score:", accuracy_score(y_c_test, y_pred))

print(classification_report(y_c_test, y_pred))

Fitting 5 folds for each of 60 candidates, totalling 300 fits
Best params: {'dt__class_weight': 'balanced', 'dt__criterion': 'entropy', 'dt__max_depth': 12, 'dt__max_features': None, 'dt__min_samples_leaf': 37, 'dt__min_samples_split': 33}
Score: 0.5141
              precision    recall  f1-score   support

           0       0.36      0.29      0.32      1000
           1       0.69      0.66      0.68      1000
           2       0.50      0.40      0.44      1000
           3       0.82      0.79      0.81      1000
           4       0.48      0.50      0.49      1000
           5       0.53      0.48      0.51      1000
           6       0.42      0.43      0.42      1000
           7       0.47      0.46      0.47      1000
           8       0.41      0.44      0.42      1000
           9       0.47      0.69      0.56      1000

    accuracy                           0.51     10000
   macro avg       0.52      0.51      0.51     10000
weighted avg       0.52      0.51      0.5

##### Создание бейзлайна для модели регрессии

Сделаем всё то же, что и ранее

In [23]:
r_base_df = pd.read_csv("../regression.csv").sample(frac=1, random_state=42).reset_index(drop=True)

r_base_df['Date'] = pd.to_datetime(r_base_df['Date'], dayfirst=True)

r_base_df["Year"] = r_base_df["Date"].dt.year
r_base_df["Month"] = r_base_df["Date"].dt.month
r_base_df["Day"] = r_base_df["Date"].dt.day

r_base_df = r_base_df.drop(columns=['Date'])

per_store_count = r_base_df.groupby('Store').size().iloc[0]
k = max(1, int(np.round(0.8 * per_store_count))) 
store_counts = r_base_df['Store'].nunique()

train = r_base_df.iloc[: store_counts * k]
test = r_base_df.iloc[store_counts * k :]

X_r_base_train = train.drop(columns=['Weekly_Sales'])
X_r_base_test = test.drop(columns=['Weekly_Sales'])
y_r_base_train = train['Weekly_Sales']
y_r_base_test = test['Weekly_Sales']

И теперь обучим модель

In [28]:
dt_reg = DecisionTreeRegressor(random_state=42, max_depth=3)
dt_reg.fit(X_r_base_train, y_r_base_train)

y_test_pred  = dt_reg.predict(X_r_base_test)

mae = mean_absolute_error(y_r_base_test, y_test_pred)
rmse = np.sqrt(mean_squared_error(y_r_base_test, y_test_pred))
r2 = r2_score(y_r_base_test, y_test_pred)

print("MAE:", mae)
print("RMSE:", rmse)
print("R2:", r2)

MAE: 330834.6335131109
RMSE: 433606.2436003742
R2: 0.41886084325173134


##### Улучшение бейзлайна для модели регрессии

Сначала повторим техники из предыдущей ЛР

In [26]:
r_df = pd.read_csv("../regression.csv").sample(frac=1, random_state=42).reset_index(drop=True)

r_df['Date'] = pd.to_datetime(r_df['Date'], dayfirst=True)

r_df['Year'] = r_df['Date'].dt.year
r_df['Week'] = r_df['Date'].dt.isocalendar().week

r_df = r_df.drop(columns=['Date'])

cat_store = ['Store', 'Week']
other_feats = ['Temperature', 'Fuel_Price', 'CPI', 'Unemployment', 'Year', 'Holiday_Flag']

per_store_count = r_df.groupby('Store').size().iloc[0]
k = max(1, int(np.round(0.8 * per_store_count))) 
store_counts = r_df['Store'].nunique()

train = r_df.iloc[: store_counts * k]
test = r_df.iloc[store_counts * k :]

X_r_train = train.drop(columns=['Weekly_Sales'])
X_r_test = test.drop(columns=['Weekly_Sales'])
y_r_train = train['Weekly_Sales']
y_r_test = test['Weekly_Sales']

warnings.filterwarnings("ignore")

preprocessor = ColumnTransformer(
    transformers=[
        ('ohe', OneHotEncoder(handle_unknown='ignore', sparse_output=False), cat_store),
        ('passth', 'passthrough', other_feats)
    ],
    remainder='drop'
)

Перейдём к обучению

In [30]:
dt_pipeline = Pipeline([
    ('preproc', preprocessor),
    ('dt', DecisionTreeRegressor(random_state=42))
])

param_dist = {
    'dt__max_depth': [None, 3, 4, 5, 6],
    'dt__min_samples_split': randint(2, 10),
    'dt__min_samples_leaf': randint(1, 10),
    'dt__max_features': [None, 'sqrt', 'log2'],
    'dt__criterion': ['squared_error', 'friedman_mse', 'absolute_error', 'poisson']
}

kf = KFold(n_splits=5, shuffle=True, random_state=42)

rand_search = RandomizedSearchCV(
    estimator=dt_pipeline,
    param_distributions=param_dist,
    n_iter=50,  
    cv=kf,
    scoring='neg_mean_squared_error', 
    n_jobs=-1,
    verbose=1,
    random_state=42,
    return_train_score=True
)

rand_search.fit(X_r_train, y_r_train)

print("Best params:", rand_search.best_params_)
best_model = rand_search.best_estimator_
y_test_pred  = best_model.predict(X_r_test)

mae = mean_absolute_error(y_r_test, y_test_pred)
rmse = np.sqrt(mean_squared_error(y_r_test, y_test_pred))
r2 = r2_score(y_r_test, y_test_pred)

print("MAE:", mae)
print("RMSE:", rmse)
print("R2:", r2)

Fitting 5 folds for each of 50 candidates, totalling 250 fits


Best params: {'dt__criterion': 'poisson', 'dt__max_depth': None, 'dt__max_features': None, 'dt__min_samples_leaf': 1, 'dt__min_samples_split': 3}
MAE: 68922.4129770115
RMSE: 132851.9622745847
R2: 0.9454461969234228


##### Имплементация базового класса

класс ноды

In [2]:
class TreeNode:
    """
    Узел дерева. Минимальный набор полей для экономии памяти.
    """
    __slots__ = ("feature", "threshold", "left", "right", "value")

    def __init__(self):
        self.feature = None      # индекс признака для сплита
        self.threshold = None    # порог для сплита
        self.left = None         # левый потомок
        self.right = None        # правый потомок
        self.value = None        # значение в листе (предсказание)

сам базовый класс

In [None]:
class BaseDecisionTree(ABC):
    """
    Базовый класс дерева (для классификации и регрессии).
    Поддерживает criterion='entropy' или 'poisson'.
    """

    def __init__(
        self,
        criterion="entropy",
        max_depth=None,
        min_samples_split=2,
        min_samples_leaf=1,
        max_features=None
    ):
        if criterion not in ("entropy", "poisson"):
            raise ValueError("Поддерживаются только criterion='entropy' или 'poisson'")
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_features = max_features
        self.root = None
        self.n_features_ = None

    @abstractmethod
    def _calculate_impurity(self, y, sample_weight=None):
        """
        Вычисляет "нечистоту" узла.
        Для классификации — энтропия.
        Для Poisson-регрессии — Poisson deviance.
        """
        pass

    @abstractmethod
    def _leaf_value(self, y, sample_weight=None):
        """
        Значение в листе.
        Для классификации — наиболее частый класс.
        Для Poisson-регрессии — среднее.
        """
        pass

    def _best_split(self, X, y, sample_weight):
        """
        Ищет оптимальный сплит по всем (или случайным) признакам,
        который максимизирует прирост качества (gain).
        """
        n_samples, n_features = X.shape

        # выбираем признаки для рассмотрения (все или подвыборка)
        features = np.arange(n_features)
        if self.max_features is not None:
            size = min(self.max_features, n_features)
            features = np.random.choice(features, size, replace=False)

        best_gain = -np.inf
        best_feature = None
        best_threshold = None
        base_impurity = self._calculate_impurity(y, sample_weight)

        # перебираем признаки и пороги для разделения
        for feature in features:
            values = X[:, feature]
            thresholds = np.unique(values)

            for thr in thresholds:
                left_mask = values <= thr
                right_mask = ~left_mask

                # проверяем минимальное количество объектов в листе
                if (
                    left_mask.sum() < self.min_samples_leaf
                    or right_mask.sum() < self.min_samples_leaf
                ):
                    continue

                y_left, y_right = y[left_mask], y[right_mask]
                w_left = sample_weight[left_mask]
                w_right = sample_weight[right_mask]

                w_total = np.sum(sample_weight)
                w_l = np.sum(w_left)
                w_r = np.sum(w_right)

                # вычисляем прирост качества (information gain)
                impurity_left = self._calculate_impurity(y_left, w_left)
                impurity_right = self._calculate_impurity(y_right, w_right)

                gain = base_impurity - (w_l / w_total) * impurity_left - (w_r / w_total) * impurity_right

                # сохраняем лучший сплит
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = thr

        return best_feature, best_threshold, best_gain


    def _build_tree(self, X, y, sample_weight, depth):
        """
        Рекурсивная постройка дерева.
        """
        node = TreeNode()  # создаем узел
        node.value = self._leaf_value(y, sample_weight)  # значение узла (листа)

        # проверка условий остановки
        if (
            len(y) < self.min_samples_split
            or (self.max_depth is not None and depth >= self.max_depth)
            or len(set(y)) == 1 
        ):
            return node  

        # ищем лучший сплит
        feature, threshold, gain = self._best_split(X, y, sample_weight)
        if feature is None or gain <= 0:
            return node  

        node.feature = feature
        node.threshold = threshold

        # рекурсивно строим левое и правое поддеревья
        mask = X[:, feature] <= threshold
        node.left = self._build_tree(X[mask], y[mask], sample_weight[mask], depth + 1)
        node.right = self._build_tree(X[~mask], y[~mask], sample_weight[~mask], depth + 1)

        return node


    def fit(self, X, y, sample_weight=None):
        """
        Обучение дерева на данных X, y с возможностью учета весов объектов.
        """
        X = np.asarray(X)
        y = np.asarray(y)
        self.n_features_ = X.shape[1]

        if sample_weight is None:
            sample_weight = np.ones(len(y))
        else:
            sample_weight = np.asarray(sample_weight)

        # создаем дерево начиная с корня
        self.root = self._build_tree(X, y, sample_weight, depth=0)
        return self


    def _predict_one(self, x, node):
        """
        Проход по дереву для одного объекта x.
        """
        while node.feature is not None:
            if x[node.feature] <= node.threshold:
                node = node.left
            else:
                node = node.right
        return node.value  


    def predict(self, X):
        """
        Предсказание значений для массива объектов X.
        """
        X = np.asarray(X)
        return np.array([self._predict_one(x, self.root) for x in X])


##### Классификатор

In [20]:
class MyDecisionTreeClassifier(BaseDecisionTree):
    """
    Реализация решающего дерева для классификации на основе BaseDecisionTree.
    """
    def __init__(
        self,
        criterion="entropy",
        max_depth=None,
        min_samples_split=2,
        min_samples_leaf=1,
        max_features=None,
        class_weight=None
    ):
        super().__init__(
            criterion=criterion,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            max_features=max_features
        )
        if criterion != "entropy":
            raise ValueError("Поддерживается только criterion='entropy'")
        self.criterion = criterion
        self.class_weight = class_weight
        self.classes_ = None
        self.class_weights_ = None

    def _compute_class_weights(self, y):
        """
        class_weight='balanced'
        """
        counts = Counter(y)
        n_samples = len(y)
        n_classes = len(counts)
        weights = {cls: n_samples / (n_classes * cnt) for cls, cnt in counts.items()}
        return weights

    def _calculate_impurity(self, y, sample_weight=None):
        """
        Энтропия с учётом весов объектов.
        """
        if len(y) == 0:
            return 0.0

        if sample_weight is None:
            counts = np.bincount(y)
            probs = counts[counts > 0] / len(y)
        else:
            total_weight = np.sum(sample_weight)
            weights_per_class = {}
            for label, w in zip(y, sample_weight):
                weights_per_class[label] = weights_per_class.get(label, 0) + w
            probs = np.array(list(weights_per_class.values())) / total_weight

        return -np.sum(probs * np.log2(probs + 1e-9))

    def _leaf_value(self, y, sample_weight=None):
        """
        Возвращает класс с наибольшим суммарным весом (или частотой).
        """
        if sample_weight is None:
            counts = Counter(y)
        else:
            counts = {}
            for label, w in zip(y, sample_weight):
                counts[label] = counts.get(label, 0) + w
        return max(counts, key=counts.get)

    def fit(self, X, y):
        X = np.asarray(X)
        y = np.asarray(y)
        self.classes_ = np.unique(y)

        if self.class_weight == "balanced":
            self.class_weights_ = self._compute_class_weights(y)
            sample_weight = np.array([self.class_weights_[cls] for cls in y])
        else:
            sample_weight = np.ones(len(y))

        return super().fit(X, y, sample_weight)

Обучим на данных бейзлайна

In [None]:
model = MyDecisionTreeClassifier(max_depth=3)

model.fit(X_c_base_train, y_c_base_train)

y_c_base_pred = model.predict(X_c_base_test)

accuracy = accuracy_score(y_c_base_test, y_c_base_pred)
print("Accuracy:", accuracy)

Accuracy: 0.3827


(я забыл вывести отчёт, а колаб уже всё затёр. Эта штучка обучалась около часа, поэтому да...)

Обучим на данных улучшенной модели. Сразу будем использовать параметры, полученные при подборе гиперпараметров

In [None]:
pipe = Pipeline([
    ('dt', MyDecisionTreeClassifier(max_depth=12, min_samples_leaf=37, min_samples_split=3))
])

pipe.fit(X_c_train, y_c_train)


y_pred = pipe.predict(X_c_test)

print("Score:", accuracy_score(y_c_test, y_pred))

print(classification_report(y_c_test, y_pred))

Score: 0.5154
              precision    recall  f1-score   support

           0       0.37      0.28      0.32      1000
           1       0.70      0.65      0.67      1000
           2       0.50      0.40      0.45      1000
           3       0.82      0.79      0.81      1000
           4       0.48      0.51      0.49      1000
           5       0.53      0.48      0.51      1000
           6       0.42      0.42      0.42      1000
           7       0.47      0.47      0.47      1000
           8       0.41      0.45      0.43      1000
           9       0.47      0.69      0.56      1000

    accuracy                           0.52     10000
   macro avg       0.52      0.52      0.51     10000
weighted avg       0.52      0.52      0.51     10000



Имеем скоры:

**Бейзлайн sklearn** - 0.4298

**Улучшенная модель sklearn** - 0.5141

**Мой бейзлайн** - 0.3827

**Мой улучшенный бейзлайн** - 0.5154

Моя модель несильно отличается от модели sklearn по скору

##### Регрессор

In [32]:
class MyDecisionTreerRegressor(BaseDecisionTree):
    """
    Решающее дерево для регрессии с Poisson loss.
    """

    def __init__(
        self,
        max_depth=None,
        min_samples_split=2,
        min_samples_leaf=1,
        max_features=None
    ):
        super().__init__(
            criterion="poisson",
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            max_features=max_features
        )

    def _calculate_impurity(self, y, sample_weight=None):
        """
        Poisson deviance: sum(y * log(y / mean) - (y - mean))
        Для узла: используем среднее по y как baseline.
        """
        if len(y) == 0:
            return 0.0

        if sample_weight is None:
            w = np.ones_like(y, dtype=float)
        else:
            w = sample_weight

        y = np.asarray(y, dtype=float)
        mean_y = np.average(y, weights=w)
        mean_y = max(mean_y, 1e-9)  # чтобы избежать log(0)

        # Poisson deviance
        deviance = np.sum(w * (y * np.log((y + 1e-9) / mean_y) - (y - mean_y)))
        return deviance

    def _leaf_value(self, y, sample_weight=None):
        """
        Значение листа — взвешенное среднее y.
        """
        if len(y) == 0:
            return 0.0

        if sample_weight is None:
            return np.mean(y)
        else:
            return np.average(y, weights=sample_weight)

Обучим на данных бейзлайна

In [33]:
dt_reg = MyDecisionTreerRegressor(max_depth=3)
dt_reg.fit(X_r_base_train, y_r_base_train)

y_test_pred  = dt_reg.predict(X_r_base_test)

mae = mean_absolute_error(y_r_base_test, y_test_pred)
rmse = np.sqrt(mean_squared_error(y_r_base_test, y_test_pred))
r2 = r2_score(y_r_base_test, y_test_pred)

print("MAE:", mae)
print("RMSE:", rmse)
print("R2:", r2)

MAE: 384023.38268405484
RMSE: 484455.802051078
R2: 0.2745668328456484


Обучим на данных улучшенной модели. Сразу будем использовать параметры, полученные при подборе гиперпараметров

In [34]:
dt_pipeline = Pipeline([
    ('preproc', preprocessor),
    ('dt', MyDecisionTreerRegressor(min_samples_leaf=1, min_samples_split=3))
])

dt_pipeline.fit(X_r_train, y_r_train)

y_test_pred  = dt_pipeline.predict(X_r_test)

mae = mean_absolute_error(y_r_test, y_test_pred)
rmse = np.sqrt(mean_squared_error(y_r_test, y_test_pred))
r2 = r2_score(y_r_test, y_test_pred)

print("MAE:", mae)
print("RMSE:", rmse)
print("R2:", r2)

MAE: 100450.66639846744
RMSE: 215020.25432678152
R2: 0.8570948674402575


Получили

| Метрика | Бейзлайн линейной регрессии | Бейзлайн имплементации | Улучшенная линейная рергессия | Улучшенная имплементация |
|-|-|-|-|-|
| MAE | 330835 | 384023 | 68922 | 100451 |
| RMSE | 433606 | 484456 | 132852 | 215020 |
| R2 | 0.419 | 0.275 | 0.945 | 0.857 |

Моя имплементация оказалась слабее