# Random Forest

In [8]:
from typing import Optional
from dataclasses import dataclass

import numpy as np
import pandas as pd

## Some Helper Functions

In [186]:
def compute_entropy(labels: np.ndarray) -> float:
    """Computes the entropy of a given set of labels (assumed to be either 0 or 1)."""

    entropy: float = 0.0

    # there is zero entropy in an empty set of labels
    if labels.shape[0] == 0:
        return 0.0

    # calculate ratio of true to false labels
    true_rate = labels.sum() / labels.shape[0]
    false_rate = 1 - true_rate

    # return zero entropy if either of these rates are 0
    if true_rate == 0 or false_rate == 0:
        return 0.0

    return -1 * false_rate * np.log2(false_rate) - true_rate * np.log2(true_rate)

def compute_information_gain(old_labels: np.ndarray, new_labels_left: np.ndarray, new_labels_right: np.ndarray) -> float:
    """Computes the information gained from splitting up the labels in old_labels to the two new sets."""

    information_gain: float = 0.0

    # compute old entropy
    old_entropy = compute_entropy(old_labels)

    # compute new entropy
    left_weight = new_labels_left.shape[0] / old_labels.shape[0]
    right_weight = new_labels_right.shape[0] / old_labels.shape[0]
    new_entropy = (left_weight * compute_entropy(new_labels_left)) + (right_weight * compute_entropy(new_labels_right))

    return old_entropy - new_entropy

def split_on_attribute(X: np.ndarray, y: np.ndarray, split_attribute_index: int, split_value: float) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Splits the data in X and y according to the chosen split attribute index (a column in X) and
        the defined split value."""
    
    X_left = []
    X_right = []
    y_left = []
    y_right = []

    for i, feature_vector in enumerate(X):
        if feature_vector[split_attribute_index] <= split_value:
            X_left.append(feature_vector)
            y_left.append(y[i])
        else:
            X_right.append(feature_vector)
            y_right.append(y[i])
    
    return np.array(X_left), np.array(X_right), np.array(y_left), np.array(y_right)

def find_best_split(X: np.ndarray, y: np.ndarray) -> tuple[int, float, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Finds the best way to split up the current dataset for the highest information gain."""

    N = X.shape[0]
    d = X.shape[1]

    highest_information_gain = np.NINF
    attribute_index = 0
    split_value = 0
    X_left = np.array([])
    X_right = np.array([])
    y_left = np.array([])
    y_right = np.array([])

    # loop through a random choice of attributes
    for new_attribute_index in np.random.choice(np.arange(d), int(d**0.5), replace=False).tolist():

        # first calculate the value to split on, which will be the mean in this example
        new_split_value = X[:, attribute_index].sum() / N

        # build partitions
        X_left_tmp, X_right_tmp, y_left_tmp, y_right_tmp = split_on_attribute(X, y, new_attribute_index, new_split_value)

        # then find the information gain for this split
        new_info_gain = compute_information_gain(y, y_left_tmp, y_right_tmp)

        # if this is higher than any previous gain, save off the result
        if new_info_gain > highest_information_gain:
            highest_information_gain = new_info_gain
            attribute_index = new_attribute_index
            split_value = new_split_value
            X_left = X_left_tmp
            X_right = X_right_tmp
            y_left = y_left_tmp
            y_right = y_right_tmp

    return attribute_index, split_value, X_left, X_right, y_left, y_right

def is_leaf(y: np.ndarray) -> bool:
    """Determines if the labels array is for a leaf node."""

    return np.all(y == y[0]).item()

def get_mode(y: np.ndarray) -> int:
    """Finds the mode of the labels."""

    # could have also used -> 1 if a.sum() > len(a) // 2 else 0
    return max(set(y.tolist()), key=y.tolist().count)

def load_data() -> tuple[np.ndarray, np.ndarray]:
    df = pd.read_csv("./data/pima-diabetes.csv", names=["A", "B", "C", "D", "E", "F", "G", "H", "label"], header=None)
    X = df[["A", "B", "C", "D", "E", "F", "G", "H"]].to_numpy()
    y = df[["label"]].to_numpy().reshape(len(df))
    return X, y



print(compute_entropy(np.array([0,0,0,1,1,1,1,1,1])))
print(compute_information_gain(np.array([0,0,0,1,1,1]), np.array([0,0]), np.array([0,1,1,1])))
print(split_on_attribute(np.array([[3, 10],[1,22],[2,28],[5,32],[4,32]]), np.array([1,1,0,0,1]), 0, 3))
load_data()

0.9182958340544896
0.4591479170272448
(array([[ 3, 10],
       [ 1, 22],
       [ 2, 28]]), array([[ 5, 32],
       [ 4, 32]]), array([1, 1, 0]), array([0, 1]))


(array([[  6.   , 148.   ,  72.   , ...,  33.6  ,   0.627,  50.   ],
        [  1.   ,  85.   ,  66.   , ...,  26.6  ,   0.351,  31.   ],
        [  8.   , 183.   ,  64.   , ...,  23.3  ,   0.672,  32.   ],
        ...,
        [  5.   , 121.   ,  72.   , ...,  26.2  ,   0.245,  30.   ],
        [  1.   , 126.   ,  60.   , ...,  30.1  ,   0.349,  47.   ],
        [  1.   ,  93.   ,  70.   , ...,  30.4  ,   0.315,  23.   ]]),
 array([1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0,
        1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1,
        0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0,
        1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0,
        1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
        1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1,
        1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
        1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 

## Define a Decision Tree Class

In [58]:
@dataclass()
class Node:
    split_attribute_index: Optional[int] = None
    split_value: Optional[float] = None
    left_branch: Optional["Node"] = None
    right_branch: Optional["Node"] = None
    leaf_label: Optional[int] = None

    def is_leaf_node(self) -> bool:
        return self.leaf_label != None

In [195]:
class DecisionTree:
    def __init__(self, max_depth: int) -> None:
        self.__root_node: Node = Node()
        
        if max_depth > 0:
            self.__max_depth = max_depth
        else:
            raise Exception("tree must have a max depth of greater than zero")
    
    def learn(self, X: np.ndarray, y: np.ndarray, current_node: Node = None, depth: int = 1) -> None:
        """Construct the tree with recursive nodes."""

        if depth == 1:
            # this is the first iteration and there is no parent node, so operate on the root node variable

            # find best split in data
            split_attribute_index, split_value, X_left, X_right, y_left, y_right = find_best_split(X, y)
            # print(X_left.shape, X_right.shape, y_left.shape, y_right.shape)

            # assign the split data to the root node
            self.__root_node.split_attribute_index = split_attribute_index
            self.__root_node.split_value = split_value

            # build the left side of the tree recursively
            self.__root_node.left_branch = Node()
            self.learn(X_left, y_left, self.__root_node.left_branch, depth + 1)

            # build the right side of the tree recursively
            self.__root_node.right_branch = Node()
            self.learn(X_right, y_right, self.__root_node.right_branch, depth + 1)

        elif depth < self.__max_depth:
            # continue building the tree and populating the current node information

            # first check if the labels presented are for a leaf node
            if is_leaf(y):
                current_node.leaf_label = y[0]
            else:
                # split the data again
                split_attribute_index, split_value, X_left, X_right, y_left, y_right = find_best_split(X, y)

                # determine if we have resulted in any empty set of labels on either side
                if len(y_left) == 0 or len(y_right) == 0:
                    # if this is the case for either of the sides, we know that we don't want to hand off
                    #   another recursive call with an empty dataset. In this case, make it a leaf node
                    #   with the mode of the labels found on the populated side.
                    current_node.leaf_label = get_mode(y)
                else:
                    # assign split data to the current node
                    current_node.split_attribute_index = split_attribute_index
                    current_node.split_value = split_value

                    # continue building the left side of the tree
                    current_node.left_branch = Node()
                    self.learn(X_left, y_left, current_node.left_branch, depth + 1)

                    # continue building the right side of the tree
                    current_node.right_branch = Node()
                    self.learn(X_right, y_right, current_node.right_branch, depth + 1)

        else:
            # depth limit has been reached, do not split anymore and create a leaf node
            current_node.leaf_label = get_mode(y)

    def classify(self, record: np.ndarray) -> int:
        """Uses the tree to classify a feature vector."""

        node = self.__root_node

        while not node.is_leaf_node():
            # we are still traversing the tree, determine which side to go down
            if record[node.split_attribute_index] <= node.split_value:
                # go down the left side of the tree
                node = node.left_branch
            else:
                # go down the right side of the tree
                node = node.right_branch
        
        return node.leaf_label


X, y = load_data()
tree = DecisionTree(10)
tree.learn(X, y)

correct = 0
for i in range(len(X)):
    if tree.classify(X[i]) == y[i]:
        correct += 1

print(f"Correctly classified {correct}/{len(y)} for an accuracy of {correct/len(y)}")

Correctly classified 520/768 for an accuracy of 0.6770833333333334


## Grow a Random Forest

In [178]:
class RandomForest:
    def __init__(self, size: int, tree_depth: int) -> None:
        self.__size = size
        self.__trees = [DecisionTree(tree_depth) for _ in range(self.__size)]
        self.__bootstrap_datasets = []
        self.__bootstrap_labels = []

    def __run_bootstrapping(self, X: np.ndarray, y: np.ndarray, size) -> tuple[np.ndarray, np.ndarray]:
        indicies = np.random.randint(len(X), size=size)
        return X[indicies], y[indicies]
    
    def train(self, X: np.ndarray, y: np.ndarray) -> None:
        for i in range(self.__size):
            # build bootstrap dataset for this tree
            X_boot, y_boot = self.__run_bootstrapping(X, y, len(X))
            # print(len(X_boot), " ", len(y_boot))
            # print(y_boot)
            self.__bootstrap_datasets.append(X_boot)
            self.__bootstrap_labels.append(y_boot)

            # train the tree
            self.__trees[i].learn(X_boot, y_boot)
    
    def voting(self, X: np.ndarray) -> np.ndarray:
        y = []

        for record in X:
            votes = []

            for i, dataset in enumerate(self.__bootstrap_datasets):
                if record not in dataset:
                    # record vote for out-of-bag tree
                    votes.append(self.__trees[i].classify(record))

            counts = np.bincount(votes)

            if len(counts) == 0:
                # this is when record was found in all bootstrap datasets, classify on all trees instead
                for tree in self.__trees:
                    votes.append(tree.classify(record))
            
                # reassign counts to this new bincount
                counts = np.bincount(votes)

            y.append(np.argmax(counts))
        
        return np.array(y)


## Train the Forest

In [179]:
np.random.seed(98238923)
FOREST_SIZE = 30
TREE_DEPTH = 10

In [197]:
X, y = load_data()

forest = RandomForest(FOREST_SIZE, TREE_DEPTH)
forest.train(X, y)
predictions = forest.voting(X)
accuracy = np.sum(predictions == y) / len(y)

print(f"Accuracy: {round(accuracy, 4)}")


Accuracy: 0.651
