## Minimal Random Forest Classifier Implementation

In [1]:
from concurrent.futures import ProcessPoolExecutor, as_completed

In [2]:
import numpy as np

In [3]:
from sklearn.base import BaseEstimator
from scipy.stats import mode

In [4]:
class Node:
    def __init__(
        self, *, feature=None, threshold=None, left=None, right=None, value=None
    ):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    @property
    def is_leaf_node(self):
        return self.value is not None

In [5]:
class DecisionTreeClassifier(BaseEstimator):
    def __init__(self, max_depth=100, min_samples_split=2, n_features=None):
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.root = None
        self.X_features_n = None

    @staticmethod
    def _most_frequent(x):  # Finding most frequent number of an array.
        return mode(x, keepdims=False)[0]

    @staticmethod
    def _entropy(x):
        return -np.sum(
            np.fromiter(
                (
                    p * np.log(p)
                    for p in np.divide(np.bincount(x), (x.shape[0]))
                    if p > 0
                ),
                dtype=np.float64,
            )
        )

    @staticmethod
    def _split(x, threshold):
        condition = x <= threshold
        return (
            np.argwhere(condition).flatten(),
            np.argwhere(np.invert(condition)).flatten(),
        )

    def _information_gain(self, x, y, threshold):
        parent_entropy = self._entropy(y)
        # child
        left_idx, right_idx = self._split(x, threshold)
        if (left_idx.size == 0) or (right_idx.size == 0):
            return 0

        # Weighted average of children entropy
        child_left_entropy = self._entropy(y[left_idx])
        child_right_entropy = self._entropy(y[right_idx])
        child_entropy = (
            left_idx.shape[0] * child_left_entropy
            + right_idx.shape[0] * child_right_entropy
        ) / y.shape[0]

        return parent_entropy - child_entropy

    def _best_split(self, X, y, features_idx):
        best_gain = 0
        split_idx, split_threshold = None, None

        for feature_idx in features_idx:
            thresholds = np.unique(X[:, feature_idx])
            for threshold in thresholds:
                gain = self._information_gain(X[:, feature_idx], y, threshold)

                if gain >= best_gain:
                    best_gain = gain
                    split_idx = feature_idx
                    split_threshold = threshold

        return split_idx, split_threshold

    def _grow_tree(self, X, y, depth=0):
        n_labels = np.unique(y).shape[0]

        # In these cases, we need to stop
        if (
            depth >= self.max_depth
            or n_labels == 1
            or X.shape[0] < self.min_samples_split
        ):
            return Node(value=self._most_frequent(y))

        best_feature_idx, best_threshold = self._best_split(
            X, y, np.random.choice(X.shape[1], self.n_features, replace=False)
        )
        # Create child
        left_idx, right_idx = self._split(X[:, best_feature_idx], best_threshold)
        left_child = self._grow_tree(X[left_idx, :], y[left_idx], depth=depth + 1)
        right_child = self._grow_tree(X[right_idx, :], y[right_idx], depth=depth + 1)

        return Node(
            feature=best_feature_idx,
            threshold=best_threshold,
            left=left_child,
            right=right_child,
        )

    def fit(self, X, y):
        assert X.shape[0] == y.shape[0]
        self.X_features_n = X.shape[1]

        self.n_features = X.shape[1] if self.n_features is None else self.n_features
        self.root = self._grow_tree(X, y)

        return self

    def _predict_one(self, x):
        node = self.root
        while not node.is_leaf_node:
            if x[node.feature] <= node.threshold:
                node = node.left
            else:
                node = node.right

        return node.value

    def predict(self, X):
        assert X.shape[1] == self.X_features_n, f"{X.shape[1]=} != {self.X_features_n=}"
        return np.array([self._predict_one(x) for x in X])


In [6]:
class RandomForsetClassifier(BaseEstimator):
    def __init__(
        self,
        n_estimators=25,
        max_depth=20,
        min_samples_split=2,
        n_features=None,
        undersampling=1,
        n_jobs=-1,
    ):
        self.n_estimators = n_estimators
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.n_features = n_features
        self.undersampling = undersampling
        self.trees = [
            DecisionTreeClassifier(
                min_samples_split=min_samples_split,
                max_depth=max_depth,
                n_features=n_features,
            )
            for _ in range(n_estimators)
        ]
        self.masks = None
        self.X_features_n = None
        self.n_jobs = None if n_jobs == -1 else n_jobs

    def _bagging(self, n):
        return np.random.choice(n, int(self.undersampling * n), replace=True)

    def fit(self, X, y):
        assert X.shape[0] == y.shape[0]
        self.X_features_n = X.shape[1]

        with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
            tasks = [
                executor.submit(tree.fit, X[mask, :], y[mask])
                for tree, mask in zip(
                    self.trees,
                    (self._bagging(X.shape[0]) for _ in range(self.n_estimators)),
                )
            ]

            trained_trees = []
            for task in as_completed(tasks):
                trained_trees.append(task.result())

            self.trees = trained_trees

    def predict(self, X):
        assert X.shape[1] == self.X_features_n
        predictions = []

        with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
            tasks = [executor.submit(tree.predict, X) for tree in self.trees]
            for task in as_completed(tasks):
                predictions.append(task.result())

        return np.apply_along_axis(
            lambda col: mode(col, keepdims=False)[0], 0, np.vstack(predictions)
        )

In [7]:
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=2_000, n_features=10, n_classes=3, n_informative=8)

In [8]:
clf = RandomForsetClassifier(n_estimators=20, max_depth=10, undersampling=0.6, n_jobs=-1)
clf.fit(X, y)

In [9]:
from sklearn.metrics import accuracy_score
accuracy_score(y, clf.predict(X))

0.9665

In [11]:
from sklearn.model_selection import cross_val_score
from scipy.stats import hmean
hmean(cross_val_score(RandomForsetClassifier(n_estimators=25, max_depth=10, undersampling=0.6), X, y, scoring='accuracy', cv=10, n_jobs=1))

0.8550196171722431

In [12]:
from sklearn.ensemble import RandomForestClassifier as sk_clf
hmean(cross_val_score(sk_clf(max_depth=10), X, y, scoring='accuracy', cv=10, n_jobs=-1))

0.8657871057622333