In [15]:
import random

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from tqdm import tqdm
import math
from typing import *
from dataclasses import dataclass, field

plotly_margin = dict(l=0, r=0, t=0, b=0)

Обзор датасета

In [16]:
df_in = pd.read_csv("data.csv", dtype='category')
df_in.describe()

Unnamed: 0,edible,cap_shape,cap_surface,cap_color,bruises,odor,gill_attachment,gill_spacing,gill_size,gill_color,...,stalk_surface_below_ring,stalk_color_above_ring,stalk_color_below_ring,veil_type,veil_color,ring_number,ring_type,spore_print_color,population,habitat
count,8124,8124,8124,8124,8124,8124,8124,8124,8124,8124,...,8124,8124,8124,8124,8124,8124,8124,8124,8124,8124
unique,2,6,4,10,2,9,2,2,2,12,...,4,9,9,1,4,3,5,9,6,7
top,e,x,y,n,f,n,f,c,b,b,...,s,w,w,p,w,o,p,w,v,d
freq,4208,3656,3244,2284,4748,3528,7914,6812,5612,1728,...,4936,4464,4384,8124,7924,7488,3968,2388,4040,3148


По документации, могут отсутствовать значения `stalk_root`, они отмечены `?`. Их порядка 30%, удалить не получится, заменим на наиболее частый класс.

In [17]:
print((df_in.stalk_root == "?").sum() / df_in.shape[0])
df_in.stalk_root = df_in.stalk_root.where(df_in.stalk_root != "?", df_in.stalk_root.mode()[0])

0.3052683407188577


Приведем в численный вид признаки 

In [18]:
df_in = df_in.apply(lambda x: x.cat.codes)
df_in.head()

Unnamed: 0,edible,cap_shape,cap_surface,cap_color,bruises,odor,gill_attachment,gill_spacing,gill_size,gill_color,...,stalk_surface_below_ring,stalk_color_above_ring,stalk_color_below_ring,veil_type,veil_color,ring_number,ring_type,spore_print_color,population,habitat
0,1,5,2,4,1,6,1,0,1,4,...,2,7,7,0,2,1,4,2,3,5
1,0,5,2,9,1,0,1,0,0,4,...,2,7,7,0,2,1,4,3,2,1
2,0,0,2,8,1,3,1,0,0,5,...,2,7,7,0,2,1,4,3,2,3
3,1,5,3,8,1,6,1,0,1,5,...,2,7,7,0,2,1,4,2,3,5
4,0,5,2,3,0,5,1,1,0,4,...,2,7,7,0,2,1,0,3,0,1


Разделим тренировочную и тестовую выборку

In [19]:
train_size = 0.7

df_train = df_in.sample(frac=train_size, random_state=42)
df_test = df_in.drop(df_train.index)

x_train, y_train = df_train.drop('edible', axis=1), df_train['edible']
x_test, y_test = df_test.drop('edible', axis=1), df_test['edible']

print(df_train.shape, df_test.shape)

(5687, 23) (2437, 23)


Определим метрики

In [20]:
def stat(y, y_pred):
    m = [
        [np.sum((y == r) & (y_pred == p)) for r in range(2)]
        for p in range(2)
    ]  # m[pred][true]
    return dict(
        tp=m[1][1],
        tn=m[0][0],
        fp=m[1][0],
        fn=m[0][1],
        n=y.shape[0],
        mat=m
    )


def accuracy(stats: dict):
    return (stats['tp'] + stats['tn']) / stats['n']


def precision(stats: dict):
    return stats['tp'] / (stats['tp'] + stats['fp'])


def recall(stats: dict):
    return stats['tp'] / (stats['tp'] + stats['fn'])


def f1(stats: dict):
    p, r = precision(stats), recall(stats)
    return 2 * p * r / (p + r)


def TPR(stats: dict):
    return stats['tp'] / (stats['tp'] + stats['fn'])


def FPR(stats: dict):
    return stats['fp'] / (stats['fp'] + stats['tn'])

Определим меры информации

In [21]:
# https://www.tandfonline.com/doi/pdf/10.1080/08839514.2018.1447479 page 122

# информация 
def info(x):
    _, cnt = np.unique(x, return_counts=True)
    freq = cnt / x.shape[0]
    return -(freq * np.log2(freq)).sum()


# прирост информации при делении по столбцу
def info_gain(X, y, attr):
    gain = info(y)
    for cat in np.unique(X[:, attr]):
        yi = y[X[:, attr] == cat]
        gain -= (yi.shape[0] / y.shape[0]) * info(yi)
    return gain


def split_info(x, attr):
    return info(x[:, attr])


# нормированный прирост
def gain_ratio(x, y, attr):
    if (si := split_info(x, attr)) == 0: return 0
    return info_gain(x, y, attr) / si

Решающее дерево по алгоритму C4.5 

In [48]:
def pad_lines(x: str, pad=0, char=' ') -> str:
    return x.replace('\n', '\n' + (char * pad))


@dataclass
class TreeNode:
    leaf_cat: Optional[int] = None
    leaf_prob: Optional[float] = None
    split_attr: Optional[int] = None
    split_child: Dict[int, 'TreeNode'] = field(default_factory=dict)

    def __str__(self):
        if (cat := self.leaf_prob) is not None:
            return f"-> {cat:.2f}"
        res = f"\nATTR {self.split_attr}: \n"
        res += '\n'.join([
            f"|  {i}: {pad_lines(str(j), 2)}"
            for i, j in self.split_child.items()
        ])
        return res


class Tree:
    def __init__(self, x, y):
        if type(x) is pd.DataFrame:
            x = x.to_numpy()
        if type(y) is pd.DataFrame:
            y = y.to_numpy()
        self.__root = self.__build(x, y)

    def __str__(self):
        return str(self.__root)

    def __build(self, x, y):
        out_cats = np.unique(y)
        if out_cats.shape[0] == 1:  # single out class
            cat = int(out_cats[0])
            return TreeNode(leaf_cat=cat, leaf_prob=cat)

        split_attr, max_gain = max((
            (i, gain_ratio(x, y, i))
            for i in range(x.shape[1])
        ), key=lambda i: i[1])

        if np.isclose(max_gain, 0, rtol=1e-9):  # all gains zero -> no definite class for leaf
            return TreeNode(leaf_cat=int(y.mode()[0]), leaf_prob=float(y.mean()))
        
        split_cats = np.unique(x[:, split_attr])
        split = [
            (cat, x[:, split_attr] == cat)
            for cat in split_cats
        ]
        return TreeNode(
            split_attr=split_attr,
            split_child={
                int(cat): self.__build(x[ids], y[ids])
                for cat, ids in split
            }
        )

    def __predict(self, x, root: TreeNode = None):
        """
        :return: (category, probability)
        """
        if root is None:
            root = self.__root
        if root.leaf_cat is not None:
            return root.leaf_cat, root.leaf_prob
        child = root.split_child[x[root.split_attr]]
        return self.__predict(x, child)

    def predict(self, x):
        """
        :return: (np.array(category), np.array(probability))
        """
        data = np.apply_along_axis(lambda r: self.__predict(r), 1, x)
        return data[:,0], data[:,1]

Построим дерево для заданного набора признаков

In [55]:
cols = x_train.columns.values

sel_cols = np.random.choice(cols, size=math.ceil(math.sqrt(len(cols))))
tree = Tree(x_train[sel_cols], y_train)
print(f"C4.5 tree for columns: {', '.join(sel_cols)}")
print(tree)

C4.5 tree for columns: gill_attachment, gill_color, gill_spacing, stalk_surface_above_ring, stalk_color_below_ring

ATTR 3: 
|  0: 
  ATTR 2: 
  |  0: -> 1.00
  |  1: -> 0.00
|  1: 
  ATTR 2: 
  |  0: -> 1.00
  |  1: -> 0.00
|  2: 
  ATTR 1: 
  |  0: -> 1.00
  |  1: -> 0.00
  |  2: 
    ATTR 2: 
    |  0: -> 0.22
    |  1: -> 0.30
  |  3: 
    ATTR 2: 
    |  0: -> 0.80
    |  1: -> 0.00
  |  4: 
    ATTR 2: 
    |  0: -> 0.30
    |  1: -> 0.00
  |  5: 
    ATTR 4: 
    |  3: -> 0.00
    |  5: -> 0.00
    |  6: -> 0.00
    |  7: 
      ATTR 2: 
      |  0: -> 0.22
      |  1: -> 0.17
  |  6: -> 0.00
  |  7: 
    ATTR 4: 
    |  3: -> 0.00
    |  6: -> 0.00
    |  7: 
      ATTR 2: 
      |  0: -> 0.37
      |  1: -> 0.12
  |  8: -> 1.00
  |  9: 
    ATTR 2: 
    |  0: 
      ATTR 4: 
      |  3: -> 0.00
      |  6: -> 0.00
      |  7: -> 0.15
    |  1: -> 1.00
  |  10: 
    ATTR 4: 
    |  2: -> 0.00
    |  3: -> 0.00
    |  4: -> 0.00
    |  6: -> 0.00
    |  7: 
      ATTR 2: 
      

Оценка качетсва

In [56]:
y_pred, y_prob = tree.predict(x_test[sel_cols])
stats = stat(y_test, y_pred)
print(f"""
    Accuracy:   {accuracy(stats):.3f}
    Precision:  {precision(stats):.3f}
    Recall:     {recall(stats):.3f}
    F1:         {f1(stats):.3f}
""")


    Accuracy:   0.930
    Precision:  0.997
    Recall:     0.852
    F1:         0.919


ROC-AUC, PR-AUC

In [58]:
@np.vectorize
def roc(thresh):
    y_pred = np.round(y_prob >= thresh)
    s = stat(y_test, y_pred)
    return FPR(s), TPR(s)

@np.vectorize
def pr(thresh):
    y_pred = np.round(y_prob >= thresh)
    s = stat(y_test, y_pred)
    return recall(s), precision(s)

pts = np.unique(y_prob)
roc_fpr, roc_tpr = roc(pts)
pr_r, pr_p = pr(pts)

roc_auc = -np.trapezoid(roc_tpr, roc_fpr)
pr_auc = -np.trapezoid(pr_p, pr_r)

fig = make_subplots(1, 2, subplot_titles=[f"ROC (AUC={roc_auc:.3f})", f"PR (AUC={pr_auc:.3f})"])

fig.add_trace(go.Scatter(x=roc_fpr, y=roc_tpr), 1, 1)
fig.update_xaxes(title_text="FPR", row=1, col=1)
fig.update_yaxes(title_text="TPR", row=1, col=1)

fig.add_trace(go.Scatter(x=pr_r, y=pr_p), 1, 2)
fig.update_xaxes(title_text="Recall", row=1, col=2)
fig.update_yaxes(title_text="Precision", row=1, col=2)

fig.show()