# Оценка влияния

In [1]:
from typing import Tuple, Optional, Dict, Callable, List

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.tree import DecisionTreeClassifier, export_graphviz

In [2]:
def calculate_entropy(samples: np.ndarray):
    samples = samples[samples != 0].astype(float)

    probabilities = samples / samples.sum()

    return -1 * (probabilities * np.log2(probabilities)).sum()

In [3]:
def calculate_uncertainty(table: np.ndarray):
    h_xy = calculate_entropy(table.flatten())
    h_x = calculate_entropy(table.sum(axis=1))  # складываем все элементы в строке
    h_y = calculate_entropy(table.sum(axis=0))  # складываем все элементы в столбцы

    i = h_x + h_y - h_xy

    return i / h_x * 100, i / h_y * 100, 2 * i / (h_x + h_y) * 100

In [4]:
# craving.to.alcohol.1 -- depressed.mood.1
# craving.to.alcohol.1 -- headache.1
# craving.to.alcohol.1 -- weakness.1

data = pd.read_csv('data_big.csv', index_col=0)

z = data['headache.1'] + 3 * data['weakness.1'] + 2 * (data['depressed.mood.1'] - 1)

crosstab = pd.crosstab(data['craving.to.alcohol.1'], z).to_numpy()
print(crosstab)
calculate_uncertainty(crosstab)

[[0 1 1 3 4 2 0 1 0 0]
 [1 0 0 4 3 5 2 1 0 1]
 [0 0 0 1 0 2 0 0 1 1]]


(28.23769864972048, 14.554127872724257, 19.208111005213784)

# Дерево классификации

In [5]:
Splitter = Tuple[str, float, float]

In [6]:
def calculate_diversity_index(table: np.ndarray) -> float:
    _, counts = np.unique(table, return_counts=True)
    return counts.sum() * np.log(counts.sum()) - (counts * np.log(counts)).sum()


def calculate_split_metric(first: pd.DataFrame, second: pd.DataFrame) -> float:
    first_diversity = calculate_diversity_index(first.to_numpy())
    second_diversity = calculate_diversity_index(second.to_numpy())
    joint_diversity = calculate_diversity_index(pd.concat([first, second]).to_numpy())

    return joint_diversity - first_diversity - second_diversity

In [7]:
def get_best_splitter(X: pd.DataFrame, y: pd.DataFrame) -> Optional[Splitter]:
    max_column = ""
    max_average = 0
    max_metric = 0
    for column in X.columns:
        unique = np.sort(X[column].unique())

        for average in (unique[:-1] + unique[1:]) / 2:
            left, right = X[X[column] < average].index, X[X[column] >= average].index

            metric = calculate_split_metric(y[left], y[right])
            if metric > max_metric:
                max_metric = metric
                max_average = average
                max_column = column

    if max_metric == 0:
        return None

    return (max_column, max_average, max_metric)

In [8]:
class Node:
    left: Optional['Node'] = None
    right: Optional['Node'] = None

    def __init__(
            self,
            classes: Dict[str, int],
            column: Optional[str] = None,
            value: Optional[float] = None,
            metric: Optional[float] = None,
    ):
        self.column = column
        self.value = value
        self.classes = classes
        self.metric = metric

    def predict(self, attributes: pd.Series):
        if self.left is None and self.right is None:
            return max(self.classes, key=self.classes.get)

        if attributes[self.column] < self.value:
            return self.left.predict(attributes)

        return self.right.predict(attributes)

    def to_graphviz(self) -> Tuple[List[str], List[str]]:
        nodes = []
        transitions = []

        dot_node = f'"{hash(self)}"'

        if self.left is None and self.right is None:
            nodes.append(f'{dot_node} [shape=record, label="{self.classes}"]')
            return (nodes, transitions)

        nodes.append(
            f'{dot_node} [shape=record, label="{{ {self.column} \< {round(self.value, 4)} | metric={round(self.metric, 4)} | {self.classes} }}"]'
        )

        if self.left is not None:
            transitions.append(f'{dot_node} -> "{hash(self.left)}"')
            left_nodes, left_transitions = self.left.to_graphviz()
            nodes.extend(left_nodes)
            transitions.extend(left_transitions)

        if self.right is not None:
            transitions.append(f'{dot_node} -> "{hash(self.right)}"')
            right_nodes, right_transitions = self.right.to_graphviz()
            nodes.extend(right_nodes)
            transitions.extend(right_transitions)

        return nodes, transitions

In [9]:
class DecisionTree:
    _root: Node

    def fit(
            self,
            X: pd.DataFrame,
            y: pd.DataFrame,
            *,
            max_depth: Optional[int] = None,
            min_samples_split: int = 2,
            min_samples_leaf: int = 1,
    ):
        if max_depth is None:
            max_depth = len(X)

        self._root = self._create_tree(X, y, max_depth, min_samples_split, min_samples_leaf)

    def predict(self, X: pd.DataFrame):
        return X.apply(lambda row: self._root.predict(row), axis=1)

    @classmethod
    def _create_tree(
            cls,
            X: pd.DataFrame,
            y: pd.DataFrame,
            remain_depth: int,
            min_samples_split: int,
            min_samples_leaf: int,
    ) -> Optional[Node]:
        if remain_depth < 0:
            return None

        classes = y.value_counts().to_dict()

        if sum(classes.values()) < min_samples_leaf:
            return None

        best_splitter = get_best_splitter(X, y)
        if best_splitter is None or sum(classes.values()) < min_samples_split:
            return Node(classes)

        column, value, metric = best_splitter
        root = Node(classes, column, value, metric)

        left_X, right_X = X[X[column] < value], X[X[column] >= value]
        left_y, right_y = y[left_X.index], y[right_X.index]

        left_subtree = cls._create_tree(left_X, left_y, remain_depth - 1, min_samples_split, min_samples_leaf)
        if left_subtree is not None:
            root.left = left_subtree

        right_subtree = cls._create_tree(right_X, right_y, remain_depth - 1, min_samples_split, min_samples_leaf)
        if right_subtree is not None:
            root.right = right_subtree

        if left_subtree is None or right_subtree is None:
            return Node(classes)

        return root

    def export_graphviz(self) -> str:
        dot = 'digraph {\n'
        if self._root is not None:
            nodes, transitions = self._root.to_graphviz()
            dot += '\t'
            dot += '\n\t'.join(nodes)
            dot += '\n\n\t'
            dot += '\n\t'.join(transitions)
            dot += '\n'
        dot += '}\n'
        return dot

# Эксперименты с деревом

In [10]:
data = pd.DataFrame.from_dict({
    'Moris': [15, 12, 10, 8, 17, 1, 13, 14, 3, 3, 1, 15, 13, 1, 13, 1, 15],
    'TC': [29, 40, 52, 48, 53, 44, 51, 52, 26, 64, 22, 51, 52, 40, 78, 45, 57],
    'TL': [38, 27, 30, 47, 69, 26, 33, 49, 30, 50, 30, 51, 39, 39, 46, 36, 39],
    'class': [2, 2, 2, 1, 2, 1, 1, 1, 2, 1, 2, 2, 1, 2, 1, 2, 2]
})

In [11]:
clf = DecisionTree()
clf.fit(data[['Moris', 'TC', 'TL']], data['class'], max_depth=2)
print(clf.export_graphviz())

digraph {
	"8777272993464" [shape=record, label="{ TC \< 42.0 | metric=3.3671 | {2: 10, 1: 7} }"]
	"8777273004758" [shape=record, label="{2: 5}"]
	"8777273004704" [shape=record, label="{ Moris \< 14.5 | metric=3.383 | {1: 7, 2: 5} }"]
	"8777316770247" [shape=record, label="{1: 7, 2: 2}"]
	"8777316770154" [shape=record, label="{2: 3}"]

	"8777272993464" -> "8777273004758"
	"8777272993464" -> "8777273004704"
	"8777273004704" -> "8777316770247"
	"8777273004704" -> "8777316770154"
}



In [12]:
data = pd.read_csv('Maternal Health Risk Data Set.csv')
data

Unnamed: 0,Age,SystolicBP,DiastolicBP,BS,BodyTemp,HeartRate,RiskLevel
0,25,130,80,15.0,98.0,86,high risk
1,35,140,90,13.0,98.0,70,high risk
2,29,90,70,8.0,100.0,80,high risk
3,30,140,85,7.0,98.0,70,high risk
4,35,120,60,6.1,98.0,76,low risk
...,...,...,...,...,...,...,...
1009,22,120,60,15.0,98.0,80,high risk
1010,55,120,90,18.0,98.0,60,high risk
1011,35,85,60,19.0,98.0,86,high risk
1012,43,120,90,18.0,98.0,70,high risk


In [13]:
x_columns = data.columns.to_list()
x_columns.remove('RiskLevel')

X_train, X_test, y_train, y_test = train_test_split(
    data[x_columns], 
    data['RiskLevel'], 
    random_state=42,
)

In [14]:
clf = DecisionTree()
clf.fit(X_train, y_train, min_samples_leaf=15, min_samples_split=80)
print(clf.export_graphviz())

digraph {
	"8777273027639" [shape=record, label="{ BS \< 7.95 | metric=177.3193 | {'low risk': 304, 'mid risk': 241, 'high risk': 215} }"]
	"8777272981348" [shape=record, label="{ SystolicBP \< 132.5 | metric=69.4568 | {'low risk': 298, 'mid risk': 200, 'high risk': 58} }"]
	"8777272983959" [shape=record, label="{ BodyTemp \< 99.5 | metric=36.4813 | {'low risk': 298, 'mid risk': 197, 'high risk': 28} }"]
	"8777272986031" [shape=record, label="{ SystolicBP \< 125.0 | metric=23.6684 | {'low risk': 268, 'mid risk': 142, 'high risk': 6} }"]
	"8777272989073" [shape=record, label="{ BS \< 7.005 | metric=19.7314 | {'low risk': 268, 'mid risk': 121, 'high risk': 6} }"]
	"8777272991058" [shape=record, label="{ HeartRate \< 79.0 | metric=7.4385 | {'low risk': 119, 'mid risk': 94, 'high risk': 4} }"]
	"8777272960678" [shape=record, label="{'low risk': 98, 'mid risk': 81}"]
	"8777272962027" [shape=record, label="{'low risk': 21, 'mid risk': 13, 'high risk': 4}"]
	"8777272960371" [shape=record, lab

In [15]:
print(confusion_matrix(y_test, clf.predict(X_test), labels=['low risk', 'mid risk', 'high risk']))
print(classification_report(y_test, clf.predict(X_test)))

[[95  5  2]
 [54 31 10]
 [ 6  5 46]]
              precision    recall  f1-score   support

   high risk       0.79      0.81      0.80        57
    low risk       0.61      0.93      0.74       102
    mid risk       0.76      0.33      0.46        95

    accuracy                           0.68       254
   macro avg       0.72      0.69      0.67       254
weighted avg       0.71      0.68      0.65       254



In [16]:
clf = DecisionTreeClassifier(min_samples_leaf=20, max_depth=7)
clf.fit(X_train, y_train)
print(confusion_matrix(y_test, clf.predict(X_test), labels=['low risk', 'mid risk', 'high risk']))
print(classification_report(y_test, clf.predict(X_test)))
print(export_graphviz(clf, feature_names=clf.feature_names_in_))

[[73 27  2]
 [28 54 13]
 [ 4  7 46]]
              precision    recall  f1-score   support

   high risk       0.75      0.81      0.78        57
    low risk       0.70      0.72      0.71       102
    mid risk       0.61      0.57      0.59        95

    accuracy                           0.68       254
   macro avg       0.69      0.70      0.69       254
weighted avg       0.68      0.68      0.68       254

digraph Tree {
node [shape=box, fontname="helvetica"] ;
edge [fontname="helvetica"] ;
0 [label="BS <= 7.95\ngini = 0.659\nsamples = 760\nvalue = [215, 304, 241]"] ;
1 [label="SystolicBP <= 132.5\ngini = 0.572\nsamples = 556\nvalue = [58, 298, 200]"] ;
0 -> 1 [labeldistance=2.5, labelangle=45, headlabel="True"] ;
2 [label="BS <= 7.055\ngini = 0.531\nsamples = 523\nvalue = [28, 298, 197]"] ;
1 -> 2 ;
3 [label="BodyTemp <= 99.5\ngini = 0.544\nsamples = 286\nvalue = [14, 130, 142]"] ;
2 -> 3 ;
4 [label="Age <= 15.5\ngini = 0.515\nsamples = 231\nvalue = [4, 121, 106]"] ;
3 -> 4 ;
