In [36]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, r2_score, recall_score
from sklearn.model_selection import GridSearchCV
from imblearn.over_sampling import SMOTE
from sklearn.impute import SimpleImputer

Структура ноды

In [2]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

Классификатор

In [3]:
class MyDecisionTreeClassifier:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split

    def _gini(self, y):
        _, counts = np.unique(y, return_counts=True)
        probs = counts / len(y)
        return 1.0 - np.sum(probs ** 2)

    def _best_split(self, X, y):
        n_samples, n_features = X.shape
        best_feature, best_thresh = None, None
        best_gain = 0
        parent_impurity = self._gini(y)

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for t in thresholds:
                left_mask = X[:, feature] <= t
                right_mask = X[:, feature] > t

                if np.sum(left_mask) < self.min_samples_split or np.sum(right_mask) < self.min_samples_split:
                    continue

                left_y, right_y = y[left_mask], y[right_mask]
                impurity = (len(left_y) * self._gini(left_y) + len(right_y) * self._gini(right_y)) / n_samples
                gain = parent_impurity - impurity

                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_thresh = t

        return best_feature, best_thresh

    def _build_tree(self, X, y, depth=0):
        if len(np.unique(y)) == 1 or (self.max_depth and depth >= self.max_depth):
            values, counts = np.unique(y, return_counts=True)
            return Node(value=values[np.argmax(counts)])

        feature, threshold = self._best_split(X, y)
        if feature is None:
            values, counts = np.unique(y, return_counts=True)
            return Node(value=values[np.argmax(counts)])

        left_mask = X[:, feature] <= threshold
        right_mask = X[:, feature] > threshold
        left = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right = self._build_tree(X[right_mask], y[right_mask], depth + 1)
        return Node(feature, threshold, left, right)

    def fit(self, X, y):
        self.tree_ = self._build_tree(np.array(X), np.array(y))
        return self

    def _predict_one(self, x, node):
        if node.value is not None:
            return node.value
        if x[node.feature] <= node.threshold:
            return self._predict_one(x, node.left)
        else:
            return self._predict_one(x, node.right)

    def predict(self, X):
        return np.array([self._predict_one(x, self.tree_) for x in np.array(X)])

Регрессор

In [4]:
class MyDecisionTreeRegressor:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split

    def _mse(self, y):
        return np.mean((y - np.mean(y)) ** 2)

    def _best_split(self, X, y):
        n_samples, n_features = X.shape
        best_feature, best_thresh = None, None
        best_mse = np.inf

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for t in thresholds:
                left_mask = X[:, feature] <= t
                right_mask = X[:, feature] > t

                if np.sum(left_mask) < self.min_samples_split or np.sum(right_mask) < self.min_samples_split:
                    continue

                left_y, right_y = y[left_mask], y[right_mask]
                mse = (len(left_y) * self._mse(left_y) + len(right_y) * self._mse(right_y)) / n_samples

                if mse < best_mse:
                    best_mse = mse
                    best_feature = feature
                    best_thresh = t

        return best_feature, best_thresh

    def _build_tree(self, X, y, depth=0):
        if (self.max_depth and depth >= self.max_depth) or len(y) < self.min_samples_split:
            return Node(value=np.mean(y))

        feature, threshold = self._best_split(X, y)
        if feature is None:
            return Node(value=np.mean(y))

        left_mask = X[:, feature] <= threshold
        right_mask = X[:, feature] > threshold
        left = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        right = self._build_tree(X[right_mask], y[right_mask], depth + 1)
        return Node(feature, threshold, left, right)

    def fit(self, X, y):
        self.tree_ = self._build_tree(np.array(X), np.array(y))
        return self

    def _predict_one(self, x, node):
        if node.value is not None:
            return node.value
        if x[node.feature] <= node.threshold:
            return self._predict_one(x, node.left)
        else:
            return self._predict_one(x, node.right)

    def predict(self, X):
        return np.array([self._predict_one(x, self.tree_) for x in np.array(X)])

Загрузка датасета классификации

In [None]:
df_fraud = pd.read_csv('creditcard.csv')

df_fraud = df_fraud.sample(n=10000, random_state=42)

Xc = df_fraud.drop('Class', axis=1)
yc = df_fraud['Class']

Xc_train, Xc_test, yc_train, yc_test = train_test_split(
    Xc, yc, test_size=0.2, random_state=42, stratify=yc
)

Запуск классификации sklearn

In [6]:
sk_tree_c = DecisionTreeClassifier(max_depth=3)
sk_tree_c.fit(Xc_train, yc_train)
yc_pred_sk = sk_tree_c.predict(Xc_test)

Запуск собственной классификации

In [7]:
my_tree_c = MyDecisionTreeClassifier(max_depth=3)
my_tree_c.fit(Xc_train, yc_train)
yc_pred_my = my_tree_c.predict(Xc_test)

Метрики

In [8]:

print(f"Sklearn accuracy={accuracy_score(yc_test, yc_pred_sk):.4f}, f1={f1_score(yc_test, yc_pred_sk, average='weighted'):.4f}, recall={recall_score(yc_test, yc_pred_sk, pos_label=1)}")
print(f"Custom  accuracy={accuracy_score(yc_test, yc_pred_my):.4f}, f1={f1_score(yc_test, yc_pred_my, average='weighted'):.4f}, recall={recall_score(yc_test, yc_pred_my, pos_label=1)}")

Sklearn accuracy=0.9990, f1=0.9988, recall=0.3333333333333333
Custom  accuracy=0.9995, f1=0.9995, recall=0.6666666666666666


Регрессия

Загрузка датасета

In [9]:
df_crop = pd.read_csv('crop_yield.csv')

df_crop = df_crop.sample(n=10000, random_state=42)

Xr = df_crop.drop('Yield_tons_per_hectare', axis=1)
yr = df_crop['Yield_tons_per_hectare']

Xr = pd.get_dummies(Xr, drop_first=True)

Xr_train, Xr_test, yr_train, yr_test = train_test_split(
    Xr, yr, test_size=0.2, random_state=42
)


Запуск регресии sklearn

In [10]:
sk_tree_r = DecisionTreeRegressor(max_depth=4)
sk_tree_r.fit(Xr_train, yr_train)
yr_pred_sk = sk_tree_r.predict(Xr_test)

Запуск собственной регресии

In [11]:
my_tree_r = MyDecisionTreeRegressor(max_depth=4)
my_tree_r.fit(Xr_train, yr_train)
yr_pred_my = my_tree_r.predict(Xr_test)

Метрики

In [12]:

print(f"SkLearn RMSE={np.sqrt(mean_squared_error(yr_test, yr_pred_sk)):.4f}, R2={r2_score(yr_test, yr_pred_sk):.4f}")
print(f"Custom  RMSE={np.sqrt(mean_squared_error(yr_test, yr_pred_my)):.4f}, R2={r2_score(yr_test, yr_pred_my):.4f}")

SkLearn RMSE=0.6193, R2=0.8670
Custom  RMSE=0.6193, R2=0.8670


Теперь применим улучшения полученные на этапе анализа данных

Сбалансируем классы

In [None]:
df_fraud = pd.read_csv('creditcard.csv')

Xc = df_fraud.drop('Class', axis=1)
yc = df_fraud['Class']

Xc_train, Xc_test, yc_train, yc_test = train_test_split(
    Xc, yc, test_size=0.2, random_state=42, stratify=yc
)

In [26]:
sm = SMOTE(random_state=42)
Xc_train_bal, yc_train_bal = sm.fit_resample(Xc_train, yc_train)

idx = np.random.choice(len(Xc_train_bal), size=10000, replace=False)


Xc_train_bal_small = Xc_train_bal.iloc[idx]
yc_train_bal_small = yc_train_bal.iloc[idx] 

Подбор гиперпараметров sklearn

In [28]:
params = {
    'max_depth': [3, 5, 10],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 5]
}

dt = DecisionTreeClassifier(random_state=23456, class_weight='balanced')
grid = GridSearchCV(dt, params, cv=5, scoring='accuracy')
grid.fit(Xc_train_bal_small, yc_train_bal_small)

best_dt = grid.best_estimator_
yc_pred_best = best_dt.predict(Xc_test)

In [29]:
print(f"accuracy={accuracy_score(yc_test, yc_pred_best):.4f}")
print(f"f1={f1_score(yc_test, yc_pred_best, average='weighted'):.4f}")
print(f"recall={recall_score(yc_test, yc_pred_best, pos_label=1)}")

accuracy=0.9728
f1=0.9847
recall=0.8979591836734694


Собственная реализация

In [30]:
my_tree_c = MyDecisionTreeClassifier(max_depth=3)
my_tree_c.fit(Xc_train_bal_small, yc_train_bal_small)
yc_pred_my = my_tree_c.predict(Xc_test)

Метрики

In [31]:
print(f"accuracy={accuracy_score(yc_test, yc_pred_my):.4f}")
print(f"f1={f1_score(yc_test, yc_pred_my, average='weighted'):.4f}")
print(f"recall={recall_score(yc_test, yc_pred_my, pos_label=1)}")

accuracy=0.9755
f1=0.9860
recall=0.8775510204081632


Регрессия

sklearn

Обработка признаков

In [38]:
df_crop = pd.read_csv('crop_yield.csv')

df_crop = df_crop.sample(n=10000, random_state=42)

Xr = df_crop.drop('Yield_tons_per_hectare', axis=1)
yr = df_crop['Yield_tons_per_hectare']
num_cols = Xr.select_dtypes(include=np.number).columns
imputer_num = SimpleImputer(strategy='mean')
Xr[num_cols] = imputer_num.fit_transform(Xr[num_cols])

cat_cols = Xr.select_dtypes(include='object').columns
imputer_cat = SimpleImputer(strategy='most_frequent')
Xr[cat_cols] = imputer_cat.fit_transform(Xr[cat_cols])

Xr = pd.get_dummies(Xr, drop_first=True)

Разделение на выборки

In [39]:
Xr_train, Xr_test, yr_train, yr_test = train_test_split(
    Xr, yr, test_size=0.2, random_state=42
)

Подбор гипер параметров

In [40]:
params = {
    'max_depth': [5, 8, 12],
    'min_samples_split': [5, 10, 20],
    'min_samples_leaf': [2, 5, 10],
    'max_features': ['sqrt', 'log2', None]
}

dt = DecisionTreeRegressor(random_state=23)

grid = GridSearchCV(dt, params, cv=3, scoring='r2', n_jobs=-1)
grid.fit(Xr_train, yr_train)

best_dt = grid.best_estimator_
yr_pred_best = best_dt.predict(Xr_test)

Оценка результата

In [42]:
print(f"RMSE={np.sqrt(mean_squared_error(yr_test, yr_pred_best)):.4f}")
print(f"R2={r2_score(yr_test, yr_pred_best):.4f}")

RMSE=0.5317
R2=0.9020


Собственная реализация

In [43]:
my_tree_r = MyDecisionTreeRegressor(max_depth=4)
my_tree_r.fit(Xr_train, yr_train)
yr_pred_my = my_tree_r.predict(Xr_test)

Метрики

In [44]:
print(f"RMSE={np.sqrt(mean_squared_error(yr_test, yr_pred_my)):.4f}")
print(f"R2={r2_score(yr_test, yr_pred_my):.4f}")

RMSE=0.6193
R2=0.8670
