In [1]:
import numpy as np

In [2]:
import numpy as np
import treelib


class Node:
    def __init__(self, depth, expected_class=None):
        self.split_feature = None
        self.split_value = None
        self.left = None
        self.right = None
        self.expected_class = expected_class
        self.depth = depth
        
    def is_leaf(self):
        return self.expected_class is not None
        
    def predict(self, row):
        if(self.is_leaf()):
            return self.expected_class
        else:
            if(row[self.split_feature] < self.split_value):
                return self.left.predict(row)
            else:
                return self.right.predict(row)

    def stop_criterion(self, max_depth):
        return self.depth >= max_depth

                
    def iterate(self, x, y, idxs, classes, max_depth):
        if(self.stop_criterion(max_depth)): 
            self.expected_class = self.answer(y, idxs, classes)
        else:
            l_idxs , r_idxs = self.split_node(x, y, idxs, classes)
            if(len(l_idxs) == 0 or len(r_idxs) == 0): #если произошло разделение на пустой узел, то превращаем разделяемый узел в лист
                self.expected_class = self.answer(y, idxs, classes)
            else:
                #print('left:')
                #for i in l_idxs:
                #    print(y[i], end=' ')
                #print(end='\n')

                #print('right:')
                #for i in r_idxs:
                #    print(y[i], end=' ')
                #print(end='\n')

                self.left = Node(self.depth + 1)
                self.right = Node(self.depth + 1)
                self.left.iterate(x, y, l_idxs, classes, max_depth)
                self.right.iterate(x, y, r_idxs, classes, max_depth)
                
                
    def split_node(self, x, y, idxs, classes):
        
        max_delta_impurity = -1 #максимизируемая величина
        node_impurity = self.missclassification_error(idxs, y, classes) #информативность текущего узла, если объявить его листом
        best_left_idxs, best_right_idxs = 0, 0
        
        #смотрим все возможные сплиты и выбираем лучший
        for feature in range(x.shape[1]):
            for value in x[:, feature]:
                left_idxs, right_idxs = self.get_split(x, idxs, feature, value)
                left_impurity = self.missclassification_error(left_idxs, y, classes)
                right_impurity = self.missclassification_error(right_idxs, y, classes)
                
                #считаем разность между информативностью узла и информативностью разбиения
                delta_impurity = node_impurity - (len(left_idxs)/len(idxs))*left_impurity - (len(right_idxs)/len(idxs))*right_impurity
                
                if(delta_impurity > max_delta_impurity):
                    max_delta_impurity = delta_impurity
                    self.split_feature = feature
                    self.split_value = value
                    best_left_idxs = left_idxs
                    best_right_idxs = right_idxs
                
        return best_left_idxs, best_right_idxs 
                
    def get_split(self, x, idxs, feature, value):
        left_idx = []
        right_idx = []
        for idx in idxs:
            if(x[idx][feature] < value):
                left_idx.append(idx)
            else:
                right_idx.append(idx)
        return left_idx, right_idx
        
    #критерий информативности missclassification error
    def missclassification_error(self, idxs, y, classes):
        if(len(idxs) != 0):
            p = []
            for cl in classes:
                num = 0
                for idx in idxs:
                    if(y[idx] == cl):
                        num += 1
                p.append(1 - (num / len(idxs)))
            return min(p)
        else: return 0

    #в качестве ответа листа берем самый частый класс 
    def answer(self, y, idxs, classes):
        ans = dict.fromkeys(classes, 0)
        for cl in classes:
            for idx in idxs:
                if(y[idx] == cl):
                    ans[cl] += 1
            
        #print(max(ans, key=ans.get))
        return max(ans, key=ans.get)
    
    def visualize(self, tree, parent, direction):
        if(self.is_leaf()):
            tree.create_node(f'class = {self.expected_class}', parent + direction, parent=parent)
        else:
            tree.create_node(f'feature {self.split_feature} < {self.split_value}', parent + direction, parent=parent)
            self.left.visualize(tree, parent+direction, 'l')
            self.right.visualize(tree, parent+direction, 'r')

                
class Tree:
    def __init__(self):
        self.root_node = Node(1)
        
    def fit(self, x_train: np.array, y_train: np.array, max_depth):
        classes = np.unique(y_train) #список классов в тренировочном датасете
        idxs = list(range(0, len(y_train))) #список индексов объектов датасета (в корневой узел идут все индексы)
        self.root_node.iterate(x_train, y_train, idxs, classes, max_depth) #запускаем обучение из корневого узла
        
    def predict(self, x_test):
        answer = []
        for row in x_test:
            answer.append(self.root_node.predict(row))
        return np.array(answer)
            
    def visualize(self):
        tree = treelib.Tree()
        tree.create_node('start', '')
        self.root_node.visualize(tree, '', 's')
        tree.show()

In [3]:
from sklearn import datasets
from sklearn.model_selection import train_test_split

iris = datasets.load_iris()
X = iris.data
y = iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [4]:
tr = Tree()
tr.fit(X_train, y_train, 5)

In [5]:
tr.visualize()

start
└── feature 2 < 3.7
    ├── feature 2 < 3.0
    │   ├── feature 0 < 5.5
    │   │   ├── class = 0
    │   │   └── class = 0
    │   └── feature 0 < 5.5
    │       ├── class = 1
    │       └── class = 1
    └── feature 3 < 1.6
        ├── class = 2
        └── feature 2 < 5.0
            ├── class = 2
            └── feature 0 < 5.5
                ├── class = 1
                └── class = 1



In [6]:
predictions = tr.predict(X_test)

In [7]:
type(predictions)

numpy.ndarray

In [8]:
from sklearn.metrics import precision_score, recall_score, f1_score

print('F1: {}'.format(f1_score(predictions, y_test, labels=[0,1,2], average='macro')))

F1: 0.9484126984126983


In [9]:
from sklearn.tree import DecisionTreeClassifier

tr_2 = DecisionTreeClassifier(max_depth=5)
tr_2.fit(X_train, y_train)

DecisionTreeClassifier(max_depth=5)

In [10]:
preds = tr_2.predict(X_test)
print('F1: {}'.format(f1_score(preds, y_test, labels=[0,1,2], average='macro')))

F1: 1.0
