In [103]:
import math
from typing import Optional
from typing import SupportsFloat
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
import numpy as np

In [104]:
# is threshold smaller or equal to the feature of sample
def split_func(sample: np.ndarray, feature_i: int, threshold: float) -> bool:
    return sample[feature_i] >= threshold

In [105]:
def divide_on_feature(x: np.ndarray, idx: int, val: float):
    x_1 = np.array([sample for sample in x if split_func(sample, idx, val)])
    x_2 = np.array([sample for sample in x if not split_func(sample, idx, val)])
    return np.array([x_1, x_2])

In [106]:
def log2(n: SupportsFloat) -> float:
    return math.log(n) / math.log(2)

In [107]:
def calculate_entropy(y: np.ndarray) -> float:
    unique_labels = np.unique(y)
    entropy = 0.0
    for label in unique_labels:
        labelCount = len(y[y == label])
        percentage = labelCount / len(y)
        entropy += -percentage * log2(percentage)
    return entropy

In [108]:
def calculate_info_gain(y: np.ndarray, y1: np.ndarray, y2: np.ndarray) -> float:
    p1 = len(y1) / len(y)
    p2 = len(y2) / len(y)
    entropy = calculate_entropy(y)
    info_gain = entropy - (p1 * calculate_entropy(y1) + p2 * calculate_entropy(y2))
    return info_gain

In [109]:
def majority_vote(y: np.ndarray) -> float:
    most_common = 0.0
    max_count = 0
    for label in np.unique(y):
        count = len(y[y == label])
        if count > max_count:
            most_common = label
    return most_common

In [110]:
class DecisionNode:
    """Class representing a node in a Decision Tree (Classification Tree)."""

    def __init__(
            self,
            feature_i: int = None,
            threshold: float = None,
            value: float = None,
            true_branch: DecisionNode = None,
            false_branch: DecisionNode = None,
    ) -> None:
        self.feature_i = feature_i
        self.threshold = threshold
        self.value = value
        self.true_branch = true_branch
        self.false_branch = false_branch

In [111]:
class ClassificationTree:
    """Class of Classification Tree"""

    def __init__(
            self,
            min_samples_split: int = 2,
            min_impurity: float = 1e-7,
            max_depth: float = np.inf
    ) -> None:
        self.root: DecisionNode = None
        self.min_samples = min_samples_split
        self.min_impurity = min_impurity
        self.max_depth = max_depth

    def fit(self, x: np.ndarray, y: np.ndarray):
        self.root = self._build_tree(x, y)

    def _build_tree(self, x: np.ndarray, y: np.ndarray, current_depth: int = 0):
        """Recursive method that creates a subtree of DecisionNodes based on splitting the given input data according to the minimum impurity, the maximum depth and the minimum amount of samples."""


        largest_impurity = 0.0
        best_criteria = {}
        best_sets = {}

        # add y (1D) as last column to x (2D)
        full_x = np.concatenate((x, np.reshape(y, newshape=(-1, 1))), axis=1)
        n_samples, n_features = full_x.shape

        # only split the data if there are more than min_samples and the depth is not deeper than max_depth
        if n_samples >= self.min_samples and current_depth <= self.max_depth:
            # for each feature, check the impurity of the inputs at that distinct feature
            for feature_i in range(n_features):
                if feature_i < len(x):
                    break
                feature_values = x[:, feature_i]
                unique_values = np.unique(feature_values) # no need to check impurity more than once per each distinct value

                for val in unique_values:
                    x_1, x_2 = divide_on_feature(full_x, feature_i, val)
                    # length of x_1 and x_2 needs to be greater than 0
                    if len(x_1) > 0 and len(x_2) > 0:
                        y1 = x_1[:, n_features:]
                        y2 = x_2[:, n_features:]
                        x1 = x_1[:, :n_features]
                        x2 = x_2[:, :n_features]

                        impurity = calculate_info_gain(y, y1, y2)
                        if impurity > largest_impurity:
                            largest_impurity = impurity
                            best_criteria = {
                                "feature_i": feature_i,
                                "threshold": val,
                            }
                            best_sets = {
                                "x_left": x1,
                                "x_right": x2,
                                "y_left": y1,
                                "y_right": y2
                            }
        # as long as the largest found impurity is greater than the min_impurity
        if largest_impurity > self.min_impurity:
            # build a subtree with the left half (smaller than threshold)
            true_branch = self._build_tree(best_sets["x_left"], best_sets["y_left"], current_depth+1)
            # build a subtree with the right values (greater than threshold)
            false_branch = self._build_tree(best_sets["x_right"], best_sets["y_right"], current_depth+1)
            return DecisionNode(
                feature_i=best_criteria["feature_i"],
                threshold=best_criteria["threshold"],
                true_branch=true_branch,
                false_branch=false_branch,
            )

        leaf_value = majority_vote(y)
        return DecisionNode(value=leaf_value)

    def predict_value(self, x: np.ndarray, tree: Optional[DecisionNode] = None):
        if tree is None:
            tree = self.root
        # in case of leaf simply return the leaf value
        if tree.value is not None:
            return tree.value

        # determine whether to check true_branch or false_branch
        feature_value = x[tree.feature_i]
        branch = tree.false_branch
        if feature_value >= tree.threshold:
            branch = tree.true_branch

        return self.predict_value(x, branch)

    def predict(self, x: np.ndarray) -> np.ndarray:
        y_pred = np.array([self.predict_value(xe) for xe in x])
        return y_pred

    def score(self, x: np.ndarray, y: np.ndarray):
        y_pred = self.predict(x)
        y_true = np.sum([y_pred_i == y_i for y_pred_i, y_i in zip(y_pred, y)])
        length = len(y)

        accuracy = y_true / length
        return accuracy

In [112]:
dataset = load_wine()
x = dataset.data
y = dataset.target

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3)

In [113]:
tree = ClassificationTree(min_samples_split=2, max_depth=6)
tree.fit(x_train, y_train)

accuracy = tree.score(x_test, y_test)
print(f"accuracy: {accuracy*100:.4}%")

accuracy: 24.07%
