#### Обработка неполных данных в деревьях решений

##### Цель работы
Исследовать методы обработки пропущенных значений в табличных данных и реализовать модифицированное дерево решений, не игнорирующее наблюдени с пропусками. Провести сравнительный анализ предложенного метода с классическими деревьями решений и градиентным бустингом (XGBoost).

##### Идея алгоритма
Во время разбиения по признаку, который отсутствует у наблюдения, добавлять наблюдение в обе формируемые ветви. Из-за этого вместо точного ответа будет получаться ответ вероятностный (как именно выбрать итоговый класс остаётся на личное усмотрение).

##### Задачи
* Изучить существующие методы обработки неполных данных и типы пропусков.
* Реализовать класс `AdaptiveDecisionTreeClassifier` для дерева решений с обработкой пропусков (в качестве критерия разбиения Gini или энтропия, входные данные числовые).
* Провести сравнение предложенного метода с обычным деревом решений (`sklearn.tree.DecisionTreeClassifier`) и градиентным бустингом (`XGBoost`).
* Использовать учебные наборы данных (например, [Iris](https://www.kaggle.com/datasets/himanshunakrani/iris-dataset), [Titanic](https://www.kaggle.com/datasets/yasserh/titanic-dataset), [Adult Income](https://www.kaggle.com/datasets/wenruliu/adult-income-dataset), любые другие табличные наборы для классификации).
* Оценить влияние пропусков на качество моделей, используя метрики (Accuracy, F1-score).


##### Ожидаемые результаты
* Теоретический обзор (краткое описание методов).
* Код реализации дерева решений.
* Графики / таблицы, демонстрирующие результаты сравнения.
* Выводы о преимуществах и недостатках предложенного метода.

##### Требования к реализации
* Код должен быть написан на Python версии 3.9 и выше.
* Рекомендуемые библиотеки: `numpy`, `pandas`, `scikit-learn`, `xgboost`, `matplotlib`).
* Стиль кода соответствует PEP8.
* Реализация дерева решений должна быть в виде отдельного класса с методами `fit`, `predict`.
* Датасеты и код для экспериментов должны быть воспроизводимыми.


##### Заготовка класса:
```python
import numpy as np


class AdaptiveDecisionTreeClassifier:
   def __init__(self, criterion: str = "gini", min_samples_split: int = 2, min_samples_leaf: int = 1):
      pass

   def fit(self, x: np.ndarray, y: np.array) -> "AdaptiveDecisionTreeClassifier":
      pass

   def predict(self, x: np.ndarray) -> np.array:
      pass

   def get_depth(self) -> int:
      pass
```

##### найденные источники

https://loginom.ru/blog/decision-tree-c45-2

https://habr.com/ru/companies/productstar/articles/523044/

##### имопрт библиотек

In [1]:
import pandas as pd
import numpy as np
from abc import ABC, abstractmethod
from copy import copy, deepcopy

from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

##### Модиффицированное дерево решений

In [7]:
class Criterion(ABC):
    @staticmethod
    @abstractmethod
    def compute(cls, x: np.ndarray, y: np.ndarray, y_probs: np.array, 
               min_samples_leaf: int) -> tuple:
        pass


class CriterionGini:
    @staticmethod
    def gini(x_atr: np.ndarray, y: np.array, y_probs: np.array, min_samples_leaf: int) -> tuple:
        '''finding the best threshold using gini for a specific attribute x_atr, \
            the target variable y, and y probabilities y_probs'''
        x_nan_idx = np.isnan(x_atr.astype('float'))
        x_atr_complete = x_atr[~x_nan_idx]
        y_complete = y[~x_nan_idx]
        y_complete_probs = y_probs[~x_nan_idx]
        y_nan = y[x_nan_idx]
        y_nan_probs = y_probs[x_nan_idx]
        unique_atrs = np.sort(np.unique(x_atr_complete))
        unique_anses = np.sort(np.unique(y))
        
        #if all answers are frome one class
        if unique_anses.size < 2:
            return None, 0.0

        #processing data with complete attribute
        sorted_complete_idx = np.argsort(x_atr_complete)
        x_atr_sort_complete = x_atr_complete[sorted_complete_idx]
        y_sort_complete = y_complete[sorted_complete_idx]

        #all possible thresholds for attribute
        thresholds = (unique_atrs[:-1] + unique_atrs[1:]) / 2
        gini_min = 1.0
        best_threshold = None

        #finding best threshold
        for threshold in thresholds:
            #separating y
            mask = x_atr_sort_complete > threshold
            y_right = np.concatenate((y_sort_complete[mask], y_nan))
            y_right_probs = np.concatenate((y_complete_probs[mask], y_nan_probs * np.mean(mask)))
            y_left = np.concatenate((y_sort_complete[~mask], y_nan))
            y_left_probs = np.concatenate((y_complete_probs[~mask], y_nan_probs * (1 - np.mean(mask))))
            
            total_left_weight = np.sum(y_left_probs)
            total_right_weight = np.sum(y_right_probs)
            
            if total_left_weight < min_samples_leaf or total_right_weight < min_samples_leaf:
                continue

            #calculation left based on the probabilities of occurrence of y
            left_counts = np.bincount(y_left, y_left_probs)
            p_left = left_counts / total_left_weight
            gini_left = 1 - np.sum(p_left ** 2)

            #calculation right based on the probabilities of occurrence of y
            right_counts = np.bincount(y_right, y_right_probs)
            p_right = right_counts / total_right_weight
            gini_right = 1 - np.sum(p_right ** 2)
            
            #calculating weighted gini
            total_wight = total_left_weight + total_right_weight
            gini_temp = (gini_left * total_left_weight + gini_right * total_right_weight) / total_wight

            if gini_temp < gini_min:
                gini_min = gini_temp
                best_threshold = threshold

        return best_threshold, gini_min
    
    @staticmethod
    def compute(x: np.ndarray, y: np.ndarray, y_probs: np.array, min_samples_leaf: int) -> tuple:
        '''finding best threshold with gini from all attributes'''
        best_gini = 1.0
        best_threshold = None
        best_feature_idx = -1
        prob_right = None

        for feature_idx in range(x.shape[1]):
            x_atr = x[:, feature_idx]
            threshold_temp, gini_temp = CriterionGini.gini(x_atr, y, y_probs, min_samples_leaf)
            if threshold_temp is None:
                continue
            if gini_temp < best_gini:
                best_gini = gini_temp
                best_threshold = threshold_temp
                best_feature_idx = feature_idx
                
                #finding the probability of hitting the right branch\
                    # is necessary for future predictions
                x_atr_complete = ~np.isnan(x_atr.astype('float'))
                mask = x_atr_complete > best_threshold
                prob_right = np.mean(mask)

        return best_feature_idx, best_threshold, best_gini, prob_right


class CriterionEntropy:
    @staticmethod
    def entropy(x_atr: np.ndarray, y: np.array, y_probs: np.array, min_samples_leaf: int) -> tuple:
        '''finding the best threshold using entropy for a specific attribute x_atr, \
            the target variable y, and y probabilities y_probs'''
        x_nan_idx = np.isnan(x_atr.astype('float'))
        x_atr_complete = x_atr[~x_nan_idx]
        y_complete = y[~x_nan_idx]
        y_complete_probs = y_probs[~x_nan_idx]
        y_nan = y[x_nan_idx]
        y_nan_probs = y_probs[x_nan_idx]
        unique_atrs = np.sort(np.unique(x_atr_complete))
        unique_anses = np.sort(np.unique(y))
        
        #if all answers are frome one class
        if unique_anses.size < 2:
            return None, 0.0

        #processing data with complete attribute
        sorted_complete_idx = np.argsort(x_atr_complete)
        x_atr_sort_complete = x_atr_complete[sorted_complete_idx]
        y_sort_complete = y_complete[sorted_complete_idx]
        y_sort_complete_probs = y_complete_probs[sorted_complete_idx]

        #all possible thresholds for attribute
        thresholds = (unique_atrs[:-1] + unique_atrs[1:]) / 2


        #finding max possible entropy
        parent_counts = np.bincount(y, weights=y_probs)
        parent_total = np.sum(parent_counts)
        p_parent = parent_counts / (parent_total + 1e-10)
        parent_entropy = -np.sum(p_parent * np.log2(p_parent + 1e-10))
        entropy_min = parent_entropy
        best_threshold = None

        #finding best threshold
        for threshold in thresholds:
            #separating y
            mask = x_atr_sort_complete > threshold
            y_right = np.concatenate((y_sort_complete[mask], y_nan))
            y_right_probs = np.concatenate((y_sort_complete_probs[mask], y_nan_probs * np.mean(mask)))
            y_left = np.concatenate((y_sort_complete[~mask], y_nan))
            y_left_probs = np.concatenate((y_sort_complete_probs[~mask], y_nan_probs * (1 - np.mean(mask))))
            
            total_left = np.sum(y_left_probs)
            total_right = np.sum(y_right_probs)
            
            if total_left < min_samples_leaf or total_right < min_samples_leaf:
                continue

            #calculation left based on the probabilities of occurrence of y
            left_counts = np.bincount(y_left, weights=y_left_probs, minlength=unique_anses.size)
            left_total = np.sum(left_counts)
            p_left = left_counts / (left_total + 1e-10)
            entropy_left = -np.sum(p_left * np.log2(p_left + 1e-10))

            #calculation right based on the probabilities of occurrence of y
            right_counts = np.bincount(y_right, weights=y_right_probs, minlength=unique_anses.size)
            right_total = np.sum(right_counts)
            p_right = right_counts / (right_total + 1e-10)
            entropy_right = -np.sum(p_right * np.log2(p_right + 1e-10))

            #calculating weighted entropy
            total_weight = total_left + total_right
            current_entropy = (entropy_left * total_left + entropy_right * total_right) / total_weight
            
            if current_entropy < entropy_min:
                entropy_min = current_entropy
                best_threshold = threshold

        return best_threshold, entropy_min
    
    @staticmethod
    def compute(x: np.ndarray, y: np.ndarray, y_probs: np.array, min_samples_leaf: int) -> tuple:
        '''finding best threshold with gini from all attributes'''
        best_entropy = float('inf')
        best_threshold = None
        best_feature = -1
        prob_right = None

        for feature_idx in range(x.shape[1]):
            x_atr = x[:, feature_idx]
            threshold_temp, entropy_temp = CriterionEntropy.entropy(x_atr, y, y_probs, min_samples_leaf)
            if threshold_temp is None:
                continue
            if entropy_temp < best_entropy:
                best_entropy = entropy_temp
                best_threshold = threshold_temp
                best_feature = feature_idx

                #finding the probability of hitting the right branch\
                    # is necessary for future predictions
                x_atr_complete = ~np.isnan(x_atr.astype('float'))
                mask = x_atr_complete > best_threshold
                prob_right = np.mean(mask)

        return best_feature, best_threshold, best_entropy, prob_right


In [8]:
#node of updated decision tree
class TreeNode:
    def __init__(self, criterion: Criterion, depth: int, num_classes: int, min_samples_split: int = 2, min_samples_leaf: int = 1):
        self.feature_idx = None
        self.threshold = None
        self.left = None
        self.right = None
        self.criterion = criterion
        self.prediction = None
        self.depth = depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.prob_right = None
        self.num_classes = num_classes
    
    def use_criterion(self, x: np.ndarray, y: np.array, y_probs: np.array):
        '''used to build a tree and calculate children'''
        #the moment of stopping
        if y.size < self.min_samples_split:
            counts = np.bincount(y, y_probs)
            most_frequent_index = np.argmax(counts)
            self.prediction = y[most_frequent_index]
            return
        
        #calculating threshold
        self.feature_idx, self.threshold, _, self.prob_right = self.criterion.compute(x=x, y=y, y_probs=y_probs, min_samples_leaf=self.min_samples_leaf)
        
        if self.threshold is None:
            #the moment of stopping
            counts = np.bincount(y, y_probs)
            most_frequent_index = np.argmax(counts)
            self.prediction = most_frequent_index
        else:
            #mask for complete elements with attribute <= threshold
            feature_col = x[:, self.feature_idx].astype(float)
            non_nan_mask = ~np.isnan(feature_col)
            filled_feature = np.where(non_nan_mask, feature_col, np.inf)
            mask = non_nan_mask & (filled_feature <= self.threshold)
            
            #making left data with NaN elements
            x_left = np.concatenate((x[mask], x[~non_nan_mask]))
            y_left = np.concatenate((y[mask], y[~non_nan_mask]))
            y_left_probs = np.concatenate((y_probs[mask], y_probs[~non_nan_mask] * self.prob_right))
            
            #making right data with NaN elements
            x_right = np.concatenate((x[~mask], x[~non_nan_mask]))
            y_right = np.concatenate((y[~mask], y[~non_nan_mask]))
            y_right_probs = np.concatenate((y_probs[~mask], y_probs[~non_nan_mask] * (1 - self.prob_right)))
            
            #recursive calls to calculate children
            self.left = TreeNode(criterion=self.criterion, depth=self.depth + 1, num_classes=self.num_classes, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf)
            self.left.use_criterion(x_left, y_left, y_left_probs)
            
            self.right = TreeNode(criterion=self.criterion, depth=self.depth + 1, num_classes=self.num_classes, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf)
            self.right.use_criterion(x_right, y_right, y_right_probs)
        
    def make_predict(self, x: np.ndarray) -> np.array:
        '''used to predict results for x'''
        if self.prediction is not None: 
            #it is leaf, return matrix of classes probabilities
            probs = np.zeros((x.shape[0], self.num_classes), dtype=float)
            probs[:, self.prediction] = 1
            return probs
        else:
            #making mask for complete data with attribute >= threshold
            feature_col = x[:, self.feature_idx].astype(float)
            non_nan_mask = ~np.isnan(feature_col)
            filled_feature = np.where(non_nan_mask, feature_col, -np.inf)
            mask = non_nan_mask & (filled_feature >= self.threshold)
            
            indices_right = np.where(mask & non_nan_mask)[0]
            indices_left = np.where(~mask & non_nan_mask)[0]
            indices_nan = np.where(~non_nan_mask)[0]
            
            predictions_left = self.left.make_predict(x[indices_left])
            predictions_right = self.right.make_predict(x[indices_right])
            #making weighted predict
            predictions_nan = self.prob_right * self.right.make_predict(x[indices_nan]) + (1 - self.prob_right) * self.left.make_predict(x[indices_nan])
            
            predictions = np.empty((x.shape[0], self.num_classes), dtype=int)
            predictions[indices_left] = predictions_left
            predictions[indices_right] = predictions_right
            predictions[indices_nan] = predictions_nan
            
            return predictions
    
    def calculate_tree_depth(self) -> int:
        if self.prediction is not None:
            return self.get_depth()
        return max(self.left.calculate_tree_depth(), self.right.calculate_tree_depth())
    
    def get_depth(self) -> int:
        return self.depth



In [9]:
class AdaptiveDecisionTreeClassifier:
   def __init__(self, criterion: str = "gini", min_samples_split: int = 2, min_samples_leaf: int = 1):
      if criterion == "gini":
         self.criterion = CriterionGini()
      elif criterion == "entropy":
         self.criterion = CriterionEntropy()
      else:
         raise ValueError("this criterion doesnt exist")
      self.min_samples_split = min_samples_split
      self.min_samples_leaf = min_samples_leaf
      self.head : TreeNode = None
      self.depth = 0

   def fit(self, x: np.ndarray, y: np.array) -> "AdaptiveDecisionTreeClassifier":
      self.head = TreeNode(criterion=self.criterion, depth=0, num_classes=np.unique(y).size, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf)
      self.head.use_criterion(x, y, np.ones(y.size))
      return self

   def predict(self, x: np.ndarray) -> np.array:
      predictions = self.head.make_predict(x)
      return np.argmax(predictions, axis=1)

   def get_depth(self) -> int:
      return self.head.calculate_tree_depth()

##### Подготовка данных

In [44]:
#load iris dataset
file_path = './archive/iris.csv'
data_iris = pd.read_csv(file_path)


In [45]:
data_iris

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,virginica
146,6.3,2.5,5.0,1.9,virginica
147,6.5,3.0,5.2,2.0,virginica
148,6.2,3.4,5.4,2.3,virginica


In [46]:
data_iris_np = data_iris.to_numpy()
x_iris = data_iris_np[:, :-1]
y_iris = data_iris_np[:, -1]

#making train and test datasets
x_iris_train, x_iris_test, y_iris_train, y_iris_test = train_test_split(x_iris, y_iris, test_size=0.2, random_state=28)

#converting the names of irises into numbers
_, y_iris_train = np.unique(y_iris_train, return_inverse=True)
_, y_iris_test = np.unique(y_iris_test, return_inverse=True)


##### Сравнение с обычным деревом на полных данных

In [47]:
sk_clf = DecisionTreeClassifier(criterion='entropy', splitter='best', min_samples_split=2, min_samples_leaf=2, max_depth=20)
sk_clf.fit(x_iris_train, y_iris_train)


y_iris_pred_sk = sk_clf.predict(x_iris_test)
sk_clf_accuracy = accuracy_score(y_iris_test, y_iris_pred_sk)
sk_clf_f1 = f1_score(y_iris_test, y_iris_pred_sk, average="weighted")

In [48]:
adaptive_clf = AdaptiveDecisionTreeClassifier(criterion='gini', min_samples_split=2, min_samples_leaf=2)
adaptive_clf.fit(x_iris_train, y_iris_train)
y_iris_pred_adaptive = adaptive_clf.predict(x_iris_test)
adaptive_clf_accuracy = accuracy_score(y_iris_test, y_iris_pred_adaptive)
adaptive_clf_f1 = f1_score(y_iris_test, y_iris_pred_adaptive, average="weighted")

In [49]:
print(f"sklearn accuracy = {sk_clf_accuracy}, custom tree accuracy = {adaptive_clf_accuracy}")

sklearn accuracy = 0.9666666666666667, custom tree accuracy = 0.9666666666666667


In [50]:
print(f"sklearn f1 = {sk_clf_f1}, custom tree f1 = {adaptive_clf_f1}")

sklearn f1 = 0.9666666666666667, custom tree f1 = 0.9666666666666667


In [51]:
print(f"sklearn depth = {sk_clf.get_depth()}, custom tree depth = {adaptive_clf.get_depth()}")

sklearn depth = 4, custom tree depth = 4


In [52]:
results = pd.DataFrame({"x_0" : x_iris_test[:, 0], "x_1" : x_iris_test[:, 1], "x_2" : x_iris_test[:, 2],  \
    "x_3" : x_iris_test[:, 3],"y_test" : y_iris_test, "y_pred" : y_iris_pred_sk, "y_custom_pred" : y_iris_pred_adaptive})
results

Unnamed: 0,x_0,x_1,x_2,x_3,y_test,y_pred,y_custom_pred
0,4.8,3.4,1.9,0.2,0,0,0
1,6.1,2.6,5.6,1.4,2,2,2
2,5.5,2.4,3.7,1.0,1,1,1
3,5.8,4.0,1.2,0.2,0,0,0
4,5.8,2.8,5.1,2.4,2,2,2
5,5.0,2.3,3.3,1.0,1,1,1
6,6.4,2.8,5.6,2.1,2,2,2
7,5.6,3.0,4.5,1.5,1,1,1
8,5.2,2.7,3.9,1.4,1,1,1
9,5.0,3.2,1.2,0.2,0,0,0


Раздел демонстрирует абсолютное совпадение результатов полученных встроенным и самостоятельно реализованным деревом на данных, не содержащих пропусков

##### Подготовка данных с пропущенными атрибутами

Для проверки работы дерева на данных с пропущенными атрибутами с вероятностью 10 процентов превратим каждый атрибут тестовой и тренировочной выборок в nan

In [53]:
np.random.seed(125)
random_mask_train = np.random.random(x_iris_train.shape)
random_mask_test = np.random.random(x_iris_test.shape)
prob = 0.1
x_with_nan_train = x_iris_train.copy()
y_with_nan_train = y_iris_train.copy()
x_with_nan_test = x_iris_test.copy()
y_with_nan_test = y_iris_test.copy()
x_with_nan_train[random_mask_train < prob] = np.nan
x_with_nan_test[random_mask_test < prob] = np.nan


##### Проверка работы с пропущенными данными

In [54]:
adaptive_clf_with_nan = AdaptiveDecisionTreeClassifier(criterion="entropy", min_samples_leaf=2)
adaptive_clf_with_nan.fit(x_with_nan_train, y_iris_train)
y_pred_adaptive_nan = adaptive_clf_with_nan.predict(x_with_nan_test)
accuracy_adaptive_nan = accuracy_score(y_iris_test, y_pred_adaptive_nan)
f1_adaptive_nan = f1_score(y_iris_test, y_pred_adaptive_nan, average="weighted")

In [55]:
accuracy_adaptive_nan

0.8666666666666667

In [56]:
f1_adaptive_nan

0.866265664160401

In [57]:
results = pd.DataFrame({"x_0" : x_with_nan_test[:, 0], "x_1" : x_with_nan_test[:, 1], "x_2" : x_with_nan_test[:, 2], \
    "x_3" : x_with_nan_test[:, 3],"y_test" : y_with_nan_test, "y_pred" : y_pred_adaptive_nan})
results

Unnamed: 0,x_0,x_1,x_2,x_3,y_test,y_pred
0,4.8,3.4,1.9,0.2,0,0
1,6.1,2.6,5.6,1.4,2,1
2,,2.4,,1.0,1,0
3,5.8,4.0,,0.2,0,0
4,5.8,2.8,5.1,2.4,2,2
5,5.0,2.3,3.3,1.0,1,1
6,6.4,2.8,5.6,2.1,2,2
7,5.6,3.0,4.5,1.5,1,1
8,5.2,2.7,3.9,1.4,1,1
9,5.0,3.2,1.2,0.2,0,0


Для полученного набора данных с добавлением пропусков, точность равна 86 процентам, то есть снизилась на 10 процентов по сравнению с работой дерева на тех же данных без пропусков