forked from cockroachdb/cockroach
/
multi_iterator.go
177 lines (158 loc) · 6.78 KB
/
multi_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/LICENSE
package engineccl
import (
"bytes"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)
const invalidIdxSentinel = -1
// multiIterator multiplexes iteration over a number of engine.Iterators.
type multiIterator struct {
iters []engine.SimpleIterator
// The index into `iters` of the iterator currently being pointed at.
currentIdx int
// The indexes of every iterator with the same key as the one in currentIdx.
itersWithCurrentKey []int
// The indexes of every iterator with the same key and timestamp as the one
// in currentIdx.
itersWithCurrentKeyTimestamp []int
// err, if non-nil, is an error encountered by one of the underlying
// Iterators or in some incompatibility between them. If err is non-nil,
// Valid must return false.
err error
}
var _ engine.SimpleIterator = &multiIterator{}
// MakeMultiIterator creates an iterator that multiplexes
// engine.SimpleIterators. The caller is responsible for closing the passed
// iterators after closing the returned multiIterator.
//
// If two iterators have an entry with exactly the same key and timestamp, the
// one with a higher index in this constructor arg is preferred. The other is
// skipped.
func MakeMultiIterator(iters []engine.SimpleIterator) engine.SimpleIterator {
return &multiIterator{
iters: iters,
currentIdx: invalidIdxSentinel,
itersWithCurrentKey: make([]int, 0, len(iters)),
itersWithCurrentKeyTimestamp: make([]int, 0, len(iters)),
}
}
func (f *multiIterator) Close() {
// No-op, multiIterator doesn't close the underlying iterators.
}
// Seek advances the iterator to the first key in the engine which is >= the
// provided key.
func (f *multiIterator) Seek(key engine.MVCCKey) {
for _, iter := range f.iters {
iter.Seek(key)
}
// Each of the iterators now points at the first key >= key. Set currentIdx
// and itersWithCurrentKey but don't advance any of the underlying
// iterators.
f.advance()
}
// Valid must be called after any call to Seek(), Next(), or similar methods. It
// returns (true, nil) if the iterator points to a valid key (it is undefined to
// call UnsafeKey(), UnsafeValue(), or similar methods unless Valid() has
// returned (true, nil)). It returns (false, nil) if the iterator has moved past
// the end of the valid range, or (false, err) if an error has occurred. Valid()
// will never return true with a non-nil error.
func (f *multiIterator) Valid() (bool, error) {
valid := f.currentIdx != invalidIdxSentinel && f.err == nil
return valid, f.err
}
// UnsafeKey returns the current key, but the memory is invalidated on the next
// call to {NextKey,Seek}.
func (f *multiIterator) UnsafeKey() engine.MVCCKey {
return f.iters[f.currentIdx].UnsafeKey()
}
// UnsafeValue returns the current value as a byte slice, but the memory is
// invalidated on the next call to {NextKey,Seek}.
func (f *multiIterator) UnsafeValue() []byte {
return f.iters[f.currentIdx].UnsafeValue()
}
// Next advances the iterator to the next key/value in the iteration. After this
// call, Valid() will be true if the iterator was not positioned at the last
// key.
func (f *multiIterator) Next() {
// Advance each iterator at the current key and timestamp to its next key,
// then recompute currentIdx.
for _, iterIdx := range f.itersWithCurrentKeyTimestamp {
f.iters[iterIdx].Next()
}
f.advance()
}
// NextKey advances the iterator to the next MVCC key. This operation is
// distinct from Next which advances to the next version of the current key or
// the next key if the iterator is currently located at the last version for a
// key.
func (f *multiIterator) NextKey() {
// Advance each iterator at the current key to its next key, then recompute
// currentIdx.
for _, iterIdx := range f.itersWithCurrentKey {
f.iters[iterIdx].NextKey()
}
f.advance()
}
func (f *multiIterator) advance() {
// Loop through every iterator, storing the best next value for currentIdx
// in proposedNextIdx as we go. If it's still invalidIdxSentinel at the end,
// then all the underlying iterators are exhausted and so is this one.
proposedNextIdx := invalidIdxSentinel
for iterIdx, iter := range f.iters {
// If this iterator is exhausted skip it (or error if it's errored).
// TODO(dan): Iterators that are exhausted could be removed to save
// having to check them on all future calls to advance.
if ok, err := iter.Valid(); err != nil {
f.err = err
return
} else if !ok {
continue
}
// Fill proposedMVCCKey with the mvcc key of the current best for the
// next value for currentIdx (or a sentinel that sorts after everything
// if this is the first non-exhausted iterator).
proposedMVCCKey := engine.MVCCKey{Key: keys.MaxKey}
if proposedNextIdx != invalidIdxSentinel {
proposedMVCCKey = f.iters[proposedNextIdx].UnsafeKey()
}
iterMVCCKey := iter.UnsafeKey()
if cmp := bytes.Compare(iterMVCCKey.Key, proposedMVCCKey.Key); cmp < 0 {
// The iterator at iterIdx has a lower key than any seen so far.
// Update proposedNextIdx with it and reset itersWithCurrentKey
// (because everything seen so for must have had a higher key).
f.itersWithCurrentKey = f.itersWithCurrentKey[:0]
f.itersWithCurrentKey = append(f.itersWithCurrentKey, iterIdx)
f.itersWithCurrentKeyTimestamp = f.itersWithCurrentKeyTimestamp[:0]
f.itersWithCurrentKeyTimestamp = append(f.itersWithCurrentKeyTimestamp, iterIdx)
proposedNextIdx = iterIdx
} else if cmp == 0 {
// The iterator at iterIdx has the same key as the current best, add
// it to itersWithCurrentKey and check how the timestamps compare.
f.itersWithCurrentKey = append(f.itersWithCurrentKey, iterIdx)
if proposedMVCCKey.Timestamp == iterMVCCKey.Timestamp {
// We have two exactly equal mvcc keys (both key and timestamps
// match). The one in the later iterator takes precedence and
// the one in the earlier iterator should be omitted from
// iteration.
f.itersWithCurrentKeyTimestamp = append(f.itersWithCurrentKeyTimestamp, iterIdx)
proposedNextIdx = iterIdx
} else if iterMVCCKey.Less(proposedMVCCKey) {
// This iterator sorts before the current best in mvcc sort
// order, so update the current best.
f.itersWithCurrentKeyTimestamp = f.itersWithCurrentKeyTimestamp[:0]
f.itersWithCurrentKeyTimestamp = append(f.itersWithCurrentKeyTimestamp, iterIdx)
proposedNextIdx = iterIdx
}
}
}
// NB: proposedNextIdx will still be invalidIdxSentinel here if this
// iterator is exhausted.
f.currentIdx = proposedNextIdx
}