Skip to content

Commit

Permalink
Merge 07845ec into blathers/backport-release-24.1.0-rc-123625
Browse files Browse the repository at this point in the history
  • Loading branch information
blathers-crl[bot] committed May 10, 2024
2 parents 87b0327 + 07845ec commit 4d307a0
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ go_test(
"alter_changefeed_test.go",
"avro_test.go",
"changefeed_dist_test.go",
"changefeed_processors_test.go",
"changefeed_test.go",
"csv_test.go",
"encoder_test.go",
Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,20 @@ func makeKVFeedMonitoringCfg(
func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) {
var initialHighWater hlc.Timestamp
spans = make([]roachpb.Span, 0, len(ca.spec.Watches))
for _, watch := range ca.spec.Watches {
if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) {

// Keep initialHighWater as the minimum of all InitialResolved timestamps.
// If there are any zero InitialResolved timestamps, initial scan is
// ongoing. If there are no zero InitialResolved timestamps, initial scan
// is not required.
for i, watch := range ca.spec.Watches {
spans = append(spans, watch.Span)
if i == 0 {
initialHighWater = watch.InitialResolved
continue
}
if watch.InitialResolved.Less(initialHighWater) {
initialHighWater = watch.InitialResolved
}
spans = append(spans, watch.Span)
}

ca.frontier, err = makeSchemaChangeFrontier(initialHighWater, spans...)
Expand Down
135 changes: 135 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestSetupSpansAndFrontier tests that the setupSpansAndFrontier function
// correctly sets up frontier for the changefeed aggregator frontier.
func TestSetupSpansAndFrontier(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, tc := range []struct {
name string
expectedFrontier hlc.Timestamp
watches []execinfrapb.ChangeAggregatorSpec_Watch
}{
{
name: "new initial scan",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{},
},
},
},
{
name: "incomplete initial scan with non-empty initial resolved in the middle",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 5},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
},
},
{
name: "incomplete initial scan with non-empty initial resolved in the front",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
},
},
{
name: "incomplete initial scan with empty initial resolved in the end",
expectedFrontier: hlc.Timestamp{},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{},
},
},
},
{
name: "complete initial scan",
expectedFrontier: hlc.Timestamp{WallTime: 5},
watches: []execinfrapb.ChangeAggregatorSpec_Watch{
{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")},
InitialResolved: hlc.Timestamp{WallTime: 10},
},
{
Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")},
InitialResolved: hlc.Timestamp{WallTime: 20},
},
{
Span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")},
InitialResolved: hlc.Timestamp{WallTime: 5},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
ca := &changeAggregator{
spec: execinfrapb.ChangeAggregatorSpec{
Watches: tc.watches,
},
}
_, err := ca.setupSpansAndFrontier()
require.NoError(t, err)
require.Equal(t, tc.expectedFrontier, ca.frontier.Frontier())
})
}
}

0 comments on commit 4d307a0

Please sign in to comment.