In [34]:
import numpy as np
import pandas as pd
from typing_extensions import Self

class Node:
    """
    Node in a decision tree.
    """
    def __init__(
        self,
        df: pd.DataFrame,
        target_col: str
    ) -> None:
        # For training
        self.df = df
        self.target_col = target_col
        self.pk = self._set_pk()
        self.gini = self._set_gini()

        # For training/inference
        self.left = None
        self.right = None

        # For inference
        self.feature = None
        self.threshold = None

    def _set_pk(self) -> float:
        """
        Sets pk, the prop samples that are the positive class.
        Assumes samples are an array of ints, where 1 is the positive
        class and 0 is the negative class.
        """
        return np.mean(self.df[self.target_col].values)

    def _set_gini(self) -> float:
        """
        Sets the Gini impurity.
        """
        return 1 - self.pk**2 - (1 - self.pk)**2
    
    def split_on_feature(
        self,
        feature: str
    ) -> tuple[float, int|float, Self, Self]:
        """
        Iterate through values of a feature and identify split that
        minimizes weighted Gini impurity in child nodes. Returns tuple
        of weighted Gini impurity, feature threshold, and left and
        right child nodes.
        """
        values = []

        for thresh in self.df[feature].unique():
            values.append(self._process_split(feature, thresh))

        values = [v for v in values if v[1] is not None]
        if values:
            return min(values, key=lambda x: x[0])
        return None, None, None, None
    
    def _process_split(
        self,
        feature: str,
        threshold: int|float
    ) -> tuple[float, int|float, Self|None, Self|None]:
        """
        Splits df on the feature threshold. Returns weighted Gini
        impurity, inputted threshold, and child nodes. If split
        results in empty subset, returns Gini impurity and None's.
        """
        df_lower = self.df[self.df[feature] <= threshold]
        df_upper = self.df[self.df[feature] > threshold]

        # If threshold doesn't split the data at all, end early
        if len(df_lower) == 0 or len(df_upper) == 0:
            return self.gini, None, None, None

        node_lower = Node(df_lower, self.target_col)
        node_upper = Node(df_upper, self.target_col)

        prop_lower = len(df_lower) / len(self.df)
        prop_upper = len(df_upper) / len(self.df)

        weighted_gini = node_lower.gini * prop_lower \
          + node_upper.gini * prop_upper

        return weighted_gini, threshold, node_lower, node_upper
    
class DecisionTree:
    """
    Decision tree classifier, constructed with Nodes
    """
    def __init__(
        self,
        df: pd.DataFrame,
        target_col: str,
        feature_select: float = 1.0,
        max_depth: int = 4
    ) -> None:
        self.root = Node(df, target_col)
        self.features = list(df)
        self.features.remove(target_col)
        self.feature_select = feature_select
        self.max_depth = max_depth
        
    def build_tree(self) -> None:
        """
        Builds tree using depth-first traversal
        """
        stack = [(self.root, 0)]

        while stack:
            current_node, depth = stack.pop()

            if depth <= self.max_depth:
                left, right = self._process_node(current_node)

                if left and right:
                    current_node.left = left
                    current_node.right = right
                    stack.append((right, depth+1))
                    stack.append((left, depth+1))

        return self
    
    def _process_node(
        self,
        node: Node,
        features: list[str]
    ) -> tuple[Node|None, Node|None]:
        """
        Iterates through features, identifies split that minimizes
        Gini impurity in child nodes, and identifies feature whose
        split minimizes Gini impurity the most. Then returns child
        nodes split on that feature.
        """
        # Randomly select features. No randomness if
        # self.feature_select = 1.0 (default).
        features = list(
            np.random.choice(
                self.features,
                int(self.feature_select*len(self.features)),
                replace=False
            )
        )

        # Get Gini impurity for best split for each column
        d = {}
        for col in features:
            feature_info = node.split_on_feature(col)
            if feature_info[0] is not None:
                d[col] = feature_info

        # Select best column to split on
        min_gini = np.inf
        best_feature = None
        for col, tup in d.items():
            if tup[0] < min_gini:
                min_gini = tup[0]
                best_feature = col

        # Only update if the best split reduces Gini impurity
        if min_gini < node.gini:
            # Update node
            node.feature = best_feature
            node.threshold = d[col][1]
            return d[col][2:]

        return None, None

    
    def classify(self, feature_df: pd.DataFrame) -> list[int]:
        """
        Given a dataframe where each row is a feature vector, traverses
        the tree to generate a predicted label.
        """
        return [
          self._classify(self.root, f) for i, f in feature_df.iterrows()
        ]

    def _classify(self, node: Node, features: pd.Series) -> int:
        """
        Given a vector of features, traverse the node's children until
        a leaf is reached, then return the most frequent class in the
        node. If there are an equal number of positive and negative
        labels, predicts the negative class.
        """
        # Child node
        if node.feature is None or node.threshold is None:
            return int(node.pk > 0.5)

        if features[node.feature] < node.threshold:
            return self._classify(node.left, features)
        return self._classify(node.right, features)

    
class RandomForest:
    """
    Forest of decision trees.
    """
    def __init__(self, df: pd.DataFrame, target_col: str, n_trees: int = 100, feature_select: float = 0.5, max_depth: int = 4) -> None:
        self.df = df
        self.target_col = target_col
        self.n_trees = n_trees
        self.feature_select = feature_select
        self.max_depth = max_depth
        self.forest = []
        
    def train(self) -> None:
        """
        Fit the forest to self.df
        """
        bootstrap_dfs = [self._bootstrap() for _ in range(self.n_trees)]
        self.forest = [
            DecisionTree(
              bdf,
              self.target_col,
              self.feature_select,
              self.max_depth
            )
            for bdf in bootstrap_dfs
        ]
        self.forest = [tree.build_tree() for tree in self.forest]
        return None

    def _bootstrap(self) -> pd.DataFrame:
        """
        Sample rows from self.df with replacement
        """
        return self.df.sample(len(self.df), replace=True)
    
    def classify(self, feature_df: pd.DataFrame) -> int:
        """
        Classify inputted feature vectors. Each tree in the forest
        generates a predicted label and the most common label for
        each feature vector is returned.
        """
        preds = pd.DataFrame(
          [tree.classify(feature_df) for tree in self.forest]
        )
        # Return most common predicted label
        return list(preds.mode().iloc[0])
    

In [35]:
from functools import partial
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier

# Set seed for reproducibility
np.random.seed(0)

# Data generation helpers. Only parameter to pass in is the mean.
npn = partial(np.random.normal, scale=1, size=1)
npc = partial(np.random.choice, a=[0,1], size=1)

# Use this function for a detailed look at decision tree formation
def gen_df(n: int) -> pd.DataFrame:
    labels = np.random.choice([0,1], n)
    return pd.DataFrame({
        'strong_continuous': [npn(3)[0] if x else npn(0)[0] for x in labels],
        'weak_continuous': [npn(1)[0] if x else npn(0)[0] for x in labels],
        'strong_categorical': [
            npc(p=[0.8, 0.2])[0] if x else npc(p=[0.5,0.5])[0]
            for x in labels
        ],
        'label': labels
    })

# Use this function for a random forest using many features
def gen_df_hd(n_rows: int, n_cols: int) -> pd.DataFrame:
    """
    Note: actual df has 2*n_cols, since we produce both
    a continuous and categorical feature for each col.
    """

    labels = np.random.choice([0,1], n_rows)
    d = {'label': labels}

    for i in range(n_cols):
        mean = np.random.uniform(0, 1, 1)[0]
        d[f"continuous_{i}"] = [
            npn(mean)[0] if x else npn(0)[0] for x in labels
        ]
        ratio = np.random.uniform(0.4, 0.6, 1)[0]
        d[f"categorical_{i}"] = [
            npc(p=[ratio, 1-ratio])[0] if x else npc(p=[0.5,0.5])[0]
            for x in labels
        ]

    return pd.DataFrame(d)

# Generate data
print("Generating train and test data")
train_df = gen_df_hd(400, 50)
test_df = gen_df_hd(100, 50)

# 1. Decision Tree
print("1. Fitting a decision tree")
decision_tree = DecisionTree(train_df, target_col='label')
decision_tree.build_tree()
tree_preds = decision_tree.classify(test_df)
tree_accuracy = round(accuracy_score(test_df['label'], tree_preds), 3)

# 2. Random Forest
print("2. Fitting a random forest")
forest = RandomForest(train_df, target_col='label', n_trees=50)
forest.train()
forest_preds = forest.classify(test_df)
forest_accuracy = round(accuracy_score(test_df['label'], forest_preds), 3)

# 3. Average tree in forest
print("3. Calculating average tree accuracy")
tree_accs = []
for i in range(forest.n_trees):
    forest_tree_preds = forest.forest[i].classify(test_df)
    tree_accs.append(accuracy_score(test_df['label'], forest_tree_preds))
forest_tree_accuracy = np.mean(tree_accs).round(3)

# 4. Scikit-learn
print("4. Fitting scikit-learn forest")
X_train = train_df.copy()
X_train = X_train.drop('label', axis=1)

X_test = test_df.copy()
X_test = X_test.drop('label', axis=1)

y_train = train_df['label']
y_test = test_df['label']

rf = RandomForestClassifier(n_estimators=50)
rf.fit(X_train, y_train)

sklearn_preds = rf.predict(X_test)
sklearn_acc = round(accuracy_score(y_test, sklearn_preds), 3)

# Display results
print("Accuracy")
print(f" * Single decision tree:   {tree_accuracy}")
print(f" * Avg random forest tree: {forest_tree_accuracy}")
print(f" * Full random forest:     {forest_accuracy}")
print(f" * Scikit-learn forest:    {sklearn_acc}")

Generating train and test data
1. Fitting a decision tree


TypeError: DecisionTree._process_node() missing 1 required positional argument: 'features'