In [1]:
from __future__ import annotations

from typing import Dict, List, Tuple, Optional, cast, Callable
from numbers import Number

from collections import deque

import numpy as np
import pandas as pd
import random

In [2]:
class Node(object):
    def __init__(
        self,
        feature: str | int | None = None,
        value: Optional[object] = None,
        left: Optional[Node] = None,
        right: Optional[Node] = None
    ) -> None:
        self.value = value
        self.left = left
        self.right = right
        self.f = feature


    def __repr__(self) -> str:
        return "{{f: {}, v: {}, left: {}, right: {}}}".format(self.f, self.value, self.left, self.right)


class DecisionTree(object):
    def __init__(self, root: Node, verbose: bool = False) -> None:
        self.root = root
        self.__verbose = verbose


    def __debug(self, value = None) -> None:
        if self.__verbose:
            print(value)


    def predict(self, x: np.ndarray) -> object:
        self.__debug(f"predicting for input: {x}")
        curr = self.root
        
        while curr.f is not None:
            self.__debug(f'feature: {curr.f}, value: {curr.value}')
            if x[curr.f] >= curr.value:
                curr = curr.left
            else:
                curr = curr.right
            curr = cast(Node, curr)
        
        self.__debug()
        return curr.value


In [3]:
X_train = np.array([[1, 1, 1],
[0, 0, 1],
 [0, 1, 0],
 [1, 0, 1],
 [1, 1, 1],
 [1, 1, 0],
 [0, 0, 0],
 [1, 1, 0],
 [0, 1, 0],
 [0, 1, 0]])

y_train = np.array([1, 1, 0, 0, 1, 1, 0, 1, 0, 0])

In [4]:
tree = DecisionTree(
    Node(
        feature=0,
        value=1,
        left=Node(
            feature=1,
            value=1,
            left=Node(
                value=1
            ),
            right=Node(
                value=0
            )
        ),
        right=Node(
            feature=2,
            value=1,
            left=Node(
                value=1
            ),
            right=Node(
                value=0
            )
        )
    )
)

In [5]:
y_hat = [tree.predict(X_train[i]) for i in range(X_train.shape[0])]
y_train - y_hat

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [6]:
tree.predict(X_train[0])

1

In [7]:
def entropy(y: float) -> float:
    if y == 0 or y == 1:
        return 0
    else:
        return -y * np.log2(y) - (1 - y) * np.log2(1 - y)

In [8]:
for i in np.arange(0, 1.1, .1):
    print(entropy(i))

0
0.4689955935892812
0.7219280948873623
0.8812908992306927
0.9709505944546686
1.0
0.9709505944546686
0.8812908992306926
0.7219280948873623
0.4689955935892811
0


In [9]:
def information_gain(left_idx: np.ndarray, right_idx: np.ndarray, parent_idx: np.ndarray, y: np.ndarray) -> float:
    parent_y = y[parent_idx]
    p_root = np.sum(parent_y) / parent_y.shape[0]

    left = y[left_idx]

    w_left = len(left) / len(y)
    p_left = np.sum(left) / (len(left) + 1e-15)

    right = y[right_idx]

    w_right = len(right) / len(y)
    p_right = np.sum(right) / (len(right) + 1e-15)

    return inf_gain(p_root, p_left, w_left, p_right, w_right)

def inf_gain(p_root: float, p_left: float, w_left: float, p_right: float, w_right: float) -> float:
    return entropy(p_root) - (w_left * entropy(p_left) + w_right * entropy(p_right))

In [10]:
inf_gain(.5, 4/5, .5, 1/5, .5)

0.2780719051126377

In [11]:
def split_by_feature(X: np.ndarray, idx: np.ndarray, feature: str | int, value: Number) -> Tuple[np.ndarray, np.ndarray]:
    left_mask = X[idx, feature] >= value
    right_mask = ~left_mask

    return idx[left_mask], idx[right_mask]

def get_best_feature_idx(
    X: np.ndarray,
    y: np.ndarray,
    cost_fn: Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray], float],
    idx: np.ndarray,
    features: Dict[str | int, List[Number]],
    feature_keys: List[str | int]
) -> Tuple[str | int, int, float]:

    costs: List[Tuple[str | int, int, float]] = []

    for feature_key in feature_keys:
        for i, value in enumerate(features[feature_key]):
            left_idx, right_idx = split_by_feature(X, idx, feature_key, value)

            cost = cost_fn(left_idx, right_idx, idx, y)
            costs.append((feature_key, i, cost))
    
    return max(costs, key=lambda x: x[2])

In [12]:
features: Dict[str | int, List[Number]] = {i:[1] for i in range(X_train.shape[1])}
feature_keys = list(features.keys())

X = X_train
y = y_train

idx = np.arange(X.shape[0])

fkey, f_idx, cost_decrease = get_best_feature_idx(X, y, information_gain, idx, features, feature_keys)
print(f'first level feature: {fkey}:{f_idx}, inf gain: {cost_decrease}')

left_idx, right_idx = split_by_feature(X, idx, fkey, features[fkey][f_idx])
print(left_idx)
print(right_idx)

left_fkey, left_f_idx, cost_decrease = get_best_feature_idx(X, y, information_gain, left_idx, features, feature_keys)
print(f'second level left feature: {left_fkey}:{left_f_idx}, inf gain: {cost_decrease}')

left_left_idx, left_right_idx = split_by_feature(X, left_idx, left_fkey, features[left_fkey][left_f_idx])
print(left_left_idx)
print(left_right_idx)

right_fkey, right_f_idx, cost_decrease = get_best_feature_idx(X, y, information_gain, right_idx, features, feature_keys)
print(f'second level right feature: {right_fkey}:{right_f_idx}, inf gain: {cost_decrease}')

right_left_idx, right_right_idx = split_by_feature(X, right_idx, right_fkey, features[right_fkey][right_f_idx])
print(right_left_idx)
print(right_right_idx)

first level feature: 0:0, inf gain: 0.27807190511263746
[0 3 4 5 7]
[1 2 6 8 9]
second level left feature: 1:0, inf gain: 0.7219280948873575
[0 4 5 7]
[3]
second level right feature: 2:0, inf gain: 0.7219280948873567
[1]
[2 6 8 9]


In [13]:
def is_categorical(unique_feature_vals: np.ndarray) -> bool:
    return np.max(unique_feature_vals) == 1 and np.min(unique_feature_vals) == 0 and len(unique_feature_vals) == 2


def make_continuous_decision_boundaries(unique_feature_vals: np.ndarray) -> np.ndarray:
    return (unique_feature_vals[:-1] + unique_feature_vals[1:]) / 2


def make_features(X: np.ndarray) -> Dict[str | int, List[Number]]:
    features = dict()

    for feature in range(X.shape[1]):
        unique_fvals = np.unique(X[:, feature])
        
        if is_categorical(unique_fvals):
            features[feature] = [1]
        else:
            features[feature] = make_continuous_decision_boundaries(unique_fvals)

    return features


def build_tree(
    X: np.ndarray,
    y: np.ndarray,
    cost_fn: Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray], float],
    compute_leaf: Callable[[np.ndarray], np.number],
    max_height: int,
    inf_gain_threshold: float = 1e-15,
    min_sample_count: int = 1,
    num_features: int | None = None,
    rng_seed: int | None = None
) -> DecisionTree:
    num_features = X.shape[1] if num_features is None else min(num_features, X.shape[1])
    random.seed(rng_seed)

    features = make_features(X)
    feature_keys = list(features.keys())

    idx = np.arange(X.shape[0])
    root = Node()
    
    queue: deque[Tuple[np.ndarray, Node, int]] = deque([(idx, root, 0)])
    
    while len(queue) > 0:
        idx, node, height = queue.popleft()

        if num_features < X.shape[1]:
            keys = random.sample(feature_keys, k=num_features)
        else:
            keys = feature_keys

        best_fkey, best_f_idx, max_inf_gain = get_best_feature_idx(X, y, cost_fn, idx, features, keys)

        # If stopping criteria is met
        if height == max_height\
        or idx.shape[0] <= min_sample_count\
        or max_inf_gain <= inf_gain_threshold:
            # Set the leaf node to be the dominant class
            node.value = compute_leaf(y[idx])
            continue

        # Update the node with the best feature
        fval = features[best_fkey][best_f_idx]
        node.f = best_fkey
        node.value = fval
        node.left = Node()
        node.right = Node()

        # Create the children nodes
        left_idx, right_idx = split_by_feature(X, idx, node.f, node.value)

        queue.append((left_idx, node.left, height + 1))
        queue.append((right_idx, node.right, height + 1))
    
    return DecisionTree(root)

In [14]:
def get_dominant_class(y: np.ndarray) -> np.integer:
    return np.argmax(np.bincount(y))

In [15]:
tree = build_tree(X_train, y_train, information_gain, get_dominant_class, 2)

tree.predict(X_train[0])
y_hat = [tree.predict(X_train[i]) for i in range(X_train.shape[0])]
y_train - y_hat

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0], dtype=int64)

In [16]:
rng_seed = np.random.default_rng(seed=0)

test_x = rng_seed.random(size=(20, 2))
test_y = (np.average(test_x, axis=1) > 0.5).astype(int)

In [17]:
print(test_x)
print(test_y)

[[0.63696169 0.26978671]
 [0.04097352 0.01652764]
 [0.81327024 0.91275558]
 [0.60663578 0.72949656]
 [0.54362499 0.93507242]
 [0.81585355 0.0027385 ]
 [0.85740428 0.03358558]
 [0.72965545 0.17565562]
 [0.86317892 0.54146122]
 [0.29971189 0.42268722]
 [0.02831967 0.12428328]
 [0.67062441 0.64718951]
 [0.61538511 0.38367755]
 [0.99720994 0.98083534]
 [0.68554198 0.65045928]
 [0.68844673 0.38892142]
 [0.13509651 0.72148834]
 [0.52535432 0.31024188]
 [0.48583536 0.88948783]
 [0.93404352 0.3577952 ]]
[0 0 1 1 1 0 0 0 1 0 0 1 0 1 1 1 0 0 1 1]


In [18]:
test_tree = build_tree(test_x, test_y, information_gain, get_dominant_class, max_height=2,)

In [19]:
print(test_tree.root)

{f: 1, v: 0.33401853613401294, left: {f: 0, v: 0.39277362468458693, left: {f: None, v: 1, left: None, right: None}, right: {f: None, v: 0, left: None, right: None}}, right: {f: None, v: 0, left: None, right: None}}


In [20]:
y_hat = [test_tree.predict(test_x[i]) for i in range(test_x.shape[0])]
print(y_hat - test_y)

[0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0]


In [21]:
from typing import Any
from numpy.typing import NDArray

def variance_gain(left_idx: np.ndarray, right_idx: np.ndarray, parent_idx: np.ndarray, y: np.ndarray) -> float:
    parent_y = y[parent_idx]
    parent_var = np.var(parent_y)

    left = y[left_idx]

    w_left = len(left) / len(y)
    left_var = np.var(left) if len(left) > 0 else 0.0

    right = y[right_idx]

    w_right = len(right) / len(y)
    right_var = np.var(right) if len(right) > 0 else 0.0

    return parent_var - (w_left * left_var + w_right * right_var)


def mse(y_hat: NDArray[np.floating[Any]], y: NDArray[np.floating[Any]]) -> np.floating[Any]:
    return np.average((y_hat - y) ** 2)


def get_mean_value(y: np.ndarray) -> np.floating:
    return np.average(y)

In [22]:
rng_seed = np.random.default_rng(seed=0)
X_reg = rng_seed.random(size=(100, 10))
y_reg = np.sum(X_reg ** 2, axis=1)

In [23]:
print(X_reg.shape)
print(y_reg.shape)

(100, 10)
(100,)


In [24]:
reg_tree = build_tree(X_reg, y_reg, variance_gain, get_mean_value, max_height=3)

In [25]:
print(reg_tree.root)

{f: 4, v: 0.8784579289518393, left: {f: 7, v: 0.7731027147673438, left: {f: 0, v: 0.8595724184235947, left: {f: None, v: 6.585205450572977, left: None, right: None}, right: {f: None, v: 5.5910333769445035, left: None, right: None}}, right: {f: 0, v: 0.21342352557953748, left: {f: None, v: 4.621060382833744, left: None, right: None}, right: {f: None, v: 3.8588176735664836, left: None, right: None}}}, right: {f: 9, v: 0.8338161758058992, left: {f: 1, v: 0.8479934402366498, left: {f: None, v: 5.241748979841595, left: None, right: None}, right: {f: None, v: 3.803558756762245, left: None, right: None}}, right: {f: 3, v: 0.760264000981916, left: {f: None, v: 4.002968667852741, left: None, right: None}, right: {f: None, v: 3.078938332071388, left: None, right: None}}}}


In [26]:
y_hat = np.asarray([reg_tree.predict(X_reg[i]) for i in range(X_reg.shape[0])])

print(f'mean squared error of prediction: {mse(y_hat, y_reg)}')

mean squared error of prediction: 0.3377934989274023


In [27]:
def build_forest(
    X: np.ndarray,
    y: np.ndarray,
    cost_fn: Callable[[np.ndarray, np.ndarray, np.ndarray, np.ndarray], float],
    compute_leaf: Callable[[np.ndarray], np.number],
    tree_height: int,
    num_features: int | None = None,
    num_trees: int = 100,
) -> List[DecisionTree]:

    forest = []

    for _ in range(num_trees):
        sampled_idx = rng_seed.integers(0, X.shape[0], size=X.shape[0])
        tree = build_tree(
            X[sampled_idx],
            y[sampled_idx],
            cost_fn,
            compute_leaf,
            max_height=tree_height,
            num_features=num_features,
            rng_seed=0
        )
        forest.append(tree)
    

    return forest

In [28]:
forest = build_forest(X_reg, y_reg, variance_gain, get_mean_value, tree_height=3, num_trees=100)

In [29]:
def predict(X: np.ndarray, trees: List[DecisionTree], voting_strategy: Callable[[np.ndarray], np.number]) -> np.ndarray:
    predictions = list()
    
    for i in range(X.shape[0]):
        votes = np.asarray([tree.predict(X[i]) for tree in trees])
        prediction = voting_strategy(votes)

        predictions.append(prediction)
    
    return np.asarray(predictions)


In [30]:
y_hat = predict(X_reg, forest, get_mean_value)

print(f'mean squared error of prediction: {mse(y_hat, y_reg)}')

mean squared error of prediction: 0.2273120061819379
