Skip to content

Commit

Permalink
Merge pull request #101124 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-101077

release-23.1: changefeedccl: Fix TestChangefeedHandlesDrainingNodes test
  • Loading branch information
miretskiy committed Apr 10, 2023
2 parents d11c072 + f6df923 commit 843b120
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6039,7 +6039,7 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
sqlDB.Exec(t, "ALTER TABLE test.foo SCATTER")

// Create a factory which executes the CREATE CHANGEFEED statement on server 0.
// This statement should fail, but the job itself ought to be creaated.
// This statement should fail, but the job itself ought to be created.
// After some time, that job should be adopted by another node, and executed successfully.
f, closeSink := makeFeedFactory(t, randomSinkType(feedTestEnterpriseSinks), tc.Server(1), tc.ServerConn(0))
defer closeSink()
Expand All @@ -6048,22 +6048,21 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) {
feed := feed(t, f, "CREATE CHANGEFEED FOR foo")
defer closeFeed(t, feed)

// At this point, the job created by feed will fail to start running on node 0 due to draining
// registry. However, this job will be retried, and it should succeed.
// Note: This test is a bit unrealistic in that if the registry is draining, that
// means that the server is draining (i.e. being shut down). We don't do a full shutdown
// here, but we are simulating a restart by failing to start a flow the first time around.
assertPayloads(t, feed, []string{
`foo: [1]->{"after": {"k": 1, "v": 1}}`,
`foo: [2]->{"after": {"k": 2, "v": 0}}`,
`foo: [3]->{"after": {"k": 3, "v": 1}}`,
`foo: [4]->{"after": {"k": 4, "v": 0}}`,
`foo: [5]->{"after": {"k": 5, "v": 1}}`,
`foo: [6]->{"after": {"k": 6, "v": 0}}`,
`foo: [7]->{"after": {"k": 7, "v": 1}}`,
`foo: [8]->{"after": {"k": 8, "v": 0}}`,
`foo: [9]->{"after": {"k": 9, "v": 1}}`,
`foo: [10]->{"after": {"k": 10, "v": 0}}`,
jobID := feed.(cdctest.EnterpriseTestFeed).JobID()
registry := tc.Server(1).JobRegistry().(*jobs.Registry)
loadProgress := func() jobspb.Progress {
job, err := registry.LoadJob(context.Background(), jobID)
require.NoError(t, err)
return job.Progress()
}

// Wait until highwater advances.
testutils.SucceedsSoon(t, func() error {
progress := loadProgress()
if hw := progress.GetHighWater(); hw == nil || hw.IsEmpty() {
return errors.New("waiting for highwater")
}
return nil
})
}

Expand Down

0 comments on commit 843b120

Please sign in to comment.