-
Notifications
You must be signed in to change notification settings - Fork 3
/
random_forest.py
96 lines (73 loc) · 3.98 KB
/
random_forest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import numpy as np
from decision_tree import DecisionTree
class RandomForestClassifier():
"""
Random Forest Classifier
Training: Use "train" function with train set features and labels
Predicting: Use "predict" function with test set features
"""
def __init__(self, n_base_learner=10, max_depth=5, min_samples_leaf=1, min_information_gain=0.0, \
numb_of_features_splitting=None, bootstrap_sample_size=None) -> None:
self.n_base_learner = n_base_learner
self.max_depth = max_depth
self.min_samples_leaf = min_samples_leaf
self.min_information_gain = min_information_gain
self.numb_of_features_splitting = numb_of_features_splitting
self.bootstrap_sample_size = bootstrap_sample_size
def _create_bootstrap_samples(self, X, Y) -> tuple:
"""
Creates bootstrap samples for each base learner
"""
bootstrap_samples_X = []
bootstrap_samples_Y = []
for i in range(self.n_base_learner):
if not self.bootstrap_sample_size:
self.bootstrap_sample_size = X.shape[0]
sampled_idx = np.random.choice(X.shape[0], size=self.bootstrap_sample_size, replace=True)
bootstrap_samples_X.append(X[sampled_idx])
bootstrap_samples_Y.append(Y[sampled_idx])
return bootstrap_samples_X, bootstrap_samples_Y
def train(self, X_train: np.array, Y_train: np.array) -> None:
"""Trains the model with given X and Y datasets"""
bootstrap_samples_X, bootstrap_samples_Y = self._create_bootstrap_samples(X_train, Y_train)
self.base_learner_list = []
for base_learner_idx in range(self.n_base_learner):
base_learner = DecisionTree(max_depth=self.max_depth, min_samples_leaf=self.min_samples_leaf, \
min_information_gain=self.min_information_gain,
numb_of_features_splitting=self.numb_of_features_splitting)
base_learner.train(bootstrap_samples_X[base_learner_idx], bootstrap_samples_Y[base_learner_idx])
self.base_learner_list.append(base_learner)
# Calculate feature importance
self.feature_importances = self._calculate_rf_feature_importance(self.base_learner_list)
def _predict_proba_w_base_learners(self, X_set: np.array) -> list:
"""
Creates list of predictions for all base learners
"""
pred_prob_list = []
for base_learner in self.base_learner_list:
pred_prob_list.append(base_learner.predict_proba(X_set))
return pred_prob_list
def predict_proba(self, X_set: np.array) -> list:
"""Returns the predicted probs for a given data set"""
pred_probs = []
base_learners_pred_probs = self._predict_proba_w_base_learners(X_set)
# Average the predicted probabilities of base learners
for obs in range(X_set.shape[0]):
base_learner_probs_for_obs = [a[obs] for a in base_learners_pred_probs]
# Calculate the average for each index
obs_average_pred_probs = np.mean(base_learner_probs_for_obs, axis=0)
pred_probs.append(obs_average_pred_probs)
return pred_probs
def predict(self, X_set: np.array) -> np.array:
"""Returns the predicted labels for a given data set"""
pred_probs = self.predict_proba(X_set)
preds = np.argmax(pred_probs, axis=1)
return preds
def _calculate_rf_feature_importance(self, base_learners):
"""Calcalates the average feature importance of the base learners"""
feature_importance_dict_list = []
for base_learner in base_learners:
feature_importance_dict_list.append(base_learner.feature_importances)
feature_importance_list = [list(x.values()) for x in feature_importance_dict_list]
average_feature_importance = np.mean(feature_importance_list, axis=0)
return average_feature_importance