/
merge.go
155 lines (133 loc) · 3.17 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package v1
// HeapIterator is a heap implementation of BlockQuerier backed by multiple blocks
// It is used to merge multiple blocks into a single ordered querier
// NB(owen-d): it uses a custom heap implementation because Pop() only returns a single
// value of the top-most iterator, rather than the iterator itself
type HeapIterator[T any] struct {
itrs []PeekingIterator[T]
less func(T, T) bool
zero T // zero value of T
cache T
ok bool
}
func NewHeapIterForSeriesWithBloom(queriers ...PeekingIterator[*SeriesWithBloom]) *HeapIterator[*SeriesWithBloom] {
return NewHeapIterator(
func(a, b *SeriesWithBloom) bool {
return a.Series.Fingerprint < b.Series.Fingerprint
},
queriers...,
)
}
func NewHeapIterator[T any](less func(T, T) bool, itrs ...PeekingIterator[T]) *HeapIterator[T] {
res := &HeapIterator[T]{
itrs: itrs,
less: less,
}
res.init()
return res
}
func (mbq HeapIterator[T]) Len() int {
return len(mbq.itrs)
}
func (mbq *HeapIterator[T]) Less(i, j int) bool {
a, aOk := mbq.itrs[i].Peek()
b, bOk := mbq.itrs[j].Peek()
if !bOk {
return true
}
if !aOk {
return false
}
return mbq.less(a, b)
}
func (mbq *HeapIterator[T]) Swap(a, b int) {
mbq.itrs[a], mbq.itrs[b] = mbq.itrs[b], mbq.itrs[a]
}
func (mbq *HeapIterator[T]) Next() (ok bool) {
mbq.cache, ok = mbq.pop()
return
}
// TODO(owen-d): don't swallow this error
func (mbq *HeapIterator[T]) Err() error {
return nil
}
func (mbq *HeapIterator[T]) At() T {
return mbq.cache
}
func (mbq *HeapIterator[T]) push(x PeekingIterator[T]) {
mbq.itrs = append(mbq.itrs, x)
mbq.up(mbq.Len() - 1)
}
func (mbq *HeapIterator[T]) pop() (T, bool) {
for {
if mbq.Len() == 0 {
return mbq.zero, false
}
cur := mbq.itrs[0]
if ok := cur.Next(); !ok {
mbq.remove(0)
continue
}
result := cur.At()
_, ok := cur.Peek()
if !ok {
// that was the end of the iterator. remove it from the heap
mbq.remove(0)
} else {
_ = mbq.down(0)
}
return result, true
}
}
func (mbq *HeapIterator[T]) remove(idx int) {
mbq.Swap(idx, mbq.Len()-1)
mbq.itrs[len(mbq.itrs)-1] = nil // don't leak reference
mbq.itrs = mbq.itrs[:mbq.Len()-1]
mbq.fix(idx)
}
// fix re-establishes the heap ordering after the element at index i has changed its value.
// Changing the value of the element at index i and then calling fix is equivalent to,
// but less expensive than, calling Remove(h, i) followed by a Push of the new value.
// The complexity is O(log n) where n = h.Len().
func (mbq *HeapIterator[T]) fix(i int) {
if !mbq.down(i) {
mbq.up(i)
}
}
func (mbq *HeapIterator[T]) up(j int) {
for {
i := (j - 1) / 2 // parent
if i == j || !mbq.Less(j, i) {
break
}
mbq.Swap(i, j)
j = i
}
}
func (mbq *HeapIterator[T]) down(i0 int) (moved bool) {
i := i0
n := mbq.Len()
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && mbq.Less(j2, j1) {
j = j2 // take the higher priority child index
}
if !mbq.Less(j, i) {
break
}
mbq.Swap(i, j)
i = j
}
return i > i0
}
// establish heap invariants. O(n)
func (mbq *HeapIterator[T]) init() {
n := mbq.Len()
for i := n/2 - 1; i >= 0; i-- {
_ = mbq.down(i)
}
}