### Import

In [1]:
import numpy as np
import math

### Define train and test data

In [2]:
X_train = np.array([[1.5, 0.8, 0.4],
                   [2.0, 1.0, 2.3],
                   [2.2, 1.2, 8.4],
                   [2.4, 1.5, 3.1],
                   [3.0, 2.0, 6.8],
                   [3.5, 2.5, 5.3],
                   [4.0, 3.0, 1.7],
                   [4.5, 3.5, 12.3],
                   [5.0, 4.0, 4.2],
                   [5.5, 4.5, 15.2]])
Y_train = np.array([2.3, 1.2, 2.54, 4.6, 1.2, 1.9, 2.7, 1.2, 2.5, 3.8])

X_unseen= np.array([[3.0, 4.2, 7.9], [7.2, 8.6, 10.5], [10.2, 12.3, 6.9]])

### Define a decision tree

In [3]:
class Node:
    def __init__(self, left=None, right=None, feature_index=None, threshold=None, value=None):
        self.left = left
        self.right = right
        self.feature_index = feature_index
        self.threshold = threshold
        self.value = value

In [4]:
class DTree:
    def __init__(self, max_depth):
        self.max_depth = max_depth
        self.model = None

    def train(self, X, Y):
        self.model = self._grow(X, Y)

    def predict(self, X):
        # Predicting every x examples' feature values with the calculated decision tree
        return np.array([self._predict(x, self.model) for x in X])

    def _grow(self, X, Y, depth=0):
        num_samples, num_features = X.shape
        num_classes = len(np.unique(Y))
        
        best_mape = math.inf
        best_index = None
        best_threshold = None
        best_sets = None

        # Base case, if there's only 1 class or has reach max depth
        if num_classes == 1 or depth == self.max_depth:
            return Node(value=np.mean(Y))

        # Loop through all the features
        for feature_index in range(num_features):
            # Find thresholds
            thresholds = np.unique(X[:, feature_index])

            # For each threshold, split the chosen feature set into half base on the value of chosen threshold.
            # Our goal is to find the best threshold to divide classes
            for threshold in thresholds:
                left_indices = np.where(X[:, feature_index] <= threshold)[0]
                right_indices = np.where(X[:, feature_index] > threshold)[0]

                # If one or the other side is empty, we don't consider it
                if len(left_indices) == 0 or len(right_indices) == 0:
                    continue

                # Perform MAPE on both left and right
                Y_left = Y[left_indices]
                Y_right = Y[right_indices]

                Y_pred_left = np.mean(Y_left)
                Y_pred_right = np.mean(Y_right)
                
                mape_left = self._mean_absolute_percentage_error(Y_left, Y_pred_left)
                mape_right = self._mean_absolute_percentage_error(Y_right, Y_pred_right)

                mape = (np.sum(left_indices) * mape_left + np.sum(right_indices) * mape_right) / num_samples

                # If that mape is better than the current best, we store it to use later
                if mape < best_mape:
                    best_mape = mape
                    best_index = feature_index
                    best_threshold = threshold
                    best_sets = (left_indices, right_indices)

        # If for any reason mape score hasn't been calculated
        if best_mape == math.inf:
            return Node(value=np.mean(Y))

        # Continue to grow recursively of both left and right side and append it to the current node
        # using the best sets that we have found above
        left = self._grow(X[best_sets[0]], Y[best_sets[0]], depth + 1)
        right = self._grow(X[best_sets[1]], Y[best_sets[1]], depth + 1)
        
        return Node(feature_index=best_index, threshold=best_threshold, left=left, right=right)

    def _mean_absolute_percentage_error(self, Y_actual, Y_pred):
        return np.mean(np.abs((Y_actual - Y_pred) / Y_actual)) * 100
        
    def _predict(self, x, node):
        # Return prediction value if it's available, aka. bottom of the tree
        if node.value is not None:
            return node.value

        # Else descend lower into the tree and return the final prediction all the way up
        if x[node.feature_index] <= node.threshold:
            return self._predict(x, node.left)
        else:
            return self._predict(x, node.right)

#### Testing out decision tree

In [5]:
dtree = DTree(5)
dtree.train(X_train, Y_train)
print("Our model prediction: " + str(dtree.predict(X_unseen)))

# Comparing to Sklearn
from sklearn.tree import DecisionTreeRegressor

# Since our implementation doesn't have any randomness involved, we need to set a fixed random seed
dtree = DecisionTreeRegressor(max_depth=5, random_state=1)
dtree.fit(X_train, Y_train)
print("Sklearn model prediction: " + str(dtree.predict(X_unseen)))

Our model prediction: [1.2 3.8 3.8]
Sklearn model prediction: [1.2 3.8 3.8]


### Define a random forest using the decision tree

In [6]:
class RandomForest:
    def __init__(self, num_tree, max_tree_depth, max_feature, seed):
        self.num_tree = num_tree
        self.trees = []
        self.max_tree_depth = max_tree_depth
        self.max_feature = max_feature
        self.seed = seed

    def train(self, X, Y):
        np.random.seed(self.seed)
        
        for _ in range(self.num_tree):
            # Choosing random examples to train with, or bootstrapping with replacement
            sample_indices = np.random.choice(X.shape[0], size=X.shape[0], replace=True)

            # Choosing random features to train with without replacement
            feature_indices = np.random.choice(X.shape[1], size=self.max_feature, replace=False)

            # Compiling into a training data format
            X_training = X[sample_indices][:, feature_indices]
            Y_training = Y[sample_indices]

            dtree = DTree(self.max_tree_depth)
            dtree.train(X_training, Y_training)
            self.trees.append(dtree)

    def predict(self, X):
        predictions = []

        # Calculate result for individual trees
        for tree in self.trees:
            predictions.append(tree.predict(X))

        # Averaging from all the predictions each trees made
        return np.mean(predictions, axis=0)    

#### Testing out random forest

In [7]:
rf = RandomForest(num_tree=100, max_tree_depth=5, max_feature=2, seed=1)
rf.train(X_train, Y_train)
print("Our model prediction: " + str(rf.predict(X_unseen)))

# Comparing to Sklearn
from sklearn.ensemble import RandomForestRegressor

rf = RandomForestRegressor(max_depth=5, random_state=1, criterion='absolute_error', max_features=2, n_estimators=100)
rf.fit(X_train, Y_train)
print("Sklearn model prediction: " + str(rf.predict(X_unseen)))

Our model prediction: [2.36033333 3.0078     2.9712    ]
Sklearn model prediction: [2.0686 2.9921 2.9761]
