-
Notifications
You must be signed in to change notification settings - Fork 29
/
sgm.py
431 lines (363 loc) · 17.9 KB
/
sgm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
"""
python implementation of the semi-global matching algorithm from Stereo Processing by Semi-Global Matching
and Mutual Information (https://core.ac.uk/download/pdf/11134866.pdf) by Heiko Hirschmuller.
author: David-Alexandre Beaupre
date: 2019/07/12
"""
import argparse
import sys
import time as t
import cv2
import numpy as np
class Direction:
def __init__(self, direction=(0, 0), name='invalid'):
"""
represent a cardinal direction in image coordinates (top left = (0, 0) and bottom right = (1, 1)).
:param direction: (x, y) for cardinal direction.
:param name: common name of said direction.
"""
self.direction = direction
self.name = name
# 8 defined directions for sgm
N = Direction(direction=(0, -1), name='north')
NE = Direction(direction=(1, -1), name='north-east')
E = Direction(direction=(1, 0), name='east')
SE = Direction(direction=(1, 1), name='south-east')
S = Direction(direction=(0, 1), name='south')
SW = Direction(direction=(-1, 1), name='south-west')
W = Direction(direction=(-1, 0), name='west')
NW = Direction(direction=(-1, -1), name='north-west')
class Paths:
def __init__(self):
"""
represent the relation between the directions.
"""
self.paths = [N, NE, E, SE, S, SW, W, NW]
self.size = len(self.paths)
self.effective_paths = [(E, W), (SE, NW), (S, N), (SW, NE)]
class Parameters:
def __init__(self, max_disparity=64, P1=5, P2=70, csize=(7, 7), bsize=(3, 3)):
"""
represent all parameters used in the sgm algorithm.
:param max_disparity: maximum distance between the same pixel in both images.
:param P1: penalty for disparity difference = 1
:param P2: penalty for disparity difference > 1
:param csize: size of the kernel for the census transform.
:param bsize: size of the kernel for blurring the images and median filtering.
"""
self.max_disparity = max_disparity
self.P1 = P1
self.P2 = P2
self.csize = csize
self.bsize = bsize
def load_images(left_name, right_name, parameters):
"""
read and blur stereo image pair.
:param left_name: name of the left image.
:param right_name: name of the right image.
:param parameters: structure containing parameters of the algorithm.
:return: blurred left and right images.
"""
left = cv2.imread(left_name, 0)
left = cv2.GaussianBlur(left, parameters.bsize, 0, 0)
right = cv2.imread(right_name, 0)
right = cv2.GaussianBlur(right, parameters.bsize, 0, 0)
return left, right
def get_indices(offset, dim, direction, height):
"""
for the diagonal directions (SE, SW, NW, NE), return the array of indices for the current slice.
:param offset: difference with the main diagonal of the cost volume.
:param dim: number of elements along the path.
:param direction: current aggregation direction.
:param height: H of the cost volume.
:return: arrays for the y (H dimension) and x (W dimension) indices.
"""
y_indices = []
x_indices = []
for i in range(0, dim):
if direction == SE.direction:
if offset < 0:
y_indices.append(-offset + i)
x_indices.append(0 + i)
else:
y_indices.append(0 + i)
x_indices.append(offset + i)
if direction == SW.direction:
if offset < 0:
y_indices.append(height + offset - i)
x_indices.append(0 + i)
else:
y_indices.append(height - i)
x_indices.append(offset + i)
return np.array(y_indices), np.array(x_indices)
def get_path_cost(slice, offset, parameters):
"""
part of the aggregation step, finds the minimum costs in a D x M slice (where M = the number of pixels in the
given direction)
:param slice: M x D array from the cost volume.
:param offset: ignore the pixels on the border.
:param parameters: structure containing parameters of the algorithm.
:return: M x D array of the minimum costs for a given slice in a given direction.
"""
other_dim = slice.shape[0]
disparity_dim = slice.shape[1]
disparities = [d for d in range(disparity_dim)] * disparity_dim
disparities = np.array(disparities).reshape(disparity_dim, disparity_dim)
penalties = np.zeros(shape=(disparity_dim, disparity_dim), dtype=slice.dtype)
penalties[np.abs(disparities - disparities.T) == 1] = parameters.P1
penalties[np.abs(disparities - disparities.T) > 1] = parameters.P2
minimum_cost_path = np.zeros(shape=(other_dim, disparity_dim), dtype=slice.dtype)
minimum_cost_path[offset - 1, :] = slice[offset - 1, :]
for i in range(offset, other_dim):
previous_cost = minimum_cost_path[i - 1, :]
current_cost = slice[i, :]
costs = np.repeat(previous_cost, repeats=disparity_dim, axis=0).reshape(disparity_dim, disparity_dim)
costs = np.amin(costs + penalties, axis=0)
minimum_cost_path[i, :] = current_cost + costs - np.amin(previous_cost)
return minimum_cost_path
def aggregate_costs(cost_volume, parameters, paths):
"""
second step of the sgm algorithm, aggregates matching costs for N possible directions (8 in this case).
:param cost_volume: array containing the matching costs.
:param parameters: structure containing parameters of the algorithm.
:param paths: structure containing all directions in which to aggregate costs.
:return: H x W x D x N array of matching cost for all defined directions.
"""
height = cost_volume.shape[0]
width = cost_volume.shape[1]
disparities = cost_volume.shape[2]
start = -(height - 1)
end = width - 1
aggregation_volume = np.zeros(shape=(height, width, disparities, paths.size), dtype=cost_volume.dtype)
path_id = 0
for path in paths.effective_paths:
print('\tProcessing paths {} and {}...'.format(path[0].name, path[1].name), end='')
sys.stdout.flush()
dawn = t.time()
main_aggregation = np.zeros(shape=(height, width, disparities), dtype=cost_volume.dtype)
opposite_aggregation = np.copy(main_aggregation)
main = path[0]
if main.direction == S.direction:
for x in range(0, width):
south = cost_volume[0:height, x, :]
north = np.flip(south, axis=0)
main_aggregation[:, x, :] = get_path_cost(south, 1, parameters)
opposite_aggregation[:, x, :] = np.flip(get_path_cost(north, 1, parameters), axis=0)
if main.direction == E.direction:
for y in range(0, height):
east = cost_volume[y, 0:width, :]
west = np.flip(east, axis=0)
main_aggregation[y, :, :] = get_path_cost(east, 1, parameters)
opposite_aggregation[y, :, :] = np.flip(get_path_cost(west, 1, parameters), axis=0)
if main.direction == SE.direction:
for offset in range(start, end):
south_east = cost_volume.diagonal(offset=offset).T
north_west = np.flip(south_east, axis=0)
dim = south_east.shape[0]
y_se_idx, x_se_idx = get_indices(offset, dim, SE.direction, None)
y_nw_idx = np.flip(y_se_idx, axis=0)
x_nw_idx = np.flip(x_se_idx, axis=0)
main_aggregation[y_se_idx, x_se_idx, :] = get_path_cost(south_east, 1, parameters)
opposite_aggregation[y_nw_idx, x_nw_idx, :] = get_path_cost(north_west, 1, parameters)
if main.direction == SW.direction:
for offset in range(start, end):
south_west = np.flipud(cost_volume).diagonal(offset=offset).T
north_east = np.flip(south_west, axis=0)
dim = south_west.shape[0]
y_sw_idx, x_sw_idx = get_indices(offset, dim, SW.direction, height - 1)
y_ne_idx = np.flip(y_sw_idx, axis=0)
x_ne_idx = np.flip(x_sw_idx, axis=0)
main_aggregation[y_sw_idx, x_sw_idx, :] = get_path_cost(south_west, 1, parameters)
opposite_aggregation[y_ne_idx, x_ne_idx, :] = get_path_cost(north_east, 1, parameters)
aggregation_volume[:, :, :, path_id] = main_aggregation
aggregation_volume[:, :, :, path_id + 1] = opposite_aggregation
path_id = path_id + 2
dusk = t.time()
print('\t(done in {:.2f}s)'.format(dusk - dawn))
return aggregation_volume
def compute_costs(left, right, parameters, save_images):
"""
first step of the sgm algorithm, matching cost based on census transform and hamming distance.
:param left: left image.
:param right: right image.
:param parameters: structure containing parameters of the algorithm.
:param save_images: whether to save census images or not.
:return: H x W x D array with the matching costs.
"""
assert left.shape[0] == right.shape[0] and left.shape[1] == right.shape[1], 'left & right must have the same shape.'
assert parameters.max_disparity > 0, 'maximum disparity must be greater than 0.'
height = left.shape[0]
width = left.shape[1]
cheight = parameters.csize[0]
cwidth = parameters.csize[1]
y_offset = int(cheight / 2)
x_offset = int(cwidth / 2)
disparity = parameters.max_disparity
left_img_census = np.zeros(shape=(height, width), dtype=np.uint8)
right_img_census = np.zeros(shape=(height, width), dtype=np.uint8)
left_census_values = np.zeros(shape=(height, width), dtype=np.uint64)
right_census_values = np.zeros(shape=(height, width), dtype=np.uint64)
print('\tComputing left and right census...', end='')
sys.stdout.flush()
dawn = t.time()
# pixels on the border will have no census values
for y in range(y_offset, height - y_offset):
for x in range(x_offset, width - x_offset):
left_census = np.int64(0)
center_pixel = left[y, x]
reference = np.full(shape=(cheight, cwidth), fill_value=center_pixel, dtype=np.int64)
image = left[(y - y_offset):(y + y_offset + 1), (x - x_offset):(x + x_offset + 1)]
comparison = image - reference
for j in range(comparison.shape[0]):
for i in range(comparison.shape[1]):
if (i, j) != (y_offset, x_offset):
left_census = left_census << 1
if comparison[j, i] < 0:
bit = 1
else:
bit = 0
left_census = left_census | bit
left_img_census[y, x] = np.uint8(left_census)
left_census_values[y, x] = left_census
right_census = np.int64(0)
center_pixel = right[y, x]
reference = np.full(shape=(cheight, cwidth), fill_value=center_pixel, dtype=np.int64)
image = right[(y - y_offset):(y + y_offset + 1), (x - x_offset):(x + x_offset + 1)]
comparison = image - reference
for j in range(comparison.shape[0]):
for i in range(comparison.shape[1]):
if (i, j) != (y_offset, x_offset):
right_census = right_census << 1
if comparison[j, i] < 0:
bit = 1
else:
bit = 0
right_census = right_census | bit
right_img_census[y, x] = np.uint8(right_census)
right_census_values[y, x] = right_census
dusk = t.time()
print('\t(done in {:.2f}s)'.format(dusk - dawn))
if save_images:
cv2.imwrite('left_census.png', left_img_census)
cv2.imwrite('right_census.png', right_img_census)
print('\tComputing cost volumes...', end='')
sys.stdout.flush()
dawn = t.time()
left_cost_volume = np.zeros(shape=(height, width, disparity), dtype=np.uint32)
right_cost_volume = np.zeros(shape=(height, width, disparity), dtype=np.uint32)
lcensus = np.zeros(shape=(height, width), dtype=np.int64)
rcensus = np.zeros(shape=(height, width), dtype=np.int64)
for d in range(0, disparity):
rcensus[:, (x_offset + d):(width - x_offset)] = right_census_values[:, x_offset:(width - d - x_offset)]
left_xor = np.int64(np.bitwise_xor(np.int64(left_census_values), rcensus))
left_distance = np.zeros(shape=(height, width), dtype=np.uint32)
while not np.all(left_xor == 0):
tmp = left_xor - 1
mask = left_xor != 0
left_xor[mask] = np.bitwise_and(left_xor[mask], tmp[mask])
left_distance[mask] = left_distance[mask] + 1
left_cost_volume[:, :, d] = left_distance
lcensus[:, x_offset:(width - d - x_offset)] = left_census_values[:, (x_offset + d):(width - x_offset)]
right_xor = np.int64(np.bitwise_xor(np.int64(right_census_values), lcensus))
right_distance = np.zeros(shape=(height, width), dtype=np.uint32)
while not np.all(right_xor == 0):
tmp = right_xor - 1
mask = right_xor != 0
right_xor[mask] = np.bitwise_and(right_xor[mask], tmp[mask])
right_distance[mask] = right_distance[mask] + 1
right_cost_volume[:, :, d] = right_distance
dusk = t.time()
print('\t(done in {:.2f}s)'.format(dusk - dawn))
return left_cost_volume, right_cost_volume
def select_disparity(aggregation_volume):
"""
last step of the sgm algorithm, corresponding to equation 14 followed by winner-takes-all approach.
:param aggregation_volume: H x W x D x N array of matching cost for all defined directions.
:return: disparity image.
"""
volume = np.sum(aggregation_volume, axis=3)
disparity_map = np.argmin(volume, axis=2)
return disparity_map
def normalize(volume, parameters):
"""
transforms values from the range (0, 64) to (0, 255).
:param volume: n dimension array to normalize.
:param parameters: structure containing parameters of the algorithm.
:return: normalized array.
"""
return 255.0 * volume / parameters.max_disparity
def get_recall(disparity, gt, args):
"""
computes the recall of the disparity map.
:param disparity: disparity image.
:param gt: path to ground-truth image.
:param args: program arguments.
:return: rate of correct predictions.
"""
gt = np.float32(cv2.imread(gt, cv2.IMREAD_GRAYSCALE))
gt = np.int16(gt / 255.0 * float(args.disp))
disparity = np.int16(np.float32(disparity) / 255.0 * float(args.disp))
correct = np.count_nonzero(np.abs(disparity - gt) <= 3)
return float(correct) / gt.size
def sgm():
"""
main function applying the semi-global matching algorithm.
:return: void.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--left', default='cones/im2.png', help='name (path) to the left image')
parser.add_argument('--right', default='cones/im6.png', help='name (path) to the right image')
parser.add_argument('--left_gt', default='cones/disp2.png', help='name (path) to the left ground-truth image')
parser.add_argument('--right_gt', default='cones/disp6.png', help='name (path) to the right ground-truth image')
parser.add_argument('--output', default='disparity_map.png', help='name of the output image')
parser.add_argument('--disp', default=64, type=int, help='maximum disparity for the stereo pair')
parser.add_argument('--images', default=False, type=bool, help='save intermediate representations')
parser.add_argument('--eval', default=True, type=bool, help='evaluate disparity map with 3 pixel error')
args = parser.parse_args()
left_name = args.left
right_name = args.right
left_gt_name = args.left_gt
right_gt_name = args.right_gt
output_name = args.output
disparity = args.disp
save_images = args.images
evaluation = args.eval
dawn = t.time()
parameters = Parameters(max_disparity=disparity, P1=10, P2=120, csize=(7, 7), bsize=(3, 3))
paths = Paths()
print('\nLoading images...')
left, right = load_images(left_name, right_name, parameters)
print('\nStarting cost computation...')
left_cost_volume, right_cost_volume = compute_costs(left, right, parameters, save_images)
if save_images:
left_disparity_map = np.uint8(normalize(np.argmin(left_cost_volume, axis=2), parameters))
cv2.imwrite('disp_map_left_cost_volume.png', left_disparity_map)
right_disparity_map = np.uint8(normalize(np.argmin(right_cost_volume, axis=2), parameters))
cv2.imwrite('disp_map_right_cost_volume.png', right_disparity_map)
print('\nStarting left aggregation computation...')
left_aggregation_volume = aggregate_costs(left_cost_volume, parameters, paths)
print('\nStarting right aggregation computation...')
right_aggregation_volume = aggregate_costs(right_cost_volume, parameters, paths)
print('\nSelecting best disparities...')
left_disparity_map = np.uint8(normalize(select_disparity(left_aggregation_volume), parameters))
right_disparity_map = np.uint8(normalize(select_disparity(right_aggregation_volume), parameters))
if save_images:
cv2.imwrite('left_disp_map_no_post_processing.png', left_disparity_map)
cv2.imwrite('right_disp_map_no_post_processing.png', right_disparity_map)
print('\nApplying median filter...')
left_disparity_map = cv2.medianBlur(left_disparity_map, parameters.bsize[0])
right_disparity_map = cv2.medianBlur(right_disparity_map, parameters.bsize[0])
cv2.imwrite(f'left_{output_name}', left_disparity_map)
cv2.imwrite(f'right_{output_name}', right_disparity_map)
if evaluation:
print('\nEvaluating left disparity map...')
recall = get_recall(left_disparity_map, left_gt_name, args)
print('\tRecall = {:.2f}%'.format(recall * 100.0))
print('\nEvaluating right disparity map...')
recall = get_recall(right_disparity_map, right_gt_name, args)
print('\tRecall = {:.2f}%'.format(recall * 100.0))
dusk = t.time()
print('\nFin.')
print('\nTotal execution time = {:.2f}s'.format(dusk - dawn))
if __name__ == '__main__':
sgm()