# Моя реализация алгоритма "Дерево решений"

В данной работе я хотел бы самостоятельно написать один из базовых алгоритмов машинного обучения - Decision tree, и сравнить его с встроенной в sklearn моделью по точности и скорости. Здесь представлена вариация для классификации, и для создания датасета я использую функцию make_classification из sklearn.

In [26]:
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score

Процесс обучения строится следующим образом:

1) Модель принимает на вход фичи и таргет

2) Создается словарь, в котором для каждой фичи будут храниться ее потенциальные разделители. В качестве разделителей в базовом случае используются разделители для отсортированных уникальных значений. В случае использования гистограмного метода, если число разделителей гистограммы меньше исходного, записываем их как разделители для фичи

3) Строим дерево, для каждого узла отбирая лучший разделитель либо с помощью прироста информации, либо с помощью неопределенности Джини

4) Параллельно ведем подсчет листьев и подсчет глубины для каждой ветви, чтобы не превысить заданные ограничения.

### Для лучшей структуризации кода, пропишем нужные в процессе обучения функции до создания самого класса:

In [27]:
def calculateGini(arr):
  left_gini = 1 - (np.sum(arr[0]==0)/len(arr[0]))**2 - (np.sum(arr[0]==1)/len(arr[0]))**2
  right_gini = 1 - (np.sum(arr[1]==0)/len(arr[1]))**2 - (np.sum(arr[1]==1)/len(arr[1]))**2
  gini = (len(arr[0])*left_gini + len(arr[1])*right_gini)/(len(arr[0])+len(arr[1]))
  return gini


def calculateEntropy(y): # считаем энтропию таргета
  values, counts = np.unique(y, return_counts=True)
  probabilities = counts/np.sum(counts)
  entropy = -np.sum(probabilities*np.log2(probabilities))
  return entropy


def calculateIG(arr, s): # получаем на вход список массивов, на которые разбили изначальный, и начальную энтропию
  lens = [len(arr[i]) for i in range(len(arr))]
  weights = np.array(lens)/np.sum(lens)
  return s - np.sum([weights[i]*calculateEntropy(arr[i]) for i in range(len(arr))])


class TreeNode:
  def __init__(self, X=None, y=None, feature=None, splitter=None):
    self.y = y
    self.X = X
    self.feature = feature
    self.splitter = splitter
    self.val = None
    self.left = None
    self.right = None

### Реализуем класс, включив в него функцию для отбора лучшего разделителя, поскольку в процессе используется атрибут класса, и так будет проще вести его изменение прямо во время работы. Класс поддерживает следующие модификации:
- Гистограмный метод создания разделителей (будет полезен, когда мы используем данный алгоритм при создании градиентного бустинга)
- выбор критерия для отбора лучшего разделителя (Энтропия или неопределенность Джини)
- предсказание вероятностей классов
- отрисовка дерева в понятном виде для отладки кода

In [28]:
class MyTreeClf:
  def __init__(self, max_depth = 5, min_samples_split = 2, max_leafs = 20, bins=None, criterion = 'entropy'):
    self.max_depth = max_depth
    self.min_samples_split = min_samples_split
    self.max_leaves = max_leafs
    self.bins = bins
    self.criterion = criterion
    self.fi = {}


  def __str__(self):
    return f'class MyTreeClf: max_depth = {self.max_depth}, min_samples_split = {self.min_samples_split}, max_leaves = {self.max_leaves}'

  def get_best_split(self, X, y):
    features = X.columns
    best_feature = features[0]
    best_splitter = None
    best_IG = 0
    best_gini = 1
    s = calculateEntropy(y)

    for name in features:
      feature = X[name]
      #splitters = np.unique((np.unique(feature)[1:]+np.unique(feature)[:-1])/2)
      splitters = self.splitters[name]

      #if self.bins:
        #if len(splitters)>self.bins-1:
          #splitters = self.histogram[name]

      if len(splitters)>0:
        for splitter in splitters:
          arr = [y[feature<=splitter], y[feature>splitter]]
          if len(arr[0])==0 or len(arr[1])==0:
            continue
          if self.criterion == 'entropy':
            IG = calculateIG(arr, s)
            if IG>best_IG:
              best_IG = IG
              best_splitter = splitter
              best_feature = name
          else:
            gini = calculateGini(arr)
            if gini<best_gini:
              best_splitter = splitter
              best_feature = name
              best_gini = gini

    if self.criterion == 'entropy':
      return best_feature, best_splitter, best_IG
    else:
      return best_feature, best_splitter, best_gini


  def fit(self, X, y):
    self.y = y
    self.root = TreeNode(X, y)
    self.root.val = np.mean(y)
    self.histogram = pd.DataFrame()
    self.splitters = {}
    for name in X.columns:
      self.fi[name] = 0
      self.splitters[name] = (np.unique(X[name])[1:]+np.unique(X[name])[:-1])/2

    if self.bins:
      #self.histogram = pd.DataFrame(columns = X.columns)
      for name in X.columns:
        if len(self.splitters[name])>self.bins-1:
          self.splitters[name] = np.histogram(X[name], bins=self.bins)[1][1:-1]
        #self.histogram[name] = np.histogram(X[name], bins=self.bins)[1][1:-1]

    def inorder(root, depth=1):
      if (self.leaves+1<=self.max_leaves or self.leaves<2) and depth<=self.max_depth and root and len(root.y)>=self.min_samples_split and np.mean(root.y)!=0 and np.mean(root.y)!=1:
        # если условия позволяют, создаем левого и правого соседей и обходим их
        root.val = np.mean(root.y)
        best_feature, best_splitter = self.get_best_split(root.X, root.y)[:-1]

        if not best_splitter:
          root.left = None
          root.right = None
          root.val = np.mean(root.y)
          return None

        root.feature = best_feature
        root.splitter = best_splitter

        left_X = root.X[root.X[best_feature]<=best_splitter]
        right_X = root.X[root.X[best_feature]>best_splitter]
        left_y = root.y[left_X.index]
        right_y = root.y[right_X.index]

        root.left = TreeNode(left_X, left_y)
        root.right = TreeNode(right_X, right_y)

        if len(root.left.y)>0 and len(root.right.y)>0:
          if self.criterion == 'entropy':
            self.fi[best_feature]+=len(root.y)/len(self.y)*(calculateEntropy(root.y) - len(left_y)/len(root.y)*calculateEntropy(left_y) - len(right_y)/len(root.y)*calculateEntropy(right_y))
          else:
            left_gini = 1 - (np.sum(left_y==0)/len(left_y))**2 - (np.sum(left_y==1)/len(left_y))**2
            right_gini = 1 - (np.sum(right_y==0)/len(right_y))**2 - (np.sum(right_y==1)/len(right_y))**2
            gini = 1 - (np.sum(root.y==0)/len(root.y))**2 - (np.sum(root.y==1)/len(root.y))**2
            self.fi[best_feature]+=len(root.y)/len(self.y)*(gini - len(left_y)/len(root.y)*left_gini - len(right_y)/len(root.y)*right_gini)

          self.leaves+=1
          inorder(root.left, depth+1)
          inorder(root.right, depth+1)
        else:
          root.left = None
          root.right = None
          root.val = np.mean(root.y)

      else:
        # превращаем в лист
        root.val = np.mean(root.y)

    self.leaves = 1
    inorder(self.root)
    self.leafs_cnt = self.leaves


  def print_tree(self):
    def traversal(root, depth=0):
      if root:
        if root.left or root.right:
          print('    '*depth, f'{root.feature} > {root.splitter}')
          traversal(root.left, depth+1)
          traversal(root.right, depth+1)
        else:
          print('    '*depth, f'leaf val = {root.val}')
    traversal(self.root)


  def predict_proba(self, X):
    self.pred_proba = pd.Series(index = X.index)
    def searchLeaf(X, root):
      if not root.feature:
        self.pred_proba[X.index] = root.val
      else:
        searchLeaf(X[X[root.feature]<=root.splitter], root.left)
        searchLeaf(X[X[root.feature]>root.splitter], root.right)

    searchLeaf(X, self.root)
    return self.pred_proba


  def predict(self, X):
    self.pred_y = pd.Series(index = X.index)
    def searchLeaf(X, root):
      if not root.feature:
        self.pred_y[X.index] = (root.val>=0.5)
      else:
        searchLeaf(X[X[root.feature]<=root.splitter], root.left)
        searchLeaf(X[X[root.feature]>root.splitter], root.right)

    searchLeaf(X, self.root)
    return self.pred_y.astype(int)

### Создадим датасет для тестирования, обучим модель и изобразим дерево

In [29]:
X, y = make_classification(n_samples=1000, n_features=8, n_informative=5, random_state=42)
X = pd.DataFrame(X).round(2)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]
test = X.sample(200, random_state=42)

tree = MyTreeClf(
    max_depth=10,
    min_samples_split=2,
    max_leafs=30,
    criterion = 'gini'
)

tree.fit(X, y)
tree.print_tree()

 col_7 > -0.2
     col_1 > -1.6150000000000002
         col_4 > -0.11499999999999999
             col_0 > -0.175
                 col_1 > -2.865
                     col_1 > -2.875
                         leaf val = 0.0
                         leaf val = 1.0
                     leaf val = 0.0
                 col_6 > 0.875
                     col_2 > 0.945
                         leaf val = 1.0
                         leaf val = 0.0
                     leaf val = 0.0
             col_2 > -0.495
                 col_4 > 0.46499999999999997
                     leaf val = 0.0
                     leaf val = 1.0
                 leaf val = 1.0
         col_1 > -0.685
             col_4 > -0.015
                 col_7 > -0.69
                     col_2 > -1.105
                         col_0 > -1.315
                             leaf val = 1.0
                             leaf val = 0.0
                         leaf val = 1.0
                     col_0 > -1.73
                      

Обучим встроенную модель

In [30]:
sk_model = DecisionTreeClassifier(max_depth=10, max_leaf_nodes=30)
sk_model.fit(X, y)

Сравним точность их предсказаний на том же датасете, на котором модели были обучены

In [31]:
(tree.predict(X) == y).sum()

  self.pred_y[X.index] = (root.val>=0.5)


932

In [32]:
(sk_model.predict(X)==y).sum()

971

In [33]:
(tree.predict(X)==sk_model.predict(X)).sum()

  self.pred_y[X.index] = (root.val>=0.5)


933

In [34]:
print(f'Accuracy of our model = {accuracy_score(y, tree.predict(X))} and sklearn model = {accuracy_score(y, sk_model.predict(X))}')
print(f'Precision of our model = {precision_score(y, tree.predict(X))} and sklearn model = {precision_score(y, sk_model.predict(X))}')
print(f'Recall of our model = {recall_score(y, tree.predict(X))} and sklearn model = {recall_score(y, sk_model.predict(X))}')
print()

Accuracy of our model = 0.932 and sklearn model = 0.971
Precision of our model = 0.9520833333333333 and sklearn model = 0.9701789264413518
Recall of our model = 0.9103585657370518 and sklearn model = 0.9721115537848606



  self.pred_y[X.index] = (root.val>=0.5)
  self.pred_y[X.index] = (root.val>=0.5)
  self.pred_y[X.index] = (root.val>=0.5)


Как мы видим, модель дает результаты, по точности сопоставимые встроенной в sklearn, хотя и уступает ей в скорости обучения. Исходя из этого я делаю вывод, что реализация вполне успешна.