Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: checkpoint for lagging high-water #77763

Merged
merged 5 commits into from May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 22 additions & 22 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Expand Up @@ -30,13 +30,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -575,8 +575,8 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)

//Ensure Scan Requests are always small enough that we receive multiple
//resolved events during a backfill
// Ensure Scan Requests are always small enough that we receive multiple
// resolved events during a backfill
knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error {
b.Header.MaxSpanRequestKeys = 10
return nil
Expand Down Expand Up @@ -1129,18 +1129,18 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t)

rnd, _ := randutil.NewTestRand()
var maxCheckpointSize int64 = 100
var rndMu syncutil.Mutex
const maxCheckpointSize = 1 << 20
const numRowsPerTable = 1000

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo(val INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo (val) SELECT * FROM generate_series(0, 999)`)
sqlDB.Exec(t, `INSERT INTO foo (val) SELECT * FROM generate_series(0, $1)`, numRowsPerTable-1)

sqlDB.Exec(t, `CREATE TABLE bar(val INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO bar (val) SELECT * FROM generate_series(0, 999)`)
sqlDB.Exec(t, `INSERT INTO bar (val) SELECT * FROM generate_series(0, $1)`, numRowsPerTable-1)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
f.Server().DB(), keys.SystemSQLCodec, "d", "foo")
Expand All @@ -1153,14 +1153,19 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
// Ensure Scan Requests are always small enough that we receive multiple
// resolvedFoo events during a backfill
knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error {
rndMu.Lock()
defer rndMu.Unlock()
b.Header.MaxSpanRequestKeys = 1 + rnd.Int63n(100)
return nil
}

// Emit resolved events for majority of spans. Be extra paranoid and ensure that
// Emit resolved events for the majority of spans. Be extra paranoid and ensure that
// we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing).
haveGaps := false
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
rndMu.Lock()
defer rndMu.Unlock()

if r.Span.Equal(fooTableSpan) {
// Do not emit resolved events for the entire table span.
// We "simulate" large table by splitting single table span into many parts, so
Expand All @@ -1186,23 +1191,16 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
registry := f.Server().JobRegistry().(*jobs.Registry)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)

// Kafka feeds are not buffered, so we have to consume messages.
g := ctxgroup.WithContext(context.Background())
g.Go(func() error {
expectedValues := make([]string, 1000)
for j := 0; j <= 999; j++ {
// Kafka feeds are not buffered, so we have to consume messages.
// We just want to ensure that eventually, we get all the rows from foo and bar.
expectedValues := make([]string, 2*numRowsPerTable)
for j := 0; j < numRowsPerTable; j++ {
expectedValues[j] = fmt.Sprintf(`foo: [%d]->{"after": {"val": %d}}`, j, j)
expectedValues[j+numRowsPerTable] = fmt.Sprintf(`bar: [%d]->{"after": {"val": %d}}`, j, j)
}
err := assertPayloadsBaseErr(testFeed, expectedValues, false, false)
if err != nil {
return err
}

for j := 0; j <= 999; j++ {
expectedValues[j] = fmt.Sprintf(`bar: [%d]->{"after": {"val": %d}}`, j, j)
}
err = assertPayloadsBaseErr(testFeed, expectedValues, false, false)
return err
return assertPayloadsBaseErr(testFeed, expectedValues, false, false)
})

defer func() {
Expand Down Expand Up @@ -1269,6 +1267,8 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
require.NotNil(t, progress.GetChangefeed())
require.Equal(t, 0, len(progress.GetChangefeed().Checkpoint.Spans))

require.NoError(t, jobFeed.Pause())

// Verify that none of the resolvedFoo spans after resume were checkpointed.
for _, sp := range resolvedFoo {
require.Falsef(t, checkpoint.Contains(sp.Key), "span should not have been resolved: %s", sp)
Expand Down