In [24]:
import numpy as np
np.random.seed(42)

# Decission Tree Interface

In [25]:
from typing import Optional, Literal


class Node:

    def __init__(
        self,
        feature: int, 
        threshold: np.number,
        preds: Optional[dict] = None,
        left: Optional['Node'] = None,
        right: Optional['Node'] = None,
    ):
        self.feature = feature
        self.threshold = threshold
        self.preds = preds
        self.left = left
        self.right = right


class Tree:
    
    def __init__(
        self,
        max_depth: int = 5, 
        min_samples_split: int = 2, 
        min_samples_leaf: int = 2,
        max_features: Optional[int] = None,
    ):
        self.tree = None
        self.criterion = lambda a, b=None: None
        self.prediction_func = lambda _: None
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_features = max_features


    def information_gain(
        self,
        x: np.array, 
        y: np.array, 
        criterion_idx: int, 
        criterion_threshold: np.number,
        sample_weight: Optional[np.array] = None,
    ):
        parent_crit_val = self.criterion(y, sample_weight)

        left_mask = x[:, criterion_idx] < criterion_threshold
        right_mask = ~left_mask

        left_y = y[left_mask]
        right_y = y[right_mask]

        left_w = sample_weight[left_mask] if sample_weight is not None else None
        right_w = sample_weight[right_mask] if sample_weight is not None else None

        left_crit_val = self.criterion(left_y, left_w)
        right_crit_val = self.criterion(right_y, right_w)

        return len(y) * parent_crit_val - len(left_y) * left_crit_val - len(right_y) * right_crit_val


    def split_node(
            self, 
            x: np.array, 
            y: np.array, 
            depth: int, 
            used: set, 
            sample_weight: Optional[np.array] = None,
        ):
        n_samples, n_features = x.shape

        preds = self.prediction_func(y)

        if (n_samples < self.min_samples_split) or (depth >= self.max_depth):
            return preds
        
        best_info_gain, best_feature, best_threshold = -np.inf, None, None
        
        features = set(range(n_features)) - used
        if self.max_features is not None:
            features = np.random.choice(list(features), size=self.max_features)
        
        for feature in features:
            col_vals = np.unique(x[:, feature])
            thresholds = (col_vals[:-1] + col_vals[1:]) / 2
            for threshold in thresholds:
                tmp_x = x[x[:, feature] < threshold]
                if (len(tmp_x) < self.min_samples_leaf) or (len(x) - len(tmp_x) < self.min_samples_leaf):
                    continue

                gain = self.information_gain(
                    x, y, criterion_idx=feature, criterion_threshold=threshold, sample_weight=sample_weight,
                )
                
                if gain > best_info_gain:
                    best_info_gain = gain
                    best_feature = feature
                    best_threshold = threshold
        
        if best_info_gain in [0, -np.inf]:
            return preds
        
        mask = x[:, best_feature] < best_threshold
        sw_left = sample_weight[mask] if sample_weight is not None else None
        sw_right = sample_weight[~mask] if sample_weight is not None else None

        left_node = self.split_node(x[mask], y[mask], depth + 1, used | {best_feature}, sw_left)
        right_node = self.split_node(x[~mask], y[~mask], depth + 1, used | {best_feature}, sw_right)
        
        return Node(
            feature=best_feature, 
            threshold=best_threshold, 
            preds=preds,
            left=left_node, 
            right=right_node, 
        )


    def fit(self, x: np.array, y: np.array, sample_weight: Optional[np.array] = None):
        self.tree = self.split_node(x, y, depth=0, used=set(), sample_weight=sample_weight)

# Decission Tree for classification

In [26]:
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report


ds = load_breast_cancer()

clf_x_tr, clf_x_test, clf_y_tr, clf_y_test = train_test_split(ds["data"], ds["target"], test_size=0.2, random_state=42, shuffle=True)

### Ручная реализация на np

In [None]:
class ClfDecissionTree(Tree):

    def __init__(
        self, 
        criterion: Literal["gini", "entropy"],
        max_depth: int = 5, 
        min_samples_split: int = 2, 
        min_samples_leaf: int = 2,
        max_features: Optional[int] = None,
    ):
        super().__init__(
            max_depth=max_depth,
            min_samples_leaf=min_samples_leaf,
            min_samples_split=min_samples_split,
            max_features=max_features,
        )

        if criterion == "gini":
            self.criterion = ClfDecissionTree.gini
        elif criterion == "entropy":
            self.criterion = ClfDecissionTree.entropy
        else:
            raise ValueError(criterion)
        
        self.prediction_func = self._calc_probs


    @staticmethod
    def _calc_weighted_probs(y: np.array, sample_weight: np.array):
        if (sample_weight is None) or (np.sum(sample_weight) == 0):
            _, counts = np.unique(y, return_counts=True)
            return counts / np.sum(counts)
        else:  # Поведение под AdaBoost - расчет вероятности для взвешенных объектов
            classes = np.unique(y)
            sum_weights = np.sum(sample_weight)
            return np.array([np.sum(sample_weight[y == cl]) / sum_weights for cl in classes])
        

    @staticmethod
    def entropy(y: np.array, sample_weight: Optional[np.array] = None):
        p = ClfDecissionTree._calc_weighted_probs(y, sample_weight)
        
        return -np.sum(p * np.log2(p))
    

    @staticmethod
    def gini(y: np.array, sample_weight: Optional[np.array] = None):
        p = ClfDecissionTree._calc_weighted_probs(y, sample_weight)
            
        return 1 - np.sum(p ** 2)
    

    def _calc_probs(self, y: np.array):
        classes, counts = np.unique(y, return_counts=True)
        probs_dict = dict(zip(classes, counts / len(y)))

        return [probs_dict.get(c, np.float64(0.)) for c in self.classes]


    def fit(self, x: np.array, y: np.array, sample_weight: Optional[np.array] = None):
        self.classes = sorted(np.unique(y))
        self.tree = self.split_node(x, y, depth=0, used=set(), sample_weight=sample_weight)
    
    
    def predict_proba(self, x: np.array):
        preds = []
        for x_obj in x:
            node = self.tree 
            while isinstance(node, Node):
                node = node.left if x_obj[node.feature] < node.threshold else node.right
            
            preds.append(node)

        return np.array(preds)
    
    
    def predict(self, x: np.array):
        probs = self.predict_proba(x)
        inds = np.argmax(probs, axis=-1)

        return np.array(self.classes)[inds]  # Такая реализация безопасна для ситуаций, когда классы не [0, 1, 2...] (к примеру, AdaBoost)

In [28]:
tree = ClfDecissionTree(
    criterion="gini",
    max_depth=3, 
    min_samples_split=10, 
    min_samples_leaf=10,
    max_features=10,
)
tree.fit(clf_x_tr, clf_y_tr)

print("Результаты на test:\n\n", classification_report(clf_y_test, tree.predict(clf_x_test)))
print("ROC-AUC на test:", roc_auc_score(clf_y_test, tree.predict_proba(clf_x_test)[:, 1]).round(3))

Результаты на test:

               precision    recall  f1-score   support

           0       1.00      0.93      0.96        43
           1       0.96      1.00      0.98        71

    accuracy                           0.97       114
   macro avg       0.98      0.97      0.97       114
weighted avg       0.97      0.97      0.97       114

ROC-AUC на test: 0.97


In [29]:
weights = np.ones_like(clf_y_tr)
weights[clf_y_tr == 0] = 4

tree = ClfDecissionTree(
    criterion="gini",
    max_depth=3, 
    min_samples_split=10, 
    min_samples_leaf=10,
    max_features=10,
)
tree.fit(clf_x_tr, clf_y_tr, sample_weight=weights)

print("Результаты на test:\n\n", classification_report(clf_y_test, tree.predict(clf_x_test)))
print("ROC-AUC на test:", roc_auc_score(clf_y_test, tree.predict_proba(clf_x_test)[:, 1]).round(3))

Результаты на test:

               precision    recall  f1-score   support

           0       0.86      1.00      0.92        43
           1       1.00      0.90      0.95        71

    accuracy                           0.94       114
   macro avg       0.93      0.95      0.94       114
weighted avg       0.95      0.94      0.94       114

ROC-AUC на test: 0.948


### Сравнение с sklearn

In [30]:
from sklearn.tree import DecisionTreeClassifier


sklearn_tree = DecisionTreeClassifier(
    criterion="gini", 
    max_depth=3, 
    min_samples_split=10, 
    min_samples_leaf=10,
    max_features=10,
)
sklearn_tree.fit(clf_x_tr, clf_y_tr)

In [31]:
print("Результаты на test:\n\n", classification_report(clf_y_test, sklearn_tree.predict(clf_x_test)))
print("ROC-AUC на test:", roc_auc_score(clf_y_test, sklearn_tree.predict_proba(clf_x_test)[:, 1]).round(3))

Результаты на test:

               precision    recall  f1-score   support

           0       0.95      0.93      0.94        43
           1       0.96      0.97      0.97        71

    accuracy                           0.96       114
   macro avg       0.96      0.95      0.95       114
weighted avg       0.96      0.96      0.96       114

ROC-AUC на test: 0.993


# Decission Tree for regression

In [35]:
from sklearn.datasets import load_diabetes
from sklearn.metrics import mean_squared_error


ds = load_diabetes(scaled=False)

reg_x_tr, reg_x_test, reg_y_tr, reg_y_test = train_test_split(ds["data"], ds["target"], test_size=0.2, random_state=42, shuffle=True)

### Ручная реализация на np

In [33]:
class RegDecissionTree(Tree):

    def __init__(
        self, 
        criterion: Literal["mae", "mse"],
        max_depth: int = 5, 
        min_samples_split: int = 2, 
        min_samples_leaf: int = 2,
    ):
        super().__init__(
            max_depth=max_depth,
            min_samples_leaf=min_samples_leaf,
            min_samples_split=min_samples_split,
        )

        if criterion == "mse":
            self.criterion = RegDecissionTree.mse
            self.prediction_func = RegDecissionTree._predict_mean
        elif criterion == "mae":
            self.criterion = RegDecissionTree.mae
            self.prediction_func = RegDecissionTree._predict_median
        else:
            raise ValueError(criterion)


    @staticmethod
    def mse(y: np.array):
        return np.sum((y - np.mean(y)) ** 2)
    

    @staticmethod
    def mae(y: np.array):
        return np.sum(np.abs(y - np.median(y)))
    
    @staticmethod
    def _predict_mean(y: np.array):
        return np.mean(y)
    

    @staticmethod
    def _predict_median(y: np.array):
        return np.median(y)

    
    def predict(self, x: np.array):
        preds = []
        for x_obj in x:
            node = self.tree 
            while isinstance(node, Node):
                node = node.left if x_obj[node.feature] < node.threshold else node.right
            
            preds.append(node)

        return np.array(preds)

In [36]:
tree = RegDecissionTree(
    criterion="mse",
    max_depth=10, 
    min_samples_split=5, 
    min_samples_leaf=5,
)
tree.fit(reg_x_tr, reg_y_tr)

TypeError: RegDecissionTree.mse() takes 1 positional argument but 2 were given

In [34]:
mean_squared_error(reg_y_test, tree.predict(reg_x_test))

3796.5563812188416

### Сравнение с sklearn

In [35]:
from sklearn.tree import DecisionTreeRegressor


sklearn_tree = DecisionTreeRegressor(
    criterion="squared_error", 
    max_depth=10, 
    min_samples_split=5, 
    min_samples_leaf=5,
)
sklearn_tree.fit(reg_x_tr, reg_y_tr)

In [36]:
mean_squared_error(reg_y_test, sklearn_tree.predict(reg_x_test))

3400.8392631942497