# Classification Algorithms Using Trees

## Decision Trees (From Scratch)


In [1]:
import numpy as np
from sklearn.model_selection import train_test_split

from run_algos import utils

from collections import Counter
from typing import Union, Optional


# Black code formatter (Optional)
%load_ext lab_black

%load_ext autoreload
%autoreload 2

## Decision Trees

[![image.png](https://i.postimg.cc/RCDcYtgh/image.png)](https://postimg.cc/5j8YYXdW)

<br>

[Source](https://www.youtube.com/watch?v=NxEHSAfFlK8&t=287)

<br>

[![image.png](https://i.postimg.cc/nVBF8bkm/image.png)](https://postimg.cc/vD8F9KC8)


### Decisions To Be Made

1. **Split feature**: Which feature should be used for the splitting?
2. **Split point**: At what point in a numerical variable should we split?
3. **When to stop splitting**: When you should you stop splitting to avoid trees from growing so big?

### Steps

> The following steps are used to build the Decision Tree classifier from scratch.

#### Training
Given the entire dataset:
1. Calculate the information gain with each possible split. i.e using all the possible features, calculate the IG.
2. Divide the data with the feature and the value threshold (if it's numerical) that gives the most information gain.
3. The result from step 2 is used to create the branches.
4. Repeat steps 1 thru 3 until a stopping criteria is reached.

#### Making Predictions
Given a data point:

1. Traverse the tree until you reach a leaf node.
2. Return the most common class label i.e (if a leaf node is pure, return the class label otherwise, return a majority vote)

<hr>

### Important Terms

* Entropy: This refers to how much variance the data has. i.e. it measures how random or unpredictable a node is. The entropy is largest when a node has 50% of both classes (e.g. a binary class). It ranges between `0` and `1`.

$$
Entropy = - \sum^C_{i=1}(p_{i}*log_{2}(p_{i}))
$$

where:

$p_{i}$ is the probability of randomly picking an element of $class_{i}$ .

$C$ is the total number of classes. For a binary problem, $C = 2$. i.e $C_{unique} = [0, 1]$

[]

* **Information Gain (IG)**: This measures the quality of the splits. i.e. it measures how much entropy was removed by splitting on a feature. It's the basic criterion to decide whether a feature should be used to split a node or not. The feature with the optimal split i.e., the highest value of information gain at a node of a decision tree is used as the feature for splitting the node. It ranges between `0` and `1`.

$$
IG = Entropy_{parent} - (weighted_{average}* Entropy_{children})
$$}

where:

$weighted_{average}* Entropy_{children}$: $((\frac{num_{LeftNodes}}{total} * entropy_{Left}) + (\frac{num_{RightNodes}}{total} * entropy_{Right}))$

### Stopping Criteria

1. **Maximum depth**: This refers to how deep you want the tree to grow.
2. **Minimum no of samples**: Refers to the minimum number of samples a node can have before splitting can take place.
3. **Minimum impurity decrease**: Refers to the minimum entropy change required for a split to take place.

In [2]:
from typing import Any, NewType, Union

Tree = NewType("Tree", tp=Any)

# Create 2 classes. The 1st class (Node) is used to implement
# the node and all its attributes while 2nd class (DecisionTree)
# contains all the logic for the classifier. The DecisionTree has
# the attributes used as stopping criteria which prevents the tree
# from growing uncontrollably.


class Node:
    """This class is used to implement the nodes
    of a Decision Tree classifier."""

    def __init__(
        self,
        left: Union[float, int] = None,
        right: Union[float, int] = None,
        feature: Union[float, int] = None,
        threshold: Union[float, int] = None,
        *,
        value: Union[float, int] = None,
    ):
        self.left = left
        self.right = right
        self.feature = feature
        self.threshold = threshold
        self.value = value

    def _is_leaf_node(self) -> bool:
        """It returns True if it's a leaf node and False otherwise."""
        return self.value is not None


# Training
# Given the entire dataset:
# 1. Calculate the information gain with each possible split.
# i.e using all the possible features, calculate the IG.
# 2. Divide the data with the feature and the value threshold (if it's numerical)
# that gives the most information gain.
# 3. The result from step 2 is used to create the branches (grow the tree)
# a. check the stopping criteria to prevent growing trees uncontrollably.
# b. find the best split using IG.
# c. create child nodes.
# 4. Repeat steps 1 thru 3 until a stopping criteria is reached.


class DecisionTree:
    """This class is used to implement Decision Tree classifier."""

    def __init__(
        self,
        max_depth: int = 100,
        min_samples_split: int = 2,
        n_features: Optional[int] = None,
    ) -> None:
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.n_features = n_features
        self.root = None

    def __repr__(self) -> str:
        """This returns the string representation of the class."""
        return (
            f"{self.__class__.__name__}(max_depth={self.max_depth}, "
            f"min_samples_split={self.min_samples_split}, "
            f"n_features={self.n_features}, "
            f"root={self.root})"
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """This is used to train the model."""
        # check that the n_features is valid.
        all_feats = X.shape[1]
        self.n_features = (
            all_feats if self.n_features is None else min(all_feats, self.n_features)
        )
        # grow trees
        self.root = self._grow_tree(X, y)
        return self

    def _grow_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node:
        """This is used to create child nodes recursively.

        It returns a Node object
        """
        n_samples, n_feats = X.shape
        n_K = len(np.unique(y))  # number of unique labels.

        # Base case: check the stopping criteria: if the n_K = 1
        # or if the n_samples < min_samples_split or depth > max_depth
        # and return the only present label or the label with
        # the label with the highest frequency.
        if n_K == 1 or n_samples < self.min_samples_split or depth >= self.max_depth:
            leaf_node = DecisionTree._get_most_common_label(input_=y)
            return Node(value=leaf_node)

        # Randomly select features (indices)
        feature_idxs = np.random.choice(n_feats, size=self.n_features, replace=False)

        # Find the best split: Select the feature and the
        # label of the feature used for splitting
        best_feature, best_thresh = self._best_split(X, y, feature_idxs)

        # Create child nodes (recursively) using the result from the best_split
        left_idxs, right_idxs = DecisionTree._split_into_nodes(
            feat_matrix=X[:, best_feature], split_thresh=best_thresh
        )
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth=depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth=depth + 1)
        return Node(left, right, best_feature, best_thresh)

    @staticmethod
    def _best_split(
        X: np.ndarray, y: np.ndarray, feature_idxs: list[int]
    ) -> tuple[int, int]:
        """This uses information gain to calculate the best split at
        every node. It returns the feature index and the label of
        the feeature to split on.

        Returns:
            feat_split_idx, label_split_thresh: The best feature index
            and feature label respectively used to perform the split.
        """
        # Initialize variables
        feat_split_idx, label_split_thresh = None, None
        best_gain = -1

        # Calculate the IG for each feature and determine the best
        for feat_idx in feature_idxs:
            feat_matrix = X[:, feat_idx]  # Matrix (2-D)
            thresholds = np.unique(feat_matrix)  # Unique labels of the feature

            for thresh in thresholds:
                # Calculate the Info Gain
                info_gain = DecisionTree._calculate_info_gain(y, feat_matrix, thresh)
                # Update values
                if info_gain > best_gain:
                    best_gain = info_gain
                    feat_split_idx, label_split_thresh = feat_idx, thresh

        return (feat_split_idx, label_split_thresh)

    @staticmethod
    def _calculate_info_gain(
        y: np.ndarray, feat_matrix: np.ndarray, split_thresh: int
    ) -> float:
        """This is used to calculate the information gain.
        It ranges between 0 and 1."""
        # Calculate entropy of the parent
        parent_entropy = DecisionTree._calculate_entropy(y)

        # Create children i.e split into left and right nodes
        left_idxs, right_idxs = DecisionTree._split_into_nodes(
            feat_matrix, split_thresh
        )

        # If the left or right nodes is empty. i.e. after the split,
        # there are nodes with no class labels, info_gain=0
        if len(left_idxs) == 0 or len(right_idxs) == 0:
            info_gain = 0

        # Calculate weighted average: using the number of labels in the
        # left and right nodes and left and right entropies.
        num_left_nodes, num_right_nodes = len(left_idxs), len(right_idxs)
        left_entropy, right_entropy = DecisionTree._calculate_entropy(
            y[left_idxs]
        ), DecisionTree._calculate_entropy(y[right_idxs])

        # Calculate the entropy of the children
        child_entropy = (num_left_nodes / len(y) * left_entropy) + (
            num_right_nodes / len(y) * right_entropy
        )

        # Calculate the IG
        info_gain = parent_entropy - child_entropy
        return info_gain

    @staticmethod
    def _calculate_entropy(input_: Union[list[int], np.ndarray]) -> float:
        """This is used to calculate the entropy."""
        counts = np.bincount(input_)
        probs = counts / len(input_)
        entropy = -np.sum([(p_k * np.log2(p_k)) for p_k in probs if p_k > 0])
        return entropy

    @staticmethod
    def _get_most_common_label(input_: np.ndarray) -> int:
        """This returns the most common label."""
        counter = Counter(input_)
        return counter.most_common(n=1)[0][0]

    @staticmethod
    def _split_into_nodes(
        feat_matrix: np.ndarray, split_thresh: int
    ) -> tuple[list, list]:
        """This is used to split a node into the left and right nodes.
        It returns a tuple of lists.

        Params:
            feat_matrix (np.ndarray): A 2-D array (Matrix)
            split_thresh (int):
        """
        # Return the idxs that satisfy the condition
        left_idxs = np.argwhere(feat_matrix <= split_thresh).flatten()
        right_idxs = np.argwhere(feat_matrix > split_thresh).flatten()
        return (left_idxs, right_idxs)

    def predict(self, X: np.ndarray) -> np.ndarray:
        """This is used to make inference on the entire data."""
        pred = [self._traverse_tree(x, self.root) for x in X]
        return np.array(pred)

    def _traverse_tree(self, x, node: Node) -> int:
        """This is used to traverse recursively through the tree."""
        # Base case: Check if it's a leaf node
        if node._is_leaf_node():
            return node.value
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        # if x[node.feature] > node.threshold
        return self._traverse_tree(x, node.right)

In [3]:
X, y = utils.generate_mock_data(type_="classification")

# split the data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=utils.TEST_SIZE, random_state=utils.RANDOM_STATE
)

X_train.shape, X_test.shape

((1800, 11), (200, 11))

In [4]:
d_tree = DecisionTree(max_depth=5, min_samples_split=100)
d_tree.fit(X_train, y_train)

DecisionTree(max_depth=5, min_samples_split=100, n_features=11, root=<__main__.Node object at 0x132758c70>)

In [5]:
y_pred = d_tree.predict(X_test)

# Calculate accuracy
np.mean(y_pred == y_test)

# np.unique(y_test)

0.91

## Random Forests

[![image.png](https://i.postimg.cc/1R82hKM8/image.png)](https://postimg.cc/PNk21YgH)

### Overview
* This is a collection of many Decision Trees (hence `forest`).
* A **subset** of the data is chosen at **`random`** (hence `random`), and a decision tree is used to make predictions based on the subset of data chosen.
* The process is repeated a `N` number of times, where `N` is the number of decision trees in the forest.
* The predictions made by all of the decision trees in the forest are used to make the final prediction by majority voting at inference.
  * Classification: majority vote.
  * Regression: mean of the predictions.

<hr><br>

### Training
1. A subset of the data is chosen at random with replacement, resulting in some data points being repeated and not all of the actual data being used for training (bootstrapping).
2. Based on the subset of data selected, `N` decision trees are used to fit (train).

### Inference (Making Predictions)
1. The trained decision trees are used to make predictions.
2. For classification, the predictions made by all of the decision trees in the forest are used to make the final prediction by majority voting.

In [6]:
# 1. Init hyperparams
# * n_trees, * max_depth, * min_samples_split, * n_features, * root
# 2. Create N number of decision trees
# 3. Bootstrap the samples (the training data) with repetition so that not
# all of the training data is used.


class RandomForest:
    """This class is used to implement the Random Forest classifier."""

    def __init__(
        self,
        n_trees: int = 20,
        max_depth: int = 100,
        min_samples_split: int = 2,
        n_features: int = None,
        *args,
        **kwargs,
    ) -> None:
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.n_trees = n_trees
        self.trees = []
        self.args = args
        self.kwargs = kwargs

    def __repr__(self) -> str:
        return (
            f"{self.__class__.__name__}(min_num_sample={self.min_samples_split} "
            f"max_depth={self.max_depth}, "
            f"n_features={self.n_features}, "
            f"n_trees={self.n_trees})"
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        self.trees = []
        for _ in np.arange(self.n_trees):
            tree = DecisionTree(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                n_features=self.n_features,
            )
            X_sampled, y_sampled = self._booststrap(X=X, y=y)
            # Fit and append
            tree.fit(X_sampled, y_sampled)
            self.trees.append(tree)
        return self

    @staticmethod
    def _booststrap(*, X: np.ndarray, y: np.ndarray) -> tuple[list[int], list[int]]:
        """This returns random samples from the data having the
        same size as the training data."""
        n_samples = X.shape[0]
        # With replace=True ensures that not all the samples are chosen
        # because a few samples will be repeated and chosen_sample == n_samples
        chosen_samples = np.random.choice(n_samples, n_samples, replace=True)
        return (X[chosen_samples, :], y[chosen_samples])

    @staticmethod
    def _get_most_common_label(*, input_: np.ndarray) -> int:
        """This returns the most common label."""
        counter = Counter(input_)
        return counter.most_common(n=1)[0][0]

    def predict(self, X: np.ndarray) -> np.ndarray:
        # This returns the predicted labels for each data point per tree.
        # i.e tree_0_pred, tree_1_pred, tree_2_pred, ...
        # [[0,1,1], [1,1,0], [0,0,1], ...]
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        # But what we actually want are all the predictions by the trees
        # for each data point in a single array.
        # e.g. [[0,1,0] [1,1,0], [1,0,1], ...]
        predictions = np.swapaxes(tree_preds, axis1=0, axis2=1)
        predictions = [self._get_most_common_label(input_=pred) for pred in predictions]
        return np.array(predictions)

In [7]:
rf_clf = RandomForest(max_depth=50)

rf_clf.fit(X_train, y_train)

In [None]:
y_pred = rf_clf.predict(X=X_test)

# Accuracy
np.mean(y_pred == y_test)

In [None]:
from src.random_forest import RandomForest

clf = RandomForest()
clf.fit(X_train, y_train)

In [None]:
y_pred = clf.predict(X=X_test)

# Accuracy
np.mean(y_pred == y_test)