# Домашнее задание №3: «Одеревенеть от страха»

In [1]:
import numpy as np
import pandas as pd

## 1. Определяем класс-предикат:

In [2]:
from collections import namedtuple

class Predicate(namedtuple("Predicate", ["attr", "value", "is_nominal"])):
    def __str__(self):
        return "{} {} {}".format(self.attr, "==" if self.is_nominal else "<=", self.value)
    
    def split(self, X, y):
        betta = X[self.attr] == self.value if self.is_nominal else X[self.attr] <= self.value
        return X[~betta], y[~betta], X[betta], y[betta]
    
    def apply(self, x):
        return x[self.attr] == self.value if self.is_nominal else x[self.attr] <= self.value

## 2. Вычисление энтропии Шеннона для множества:
$$H(U) = -\sum\limits_{i = 1}^{C}{p_i \log_2(p_i)}$$
$\texttt{@numba.jit}$ вроде немножечко ускорил (замерял с $\texttt{%timeit}$). Можно убрать, если у вас нету пакета $\texttt{numba}$.

In [3]:
import numba as nb

@nb.jit
def entropy(y):
    p = np.unique(y, return_counts=True)[1] / y.size
    return (-p * np.log2(p)).sum()

## 3. Реализация поиска оптимального предиката с энтропийным критерием
Нам надо максимизировать:
$$H(U) - \sum\limits_{v}{\frac{|U_v|}{|U|}H(U_v)}$$
Используем обычные пороговые условия $\leqslant$ для численных признаков и $==$ для номинальных. Тогда, задача выше эквивалентна минимизации (для разбиения на 2 группы):
$$|U_{false}|H(U_{false}) + |U_{true}|H(U_{true})$$
Оптимизировал этот кусок по максимуму, т.к. это самая затратная часть вычислений. Самый большой прирост дали бинпоиски. Можно улучшить дальше, используя $\texttt{numba.jit}$ или $\texttt{Cython}$, но я решил не ставить, чтобы легче было проверять.

In [4]:
class EntropyCriteria:
    @staticmethod
    def _is_nominal(x):
        return not np.issubdtype(x, np.number)
    
    def __init__(self, entropy):
        def _split(value, sx, sy):
            if not EntropyCriteria._is_nominal(sx.dtype):
                m = sx.searchsorted(value, side="right")[0]
                return sy[m:], sy[:m]
            else:
                l = sx.searchsorted(value, side="left")[0]
                r = sx.searchsorted(value, side="right")[0]
                return sy[:l].append(sy[r:]), sy[l:r]
        
        def _i_gain_part(*args):
            y0, y1 = _split(*args)
            return entropy(y0) * y0.size + entropy(y1) * y1.size
        
        self._vecalc = np.vectorize(_i_gain_part, excluded=[1, 2])

    def _do(self, column, X, y):
        sx = column.astype(X[column.name].dtype).sort_values()
        sy = y[sx.index]
        values = np.unique(sx)
        vsx = self._vecalc(values, sx, sy)
        ind = vsx.argmin()
        return vsx[ind], column.name, values[ind]
    
    def get_predicate(self, X, y):
        """
        Find the most informative predicate using score method.
        
        param X: features table
        param y: class array
        result: predicate instanceof Predicate class
        """
        col, val = X.apply(lambda col: self._do(col, X, y)).min()[1:]
        return Predicate(col, val, EntropyCriteria._is_nominal(X.dtypes[col]))

## 4. Определим классы для работы с деревом:

In [5]:
from abc import ABCMeta

class Node(metaclass=ABCMeta):
    pass

class Inner(Node, namedtuple("Inner", ["predicate", "false_branch", "true_branch"])):
    pass

class Leaf(Node, namedtuple("Leaf", ["c"])):
    pass

## 5. Наконец, реализуем алгоритм построения дерева и предсказания класса:

In [6]:
from collections import Counter

class DecisionTree:
    def _learnID3(self, X, y, criteria):
        if (np.unique(y).size == 1):
            return Leaf(y[y.index[0]])
        
        predicate = criteria.get_predicate(X, y)
        X_false, y_false, X_true, y_true = predicate.split(X, y)
        
        if not y_false.size or not y_true.size:
            return Leaf(Counter(y.tolist()).most_common(1)[0])
        
        false_branch = self._learnID3(X_false, y_false, criteria)
        true_branch = self._learnID3(X_true, y_true, criteria)
        return Inner(predicate, false_branch, true_branch)
    
    def build(self, X, y, score=entropy):
        self.root = self._learnID3(X, y, EntropyCriteria(score))
        return self
    
    def _visit_tree(self, n, x):
        if isinstance(n, Leaf):
            return n.c
        else:
            if not n.predicate.apply(x):
                return self._visit_tree(n.false_branch, x)
            else:
                return self._visit_tree(n.true_branch, x)
    
    def predict(self, x):
        assert "root" in self.__dict__, "The decision tree is not built yet!"
        return self._visit_tree(self.root, x)

## 6. Визуализация

In [7]:
import functools as ft
from PIL import Image, ImageDraw

def tree_visit(node, base, func):
    if isinstance(node, Leaf):
        return base
    else:
        return func(tree_visit(node.false_branch, base, func),
                    tree_visit(node.true_branch, base, func))

@ft.lru_cache(maxsize=None)
def getwidth(node):
    return tree_visit(node, 1, lambda l,r: l + r + 1)

@ft.lru_cache(maxsize=None)
def getdepth(node):
    return tree_visit(node, 1, lambda l,r: max(l, r) + 1)

def drawtree(tree, path="tree.jpg"):
    root = tree.root
    w = getwidth(root) * 100
    h = getdepth(root) * 100
    
    img = Image.new("RGB", (w, h), (255, 255, 255))
    draw = ImageDraw.Draw(img)
    
    drawnode(draw, root, w / 2, 20)
    img.save(path, "JPEG")

def drawnode(draw, node, x, y):
    if isinstance(node, Leaf):
        draw.text((x - 20, y), node.c, (0, 0, 0))
    else:
        shift = 100
        width1 = getwidth(node.false_branch) * shift
        width2 = getwidth(node.true_branch) * shift
        left = x - (width1 + width2) / 2
        right = x + (width1 + width2) / 2
        draw.text((x - 20, y - 10), str(node.predicate), (0, 0, 0))
        draw.line((x, y, left + width1 / 2, y + shift), fill=(255, 0, 0))
        draw.line((x, y, right - width2 / 2, y + shift), fill=(255, 0, 0))
        drawnode(draw, node.false_branch, left + width1 / 2, y + shift)
        drawnode(draw, node.true_branch, right - width2 / 2, y + shift)

## 7. Реализуем функции для работы с данными:

In [8]:
def read_data(path="halloween.csv", class_column="type"):
    csv = pd.read_csv(path)
    return csv.drop([class_column], axis=1), csv[class_column]

def train_test_split(X, y, ratio=0.8):
    indicies = X.sample(frac=ratio).index.sort_values()
    X_train = X.loc[indicies].reset_index(drop=True)
    y_train = y.loc[indicies].reset_index(drop=True)
    X_test = X.drop(indicies).reset_index(drop=True)
    y_test = y.drop(indicies).reset_index(drop=True)
    return X_train, y_train, X_test, y_test

## 8. Реализуем функции для оценки результата:

In [9]:
def precision_recall(y_test, y_pred, n_class):
    tp = ((y_pred == n_class) & (y_test == n_class)).sum()
    fp = ((y_pred == n_class) & (y_test != n_class)).sum()
    fn = ((y_pred != n_class) & (y_test == n_class)).sum()
    return tp / (tp + fp), tp / (tp + fn)

def print_precision_recall(y_test, y_pred, classes):
    for clazz in classes:
        precision, recall = precision_recall(y_test, y_pred, clazz)
        print("Precision: {}, Recall: {}, Class: {}".format(precision, recall, clazz))

## 9. Основная часть
Должно работать не более 6 секунд.

In [10]:
X, y = read_data()
X_train, y_train, X_test, y_test = train_test_split(X, y)

decision_tree = DecisionTree()
decision_tree.build(X_train, y_train)

y_pred = X_test.apply(decision_tree.predict, axis=1)
classes = np.unique(y_pred)
print_precision_recall(y_test, y_pred, classes)

drawtree(decision_tree)

Precision: 0.7142857142857143, Recall: 0.7142857142857143, Class: Ghost
Precision: 0.7142857142857143, Recall: 0.8, Class: Ghoul
Precision: 0.6, Recall: 0.5357142857142857, Class: Goblin
