-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
fake_span_resolver.go
230 lines (204 loc) · 7.17 KB
/
fake_span_resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// Copyright 2017 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package physicalplan
import (
"bytes"
"context"
"math/rand"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)
const avgRangesPerNode = 5
// fakeSpanResolver is a SpanResolver which splits spans and distributes them to
// nodes randomly. Each Seek() call generates a random distribution with
// expected avgRangesPerNode ranges for each node.
type fakeSpanResolver struct {
nodes []*roachpb.NodeDescriptor
}
var _ SpanResolver = &fakeSpanResolver{}
// NewFakeSpanResolver creates a fake span resolver.
func NewFakeSpanResolver(nodes []*roachpb.NodeDescriptor) SpanResolver {
return &fakeSpanResolver{
nodes: nodes,
}
}
// fakeRange indicates that a range between startKey and endKey is owned by a
// certain node.
type fakeRange struct {
startKey roachpb.Key
endKey roachpb.Key
replica *roachpb.NodeDescriptor
}
type fakeSpanResolverIterator struct {
fsr *fakeSpanResolver
// the fake span resolver needs to perform scans as part of Seek(); these
// scans are performed outside of the context of a txn and with a weak
// isolation so that using the resolver doesn't introduce conflicts.
db *kv.DB
err error
rng *rand.Rand
// ranges are ordered by the key; the start key of the first one is the
// beginning of the current range and the end key of the last one is the end
// of the queried span.
ranges []fakeRange
}
// NewSpanResolverIterator is part of the SpanResolver interface.
func (fsr *fakeSpanResolver) NewSpanResolverIterator(
txn *kv.Txn, optionalOracle replicaoracle.Oracle,
) SpanResolverIterator {
rng, _ := randutil.NewTestRand()
return &fakeSpanResolverIterator{fsr: fsr, db: txn.DB(), rng: rng}
}
// Seek is part of the SpanResolverIterator interface. Each Seek call generates
// a random distribution of the given span.
func (fit *fakeSpanResolverIterator) Seek(
ctx context.Context, span roachpb.Span, scanDir kvcoord.ScanDirection,
) {
// Set aside the last range from the previous seek.
var prevRange fakeRange
if fit.ranges != nil {
prevRange = fit.ranges[len(fit.ranges)-1]
}
// Scan the range and keep a list of all potential split keys. Do so using a
// read_uncommitted scan outside of the txn to avoid undesired side effects
// like breaking tracing and blocking on locks.
var b kv.Batch
b.Header.ReadConsistency = kvpb.READ_UNCOMMITTED
if len(span.EndKey) == 0 {
// If the EndKey is omitted, then the span represents a point request.
// In such case we manually set the EndKey so that the Scan below
// doesn't complain.
span.EndKey = span.Key.Next()
}
b.Scan(span.Key, span.EndKey)
err := fit.db.Run(ctx, &b)
if err != nil {
log.Errorf(ctx, "error in fake span resolver scan: %s", err)
fit.err = err
return
}
kvs := b.Results[0].Rows
// Populate splitKeys with potential split keys; all keys are strictly
// between span.Key and span.EndKey.
var splitKeys []roachpb.Key
lastKey := span.Key
for _, kv := range kvs {
// Extract the key for the row.
splitKey, err := keys.EnsureSafeSplitKey(kv.Key)
if err != nil {
fit.err = err
return
}
if !splitKey.Equal(lastKey) && span.ContainsKey(splitKey) {
splitKeys = append(splitKeys, splitKey)
lastKey = splitKey
}
}
// Generate fake splits. The number of splits is selected randomly between 0
// and a maximum value; we want to generate
// x = #nodes * avgRangesPerNode
// splits on average, so the maximum number is 2x:
// Expected[ rand(2x+1) ] = (0 + 1 + 2 + .. + 2x) / (2x + 1) = x.
maxSplits := 2 * len(fit.fsr.nodes) * avgRangesPerNode
if maxSplits > len(splitKeys) {
maxSplits = len(splitKeys)
}
numSplits := fit.rng.Intn(maxSplits + 1)
// Use Robert Floyd's algorithm to generate numSplits distinct integers
// between 0 and len(splitKeys), just because it's so cool!
chosen := make(map[int]struct{})
for j := len(splitKeys) - numSplits; j < len(splitKeys); j++ {
t := fit.rng.Intn(j + 1)
if _, alreadyChosen := chosen[t]; !alreadyChosen {
// Insert T.
chosen[t] = struct{}{}
} else {
// Insert J.
chosen[j] = struct{}{}
}
}
splits := make([]roachpb.Key, 0, numSplits+2)
splits = append(splits, span.Key)
for i := range splitKeys {
if _, ok := chosen[i]; ok {
splits = append(splits, splitKeys[i])
}
}
splits = append(splits, span.EndKey)
if scanDir == kvcoord.Descending {
// Reverse the order of the splits.
for i := 0; i < len(splits)/2; i++ {
j := len(splits) - i - 1
splits[i], splits[j] = splits[j], splits[i]
}
}
// Build ranges corresponding to the fake splits and assign them random
// replicas.
fit.ranges = make([]fakeRange, len(splits)-1)
for i := range fit.ranges {
fit.ranges[i] = fakeRange{
startKey: splits[i],
endKey: splits[i+1],
replica: fit.fsr.nodes[fit.rng.Intn(len(fit.fsr.nodes))],
}
}
// Check for the case where the last range of the previous Seek() describes
// the same row as this seek. In this case we'll assign the same replica so we
// don't "split" column families of the same row across different replicas.
if prevRange.endKey != nil {
prefix, err := keys.EnsureSafeSplitKey(span.Key)
// EnsureSafeSplitKey returns an error for keys which do not specify a
// column family. In this case we don't need to worry about splitting the
// row.
if err == nil && len(prevRange.endKey) >= len(prefix) &&
bytes.Equal(prefix, prevRange.endKey[:len(prefix)]) {
fit.ranges[0].replica = prevRange.replica
}
}
}
// Valid is part of the SpanResolverIterator interface.
func (fit *fakeSpanResolverIterator) Valid() bool {
return fit.err == nil
}
// Error is part of the SpanResolverIterator interface.
func (fit *fakeSpanResolverIterator) Error() error {
return fit.err
}
// NeedAnother is part of the SpanResolverIterator interface.
func (fit *fakeSpanResolverIterator) NeedAnother() bool {
return len(fit.ranges) > 1
}
// Next is part of the SpanResolverIterator interface.
func (fit *fakeSpanResolverIterator) Next(_ context.Context) {
if len(fit.ranges) <= 1 {
panic("Next called with no more ranges")
}
fit.ranges = fit.ranges[1:]
}
// Desc is part of the SpanResolverIterator interface.
func (fit *fakeSpanResolverIterator) Desc() roachpb.RangeDescriptor {
return roachpb.RangeDescriptor{
StartKey: roachpb.RKey(fit.ranges[0].startKey),
EndKey: roachpb.RKey(fit.ranges[0].endKey),
}
}
// ReplicaInfo is part of the SpanResolverIterator interface.
func (fit *fakeSpanResolverIterator) ReplicaInfo(
_ context.Context,
) (roachpb.ReplicaDescriptor, bool, error) {
n := fit.ranges[0].replica
return roachpb.ReplicaDescriptor{NodeID: n.NodeID}, false, nil
}