From 71be2093ad5c626c5b6c02195b4293382af38c8c Mon Sep 17 00:00:00 2001 From: elon-X <919928110@qq.com> Date: Mon, 6 May 2024 22:19:24 +0800 Subject: [PATCH 1/5] [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish --- .../source/coordinator/SourceCoordinator.java | 7 ++- .../SourceCoordinatorAlignmentTest.java | 52 +++++++++++++++++++ .../api/operators/SourceOperator.java | 8 ++- .../SourceOperatorAlignmentTest.java | 31 +++++++++++ 4 files changed, 94 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index e64eb0424ca9a..0f0bf01410005 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -202,8 +202,11 @@ void announceCombinedWatermark() { // to ready task to avoid period task fail (Java-ThreadPoolExecutor will not schedule // the period task if it throws an exception). for (Integer subtaskId : subTaskIds) { - context.sendEventToSourceOperatorIfTaskReady( - subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); + // when subtask have been finished, do not send event. + if (!context.hasNoMoreSplits(subtaskId)) { + context.sendEventToSourceOperatorIfTaskReady( + subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java index d479c34f63f30..1440c4c1a2142 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java @@ -310,6 +310,53 @@ void testWatermarkAggregatorRandomly() { testWatermarkAggregatorRandomly(10, 10000, true, false); } + @Test + void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { + long maxDrift = 1000L; + WatermarkAlignmentParams params = + new WatermarkAlignmentParams(maxDrift, "group1", maxDrift); + + final Source> mockSource = + createMockSource(); + + sourceCoordinator = + new SourceCoordinator>( + OPERATOR_NAME, + mockSource, + getNewSourceCoordinatorContext(), + new CoordinatorStoreImpl(), + params, + null) { + @Override + void announceCombinedWatermark() { + super.announceCombinedWatermark(); + } + }; + + sourceCoordinator.start(); + + int subtask0 = 0; + int subtask1 = 1; + + setReaderTaskReady(sourceCoordinator, subtask0, 0); + setReaderTaskReady(sourceCoordinator, subtask1, 0); + registerReader(subtask0); + registerReader(subtask1); + + reportWatermarkEvent(sourceCoordinator, subtask0, 42); + assertLatestWatermarkAlignmentEvent(subtask0, 1042); + + reportWatermarkEvent(sourceCoordinator, subtask1, 44); + assertLatestWatermarkAlignmentEvent(subtask1, 1042); + + //mock noMoreSplits event + assertHasNoMoreSplits(subtask0, true); + reportWatermarkEvent(sourceCoordinator, subtask0, Long.MAX_VALUE); + assertLatestWatermarkAlignmentEvent(subtask1, 1044); + + sourceCoordinator.close(); + } + private void testWatermarkAggregatorRandomly( int roundNumber, int keyNumber, boolean checkResult, boolean testSourceIdle) { final SourceCoordinator.WatermarkAggregator combinedWatermark = @@ -377,4 +424,9 @@ private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWater assertThat(events.get(events.size() - 1)) .isEqualTo(new WatermarkAlignmentEvent(expectedWatermark)); } + + private void assertHasNoMoreSplits(int subtask, boolean expected) { + sourceCoordinator.getContext().signalNoMoreSplits(0); + assertThat(sourceCoordinator.getContext().hasNoMoreSplits(subtask)).isEqualTo(expected); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index d49de8c622cad..219da1ebf37c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -434,7 +434,7 @@ private DataInputStatus emitNextNotReading(DataOutput output) throws Except // introduces a small performance regression (probably because of an extra // virtual call) processingTimeService.scheduleWithFixedDelay( - this::emitLatestWatermark, + time -> emitLatestWatermark(), watermarkAlignmentParams.getUpdateInterval(), watermarkAlignmentParams.getUpdateInterval()); } @@ -507,7 +507,7 @@ private DataInputStatus convertToInternalStatus(InputStatus inputStatus) { } } - private void emitLatestWatermark(long time) { + private void emitLatestWatermark() { checkState(currentMainOutput != null); if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) { return; @@ -572,6 +572,10 @@ public void handleOperatorEvent(OperatorEvent event) { } else if (event instanceof SourceEventWrapper) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); } else if (event instanceof NoMoreSplitsEvent) { + if (watermarkAlignmentParams.isEnabled()) { + latestWatermark = Watermark.MAX_WATERMARK.getTimestamp(); + emitLatestWatermark(); + } sourceReader.notifyNoMoreSplits(); } else if (event instanceof IsProcessingBacklogEvent) { if (eventTimeLogic != null) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index db3f8f0ed1fe7..5e2b037d25236 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -27,6 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.ReportedWatermarkEvent; import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; @@ -276,6 +277,36 @@ void testReportedWatermarkDoNotDecrease() throws Exception { assertLatestReportedWatermarkEvent(record1); } + @Test + void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { + operator.initializeState(context.createStateContext()); + operator.getReaderState().clear(); + operator.open(); + + MockSourceSplit newSplit = new MockSourceSplit(1, 0, 1); + int record1 = 1; + newSplit.addRecord(record1); + + operator.handleOperatorEvent( + new AddSplitEvent<>( + Collections.singletonList(newSplit), new MockSourceSplitSerializer())); + + CollectingDataOutput actualOutput = new CollectingDataOutput<>(); + List expectedOutput = new ArrayList<>(); + + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); + expectedOutput.add(record1); + assertOutput(actualOutput, expectedOutput); + + //no more split event, verify that the final watermark is emitted + operator.handleOperatorEvent(new NoMoreSplitsEvent()); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_DATA); + + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_INPUT); + context.getTimeService().advance(1); + assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp()); + } + private void assertOutput( CollectingDataOutput actualOutput, List expectedOutput) { assertThat( From 1d52c20e57deb29b97b96d1b70835a08e6e89747 Mon Sep 17 00:00:00 2001 From: elon-X <919928110@qq.com> Date: Tue, 7 May 2024 00:02:43 +0800 Subject: [PATCH 2/5] spotless-check --- .../source/coordinator/SourceCoordinatorAlignmentTest.java | 2 +- .../streaming/api/operators/SourceOperatorAlignmentTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java index 1440c4c1a2142..f0dfd97c5df77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java @@ -349,7 +349,7 @@ void announceCombinedWatermark() { reportWatermarkEvent(sourceCoordinator, subtask1, 44); assertLatestWatermarkAlignmentEvent(subtask1, 1042); - //mock noMoreSplits event + // mock noMoreSplits event assertHasNoMoreSplits(subtask0, true); reportWatermarkEvent(sourceCoordinator, subtask0, Long.MAX_VALUE); assertLatestWatermarkAlignmentEvent(subtask1, 1044); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index 5e2b037d25236..30b8fdd5063dd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -298,7 +298,7 @@ void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { expectedOutput.add(record1); assertOutput(actualOutput, expectedOutput); - //no more split event, verify that the final watermark is emitted + // no more split event, verify that the final watermark is emitted operator.handleOperatorEvent(new NoMoreSplitsEvent()); assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_DATA); From 422d16dbb0e631589809c7e3454a28d6fbd0e6e2 Mon Sep 17 00:00:00 2001 From: elon-X <919928110@qq.com> Date: Mon, 3 Jun 2024 23:19:00 +0800 Subject: [PATCH 3/5] adjusted the code layout and added an ITCase --- .../api/operators/SourceOperator.java | 8 +-- ...kFinishedWithWatermarkAlignmentITCase.java | 52 +++++++++++++++++++ 2 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 219da1ebf37c6..972415c1ff0c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -449,6 +449,10 @@ private DataInputStatus emitNextNotReading(DataOutput output) throws Except sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_DATA; case DATA_FINISHED: + if (watermarkAlignmentParams.isEnabled()) { + latestWatermark = Watermark.MAX_WATERMARK.getTimestamp(); + emitLatestWatermark(); + } sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_INPUT; case WAITING_FOR_ALIGNMENT: @@ -572,10 +576,6 @@ public void handleOperatorEvent(OperatorEvent event) { } else if (event instanceof SourceEventWrapper) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); } else if (event instanceof NoMoreSplitsEvent) { - if (watermarkAlignmentParams.isEnabled()) { - latestWatermark = Watermark.MAX_WATERMARK.getTimestamp(); - emitLatestWatermark(); - } sourceReader.notifyNoMoreSplits(); } else if (event instanceof IsProcessingBacklogEvent) { if (eventTimeLogic != null) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java new file mode 100644 index 0000000000000..da7917784507a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java @@ -0,0 +1,52 @@ +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.Test; + +import java.time.Duration; +import java.util.List; +import java.util.stream.LongStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** This ITCase class tests the behavior of task execution with watermark alignment. */ +public class SubTaskFinishedWithWatermarkAlignmentITCase { + + /** + * Test method to verify whether the watermark alignment works well with finished task. + * + * @throws Exception if any error occurs during the execution. + */ + @Test + public void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception { + // Set up the execution environment with parallelism of 2 + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + // Create a stream from a custom source with watermark strategy + DataStream stream = + env.fromSource( + new NumberSequenceSource(0, 100), + WatermarkStrategy.forMonotonousTimestamps() + .withTimestampAssigner( + (SerializableTimestampAssigner) + (aLong, l) -> aLong) + .withWatermarkAlignment( + "g1", Duration.ofMillis(10), Duration.ofSeconds(2)), + "Sequence Source") + .filter((FilterFunction) aLong -> true); + + // Execute the stream and collect the results + final List result = stream.executeAndCollect(101); + + // Assert that the collected result contains all numbers from 0 to 100 in any order + assertThat(result, containsInAnyOrder(LongStream.rangeClosed(0, 100).boxed().toArray())); + } +} From 460ffd0e9ddec983f73a16cca6279f94e37ba178 Mon Sep 17 00:00:00 2001 From: elon-X <919928110@qq.com> Date: Tue, 4 Jun 2024 23:03:04 +0800 Subject: [PATCH 4/5] fixed watermarkAlignmentITCase --- ...ase.java => WatermarkAlignmentITCase.java} | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) rename flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/{SubTaskFinishedWithWatermarkAlignmentITCase.java => WatermarkAlignmentITCase.java} (64%) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java similarity index 64% rename from flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java index da7917784507a..d17ee7a831003 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.test.streaming.api.datastream; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; @@ -7,17 +25,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.LongStream; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; - /** This ITCase class tests the behavior of task execution with watermark alignment. */ -public class SubTaskFinishedWithWatermarkAlignmentITCase { +public class WatermarkAlignmentITCase { /** * Test method to verify whether the watermark alignment works well with finished task. @@ -45,8 +63,10 @@ public void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception { // Execute the stream and collect the results final List result = stream.executeAndCollect(101); + Collections.sort(result); - // Assert that the collected result contains all numbers from 0 to 100 in any order - assertThat(result, containsInAnyOrder(LongStream.rangeClosed(0, 100).boxed().toArray())); + // Assert that the collected result contains all numbers from 0 to 100 + Assertions.assertIterableEquals( + result, LongStream.rangeClosed(0, 100).boxed().collect(Collectors.toList())); } } From 8ac3d8af3aefac55f13a37ed969abd79fc97a65a Mon Sep 17 00:00:00 2001 From: elon-X <919928110@qq.com> Date: Thu, 13 Jun 2024 10:55:00 +0800 Subject: [PATCH 5/5] fixed test code --- .../source/coordinator/SourceCoordinator.java | 7 +-- .../SourceCoordinatorAlignmentTest.java | 52 ------------------- .../datastream/WatermarkAlignmentITCase.java | 4 +- 3 files changed, 6 insertions(+), 57 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 0f0bf01410005..3133bbe7ce753 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -198,12 +198,13 @@ void announceCombinedWatermark() { subTaskIds, operatorName); - // Subtask maybe during deploying or restarting, so we only send WatermarkAlignmentEvent - // to ready task to avoid period task fail (Java-ThreadPoolExecutor will not schedule - // the period task if it throws an exception). for (Integer subtaskId : subTaskIds) { // when subtask have been finished, do not send event. if (!context.hasNoMoreSplits(subtaskId)) { + // Subtask maybe during deploying or restarting, so we only send + // WatermarkAlignmentEvent to ready task to avoid period task fail + // (Java-ThreadPoolExecutor will not schedule the period task if it throws an + // exception). context.sendEventToSourceOperatorIfTaskReady( subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java index f0dfd97c5df77..d479c34f63f30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java @@ -310,53 +310,6 @@ void testWatermarkAggregatorRandomly() { testWatermarkAggregatorRandomly(10, 10000, true, false); } - @Test - void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { - long maxDrift = 1000L; - WatermarkAlignmentParams params = - new WatermarkAlignmentParams(maxDrift, "group1", maxDrift); - - final Source> mockSource = - createMockSource(); - - sourceCoordinator = - new SourceCoordinator>( - OPERATOR_NAME, - mockSource, - getNewSourceCoordinatorContext(), - new CoordinatorStoreImpl(), - params, - null) { - @Override - void announceCombinedWatermark() { - super.announceCombinedWatermark(); - } - }; - - sourceCoordinator.start(); - - int subtask0 = 0; - int subtask1 = 1; - - setReaderTaskReady(sourceCoordinator, subtask0, 0); - setReaderTaskReady(sourceCoordinator, subtask1, 0); - registerReader(subtask0); - registerReader(subtask1); - - reportWatermarkEvent(sourceCoordinator, subtask0, 42); - assertLatestWatermarkAlignmentEvent(subtask0, 1042); - - reportWatermarkEvent(sourceCoordinator, subtask1, 44); - assertLatestWatermarkAlignmentEvent(subtask1, 1042); - - // mock noMoreSplits event - assertHasNoMoreSplits(subtask0, true); - reportWatermarkEvent(sourceCoordinator, subtask0, Long.MAX_VALUE); - assertLatestWatermarkAlignmentEvent(subtask1, 1044); - - sourceCoordinator.close(); - } - private void testWatermarkAggregatorRandomly( int roundNumber, int keyNumber, boolean checkResult, boolean testSourceIdle) { final SourceCoordinator.WatermarkAggregator combinedWatermark = @@ -424,9 +377,4 @@ private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWater assertThat(events.get(events.size() - 1)) .isEqualTo(new WatermarkAlignmentEvent(expectedWatermark)); } - - private void assertHasNoMoreSplits(int subtask, boolean expected) { - sourceCoordinator.getContext().signalNoMoreSplits(0); - assertThat(sourceCoordinator.getContext().hasNoMoreSplits(subtask)).isEqualTo(expected); - } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java index d17ee7a831003..e113345235ec8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java @@ -35,7 +35,7 @@ import java.util.stream.LongStream; /** This ITCase class tests the behavior of task execution with watermark alignment. */ -public class WatermarkAlignmentITCase { +class WatermarkAlignmentITCase { /** * Test method to verify whether the watermark alignment works well with finished task. @@ -43,7 +43,7 @@ public class WatermarkAlignmentITCase { * @throws Exception if any error occurs during the execution. */ @Test - public void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception { + void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception { // Set up the execution environment with parallelism of 2 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);