-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
watcher.go
370 lines (340 loc) · 10.9 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvnemesis
import (
"context"
"fmt"
"math"
"math/rand"
"reflect"
"strings"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
kvpb "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
// Watcher slurps all changes that happen to some span of kvs using RangeFeed.
type Watcher struct {
env *Env
mu struct {
syncutil.Mutex
kvs *Engine
frontier span.Frontier
frontierWaiters map[hlc.Timestamp][]chan error
}
cancel func()
g ctxgroup.Group
}
// Watch starts a new Watcher over the given span of kvs. See Watcher.
func Watch(ctx context.Context, env *Env, dbs []*kv.DB, dataSpan roachpb.Span) (*Watcher, error) {
if len(dbs) < 1 {
return nil, errors.New(`at least one db must be given`)
}
firstDB := dbs[0]
w := &Watcher{
env: env,
}
var err error
if w.mu.kvs, err = MakeEngine(); err != nil {
return nil, err
}
w.mu.frontier, err = span.MakeFrontier(dataSpan)
if err != nil {
return nil, err
}
w.mu.frontierWaiters = make(map[hlc.Timestamp][]chan error)
ctx, w.cancel = context.WithCancel(ctx)
w.g = ctxgroup.WithContext(ctx)
dss := make([]*kvcoord.DistSender, len(dbs))
for i := range dbs {
sender := dbs[i].NonTransactionalSender()
dss[i] = sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
}
startTs := firstDB.Clock().Now()
eventC := make(chan kvcoord.RangeFeedMessage, 128)
w.g.GoCtx(func(ctx context.Context) error {
ts := startTs
for i := 0; ; i = (i + 1) % len(dbs) {
w.mu.Lock()
ts.Forward(w.mu.frontier.Frontier())
w.mu.Unlock()
ds := dss[i]
err := ds.RangeFeed(ctx, []roachpb.Span{dataSpan}, ts, eventC, kvcoord.WithDiff())
if isRetryableRangeFeedErr(err) {
log.Infof(ctx, "got retryable RangeFeed error: %+v", err)
continue
}
return err
}
})
w.g.GoCtx(func(ctx context.Context) error {
return w.processEvents(ctx, eventC)
})
// Make sure the RangeFeed has started up, else we might lose some events.
if err := w.WaitForFrontier(ctx, startTs); err != nil {
_ = w.Finish()
return nil, err
}
return w, nil
}
func isRetryableRangeFeedErr(err error) bool {
switch {
case errors.Is(err, context.Canceled):
return false
default:
return true
}
}
// Finish tears down the Watcher and returns all the kvs it has ingested. It may
// be called multiple times, though not concurrently.
func (w *Watcher) Finish() *Engine {
if w.cancel == nil {
// Finish was already called.
return w.mu.kvs
}
defer w.mu.frontier.Release()
w.cancel()
w.cancel = nil
// Only WaitForFrontier cares about errors.
_ = w.g.Wait()
return w.mu.kvs
}
// WaitForFrontier blocks until all kv changes <= the given timestamp are
// guaranteed to have been ingested.
func (w *Watcher) WaitForFrontier(ctx context.Context, ts hlc.Timestamp) (retErr error) {
log.Infof(ctx, `watcher waiting for %s`, ts)
if err := w.env.SetClosedTimestampInterval(ctx, 1*time.Millisecond); err != nil {
return err
}
defer func() {
if err := w.env.ResetClosedTimestampInterval(ctx); err != nil {
retErr = errors.WithSecondaryError(retErr, err)
}
}()
resultCh := make(chan error, 1)
w.mu.Lock()
w.mu.frontierWaiters[ts] = append(w.mu.frontierWaiters[ts], resultCh)
w.mu.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-resultCh:
return err
}
}
func (w *Watcher) processEvents(ctx context.Context, eventC chan kvcoord.RangeFeedMessage) error {
for {
select {
case <-ctx.Done():
return nil
case event := <-eventC:
switch e := event.GetValue().(type) {
case *kvpb.RangeFeedError:
return e.Error.GoError()
case *kvpb.RangeFeedValue:
if err := w.handleValue(ctx, roachpb.Span{Key: e.Key}, e.Value, &e.PrevValue); err != nil {
return err
}
case *kvpb.RangeFeedDeleteRange:
if err := w.handleValue(ctx, e.Span, roachpb.Value{Timestamp: e.Timestamp}, nil /* prevV */); err != nil {
return err
}
case *kvpb.RangeFeedCheckpoint:
if err := w.handleCheckpoint(ctx, e.Span, e.ResolvedTS); err != nil {
return err
}
case *kvpb.RangeFeedSSTable:
if err := w.handleSSTable(ctx, e.Data); err != nil {
return err
}
default:
return errors.Errorf("unknown event: %T", e)
}
}
}
}
func (w *Watcher) handleValue(
ctx context.Context, span roachpb.Span, v roachpb.Value, prevV *roachpb.Value,
) error {
w.mu.Lock()
defer w.mu.Unlock()
return w.handleValueLocked(ctx, span, v, prevV)
}
func (w *Watcher) handleValueLocked(
ctx context.Context, span roachpb.Span, v roachpb.Value, prevV *roachpb.Value,
) error {
var buf strings.Builder
fmt.Fprintf(&buf, `rangefeed %s %s -> %s`, span, v.Timestamp, v.PrettyPrint())
if prevV != nil {
fmt.Fprintf(&buf, ` (prev %s)`, prevV.PrettyPrint())
}
// TODO(dan): If the exact key+ts is put into kvs more than once, the
// Engine will keep the last. This matches our txn semantics (if a key
// is written in a transaction more than once, only the last is kept)
// but it means that we'll won't catch it if we violate those semantics.
// Consider first doing a Get and somehow failing if this exact key+ts
// has previously been put with a different value.
if len(span.EndKey) > 0 {
// If we have two operations that are not atomic (i.e. aren't in a batch)
// and they produce touching tombstones at the same timestamp, then
// `.mu.kvs` will merge them but they wouldn't be merged in pebble, since
// their MVCCValueHeader will contain different seqnos (and thus the value
// isn't identical). To work around that, we put random stuff in here. This
// is never interpreted - the seqno is only pulled out via an interceptor at
// the rangefeed boundary, and handed to the tracker. This is merely our
// local copy.
//
// See https://github.com/cockroachdb/cockroach/issues/92822.
var vh enginepb.MVCCValueHeader
vh.KVNemesisSeq.Set(kvnemesisutil.Seq(rand.Int63n(math.MaxUint32)))
mvccV := storage.MVCCValue{
MVCCValueHeader: vh,
}
sl, err := storage.EncodeMVCCValue(mvccV)
if err != nil {
return err
}
w.mu.kvs.DeleteRange(span.Key, span.EndKey, v.Timestamp, sl)
return nil
}
// Handle a point write.
w.mu.kvs.Put(storage.MVCCKey{Key: span.Key, Timestamp: v.Timestamp}, v.RawBytes)
if prevV != nil {
prevTs := v.Timestamp.Prev()
getPrevV := w.mu.kvs.Get(span.Key, prevTs)
// RangeFeed doesn't send the timestamps of the previous values back
// because changefeeds don't need them. It would likely be easy to
// implement, but would add unnecessary allocations in changefeeds,
// which don't need them. This means we'd want to make it an option in
// the request, which seems silly to do for only this test.
getPrevV.Timestamp = hlc.Timestamp{}
// Additionally, ensure that deletion tombstones and missing keys are
// normalized as the nil slice, so that they can be matched properly
// between the RangeFeed and the Engine.
if len(prevV.RawBytes) == 0 {
prevV.RawBytes = nil
}
prevValueMismatch := !reflect.DeepEqual(prevV, &getPrevV)
var engineContents string
if prevValueMismatch {
engineContents = w.mu.kvs.DebugPrint(" ")
}
if prevValueMismatch {
log.Infof(ctx, "rangefeed mismatch\n%s", engineContents)
s := mustGetStringValue(getPrevV.RawBytes)
fmt.Println(s)
return errors.Errorf(
`expected (%s, %s) has previous value %s in kvs, but rangefeed has: %s`,
span, prevTs, mustGetStringValue(getPrevV.RawBytes), mustGetStringValue(prevV.RawBytes))
}
}
return nil
}
func (w *Watcher) handleCheckpoint(
ctx context.Context, span roachpb.Span, resolvedTS hlc.Timestamp,
) error {
w.mu.Lock()
defer w.mu.Unlock()
frontierAdvanced, err := w.mu.frontier.Forward(span, resolvedTS)
if err != nil {
return errors.Wrapf(err, "unexpected frontier error advancing to %s@%s", span, resolvedTS)
}
if frontierAdvanced {
frontier := w.mu.frontier.Frontier()
log.Infof(ctx, `watcher reached frontier %s lagging by %s`,
frontier, timeutil.Since(frontier.GoTime()))
for ts, chs := range w.mu.frontierWaiters {
if frontier.Less(ts) {
continue
}
log.Infof(ctx, `watcher notifying %s`, ts)
delete(w.mu.frontierWaiters, ts)
for _, ch := range chs {
ch <- nil
}
}
}
return nil
}
func (w *Watcher) handleSSTable(ctx context.Context, data []byte) error {
w.mu.Lock()
defer w.mu.Unlock()
if len(data) == 0 {
return errors.AssertionFailedf("no SST data found")
}
iter, err := storage.NewMemSSTIterator(data, false /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: keys.MinKey,
UpperBound: keys.MaxKey,
})
if err != nil {
return err
}
defer iter.Close()
for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() {
if ok, err := iter.Valid(); !ok {
return err
}
// Add range keys.
if iter.RangeKeyChanged() {
hasPoint, hasRange := iter.HasPointAndRange()
if hasRange {
rangeKeys := iter.RangeKeys().Clone()
for _, v := range rangeKeys.Versions {
mvccValue, err := storage.DecodeMVCCValue(v.Value)
if err != nil {
return err
}
mvccValue.Value.Timestamp = v.Timestamp
if seq := mvccValue.KVNemesisSeq.Get(); seq > 0 {
w.env.Tracker.Add(rangeKeys.Bounds.Key, rangeKeys.Bounds.EndKey, v.Timestamp, seq)
}
if err := w.handleValueLocked(ctx, rangeKeys.Bounds, mvccValue.Value, nil); err != nil {
return err
}
}
}
if !hasPoint { // can only happen at range key start bounds
continue
}
}
// Add point keys.
key := iter.UnsafeKey().Clone()
rawValue, err := iter.Value()
if err != nil {
return err
}
mvccValue, err := storage.DecodeMVCCValue(rawValue)
if err != nil {
return err
}
mvccValue.Value.Timestamp = key.Timestamp
if seq := mvccValue.KVNemesisSeq.Get(); seq > 0 {
w.env.Tracker.Add(key.Key, nil, key.Timestamp, seq)
}
if err := w.handleValueLocked(ctx, roachpb.Span{Key: key.Key}, mvccValue.Value, nil); err != nil {
return err
}
log.Infof(ctx, `rangefeed AddSSTable %s %s -> %s`,
key.Key, key.Timestamp, mvccValue.Value.PrettyPrint())
}
}