# Training

Given the whole dataset:

1. Calculate the information gain with each possible split (which feature to split on and on which threshold value)
2. Divide the set with the feature and the threshold that gives the most information gain
3. Repeat for all subbranches, until a stopping criteria is reached

# Inferencing

1. Traverse the tree with the features of the data point until a leaf is reached
2. Return the label of the leaf

# Model

## Information gain

Information gain is defined as $$IG(node) = Entropy(parent) - [\text{weighted average}]entropy(children)$$

$$Entropy(node) = -\sum_{i=1}^{c}p_i\log_2p_i$$
$$p_i = \frac{N_i}{N}$$

## Stopping criteria

1. Maximum depth: when the tree reaches a maximum depth
2. Minimum number of samples per node: when the number of samples in a node falls below a threshold
3. Min impurity decrease: when a split results in a decrease of the impurity greater than or equal to this value

In [1]:
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
import random
from tqdm import tqdm

In [2]:
class TreeNode:

    def __init__(self, feature_idx, threshold, left_child=None, right_child=None, *, leaf_value=None):
        self.feature_idx = feature_idx
        self.threshold = threshold
        self.left_child = left_child
        self.right_child = right_child
        self.leaf_value = leaf_value
    
    def is_leaf_node(self):
        return self.leaf_value is not None

class DecisionTree:

    def __init__(self, min_samples_leaf=2, max_depth=100, n_features=None):
        self.min_samples_leaf = min_samples_leaf
        self.max_depth = max_depth
        self.n_features = n_features
        self.root = None

    def fit(self, X, y):
        self.n_features = X.shape[1] if not self.n_features else min(self.n_features, X.shape[1])
        self.root = self._grow_tree(X, y, level=0)

    def _grow_tree(self, X, y, level):
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))

        # If we have reached a stopping criteria, return a leaf node
        if (level == self.max_depth or n_labels == 1 or n_samples < self.min_samples_leaf):
            leaf_value = self._most_common_label(y)
            return TreeNode(None, None, leaf_value=leaf_value)

        # Select random features to consider (we don't have to in the case of a Decision Tree, but it's mandatory in the case of a Random Forest)
        feature_idxs = np.random.choice(n_features, self.n_features, replace=False)

        # Greedily select the best split according to information gain
        best_feature_idx, best_threshold = self._best_split_criteria(X, y, feature_idxs)
        # Split the data using the threshold
        left_idxs, right_idxs = self._split(X[:, best_feature_idx], best_threshold)

        # Assign data points to the left or right child
        left_child = self._grow_tree(X[left_idxs, :], y[left_idxs], level+1)
        right_child = self._grow_tree(X[right_idxs, :], y[right_idxs], level+1)
        return TreeNode(best_feature_idx, best_threshold, left_child, right_child)

    def _best_split_criteria(self, X, y, feature_idxs):
        best_gain = -1
        split_criteria = None
        # Iterate through all possible splits and select the pair (feature, threshold) that maximizes information gain
        for feature_idx in feature_idxs:
            feature = X[:, feature_idx]
            thresholds = np.unique(feature)
            for threshold in thresholds:
                gain = self._information_gain(y, feature, threshold)
                if gain > best_gain:
                    best_gain = gain
                    split_criteria = (feature_idx, threshold)
        return split_criteria
    
    def _information_gain(self, y, feature, threshold):
        # Calculate the entropy of the parent node
        parent_entropy = self._entropy(y)

        # Calculate the entropy of the left and right child nodes
        left_idxs, right_idxs = self._split(feature, threshold)
        left_entropy = self._entropy(y[left_idxs])
        right_entropy = self._entropy(y[right_idxs])
        
        # Calculate the information gain
        information_gain = parent_entropy - (len(left_idxs)/len(y))*left_entropy - (len(right_idxs)/len(y))*right_entropy
        return information_gain
    
    def _entropy(self, y):
        # Calculate the entropy of a node according to the formula -sum(p_i*log(p_i)) where p_i is the probability of the i-th class
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / counts.sum()
        entropy = sum(probabilities * -np.log2(probabilities))
        return entropy

    def _split(self, X_column, split_thresh):
        # The index of the data points that are less than or equal to the threshold
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        # The index of the data points that are greater than the threshold
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs
    
    def _most_common_label(self, y):
        # Return the most common label in the data
        return np.bincount(y).argmax()
    
    def predict(self, X):
        return np.array([self._predict(x) for x in X])
    
    def _predict(self, x):
        # Traverse the tree to find the leaf node that the data point belongs to
        node = self.root
        while not node.is_leaf_node():
            if x[node.feature_idx] <= node.threshold:
                node = node.left_child
            else:
                node = node.right_child
        return node.leaf_value



In [3]:
def get_data_split():
    MAGIC = 42
    random.seed(MAGIC)
    np.random.seed(MAGIC)
    data = datasets.load_breast_cancer()
    X, y = data.data, data.target

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=MAGIC
    )

    return data, X_train, X_test, y_train, y_test

def accuracy(y_test, y_pred):
    return np.sum(y_test == y_pred) / len(y_test)

In [4]:
# Load a sample dataset
data, X_train, X_test, y_train, y_test = get_data_split()

print(f'Training data shape: {X_train.shape}')

tree_settings = {
    'max_depth': 2,
}

tree = DecisionTree(**tree_settings)
tree.fit(X_train, y_train)
tree_predictions = tree.predict(X_test)

tree_acc = accuracy(y_test, tree_predictions)
print(f'Single tree accuracy: {tree_acc}')

# Let's check which feature is the most important one
feature_names = data.feature_names
print(feature_names[tree.root.feature_idx])


Training data shape: (455, 30)
Single tree accuracy: 0.9035087719298246
mean concave points


# Random Forest

A random forest is a model that fits a number of decision tree classifiers on various sub-samples of the dataset and uses averaging to improve the predictive accuracy and control over-fitting. Each tree is trained on a random subset of the training data. The random subsets are the same size as the original training set, but are created by sampling with replacement. The idea is that by training each tree on different samples, although each tree might have high variance with respect to a particular set of the training data, overall, the entire forest will have lower variance but not at the cost of increasing the bias.

In [5]:
class RandomForest:

    def __init__(self, n_trees=10, min_samples_leaf=2, max_depth=100, n_features=None):
        self.min_samples_leaf = min_samples_leaf
        self.max_depth = max_depth
        self.n_features = n_features
        self.n_trees = n_trees

    def fit(self, X, Y):
        self.trees = []
        tree_iterator = tqdm(range(self.n_trees), desc='Training trees')
        for _ in tree_iterator:
            tree = DecisionTree(min_samples_leaf=self.min_samples_leaf, max_depth=self.max_depth, n_features=self.n_features)
            X_sample, Y_sample = self._sample(X, Y)
            tree.fit(X_sample, Y_sample)
            self.trees.append(tree)
    
    def _sample(self, X, Y):
        n_samples = X.shape[0]
        indices = np.random.choice(n_samples, n_samples, replace=True)
        return X[indices], Y[indices]
    
    def _most_common(self, Y):
        return np.bincount(Y).argmax()
    
    def predict(self, X, agg_type='mean'):
        predictions = []
        tree_iterator = tqdm(self.trees, desc='Predicting')
        for tree in tree_iterator:
            predictions.append(tree.predict(X))
        predictions = np.array(predictions)
        if agg_type == 'mean':
            return np.mean(predictions, axis=0)
        elif agg_type == 'majority':
            return np.apply_along_axis(self._most_common, axis=0, arr=predictions)
        else:
            raise ValueError('agg_type must be either mean or majority')


In [6]:
data, X_train, X_test, y_train, y_test = get_data_split()

print(f'Training data shape: {X_train.shape}')

forest = RandomForest(n_trees=10, **tree_settings)
forest.fit(X_train, y_train)
forest_predictions = forest.predict(X_test, agg_type='majority')

tree_acc = accuracy(y_test, forest_predictions)
print(f'Random forest accuracy: {tree_acc}')

Training data shape: (455, 30)


Training trees: 100%|██████████| 10/10 [00:09<00:00,  1.10it/s]
Predicting: 100%|██████████| 10/10 [00:00<00:00, 18774.86it/s]

Random forest accuracy: 0.956140350877193





# Ada Boosting

AdaBoosting (which stands for Adaptive Boosting) is an algorithm that allows to combine multiple weak classifiers into a weighted sum to produce a boosted classifier. It is usually used for binary classification, but can be extended to multi-class classification.



In [7]:
class Stump:

    def __init__(self):
        self.feature_idx = None
        self.threshold = None
        self.alpha = None
        self.p = None

    def predict(self, X):
        n_samples = X.shape[0]
        X_column = X[:, self.feature_idx]
        # Initially all the samples are classified as 1
        predictions = np.ones(n_samples)
        if self.p == 1:
            predictions[X_column < self.threshold] = -1
        else:
            predictions[X_column > self.threshold] = -1
        return predictions
    
class AdaBoost:

    def __init__(self, num_stumps=5):
        self.num_stumps = num_stumps
        self.stumps = []

    def fit(self, X, y):
        n_samples, n_features = X.shape

        # Initial weights are 1 / n_samples
        w = np.full(n_samples, (1 / n_samples))

        for _ in range(self.num_stumps):
            stump = Stump()
            min_error = float('inf')

            # Iterate through every feature and threshold to find the best decision stump
            for feature_idx in range(n_features):
                X_column = X[:, feature_idx]
                thresholds = np.unique(X_column)

                for threshold in thresholds:
                    p = 1
                    prediction = np.ones(n_samples)
                    prediction[X_column < threshold] = -1

                    # Error = sum of weights of misclassified samples
                    error = sum(w[y != prediction])

                    # If error is over 50% we flip the prediction so that error will be 1 - error
                    if error > 0.5:
                        error = 1 - error
                        p = -1

                    # Store the best configuration
                    if error < min_error:
                        stump.feature_idx = feature_idx
                        stump.threshold = threshold
                        min_error = error
                        stump.p = p

            # Calculate alpha
            EPS = 1e-10
            stump.alpha = 0.5 * np.log((1.0 - min_error + EPS) / (min_error + EPS))

            # Calculate predictions and update weights
            predictions = stump.predict(X)

            # Only misclassified samples have non-zero weights
            w *= np.exp(-stump.alpha * y * predictions)
            w /= np.sum(w)

            # Save stump
            self.stumps.append(stump)
    
    def predict(self, X):
        preds = [stump.alpha * stump.predict(X) for stump in self.stumps]
        preds = np.sum(preds, axis=0)
        preds = np.sign(preds)
        return preds
    

In [8]:
data, X_train, X_test, y_train, y_test = get_data_split()

# Change labels to be 1 and -1
y_train[y_train == 0] = -1

print(f'Training data shape: {X_train.shape}')

adaboost = AdaBoost(num_stumps = 5)
adaboost.fit(X_train, y_train)
adaboost_predictions = adaboost.predict(X_test)

adaboost_acc = accuracy(y_test, adaboost_predictions)
print(f'Adaboost accuracy: {adaboost_acc}')

Training data shape: (455, 30)
Adaboost accuracy: 0.6052631578947368
