-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecision_tree.py
More file actions
245 lines (188 loc) · 6.99 KB
/
decision_tree.py
File metadata and controls
245 lines (188 loc) · 6.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
"""
Simple decision tree implementation using Numpy.
"""
from collections import Counter
import numpy as np
class _Node:
"""
Internal structure representing a general Node type.
"""
def __init__(self, value):
self.value = value
@property
def is_leaf(self):
return False
def __eq__(self, other):
if self.is_leaf and other.is_leaf:
return self.value == other.value
return False
class Node(_Node):
"""
Structure representing inner nodes of a decision tree.
Each inner node should have two branches, a dataset feature column index,
threshold value and a couple of additional parameters helpful for
visualization purposes or inspecting learned tree properties.
"""
def __init__(self, feature, value, gini, counts, depth, left, right):
super().__init__(value)
self.feature = feature
self.value = value
self.gini = gini
self.counts = counts
self.depth = depth
self.left = left
self.right = right
class Leaf(_Node):
"""
Leaf node of decision tree.
The leaf nodes doesn't have branches and their value represents a predicted
class.
"""
@property
def is_leaf(self):
return True
def learn_tree(X, y, max_depth=5, min_split_size=10,
features_subset_size=None, min_leaf_size=None) -> Node:
"""Creates a decision tree based on provided dataset.
Returns:
Node: The root node of the created tree.
"""
features = np.arange(X.shape[1])
min_leaf_size = min_leaf_size or 1
def learn(x_index, y_index, depth):
"""Recursive function called to split the node into left and right
children. Returns majority class in case if node cannot be split or
contains observations belonging to the same class.
Note that the function operates with array indexes instead of arrays
themselves. It means that the original arrays are not copied or
modified during training process.
Args:
x_index: Subset of observations in the node.
y_index: Subset of respective observation targets.
depth: Current depth level.
Returns:
Node: Decision tree node.
"""
shortcut = (
single_class_node(y_index) or
too_small_for_split(x_index) or
max_depth_exceeded(depth))
if shortcut:
return majority_vote(y_index)
best_gini = np.inf
best_feature = None
best_split = None
best_value = None
targets = y[y_index]
counts = Counter(targets)
classes = np.unique(targets)
if features_subset_size is None:
considered_features = features
else:
considered_features = np.random.choice(
features, size=features_subset_size, replace=False)
for feature in considered_features:
column = X[x_index][:, feature]
for threshold in column:
left = mask(column <= threshold, column)
if too_few_samples_per_child(left):
continue
gini = gini_index(y_index, classes, left)
if gini < best_gini:
best_gini = gini
best_feature = feature
best_split = left
best_value = threshold
if best_split is None:
return majority_vote(y_index)
x_left = x_index[best_split]
x_right = x_index[~best_split]
y_left = y_index[best_split]
y_right = y_index[~best_split]
left = learn(x_left, y_left, depth + 1)
right = learn(x_right, y_right, depth + 1)
if left == right:
return Leaf(left.value)
return Node(feature=best_feature, value=best_value, counts=counts,
gini=best_gini, depth=depth, left=left, right=right)
def gini_index(node_subset, classes, left):
"""
Computes Gini index for a node split.
Args:
node_subset: Subset of observations in the node.
classes: List of unique classes represented in the node.
left: Subset of the node's observations assigned to the left child.
Return:
total_gini: The node split's Gini score.
"""
total_gini = 0.0
n_total = node_subset.shape[0]
right = ~left
for branch_subset in (left, right):
group = y[node_subset][branch_subset]
size = group.shape[0]
if size == 0:
continue
keys, values = np.unique(group, return_counts=True)
counts = dict(zip(keys, values))
ratios = np.array([counts.get(value, 0)/size for value in classes])
score = (ratios ** 2).sum()
total_gini += size*(1.0 - score)/n_total
return total_gini
def majority_vote(index):
"""
Returns the majority value of an array.
"""
arr = y[index]
value = np.argmax(np.bincount(arr.astype(int)))
return Leaf(value)
def single_class_node(y_index):
"""
Returns true if all elements of sequence are equal to the same value.
"""
return np.unique(y[y_index]).shape[0] == 1
def too_small_for_split(index):
"""
Checks if node is too small to being split.
"""
return min_split_size is not None and index.shape[0] < min_split_size
def max_depth_exceeded(depth):
"""
Checks if maximum tree depth is reached.
"""
return max_depth is not None and depth >= max_depth
def too_few_samples_per_child(index):
"""
Checks if leaf split generates enough instances in the child node.
Note that "index" array is a boolean mask which selects a subset of
parent nodes to group them into child node. True values select values
of left node, while False values - for the right one.
"""
total = len(index)
left_node_samples = index.sum()
right_node_samples = total - left_node_samples
return (left_node_samples < min_leaf_size or
right_node_samples < min_leaf_size)
x_root_index = np.arange(X.shape[0])
y_root_index = np.arange(y.shape[0])
return learn(x_root_index, y_root_index, 0)
def predict_tree(tree, X):
"""
Makes a prediction for each sample of provided subset of data using
a single decision tree.
"""
def predict(node, sample):
if node.is_leaf:
return node.value
feature, threshold = node.feature, node.value
if sample[feature] <= threshold:
return predict(node.left, sample)
else:
return predict(node.right, sample)
predictions = np.array([predict(tree, sample) for sample in X])
return predictions
def mask(condition, arr):
"""
Returns a mask selecting array values meeting the condition.
"""
return np.ma.masked_where(condition, arr).mask