From b372c77de18aef2ceea6b00ad64a3278fd91a8d1 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 25 Nov 2022 15:43:15 +0100 Subject: [PATCH 1/9] [FLINK-30202][tests] Do not assert on checkpointId Capturing the checkpointId for a generated record is impossible since the notifyCheckpointComplete notification may arrive at any time (or not at all). Instead just assert that each subtask got exactly as many records as expected, which can only happen (reliably) if the rate-limiting works as expected. --- .../source/DataGeneratorSourceITCase.java | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java index 76f5eca8494b0..3a6ad6398e3a0 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java @@ -20,11 +20,9 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -184,8 +182,9 @@ void testGatedRateLimiter() throws Exception { final GeneratorFunction generatorFunction = index -> 1L; + int numCycles = 3; // Allow each subtask to produce at least 3 cycles, gated by checkpoints - int count = capacityPerCycle * 3; + int count = capacityPerCycle * numCycles; final DataGeneratorSource generatorSource = new DataGeneratorSource<>( generatorFunction, @@ -195,24 +194,18 @@ void testGatedRateLimiter() throws Exception { final DataStreamSource streamSource = env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator"); - final DataStream> map = - streamSource.map(new SubtaskAndCheckpointMapper()); - final List> results = map.executeAndCollect(1000); - - final Map, Integer> collect = - results.stream() - .collect( - Collectors.groupingBy( - x -> (new Tuple2<>(x.f0, x.f1)), summingInt(x -> 1))); - for (Map.Entry, Integer> entry : collect.entrySet()) { - assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle); + final DataStream map = streamSource.map(new SubtaskMapper()); + final List results = map.executeAndCollect(1000); + + final Map collect = + results.stream().collect(Collectors.groupingBy(x -> x, summingInt(x -> 1))); + for (Map.Entry entry : collect.entrySet()) { + assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle * numCycles); } } - private static class SubtaskAndCheckpointMapper - extends RichMapFunction> implements CheckpointListener { + private static class SubtaskMapper extends RichMapFunction { - private long checkpointId = 0; private int subtaskIndex; @Override @@ -221,13 +214,8 @@ public void open(Configuration parameters) { } @Override - public Tuple2 map(Long value) { - return new Tuple2<>(subtaskIndex, checkpointId); - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - this.checkpointId = checkpointId; + public Integer map(Long value) { + return subtaskIndex; } } From 03d91f4a06df1f39a5e4da6af694eccc1b3da3a3 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Tue, 29 Nov 2022 15:19:34 +0100 Subject: [PATCH 2/9] Force first call to go through isAvailable --- .../source/DataGeneratorSourceITCase.java | 37 ++++++++++--------- .../ratelimit/RateLimitedSourceReader.java | 4 ++ 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java index 3a6ad6398e3a0..687db0f67039f 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java @@ -19,16 +19,19 @@ package org.apache.flink.connector.datagen.source; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.jupiter.api.Disabled; @@ -37,11 +40,9 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.LongStream; -import static java.util.stream.Collectors.summingInt; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; @@ -194,29 +195,31 @@ void testGatedRateLimiter() throws Exception { final DataStreamSource streamSource = env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator"); - final DataStream map = streamSource.map(new SubtaskMapper()); - final List results = map.executeAndCollect(1000); + final DataStream map = streamSource.flatMap(new FirstCheckpointFilter()); + final List results = map.executeAndCollect(1000); - final Map collect = - results.stream().collect(Collectors.groupingBy(x -> x, summingInt(x -> 1))); - for (Map.Entry entry : collect.entrySet()) { - assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle * numCycles); - } + assertThat(results).hasSize(capacityPerCycle); } - private static class SubtaskMapper extends RichMapFunction { + private static class FirstCheckpointFilter + implements FlatMapFunction, CheckpointedFunction { - private int subtaskIndex; + private volatile boolean firstCheckpoint = true; @Override - public void open(Configuration parameters) { - subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + public void flatMap(Long value, Collector out) throws Exception { + if (firstCheckpoint) { + out.collect(value); + } } @Override - public Integer map(Long value) { - return subtaskIndex; + public void snapshotState(FunctionSnapshotContext context) throws Exception { + firstCheckpoint = false; } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception {} } private DataStream getGeneratorSourceStream( diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java index 403ba36200c2e..aff9b5c266eb8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java @@ -60,6 +60,10 @@ public void start() { @Override public InputStatus pollNext(ReaderOutput output) throws Exception { + if (availabilityFuture == null) { + // force isAvailable() to be called first to evaluate rate-limiting + return InputStatus.NOTHING_AVAILABLE; + } // reset future because the next record may hit the rate limit availabilityFuture = null; final InputStatus inputStatus = sourceReader.pollNext(output); From 67520f3e366f7dd6b2bdac43c5089f5f6e0c7699 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Wed, 30 Nov 2022 17:28:26 +0100 Subject: [PATCH 3/9] fix loop - the test was never calling isAvailable(), relying on the previous (bugged) behavior of rate-limiting not being enforced - The loop was difficult to understand in terms of how many records are actually being processed and was refactored accordingly - there were a series of math errors in here; 563-177=386, but 128(elementsPerCycle)*3 = 384. This was hidden by the final call to pollNext() in the while loop (emitting 1 additional record), and the final range assertion also incrementing to by 1. --- .../source/DataGeneratorSourceTest.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java index 2caf93f6e08a8..95cdf70fa3c9b 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java @@ -29,7 +29,6 @@ import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputStatus; import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.SimpleUserCodeClassLoader; @@ -88,7 +87,7 @@ void testRestoreEnumerator() throws Exception { void testReaderCheckpoints() throws Exception { final long from = 177; final long mid = 333; - final long to = 563; + final long to = 561; final long elementsPerCycle = (to - from) / 3; final TestingReaderOutput out = new TestingReaderOutput<>(); @@ -99,25 +98,27 @@ void testReaderCheckpoints() throws Exception { new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid), new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to))); - long remainingInCycle = elementsPerCycle; - while (reader.pollNext(out) != InputStatus.END_OF_INPUT) { - if (--remainingInCycle <= 0) { - remainingInCycle = elementsPerCycle; - // checkpoint - List splits = reader.snapshotState(1L); - - // re-create and restore - reader = createReader(); - if (splits.isEmpty()) { - reader.notifyNoMoreSplits(); - } else { - reader.addSplits(splits); - } + for (int cycle = 0; cycle < 3; cycle++) { + // this call is not required but mimics what happens at runtime + reader.pollNext(out); + for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) { + reader.isAvailable().get(); + reader.pollNext(out); + } + // checkpoint + List splits = reader.snapshotState(1L); + + // re-create and restore + reader = createReader(); + if (splits.isEmpty()) { + reader.notifyNoMoreSplits(); + } else { + reader.addSplits(splits); } } final List result = out.getEmittedRecords(); - final Iterable expected = LongStream.range(from, to + 1)::iterator; + final Iterable expected = LongStream.range(from, to)::iterator; assertThat(result).containsExactlyElementsOf(expected); } From 2e3fc19a9e14cebaf5a95c9ff03a25fb63465517 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Thu, 1 Dec 2022 13:03:46 +0100 Subject: [PATCH 4/9] - more loop fixes - use 0-383 to make off-by-one error obvious (the splits included 385 values, not 384) - assert that we reach END_OF_INPUT - correctly assert all 384 elements --- .../datagen/source/DataGeneratorSourceTest.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java index 95cdf70fa3c9b..1a69daf354b2b 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.metrics.groups.SourceReaderMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.SimpleUserCodeClassLoader; @@ -85,10 +86,10 @@ void testRestoreEnumerator() throws Exception { @Test @DisplayName("Uses the underlying NumberSequenceSource correctly for checkpointing.") void testReaderCheckpoints() throws Exception { - final long from = 177; - final long mid = 333; - final long to = 561; - final long elementsPerCycle = (to - from) / 3; + final long from = 0; + final long mid = 156; + final long to = 383; + final long elementsPerCycle = (to - from + 1) / 3; final TestingReaderOutput out = new TestingReaderOutput<>(); @@ -117,8 +118,11 @@ void testReaderCheckpoints() throws Exception { } } + reader.isAvailable().get(); + assertThat(reader.pollNext(out)).isEqualTo(InputStatus.END_OF_INPUT); + final List result = out.getEmittedRecords(); - final Iterable expected = LongStream.range(from, to)::iterator; + final Iterable expected = LongStream.range(from, to + 1)::iterator; assertThat(result).containsExactlyElementsOf(expected); } From c2d400e6215ac0f2b878bfa9c8b134e8a2c17cca Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 2 Dec 2022 14:13:49 +0100 Subject: [PATCH 5/9] Add numCycles --- .../connector/datagen/source/DataGeneratorSourceTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java index 1a69daf354b2b..e5fd75f83dbe5 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java @@ -86,10 +86,11 @@ void testRestoreEnumerator() throws Exception { @Test @DisplayName("Uses the underlying NumberSequenceSource correctly for checkpointing.") void testReaderCheckpoints() throws Exception { + final int numCycles = 3; final long from = 0; final long mid = 156; final long to = 383; - final long elementsPerCycle = (to - from + 1) / 3; + final long elementsPerCycle = (to - from + 1) / numCycles; final TestingReaderOutput out = new TestingReaderOutput<>(); @@ -99,7 +100,7 @@ void testReaderCheckpoints() throws Exception { new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid), new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to))); - for (int cycle = 0; cycle < 3; cycle++) { + for (int cycle = 0; cycle < numCycles; cycle++) { // this call is not required but mimics what happens at runtime reader.pollNext(out); for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) { From d3d72be6a2b7f42f6f9d861bd2b1a4d62a27d001 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 2 Dec 2022 14:13:58 +0100 Subject: [PATCH 6/9] assert number of splits --- .../connector/datagen/source/DataGeneratorSourceTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java index e5fd75f83dbe5..99287d4d74bca 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java @@ -109,6 +109,10 @@ void testReaderCheckpoints() throws Exception { } // checkpoint List splits = reader.snapshotState(1L); + // first cycle partially consumes the first split + // second cycle consumes the remaining first split and partially consumes the second + // third cycle consumes remaining second split + assertThat(splits).hasSize(numCycles - cycle - 1); // re-create and restore reader = createReader(); From 8640f760bf3de7afc06a82613bcb57d8970c5b0e Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 2 Dec 2022 14:14:25 +0100 Subject: [PATCH 7/9] Update DataGeneratorSourceTest.java more assertions --- .../source/DataGeneratorSourceTest.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java index 99287d4d74bca..dc6a444f734da 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java @@ -102,10 +102,19 @@ void testReaderCheckpoints() throws Exception { for (int cycle = 0; cycle < numCycles; cycle++) { // this call is not required but mimics what happens at runtime - reader.pollNext(out); + assertThat(reader.pollNext(out)) + .as( + "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable") + .isSameAs(InputStatus.NOTHING_AVAILABLE); for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) { - reader.isAvailable().get(); - reader.pollNext(out); + assertThat(reader.isAvailable()) + .as( + "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.") + .isCompleted(); + assertThat(reader.pollNext(out)) + .as( + "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable") + .isSameAs(InputStatus.NOTHING_AVAILABLE); } // checkpoint List splits = reader.snapshotState(1L); @@ -123,8 +132,11 @@ void testReaderCheckpoints() throws Exception { } } - reader.isAvailable().get(); - assertThat(reader.pollNext(out)).isEqualTo(InputStatus.END_OF_INPUT); + assertThat(reader.isAvailable()) + .as( + "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.") + .isCompleted(); + assertThat(reader.pollNext(out)).isSameAs(InputStatus.END_OF_INPUT); final List result = out.getEmittedRecords(); final Iterable expected = LongStream.range(from, to + 1)::iterator; From 1493da0fbac50b0d6ffe6abc5ceafa377c61e203 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 2 Dec 2022 14:15:40 +0100 Subject: [PATCH 8/9] comments --- .../connector/datagen/source/DataGeneratorSourceTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java index dc6a444f734da..ab81be508bbea 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java @@ -111,6 +111,8 @@ void testReaderCheckpoints() throws Exception { .as( "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.") .isCompleted(); + // this never returns END_OF_INPUT because IteratorSourceReaderBase#pollNext does + // not immediately return END_OF_INPUT when the input is exhausted assertThat(reader.pollNext(out)) .as( "Each poll should return a NOTHING_AVAILABLE status to explicitly trigger the availability check through in SourceReader.isAvailable") @@ -132,6 +134,8 @@ void testReaderCheckpoints() throws Exception { } } + // we need to go again through isAvailable because IteratorSourceReaderBase#pollNext does + // not immediately return END_OF_INPUT when the input is exhausted assertThat(reader.isAvailable()) .as( "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.") From e52d31eaae6b392f79640043ddb5210c293e7832 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Sun, 4 Dec 2022 17:46:55 +0100 Subject: [PATCH 9/9] comments --- .../datagen/source/DataGeneratorSourceITCase.java | 15 +++++++-------- .../datagen/source/DataGeneratorSourceTest.java | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java index 687db0f67039f..842982ac5846a 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java @@ -177,20 +177,19 @@ void testGatedRateLimiter() throws Exception { env.setParallelism(PARALLELISM); - int capacityPerSubtaskPerCycle = 2; - int capacityPerCycle = // avoid rounding errors when spreading records among subtasks - PARALLELISM * capacityPerSubtaskPerCycle; + int capacityPerSubtaskPerCheckpoint = 2; + int capacityPerCheckpoint = // avoid rounding errors when spreading records among subtasks + PARALLELISM * capacityPerSubtaskPerCheckpoint; final GeneratorFunction generatorFunction = index -> 1L; - int numCycles = 3; - // Allow each subtask to produce at least 3 cycles, gated by checkpoints - int count = capacityPerCycle * numCycles; + // produce slightly more elements than the checkpoint-rate-limit would allow + int count = capacityPerCheckpoint + 1; final DataGeneratorSource generatorSource = new DataGeneratorSource<>( generatorFunction, count, - RateLimiterStrategy.perCheckpoint(capacityPerCycle), + RateLimiterStrategy.perCheckpoint(capacityPerCheckpoint), Types.LONG); final DataStreamSource streamSource = @@ -198,7 +197,7 @@ void testGatedRateLimiter() throws Exception { final DataStream map = streamSource.flatMap(new FirstCheckpointFilter()); final List results = map.executeAndCollect(1000); - assertThat(results).hasSize(capacityPerCycle); + assertThat(results).hasSize(capacityPerCheckpoint); } private static class FirstCheckpointFilter diff --git a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java index ab81be508bbea..ebda0c941ccfc 100644 --- a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java +++ b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java @@ -109,7 +109,7 @@ void testReaderCheckpoints() throws Exception { for (int elementInCycle = 0; elementInCycle < elementsPerCycle; elementInCycle++) { assertThat(reader.isAvailable()) .as( - "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.") + "There should be always data available because the test utilizes no rate-limiting strategy and splits are provided.") .isCompleted(); // this never returns END_OF_INPUT because IteratorSourceReaderBase#pollNext does // not immediately return END_OF_INPUT when the input is exhausted @@ -138,7 +138,7 @@ void testReaderCheckpoints() throws Exception { // not immediately return END_OF_INPUT when the input is exhausted assertThat(reader.isAvailable()) .as( - "There should be always data available because the test doesn't rely on any no rate-limiting strategy and splits are provided.") + "There should be always data available because the test utilizes no rate-limiting strategy and splits are provided.") .isCompleted(); assertThat(reader.pollNext(out)).isSameAs(InputStatus.END_OF_INPUT);