forked from hyperledger-archives/burrow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multi_iterator.go
115 lines (99 loc) · 2.58 KB
/
multi_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package storage
import (
"bytes"
"container/heap"
)
type MultiIterator struct {
start []byte
end []byte
// Acts as priority queue based on sort order of current key in each iterator
iterators []KVIterator
iteratorOrder map[KVIterator]int
lessComp int
}
// MultiIterator iterates in order over a series o
func NewMultiIterator(reverse bool, iterators ...KVIterator) *MultiIterator {
// reuse backing array
lessComp := -1
if reverse {
lessComp = 1
}
mi := &MultiIterator{
iterators: iterators,
iteratorOrder: make(map[KVIterator]int),
lessComp: lessComp,
}
mi.init()
return mi
}
func (mi *MultiIterator) init() {
validIterators := mi.iterators[:0]
for i, it := range mi.iterators {
mi.iteratorOrder[it] = i
if it.Valid() {
validIterators = append(validIterators, it)
start, end := it.Domain()
if i == 0 || CompareKeys(start, mi.start) == mi.lessComp {
mi.start = start
}
if i == 0 || CompareKeys(mi.end, end) == mi.lessComp {
mi.end = end
}
} else {
// Not clear if this is necessary - fairly sure it is permitted so can't hurt
it.Close()
}
}
mi.iterators = validIterators
heap.Init(mi)
}
// sort.Interface implementation
func (mi *MultiIterator) Len() int {
return len(mi.iterators)
}
func (mi *MultiIterator) Less(i, j int) bool {
comp := bytes.Compare(mi.iterators[i].Key(), mi.iterators[j].Key())
// Use order iterators passed to NewMultiIterator if keys are equal1
return comp == mi.lessComp || (comp == 0 && mi.iteratorOrder[mi.iterators[i]] < mi.iteratorOrder[mi.iterators[j]])
}
func (mi *MultiIterator) Swap(i, j int) {
mi.iterators[i], mi.iterators[j] = mi.iterators[j], mi.iterators[i]
}
func (mi *MultiIterator) Push(x interface{}) {
mi.iterators = append(mi.iterators, x.(KVIterator))
}
func (mi *MultiIterator) Pop() interface{} {
n := len(mi.iterators) - 1
it := mi.iterators[n]
mi.iterators = mi.iterators[:n]
return it
}
func (mi *MultiIterator) Domain() ([]byte, []byte) {
return mi.start, mi.end
}
func (mi *MultiIterator) Valid() bool {
return len(mi.iterators) > 0
}
func (mi *MultiIterator) Next() {
// Always advance the lowest iterator - the same one we serve the KV pair from
it := heap.Pop(mi).(KVIterator)
it.Next()
if it.Valid() {
heap.Push(mi, it)
}
}
func (mi *MultiIterator) Key() []byte {
return mi.Peek().Key()
}
func (mi *MultiIterator) Value() []byte {
return mi.Peek().Value()
}
func (mi *MultiIterator) Peek() KVIterator {
return mi.iterators[0]
}
func (mi *MultiIterator) Close() {
// Close any remaining valid iterators
for _, it := range mi.iterators {
it.Close()
}
}