From f6df923c5452b76775b72e4a7e57101a040a2b17 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 10 Apr 2023 14:31:10 +0000 Subject: [PATCH] changefeedccl: Fix TestChangefeedHandlesDrainingNodes test The test becamse flaky after #100476 merged Fixes #100903 Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 33 ++++++++++++------------ 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 81d20ad2072b..3e979d35924f 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6039,7 +6039,7 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { sqlDB.Exec(t, "ALTER TABLE test.foo SCATTER") // Create a factory which executes the CREATE CHANGEFEED statement on server 0. - // This statement should fail, but the job itself ought to be creaated. + // This statement should fail, but the job itself ought to be created. // After some time, that job should be adopted by another node, and executed successfully. f, closeSink := makeFeedFactory(t, randomSinkType(feedTestEnterpriseSinks), tc.Server(1), tc.ServerConn(0)) defer closeSink() @@ -6048,22 +6048,21 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { feed := feed(t, f, "CREATE CHANGEFEED FOR foo") defer closeFeed(t, feed) - // At this point, the job created by feed will fail to start running on node 0 due to draining - // registry. However, this job will be retried, and it should succeed. - // Note: This test is a bit unrealistic in that if the registry is draining, that - // means that the server is draining (i.e. being shut down). We don't do a full shutdown - // here, but we are simulating a restart by failing to start a flow the first time around. - assertPayloads(t, feed, []string{ - `foo: [1]->{"after": {"k": 1, "v": 1}}`, - `foo: [2]->{"after": {"k": 2, "v": 0}}`, - `foo: [3]->{"after": {"k": 3, "v": 1}}`, - `foo: [4]->{"after": {"k": 4, "v": 0}}`, - `foo: [5]->{"after": {"k": 5, "v": 1}}`, - `foo: [6]->{"after": {"k": 6, "v": 0}}`, - `foo: [7]->{"after": {"k": 7, "v": 1}}`, - `foo: [8]->{"after": {"k": 8, "v": 0}}`, - `foo: [9]->{"after": {"k": 9, "v": 1}}`, - `foo: [10]->{"after": {"k": 10, "v": 0}}`, + jobID := feed.(cdctest.EnterpriseTestFeed).JobID() + registry := tc.Server(1).JobRegistry().(*jobs.Registry) + loadProgress := func() jobspb.Progress { + job, err := registry.LoadJob(context.Background(), jobID) + require.NoError(t, err) + return job.Progress() + } + + // Wait until highwater advances. + testutils.SucceedsSoon(t, func() error { + progress := loadProgress() + if hw := progress.GetHighWater(); hw == nil || hw.IsEmpty() { + return errors.New("waiting for highwater") + } + return nil }) }