forked from lovoo/goka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
merge_iterator.go
143 lines (114 loc) · 2.94 KB
/
merge_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package storage
import (
"bytes"
"container/heap"
)
type iterHeap []Iterator
func (h iterHeap) Len() int {
return len(h)
}
func (h iterHeap) Less(i, j int) bool {
return bytes.Compare(h[i].Key(), h[j].Key()) == -1
}
func (h iterHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *iterHeap) Push(x interface{}) {
*h = append(*h, x.(Iterator))
}
func (h *iterHeap) Pop() interface{} {
dref := *h
x := dref[len(dref)-1]
*h = dref[:len(dref)-1]
return x
}
type mergeIterator struct {
key []byte
value []byte
err error
heap iterHeap
iters []Iterator
}
// NewMultiIterator returns an Iterator that iterates over the given subiterators.
// Iteration happens in lexicographical order given that the subiterators also
// return values in order.
func NewMultiIterator(iters []Iterator) Iterator {
miter := &mergeIterator{
iters: iters,
heap: make([]Iterator, 0, len(iters)),
}
miter.buildHeap(func(i Iterator) bool { return i.Next() })
return miter
}
func (m *mergeIterator) buildHeap(hasValue func(i Iterator) bool) {
m.heap = m.heap[:0]
for _, iter := range m.iters {
if !hasValue(iter) {
if m.err = iter.Err(); m.err != nil {
return
}
continue
}
heap.Push(&m.heap, iter)
}
}
// Key returns the current key. Caller should not keep references to the
// buffer or modify its contents.
func (m *mergeIterator) Key() []byte {
return m.key
}
// Value returns the current value. Caller should not keep references to the
// buffer or modify its contents.
func (m *mergeIterator) Value() ([]byte, error) {
return m.value, nil
}
// Seek moves the iterator to the beginning of a key-value pair sequence that
// is greater or equal to the given key. It returns whether at least one
// such key-value pairs exist.
func (m *mergeIterator) Seek(key []byte) bool {
if m.err != nil {
return false
}
m.buildHeap(func(i Iterator) bool { return i.Seek(key) })
return m.err == nil && len(m.heap) > 0
}
// Next advances the iterator to the next key-value pair. If there is no next
// pair, false is returned. Error should be checked after receiving false by
// calling Error().
func (m *mergeIterator) Next() bool {
if m.err != nil || len(m.heap) == 0 {
return false
}
iter := heap.Pop(&m.heap).(Iterator)
// cache the values as the underlying iterator might reuse its buffers on
// call to Next
m.key = append(m.key[:0], iter.Key()...)
val, err := iter.Value()
if err != nil {
m.err = err
return false
}
m.value = append(m.value[:0], val...)
if iter.Next() {
heap.Push(&m.heap, iter)
} else if m.err = iter.Err(); m.err != nil {
return false
}
return true
}
// Err returns the possible iteration error.
func (m *mergeIterator) Err() error {
return m.err
}
// Release frees up the resources used by the iterator. This will also release
// the subiterators.
func (m *mergeIterator) Release() {
for i := range m.iters {
m.iters[i].Release()
}
m.iters = nil
m.heap = nil
m.key = nil
m.value = nil
m.err = nil
}