# ID3 Decision Tree

## Условие
* Реализирайте алгоритъма за класификационно дърво ID3
* Използвайте ***кросвалидация*** за изчисляване на точността на модела върху обучаващото множество. 

* За избягване на ***overfitting*** използвайте константа **K** -- минимален брой на обучаващи примери в множеството. 

In [135]:
import math
import pandas as pd
import numpy as np
from ucimlrepo import fetch_ucirepo, dotdict
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass, field

In [136]:
# fetch dataset 

breast_cancer: dotdict = fetch_ucirepo(id=14)

X_df: pd.core.frame.DataFrame = breast_cancer.data.features 
y_df: pd.core.frame.DataFrame = breast_cancer.data.targets
X_df = X_df.fillna("?")

values: List[str] = [X_df[name].unique() for idx, name in enumerate(X_df.columns)]

X: np.ndarray = X_df.values
y: np.ndarray = y_df.values
pos_target: str = "recurrence-events"



In [137]:

Split = Dict[str, "Data"]

class Data:
    def __init__(self, 
                 X_train_ref,
                 y_train_ref,
                 values: List[str],
                 feature: int = -1,  
                 data: List[int] = None):
        
        self.X_train_ref = X_train_ref
        self.y_train_ref = y_train_ref
        self.values: List[str] = values 
        self.num_pos: int = 0
        self.feature: int = feature
        self.data: List[int] = data if data else []
        self._count_pos()

    def _count_pos(self) -> None:
        for idx in self.data:
            if self.y_train_ref[idx] == pos_target:
                self.num_pos += 1
    @property
    def prop_predict(self) -> bool:
        return self.prop_pos > 0.5

    @property
    def prop_pos(self) -> float:
        return self.num_pos / len(self.data)
    @property
    def zero_entropy(self) -> bool:
        return self.num_pos == len(self) or self.num_pos == 0 
    
    @property
    def full_entropy(self) -> bool:
        return self.num_pos == (len(self) - self.num_pos) 
    
    @property
    def entropy(self) -> float:
        if self.zero_entropy:
            return 0

        num_neg: int = (len(self) - self.num_pos) 
        prop_neg: float = num_neg / len(self)

        return -self.num_pos * math.log(self.prop_pos) - num_neg * math.log(prop_neg)

    def __len__(self) -> int:
        return len(self.data)

    def insert_idx(self, idx: int) -> None:
        self.data.append(idx)
        if self.y_train_ref[idx] == pos_target:
            self.num_pos += 1

    def split_on_attribute(self, feature: int) -> Split:
        spl: Split = {val: Data(self.X_train_ref, 
                                self.y_train_ref, 
                                self.values,
                                feature=feature) for val in self.values[feature]}
        
        for idx in self.data:
            val: str = self.X_train_ref[idx][feature]
            spl[val].insert_idx(idx)

        return spl

    def __repr__(self) -> str:
        return repr(self.data)

# dt = Data(X, y, values, data=list(range(len(X))))
# spl: Split = dt.split_on_attribute(0)
# # print(dt.data)
# # print(dt.num_pos)
# # print(dt.prop_pos)
# print(dt.entropy)
# for key, value in spl.items():
#     print(f"{key}: {value.entropy}")


In [138]:
class DecisionTree:
    K: int = 30
    D: int = 2
    def __init__(self, 
                 attributes: List[bool], 
                 data: Data=Data, depth=-1) -> None:
        
        self.data: Data = data
        self.children: Dict[str, DecisionTree] = {}
        self.attributes: List[bool] = attributes

        if depth != -1:
            self._build_children_depth(depth)
        else:
            self._build_children()
    
    def feature_id(self) -> int:
        return self.data.feature

    def information_gain(self, spl: Split) -> float:
        sum_term: float = sum((len(val) / len(self.data)) * val.entropy for _, val in spl.items())
        return self.data.entropy - sum_term

    def get_best_split(self) -> Tuple[int, Split]:
        best: Split = {}
        best_score: float = float("-inf")
        feature: int = -1
        for i in range(len(self.attributes)):
            if not self.attributes[i]:
                spl: Split = self.data.split_on_attribute(i)
                ig: float = self.information_gain(spl)
                if ig > best_score:
                    best_score = ig
                    best = spl
                    feature = i
        return feature, best

    @property
    def leaf(self) -> bool:
        return False if self.children else True

    def predict(self, x: np.ndarray) -> bool:
        if self.leaf:
            return self.data.prop_predict
        
        child = self.children[x[self.attribute]]
        if child.leaf and child.data.full_entropy:
            return self.data.prop_predict
        return child.predict(x)
        
    @property
    def attribute(self) -> str:
        return self.data.feature

    @property
    def all_used(self) -> bool:
        return sum(self.attributes) == len(self.attributes)

    def _build_children(self) -> None:
        if self.data.zero_entropy or self.all_used or len(self.data) < DecisionTree.K + 1:
            return

        feature, best_split = self.get_best_split()
        self.data.feature = feature
        self.attributes[feature] = True
    
        for val, data in best_split.items():
            self.children[val] = DecisionTree(self.attributes, data, depth=-1)

    def _build_children_depth(self, depth: int) -> None:
        if self.data.zero_entropy or self.all_used or depth > DecisionTree.D:
            return

        feature, best_split = self.get_best_split()
        self.data.feature = feature
        self.attributes[feature] = True
    
        for val, data in best_split.items():
            self.children[val] = DecisionTree(self.attributes, data, depth=depth + 1)


# attributes = [False for _ in range(X.shape[1])]
# n = DecisionTree(attributes, Data(X[100:], y[100:], values, data=list(range(len(X[100:])))))

# for i in range(len(X)):


# print("age", n.information_gain(spl))
# print(n.get_best_split())
# print(n.children)
# print(n.predict(X[0]))




## Measuring Accuracy

In [139]:
from sklearn.model_selection import  train_test_split, KFold


def measure_accuracy_num_left_pruning(X, y):
    """
    10-fold cross validation
    """
    kf = KFold(n_splits=10, shuffle=True)

    accs = []

    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        attributes = [False for _ in range(X.shape[1])]
        n = DecisionTree(attributes, Data(X_train, y_train, values, data=list(range(len(X_train)))))

        mistakes = 0
        for i in range(len(X_test)):
            temp = y_test[i] == pos_target
            if temp != n.predict(X_test[i]):
                mistakes += 1
        accs.append(1 - mistakes / len(X_test)) 
 
    return sum(accs) / len(accs)

accs = [measure_accuracy_num_left_pruning(X, y) for _ in range(10)]
for elem in accs: 
    print(elem)

print(f"Average: {sum(accs) / len(accs): .2f}")

0.6885467980295565
0.6642857142857144
0.6921182266009853
0.6860837438423645
0.6711822660098521
0.6852216748768473
0.6875615763546798
0.6967980295566502
0.6779556650246304
0.6928571428571428
Average:  0.68


In [140]:
def measure_accuracy_depth_pruning(X, y):
    kf = KFold(n_splits=10, shuffle=True)

    accs = []

    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        attributes = [False for _ in range(X.shape[1])]
        n = DecisionTree(attributes, Data(X_train, y_train, 
                                          values, 
                                          data=list(range(len(X_train)))), depth=0)

        mistakes = 0
        for i in range(len(X_test)):
            temp = y_test[i] == pos_target
            if temp != n.predict(X_test[i]):
                mistakes += 1
        accs.append(1 - mistakes / len(X_test)) 

    return sum(accs) / len(accs)


accs = [measure_accuracy_depth_pruning(X, y) for _ in range(10)]
for elem in accs: 
    print(elem)

print(f"Average: {sum(accs) / len(accs): .2f}")


0.7023399014778324
0.6883004926108374
0.6988916256157636
0.6923645320197045
0.6986453201970444
0.7171182266009853
0.6915024630541872
0.699384236453202
0.6955665024630541
0.6924876847290641
Average:  0.70


* Сравнение със ***sklearn***

In [141]:
from sklearn.model_selection import cross_val_score
from sklearn import tree
from sklearn.preprocessing import OrdinalEncoder

encoder = OrdinalEncoder()

X_encoded = pd.DataFrame(encoder.fit_transform(X_df), columns=X_df.columns)
y_encoded = pd.DataFrame(encoder.fit_transform(y_df), columns=y_df.columns)

print(f"entropy criterion, min_samples_split={DecisionTree.K}, max_depth=None")
clf = tree.DecisionTreeClassifier(criterion="entropy", 
                                  min_samples_split=DecisionTree.K)
accs = cross_val_score(clf, X_encoded, y_encoded, cv=10)
print(f"Average: {accs.mean(): .2f}")


print(f"entropy criterion, min_samples_split=2, max_depth={DecisionTree.D}")
clf = tree.DecisionTreeClassifier(criterion="entropy", 
                                  max_depth=DecisionTree.D)
accs = cross_val_score(clf, X_encoded, y_encoded, cv=10)
print(f"Average: {accs.mean(): .2f}")



entropy criterion, min_samples_split=30, max_depth=None
Average:  0.73
entropy criterion, min_samples_split=2, max_depth=2
Average:  0.71


## Random Forest

In [142]:
class RandomForest:
    """
    Samples B times with replacement and build a forest of B trees. 
    K is the number of features to be used to build the tree. 
    Picks random subset of K features for each of the trees in the forest.
    """
    def __init__(self, X_train: np.ndarray, y_train, B: int = 100, K: int = -1) -> None:
        indices: np.ndarray = [np.random.choice(len(X_train), 
                                                size=len(X_train), 
                                                replace=True) for _ in range(B)]
        if K == -1:
            K = int(math.sqrt(X.shape[1]))
        
        features: np.ndarray = np.arange(X.shape[1])
        columns: List[np.ndarray] = [np.sort(np.random.choice(features, 
                                                              size=K, 
                                                              replace=False)) for _ in range(B)]
 
        X_trains: List[np.ndarray] = [X_train[indx] 
                                      for indx, X 
                                      in zip(indices, X)]

        y_trains: List[np.ndarray] = [y_train[indx] 
                                      for indx, y 
                                      in zip(indices, y)]
 

        values: List[List[str]] = [np.unique(X[:, i]) for i in range(X.shape[1])]
        
        
        self.models: List[DecisionTree] = [DecisionTree([i not in columns[k] for i in range(X_train.shape[1])], # forbidden use of attributes not chosen
                                                         Data(X, y, values, data=list(range(len(X)))), depth=0) 
                                                         for k, (X, y) in enumerate(zip(X_trains, y_trains))] # Build forest



    def predict(self, X: np.ndarray) -> bool:
        votes_pos = sum(model.predict(X) for model in self.models)
        return votes_pos / len(self.models) > 0.5


In [143]:
n_estimators: int = 64
K: int = X.shape[1]

In [144]:

def measure_accuracy_rf(X, y):
    """
    10-fold cross validation
    """
    global n_estimators, K
    kf = KFold(n_splits=10, shuffle=True)

    accs = []

    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        rf: RandomForest = RandomForest(X, y, n_estimators, K)

        mistakes = 0
        for i in range(len(X_test)):
            temp = y_test[i] == pos_target
            if temp != rf.predict(X_test[i]):
                mistakes += 1
        accs.append(1 - mistakes / len(X_test)) 
 
    return sum(accs) / len(accs)

accs = [measure_accuracy_rf(X, y) for _ in range(10)]

for elem in accs: 
    print(elem)

print(f"Average: {sum(accs) / len(accs): .2f}")

TypeError: measure_accuracy_rf() takes 2 positional arguments but 3 were given

In [None]:
from sklearn.ensemble import RandomForestClassifier


clf = RandomForestClassifier(n_estimators=n_estimators, 
                            criterion="entropy",
                            max_features=k, 
                            max_depth=DecisionTree.D)
accs = cross_val_score(clf, X_encoded, y_encoded['Class'], cv=10)

print(f"Average accuracy: {accs.mean()}")

Average accuracy: 0.7445812807881773
