-
Notifications
You must be signed in to change notification settings - Fork 239
/
ensemble_boxes_wbf.py
248 lines (210 loc) · 9.62 KB
/
ensemble_boxes_wbf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# coding: utf-8
__author__ = 'ZFTurbo: https://kaggle.com/zfturbo'
import warnings
import numpy as np
def prefilter_boxes(boxes, scores, labels, weights, thr):
# Create dict with boxes stored by its label
new_boxes = dict()
for t in range(len(boxes)):
if len(boxes[t]) != len(scores[t]):
print('Error. Length of boxes arrays not equal to length of scores array: {} != {}'.format(len(boxes[t]), len(scores[t])))
exit()
if len(boxes[t]) != len(labels[t]):
print('Error. Length of boxes arrays not equal to length of labels array: {} != {}'.format(len(boxes[t]), len(labels[t])))
exit()
for j in range(len(boxes[t])):
score = scores[t][j]
if score < thr:
continue
label = int(labels[t][j])
box_part = boxes[t][j]
x1 = float(box_part[0])
y1 = float(box_part[1])
x2 = float(box_part[2])
y2 = float(box_part[3])
# Box data checks
if x2 < x1:
warnings.warn('X2 < X1 value in box. Swap them.')
x1, x2 = x2, x1
if y2 < y1:
warnings.warn('Y2 < Y1 value in box. Swap them.')
y1, y2 = y2, y1
if x1 < 0:
warnings.warn('X1 < 0 in box. Set it to 0.')
x1 = 0
if x1 > 1:
warnings.warn('X1 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.')
x1 = 1
if x2 < 0:
warnings.warn('X2 < 0 in box. Set it to 0.')
x2 = 0
if x2 > 1:
warnings.warn('X2 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.')
x2 = 1
if y1 < 0:
warnings.warn('Y1 < 0 in box. Set it to 0.')
y1 = 0
if y1 > 1:
warnings.warn('Y1 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.')
y1 = 1
if y2 < 0:
warnings.warn('Y2 < 0 in box. Set it to 0.')
y2 = 0
if y2 > 1:
warnings.warn('Y2 > 1 in box. Set it to 1. Check that you normalize boxes in [0, 1] range.')
y2 = 1
if (x2 - x1) * (y2 - y1) == 0.0:
warnings.warn("Zero area box skipped: {}.".format(box_part))
continue
# [label, score, weight, model index, x1, y1, x2, y2]
b = [int(label), float(score) * weights[t], weights[t], t, x1, y1, x2, y2]
if label not in new_boxes:
new_boxes[label] = []
new_boxes[label].append(b)
# Sort each list in dict by score and transform it to numpy array
for k in new_boxes:
current_boxes = np.array(new_boxes[k])
new_boxes[k] = current_boxes[current_boxes[:, 1].argsort()[::-1]]
return new_boxes
def get_weighted_box(boxes, conf_type='avg'):
"""
Create weighted box for set of boxes
:param boxes: set of boxes to fuse
:param conf_type: type of confidence one of 'avg' or 'max'
:return: weighted box (label, score, weight, model index, x1, y1, x2, y2)
"""
box = np.zeros(8, dtype=np.float32)
conf = 0
conf_list = []
w = 0
for b in boxes:
box[4:] += (b[1] * b[4:])
conf += b[1]
conf_list.append(b[1])
w += b[2]
box[0] = boxes[0][0]
if conf_type in ('avg', 'box_and_model_avg', 'absent_model_aware_avg'):
box[1] = conf / len(boxes)
elif conf_type == 'max':
box[1] = np.array(conf_list).max()
box[2] = w
box[3] = -1 # model index field is retained for consistency but is not used.
box[4:] /= conf
return box
def find_matching_box_fast(boxes_list, new_box, match_iou):
"""
Reimplementation of find_matching_box with numpy instead of loops. Gives significant speed up for larger arrays
(~100x). This was previously the bottleneck since the function is called for every entry in the array.
"""
def bb_iou_array(boxes, new_box):
# bb interesection over union
xA = np.maximum(boxes[:, 0], new_box[0])
yA = np.maximum(boxes[:, 1], new_box[1])
xB = np.minimum(boxes[:, 2], new_box[2])
yB = np.minimum(boxes[:, 3], new_box[3])
interArea = np.maximum(xB - xA, 0) * np.maximum(yB - yA, 0)
# compute the area of both the prediction and ground-truth rectangles
boxAArea = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
boxBArea = (new_box[2] - new_box[0]) * (new_box[3] - new_box[1])
iou = interArea / (boxAArea + boxBArea - interArea)
return iou
if boxes_list.shape[0] == 0:
return -1, match_iou
# boxes = np.array(boxes_list)
boxes = boxes_list
ious = bb_iou_array(boxes[:, 4:], new_box[4:])
ious[boxes[:, 0] != new_box[0]] = -1
best_idx = np.argmax(ious)
best_iou = ious[best_idx]
if best_iou <= match_iou:
best_iou = match_iou
best_idx = -1
return best_idx, best_iou
def weighted_boxes_fusion(
boxes_list,
scores_list,
labels_list,
weights=None,
iou_thr=0.55,
skip_box_thr=0.0,
conf_type='avg',
allows_overflow=False
):
'''
:param boxes_list: list of boxes predictions from each model, each box is 4 numbers.
It has 3 dimensions (models_number, model_preds, 4)
Order of boxes: x1, y1, x2, y2. We expect float normalized coordinates [0; 1]
:param scores_list: list of scores for each model
:param labels_list: list of labels for each model
:param weights: list of weights for each model. Default: None, which means weight == 1 for each model
:param iou_thr: IoU value for boxes to be a match
:param skip_box_thr: exclude boxes with score lower than this variable
:param conf_type: how to calculate confidence in weighted boxes.
'avg': average value,
'max': maximum value,
'box_and_model_avg': box and model wise hybrid weighted average,
'absent_model_aware_avg': weighted average that takes into account the absent model.
:param allows_overflow: false if we want confidence score not exceed 1.0
:return: boxes: boxes coordinates (Order of boxes: x1, y1, x2, y2).
:return: scores: confidence scores
:return: labels: boxes labels
'''
if weights is None:
weights = np.ones(len(boxes_list))
if len(weights) != len(boxes_list):
print('Warning: incorrect number of weights {}. Must be: {}. Set weights equal to 1.'.format(len(weights), len(boxes_list)))
weights = np.ones(len(boxes_list))
weights = np.array(weights)
if conf_type not in ['avg', 'max', 'box_and_model_avg', 'absent_model_aware_avg']:
print('Unknown conf_type: {}. Must be "avg", "max" or "box_and_model_avg", or "absent_model_aware_avg"'.format(conf_type))
exit()
filtered_boxes = prefilter_boxes(boxes_list, scores_list, labels_list, weights, skip_box_thr)
if len(filtered_boxes) == 0:
return np.zeros((0, 4)), np.zeros((0,)), np.zeros((0,))
overall_boxes = []
for label in filtered_boxes:
boxes = filtered_boxes[label]
new_boxes = []
weighted_boxes = np.empty((0, 8))
# Clusterize boxes
for j in range(0, len(boxes)):
index, best_iou = find_matching_box_fast(weighted_boxes, boxes[j], iou_thr)
if index != -1:
new_boxes[index].append(boxes[j])
weighted_boxes[index] = get_weighted_box(new_boxes[index], conf_type)
else:
new_boxes.append([boxes[j].copy()])
weighted_boxes = np.vstack((weighted_boxes, boxes[j].copy()))
# Rescale confidence based on number of models and boxes
for i in range(len(new_boxes)):
clustered_boxes = new_boxes[i]
if conf_type == 'box_and_model_avg':
clustered_boxes = np.array(clustered_boxes)
# weighted average for boxes
weighted_boxes[i, 1] = weighted_boxes[i, 1] * len(clustered_boxes) / weighted_boxes[i, 2]
# identify unique model index by model index column
_, idx = np.unique(clustered_boxes[:, 3], return_index=True)
# rescale by unique model weights
weighted_boxes[i, 1] = weighted_boxes[i, 1] * clustered_boxes[idx, 2].sum() / weights.sum()
elif conf_type == 'absent_model_aware_avg':
clustered_boxes = np.array(clustered_boxes)
# get unique model index in the cluster
models = np.unique(clustered_boxes[:, 3]).astype(int)
# create a mask to get unused model weights
mask = np.ones(len(weights), dtype=bool)
mask[models] = False
# absent model aware weighted average
weighted_boxes[i, 1] = weighted_boxes[i, 1] * len(clustered_boxes) / (weighted_boxes[i, 2] + weights[mask].sum())
elif conf_type == 'max':
weighted_boxes[i, 1] = weighted_boxes[i, 1] / weights.max()
elif not allows_overflow:
weighted_boxes[i, 1] = weighted_boxes[i, 1] * min(len(weights), len(clustered_boxes)) / weights.sum()
else:
weighted_boxes[i, 1] = weighted_boxes[i, 1] * len(clustered_boxes) / weights.sum()
overall_boxes.append(weighted_boxes)
overall_boxes = np.concatenate(overall_boxes, axis=0)
overall_boxes = overall_boxes[overall_boxes[:, 1].argsort()[::-1]]
boxes = overall_boxes[:, 4:]
scores = overall_boxes[:, 1]
labels = overall_boxes[:, 0]
return boxes, scores, labels