Skip to content

Commit

Permalink
Merge #96995 #97037
Browse files Browse the repository at this point in the history
96995: changefeedccl: Fix initial scan checkpointing r=miretskiy a=miretskiy

An over than 2 year old change
(#71848) that added support for checkpointing during backfill after schema change, inadvertently broke initial scan checkpointing functionality

Exacerbating the problem, the existing test
`TestChangefeedBackfillCheckpoint` continued to work fine. The reason why it was passing was because the test was looking for a checkpoint whose timestamp matched backfill timestamp. The bug involved incorrect initialize/use of 0 timestamp. It just so happens, that after initial scan completes, the rangefeed starts, and the very first thing it does is to generate a 0 timestamp checkpoint.  So, the test was observing this event, and continued to work.
This PR does not have a dedicated test because the existing tests work fine -- provided we ignore 0 timestamp checkpoint, which is what this PR does in addition to addressing the root cause of the bug.

Informs #96959

Release note (enterprise change): Fix a bug in changefeeds, where long running initial scans will fail to generate checkpoint. Failure to generate checkpoint is particularly bad if the changefeed restarts for whatever reason.  Without checkpoints, the changefeed will restart from the beginning, and in the worst case, when exporting substantially sized tables, changefeed initial scan may have hard time completing.

97037: acceptance: skip TestDockerCLI test_demo_partitioning.tcl only r=tbg a=herkolategan

Renamed `test_demo_partitioning.tcl` to `test_demo_partitioning.tcl.disabled` which will cause TestDockerCLI to skip the test file.

Refs: #96797

Reason: flaky test

Epic: None

Release note: None

Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: Herko Lategan <herko@cockroachlabs.com>
  • Loading branch information
3 people committed Feb 13, 2023
3 parents cdaf027 + 495dc98 + d5c90f7 commit 0c2cc47
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
40 changes: 31 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ type changeAggregator struct {
// eventConsumer consumes the event.
eventConsumer eventConsumer

// lastFlush and flushFrequency keep track of the flush frequency.
lastFlush time.Time
flushFrequency time.Duration
lastHighWaterFlush time.Time // last time high watermark was checkpointed.
flushFrequency time.Duration // how often high watermark can be checkpointed.
lastSpanFlush time.Time // last time expensive, span based checkpoint was written.

// frontier keeps track of resolved timestamps for spans along with schema change
// boundary information.
Expand Down Expand Up @@ -291,6 +291,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.cancel()
return
}

// Generate expensive checkpoint only after we ran for a while.
ca.lastSpanFlush = timeutil.Now()
}

func (ca *changeAggregator) startKVFeed(
Expand Down Expand Up @@ -321,7 +324,6 @@ func (ca *changeAggregator) startKVFeed(
// Trying to call MoveToDraining here is racy (`MoveToDraining called in
// state stateTrailingMeta`), so return the error via a channel.
ca.errCh <- kvfeed.Run(ctx, kvfeedCfg)
ca.cancel()
}); err != nil {
// If err != nil then the RunAsyncTask closure never ran, which means we
// need to manually close ca.kvFeedDoneCh so `(*changeAggregator).close`
Expand Down Expand Up @@ -407,6 +409,12 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
if err != nil {
return nil, err
}
if initialHighWater.IsEmpty() {
// If we are performing initial scan, set frontier initialHighWater
// to the StatementTime -- this is the time we will be scanning spans.
// Spans that reach this time are eligible for checkpointing.
ca.frontier.initialHighWater = ca.spec.Feed.StatementTime
}

checkpointedSpanTs := ca.spec.Checkpoint.Timestamp

Expand Down Expand Up @@ -535,6 +543,13 @@ func (ca *changeAggregator) tick() error {
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
if resolved.Timestamp.IsEmpty() {
// @0.0 resolved timestamps could come in from rangefeed checkpoint.
// When rangefeed starts running, it emits @0.0 resolved timestamp.
// We don't care about those as far as checkpointing concerned.
return nil
}

advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
if err != nil {
return err
Expand All @@ -543,18 +558,25 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE

checkpointFrontier := advanced &&
(forceFlush || timeutil.Since(ca.lastFlush) > ca.flushFrequency)
(forceFlush || timeutil.Since(ca.lastHighWaterFlush) > ca.flushFrequency)

if checkpointFrontier {
defer func() {
ca.lastHighWaterFlush = timeutil.Now()
}()
return ca.flushFrontier()
}

// At a lower frequency we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) ||
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, &ca.flowCtx.Cfg.Settings.SV)) &&
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastFlush)
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastSpanFlush)

if checkpointFrontier || checkpointSpans {
if checkpointSpans {
defer func() {
ca.lastFlush = timeutil.Now()
ca.lastSpanFlush = timeutil.Now()
}()
return ca.flushFrontier()
}
Expand Down Expand Up @@ -1029,7 +1051,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART) {
var err error
endTime := cf.spec.Feed.EndTime
if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime.Next()) {
if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime) {
err = pgerror.Newf(pgcode.SchemaChangeOccurred,
"schema change occurred at %v", cf.frontier.boundaryTime.Next().AsOfSystemTime())

Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func (f *kvFeed) run(ctx context.Context) (err error) {

for i := 0; ; i++ {
initialScan := i == 0
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, rangeFeedResumeFrontier.Frontier())
initialScanOnly := f.endTime.EqOrdering(f.initialHighWater)
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, initialScanOnly, rangeFeedResumeFrontier.Frontier())
if err != nil {
return err
}
Expand All @@ -270,7 +271,6 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
}
}

initialScanOnly := f.endTime.EqOrdering(f.initialHighWater)
if initialScanOnly {
if err := emitResolved(f.initialHighWater, jobspb.ResolvedSpan_EXIT); err != nil {
return err
Expand Down Expand Up @@ -358,7 +358,7 @@ func filterCheckpointSpans(spans []roachpb.Span, completed []roachpb.Span) []roa
}

func (f *kvFeed) scanIfShould(
ctx context.Context, initialScan bool, highWater hlc.Timestamp,
ctx context.Context, initialScan bool, initialScanOnly bool, highWater hlc.Timestamp,
) ([]roachpb.Span, hlc.Timestamp, error) {
scanTime := highWater.Next()

Expand Down Expand Up @@ -425,11 +425,16 @@ func (f *kvFeed) scanIfShould(
defer f.onBackfillCallback()()
}

boundaryType := jobspb.ResolvedSpan_NONE
if initialScanOnly {
boundaryType = jobspb.ResolvedSpan_EXIT
}
if err := f.scanner.Scan(ctx, f.writer, scanConfig{
Spans: spansToBackfill,
Timestamp: scanTime,
WithDiff: !isInitialScan && f.withDiff,
Knobs: f.knobs,
Boundary: boundaryType,
}); err != nil {
return nil, hlc.Timestamp{}, err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type scanConfig struct {
Timestamp hlc.Timestamp
WithDiff bool
Knobs TestingKnobs
Boundary jobspb.ResolvedSpan_BoundaryType
}

type kvScanner interface {
Expand Down Expand Up @@ -116,7 +117,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
defer limAlloc.Release()
defer spanAlloc.Release(ctx)

err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.WithDiff, sink, cfg.Knobs)
err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.Boundary, cfg.WithDiff, sink, cfg.Knobs)
finished := atomic.AddInt64(&atomicFinished, 1)
if backfillDec != nil {
backfillDec()
Expand All @@ -134,6 +135,7 @@ func (p *scanRequestScanner) exportSpan(
ctx context.Context,
span roachpb.Span,
ts hlc.Timestamp,
boundaryType jobspb.ResolvedSpan_BoundaryType,
withDiff bool,
sink kvevent.Writer,
knobs TestingKnobs,
Expand Down Expand Up @@ -191,7 +193,7 @@ func (p *scanRequestScanner) exportSpan(
if res.ResumeSpan != nil {
consumed := roachpb.Span{Key: remaining.Key, EndKey: res.ResumeSpan.Key}
if err := sink.Add(
ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, jobspb.ResolvedSpan_NONE),
ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, boundaryType),
); err != nil {
return err
}
Expand All @@ -200,7 +202,7 @@ func (p *scanRequestScanner) exportSpan(
}
// p.metrics.PollRequestNanosHist.RecordValue(scanDuration.Nanoseconds())
if err := sink.Add(
ctx, kvevent.NewBackfillResolvedEvent(span, ts, jobspb.ResolvedSpan_NONE),
ctx, kvevent.NewBackfillResolvedEvent(span, ts, boundaryType),
); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#! /usr/bin/env expect -f

# This test is skipped (flaky test) -- its filename lets it hide from the selector in
# TestDockerCLI. Un-skip it by renaming after fixing
# https://github.com/cockroachdb/cockroach/issues/96797.

source [file join [file dirname $argv0] common.tcl]

# Set a larger timeout as partitioning can be slow.
Expand Down

0 comments on commit 0c2cc47

Please sign in to comment.