In [None]:
import numpy as np

%run DecisionTree.ipynb

# Random Forest

In [2]:
from collections import namedtuple
from collections import Counter

class RandomForest(BasicModel):
    DecisionTreeAndSelectedFeaturesTuple = \
        namedtuple(
            'DecisionTreeAndSelectedFeaturesTuple',
            ['decision_tree', 'selected_features']
        )
    
    def __init__(
        self,
        n_estimators=100,
        criterion='gini',
        max_depth=None,
        min_samples_split=2,
        max_features='auto',
        random_state=42,
        bootstrap=True,
        delete_tree_datasets=True
    ):
        super().check_value_type_and_set(
            'n_estimators',
            n_estimators,
            int
        )
        
        super().check_value_and_set(
            'criterion',
            criterion,
            ['gini', 'entropy', 'gain_ratio', 'mse', 'mae']
        )
        
        if criterion in ['mse', 'mae']:
            self.__task = 'regression'
        else:
            self.__task = 'classification'
        
        super().check_value_type_and_set(
            'max_depth',
            max_depth,
            (int, type(None))
        )
        
        super().check_value_type_and_set(
            'min_samples_split',
            min_samples_split,
            (int, float)
        )
        
        super().check_value_type_and_set(
            'max_features',
            max_features,
            (int, float, str, type(None))
        )
        
        if type(max_features) == str:
            super().check_value_and_set(
                'max_features',
                max_features,
                ['auto', 'sqrt', 'log2']
            )
        if self.max_features == 'auto':
            self.max_features = 'sqrt'
            
        super().check_value_type_and_set(
            'random_state',
            random_state,
            (np.random.RandomState, int)
        )
        if type(random_state) == int:
            self.random_state = np.random.RandomState(random_state)
        
        super().check_value_type_and_set(
            'bootstrap',
            bootstrap,
            bool
        )
        
        super().check_value_type_and_set(
            'delete_tree_datasets',
            delete_tree_datasets,
            bool
        )
        
        self.ensemble = []
    
    def fit(self, X, y):
        X = super().check_and_transform_X(X)
        y = super().check_and_transform_y(X, y)
        
        self.X, self.y = X, y
        
        self.max_features = DecisionTree.process_max_features(
            self.max_features,
            X
        )
            
        all_features = np.arange(self.X.shape[1])
        
        for i in range(self.n_estimators):            
            selected_features = self.random_state.choice(
                all_features,
                self.max_features,
                replace=False
            )
            
            training_data = None
            
            if self.bootstrap:
                training_data = self.__generate_bootstrap_sample(
                    selected_features
                )
            else:
                training_data = (
                    self.X[:, selected_features], 
                    self.y
                )
            
            dt = DecisionTree(
                criterion=self.criterion,
                splitter='best',
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                random_state=self.random_state
            )
            
            dt.fit(*training_data)
            
            if self.delete_tree_datasets:
                del dt.X
                del dt.y
            
            self.ensemble.append(
                self.DecisionTreeAndSelectedFeaturesTuple(
                    dt,
                    selected_features
                )
            )
    
    def __generate_bootstrap_sample(self, selected_features):
        observation_indexes = np.arange(self.X.shape[0])
        
        selected_observation_indexes = self.random_state.choice(
            observation_indexes,
            self.X.shape[0],
            replace=True
        )
        
        X = self.X[selected_observation_indexes, :][:, selected_features]
        y = self.y[selected_observation_indexes, :]
        
        return X, y
    
    @staticmethod
    def __predict_observation(
        decision_tree, 
        selected_features, 
        el
    ):        
        node = decision_tree.root
        
        while not node.is_leaf():
            if el[selected_features[node.feature]] <= node.split_value:
                node = node.left
            else:
                node = node.right
        
        return node.answer
    
    def predict(self, X):
        assert self.ensemble != [], "Not fitted" 
        
        X = super().check_and_transform_X(X)
        
        final_predictions = []
        
        for el in X:
            el_predictions = []
            
            for model in self.ensemble:
                el_prediction = self.__predict_observation(
                    *model,
                    el
                )
                
                el_predictions.append(el_prediction)
            
            final_prediction = None
            
            if self.__task == 'classification':
                final_prediction = (
                    Counter(el_predictions)
                    .most_common()[0][0]
                )
            else:
                final_prediction = np.mean(el_predictions)
            
            final_predictions.append(final_prediction)
        
        return np.array(final_predictions).reshape((-1, 1))

# Testing

In [3]:
from sklearn.datasets import make_classification, make_regression
from sklearn.metrics import roc_auc_score, mean_squared_error, mean_absolute_error
import unittest
import time

cl_X, cl_y = make_classification(100, 20)
cl_y = cl_y.reshape((100, 1))

regr_X, regr_y = make_regression(100, 20)
regr_y = regr_y.reshape((100, 1))

def time_fit_predict(
    X, 
    y,
    score_names=['ROC AUC'],
    score_funcs=[roc_auc_score],
    *args,
    **kwargs
):
    start = time.time()
    
    rf = RandomForest(*args, **kwargs)
    rf.fit(X, y)
    
    for score_name, score_func in zip(score_names, score_funcs):
        score = score_func(cl_y, rf.predict(cl_X))

        print("{} criterion {} score: {}".format(
            kwargs['criterion'].capitalize(), 
            score_name,
            score
        ))
    print("Time: {}\n\n".format(time.time() - start))

class TestRandomForest(unittest.TestCase):    
    def test_gini(self):
        time_fit_predict(cl_X, cl_y, criterion='gini')
        
    def test_entropy(self):
        time_fit_predict(cl_X, cl_y, criterion='entropy')
    
    def test_gain_ratio(self):
        time_fit_predict(cl_X, cl_y, criterion='gain_ratio')
        
    def test_mse(self):
        time_fit_predict(
            regr_X, 
            regr_y,
            ['MSE', 'MAE'],
            [mean_squared_error, mean_absolute_error],
            criterion='mse'
        )
    
    def test_mae(self):
        time_fit_predict(
            regr_X, 
            regr_y,
            ['MSE', 'MAE'],
            [mean_squared_error, mean_absolute_error],
            criterion='mae'
        )
        
    def test_n_estimators(self):
        n_estimators = 25
        
        rf = RandomForest(n_estimators=n_estimators)
        rf.fit(cl_X, cl_y)
        
        self.assertEqual(
            len(rf.ensemble),
            n_estimators
        )
        
    def test_max_features(self):
        max_features_values = ['log2', 6, 0.3, 'auto']
        numeric_max_features_values = [
            np.int(np.log2(20)), 
            6,
            np.int(20 * 0.3),
            np.int(np.sqrt(20))
        ]
        
        for max_features, numeric_max_features in zip(
            max_features_values,
            numeric_max_features_values
        ):            
            rf = RandomForest(
                max_features=max_features,
                delete_tree_datasets=False
            )
            rf.fit(cl_X, cl_y)

            for decision_tree, selected_features in rf.ensemble:
                self.assertEqual(
                    selected_features.shape[0],
                    numeric_max_features
                )
                
                self.assertEqual(
                    decision_tree.X.shape[1],
                    numeric_max_features
                )
    
    def test_bootstrap(self):
        bootstraps = [True, False]
        
        for bootstrap in bootstraps:
            rf = RandomForest(
                bootstrap=bootstrap,
                delete_tree_datasets=False
            )
            rf.fit(cl_X, cl_y)
            
            for decision_tree, selected_features in rf.ensemble:
                self.assertEqual(
                    np.all(decision_tree.X == rf.X[:, selected_features]),
                    not bootstrap
                )
    
    def test_max_depth(self):
        max_depth = 5
        
        def check_depth(node, depth=1):
            if not node.is_leaf():
                return check_depth(node.left, depth+1) and \
                       check_depth(node.right, depth+1)
            
            if depth > max_depth:
                result = False
            
            return True
        
        rf = RandomForest(max_depth=max_depth)
        rf.fit(cl_X, cl_y)
        
        for decision_tree, _ in rf.ensemble:
            self.assertEqual(
                check_depth(decision_tree.root),
                True
            )
    
    def test_min_samples_split(self):
        min_samples_split = 10
        
        def check_samples_split(node):
            if node.is_leaf():
                return True
            
            is_satisfied = \
                node.observation_indexes.shape[0] >= min_samples_split
            
            return is_satisfied and \
                   check_samples_split(node.left) and \
                   check_samples_split(node.right)
        
        rf = RandomForest(min_samples_split=min_samples_split)
        rf.fit(cl_X, cl_y)
        
        for decision_tree, _ in rf.ensemble:
            self.assertEqual(
                check_samples_split(decision_tree.root),
                True
            )

In [None]:
with open('tmp', "w") as f:
    runner = unittest.TextTestRunner(f)
    obj = unittest.main(
        argv=['first-arg-is-ignored', '--verbose', 'TestRandomForest'], 
        testRunner=runner,
        exit=False
    )

! cat tmp
! rm -r tmp

Entropy criterion ROC AUC score: 1.0
Time: 32.513078689575195




