From 87072bdee1da392386c3bd26b740f980ffee4017 Mon Sep 17 00:00:00 2001 From: Anton Kalashnikov Date: Thu, 22 Apr 2021 17:22:27 +0200 Subject: [PATCH] [FLINK-22379][runtime] CheckpointCoordinator checks the state of all subtasks before triggering the checkpoint --- .../sink/writer/FileSinkMigrationITCase.java | 73 +++++++++++- .../state/api/SavepointReaderITTestBase.java | 22 +++- .../api/SavepointReaderKeyedStateITCase.java | 22 +++- .../api/SavepointWindowReaderITCase.java | 108 ++++++++++-------- .../flink/state/api/utils/OperatorLatch.java | 52 +++++++++ .../state/api/utils/SavepointTestBase.java | 35 +++--- .../state/api/utils/WaitingFunction.java | 25 ++++ .../flink/state/api/utils/WaitingSource.java | 6 +- .../api/utils/WaitingWindowAssigner.java | 72 ++++++++++++ .../DefaultCheckpointPlanCalculator.java | 2 +- .../CheckpointCoordinatorTestingUtils.java | 2 +- .../DefaultCheckpointPlanCalculatorTest.java | 51 +++++++-- 12 files changed, 377 insertions(+), 93 deletions(-) create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/OperatorLatch.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingFunction.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingWindowAssigner.java diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java index 6d0f14ba324e7..eb6a09e68bd61 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java @@ -20,6 +20,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -37,6 +40,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -48,6 +52,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.Serializable; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -142,7 +147,7 @@ private JobGraph createStreamingFileSinkJobGraph(String outputPath) { env.addSource(new StatefulSource(true, latchId)) .uid(SOURCE_UID) .setParallelism(NUM_SOURCES) - .addSink(sink) + .addSink(new WaitingRunningSink<>(latchId, sink)) .setParallelism(NUM_SINKS) .uid(SINK_UID); return env.getStreamGraph().getJobGraph(); @@ -200,6 +205,68 @@ private void loadSavepointAndExecute( } } + private static class WaitingRunningSink + implements RichFunction, + Serializable, + SinkFunction, + CheckpointedFunction, + CheckpointListener { + private final String latchId; + private final StreamingFileSink streamingFileSink; + + /** + * Creates a new {@code StreamingFileSink} that writes files to the given base directory + * with the give buckets properties. + */ + protected WaitingRunningSink(String latchId, StreamingFileSink streamingFileSink) { + this.latchId = latchId; + this.streamingFileSink = streamingFileSink; + } + + public void setRuntimeContext(RuntimeContext t) { + streamingFileSink.setRuntimeContext(t); + } + + public RuntimeContext getRuntimeContext() { + return streamingFileSink.getRuntimeContext(); + } + + public IterationRuntimeContext getIterationRuntimeContext() { + return streamingFileSink.getIterationRuntimeContext(); + } + + public void open(Configuration parameters) throws Exception { + streamingFileSink.open(parameters); + } + + public void close() throws Exception { + streamingFileSink.close(); + } + + public void initializeState(FunctionInitializationContext context) throws Exception { + streamingFileSink.initializeState(context); + } + + public void notifyCheckpointComplete(long checkpointId) throws Exception { + streamingFileSink.notifyCheckpointComplete(checkpointId); + } + + public void notifyCheckpointAborted(long checkpointId) { + streamingFileSink.notifyCheckpointAborted(checkpointId); + } + + public void snapshotState(FunctionSnapshotContext context) throws Exception { + streamingFileSink.snapshotState(context); + } + + @Override + public void invoke(T value, Context context) throws Exception { + SAVEPOINT_LATCH_MAP.get(latchId).countDown(); + + streamingFileSink.invoke(value, context); + } + } + private static class StatefulSource extends RichParallelSourceFunction implements CheckpointedFunction, CheckpointListener { @@ -237,10 +304,6 @@ public void initializeState(FunctionInitializationContext context) throws Except public void run(SourceContext ctx) throws Exception { if (takingSavepointMode) { sendRecordsUntil(NUM_RECORDS / 3, 0, ctx); - - CountDownLatch latch = SAVEPOINT_LATCH_MAP.get(latchId); - latch.countDown(); - sendRecordsUntil(NUM_RECORDS / 2, 100, ctx); while (true) { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java index ecaa3c3cff4f3..28e795f5c4ece 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java @@ -32,6 +32,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.state.api.utils.OperatorLatch; +import org.apache.flink.state.api.utils.WaitingFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -88,14 +90,15 @@ public void testOperatorStateInputFormat() throws Exception { DataStream data = streamEnv.addSource(new SavepointSource()).rebalance(); + StatefulOperator statefulOperator = new StatefulOperator(list, union, broadcast); data.connect(data.broadcast(broadcast)) - .process(new StatefulOperator(list, union, broadcast)) + .process(statefulOperator) .uid(UID) .addSink(new DiscardingSink<>()); JobGraph jobGraph = streamEnv.getStreamGraph().getJobGraph(); - String savepoint = takeSavepoint(jobGraph); + String savepoint = takeSavepoint(jobGraph, statefulOperator); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); @@ -165,7 +168,8 @@ private void verifyBroadcastState(String path, ExecutionEnvironment batchEnv) th broadcastStateValues); } - private String takeSavepoint(JobGraph jobGraph) throws Exception { + private String takeSavepoint(JobGraph jobGraph, WaitingFunction waitingFunction) + throws Exception { SavepointSource.initializeForTest(); ClusterClient client = miniClusterResource.getClusterClient(); @@ -178,6 +182,7 @@ private String takeSavepoint(JobGraph jobGraph) throws Exception { try { JobID jobID = client.submitJob(jobGraph).get(); + waitingFunction.await(); boolean finished = false; while (deadline.hasTimeLeft()) { if (SavepointSource.isFinished()) { @@ -249,8 +254,8 @@ private static List getElements() { } private static class StatefulOperator extends BroadcastProcessFunction - implements CheckpointedFunction { - + implements CheckpointedFunction, WaitingFunction { + private final OperatorLatch startLatch = new OperatorLatch(); private final ListStateDescriptor list; private final ListStateDescriptor union; private final MapStateDescriptor broadcast; @@ -278,6 +283,8 @@ public void open(Configuration parameters) { @Override public void processElement(Integer value, ReadOnlyContext ctx, Collector out) { + startLatch.trigger(); + elements.add(value); } @@ -304,5 +311,10 @@ public void initializeState(FunctionInitializationContext context) throws Except unionState = context.getOperatorStateStore().getUnionListState(union); } + + @Override + public void await() throws RuntimeException { + startLatch.await(); + } } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java index 99576edb307d8..d66885427de55 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java @@ -25,7 +25,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.state.api.functions.KeyedStateReaderFunction; +import org.apache.flink.state.api.utils.OperatorLatch; import org.apache.flink.state.api.utils.SavepointTestBase; +import org.apache.flink.state.api.utils.WaitingFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -58,17 +60,17 @@ public abstract class SavepointReaderKeyedStateITCase public void testUserKeyedStateReader() throws Exception { String savepointPath = takeSavepoint( - elements, - source -> { + new KeyedStatefulOperator(), + process -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(elements)) .rebalance() .keyBy(id -> id.key) - .process(new KeyedStatefulOperator()) + .process(process) .uid(uid) .addSink(new DiscardingSink<>()); @@ -86,8 +88,9 @@ public void testUserKeyedStateReader() throws Exception { "Unexpected results from keyed state", expected, new HashSet<>(results)); } - private static class KeyedStatefulOperator extends KeyedProcessFunction { - + private static class KeyedStatefulOperator extends KeyedProcessFunction + implements WaitingFunction { + private final OperatorLatch startLatch = new OperatorLatch(); private transient ValueState state; @Override @@ -97,12 +100,19 @@ public void open(Configuration parameters) { @Override public void processElement(Pojo value, Context ctx, Collector out) throws Exception { + startLatch.trigger(); + state.update(value.state); value.eventTimeTimer.forEach(timer -> ctx.timerService().registerEventTimeTimer(timer)); value.processingTimeTimer.forEach( timer -> ctx.timerService().registerProcessingTimeTimer(timer)); } + + @Override + public void await() throws RuntimeException { + startLatch.await(); + } } private static class Reader extends KeyedStateReaderFunction { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java index 79e9f6d7d9f75..f8927629d1e36 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java @@ -30,13 +30,17 @@ import org.apache.flink.state.api.utils.AggregateSum; import org.apache.flink.state.api.utils.ReduceSum; import org.apache.flink.state.api.utils.SavepointTestBase; +import org.apache.flink.state.api.utils.WaitingWindowAssigner; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -62,20 +66,21 @@ public abstract class SavepointWindowReaderITCase public void testReduceWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + windowAssigner -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.noWatermarks() .withTimestampAssigner((event, timestamp) -> 0)) .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(windowAssigner) .reduce(new ReduceSum()) .uid(uid) .addSink(new DiscardingSink<>()); @@ -102,20 +107,21 @@ public void testReduceWindowStateReader() throws Exception { public void testReduceEvictorWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + windowAssigner -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.noWatermarks() .withTimestampAssigner((event, timestamp) -> 0)) .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(windowAssigner) .evictor(new NoOpEvictor<>()) .reduce(new ReduceSum()) .uid(uid) @@ -144,20 +150,21 @@ public void testReduceEvictorWindowStateReader() throws Exception { public void testAggregateWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + windowAssigner -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.noWatermarks() .withTimestampAssigner((event, timestamp) -> 0)) .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(windowAssigner) .aggregate(new AggregateSum()) .uid(uid) .addSink(new DiscardingSink<>()); @@ -184,20 +191,21 @@ public void testAggregateWindowStateReader() throws Exception { public void testAggregateEvictorWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + windowAssigner -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.noWatermarks() .withTimestampAssigner((event, timestamp) -> 0)) .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(windowAssigner) .evictor(new NoOpEvictor<>()) .aggregate(new AggregateSum()) .uid(uid) @@ -226,20 +234,21 @@ public void testAggregateEvictorWindowStateReader() throws Exception { public void testProcessWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + windowAssigner -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.noWatermarks() .withTimestampAssigner((event, timestamp) -> 0)) .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(windowAssigner) .process(new NoOpProcessWindowFunction()) .uid(uid) .addSink(new DiscardingSink<>()); @@ -266,20 +275,21 @@ public void testProcessWindowStateReader() throws Exception { public void testProcessEvictorWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + windowAssigner -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.noWatermarks() .withTimestampAssigner((event, timestamp) -> 0)) .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(windowAssigner) .evictor(new NoOpEvictor<>()) .process(new NoOpProcessWindowFunction()) .uid(uid) @@ -308,20 +318,21 @@ public void testProcessEvictorWindowStateReader() throws Exception { public void testApplyWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + windowAssigner -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .assignTimestampsAndWatermarks( WatermarkStrategy.noWatermarks() .withTimestampAssigner((event, timestamp) -> 0)) .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(windowAssigner) .apply(new NoOpWindowFunction()) .uid(uid) .addSink(new DiscardingSink<>()); @@ -348,24 +359,30 @@ public void testApplyWindowStateReader() throws Exception { public void testApplyEvictorWindowStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, - source -> { + WaitingWindowAssigner.wrap( + TumblingEventTimeWindows.of(Time.milliseconds(10))), + (windowAssigner) -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) - .evictor(new NoOpEvictor<>()) - .apply(new NoOpWindowFunction()) - .uid(uid) - .addSink(new DiscardingSink<>()); + try { + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner( + (event, timestamp) -> 0)) + .keyBy(id -> id) + .window(windowAssigner) + .evictor(new NoOpEvictor<>()) + .apply(new NoOpWindowFunction()) + .uid(uid) + .addSink(new DiscardingSink<>()); + } catch (Exception e) { + e.printStackTrace(); + } return env; }); @@ -375,7 +392,7 @@ public void testApplyEvictorWindowStateReader() throws Exception { List results = savepoint - .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .window(TumblingEventTimeWindows.of(Time.milliseconds(1))) .evictor() .process(uid, new BasicReaderFunction(), Types.INT, Types.INT, Types.INT) .collect(); @@ -390,17 +407,18 @@ public void testApplyEvictorWindowStateReader() throws Exception { public void testWindowTriggerStateReader() throws Exception { String savepointPath = takeSavepoint( - numbers, + WaitingWindowAssigner.wrap(GlobalWindows.create()), source -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(getStateBackend()); env.setParallelism(4); - env.addSource(source) + env.addSource(createSource(numbers)) .rebalance() .keyBy(id -> id) - .countWindow(10) + .window(source) + .trigger(PurgingTrigger.of(CountTrigger.of(10))) .reduce(new ReduceSum()) .uid(uid) .addSink(new DiscardingSink<>()); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/OperatorLatch.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/OperatorLatch.java new file mode 100644 index 0000000000000..8da28436046ef --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/OperatorLatch.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.utils; + +import org.apache.flink.core.testutils.OneShotLatch; + +import java.io.Serializable; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * {@link org.apache.flink.core.testutils.OneShotLatch} which able to survive + * serialization/deserialization. + */ +public class OperatorLatch implements Serializable { + private static final Map guards = new ConcurrentHashMap<>(); + + private final String guardId = UUID.randomUUID().toString(); + + public OperatorLatch() { + guards.put(guardId, new OneShotLatch()); + } + + public void trigger() { + guards.get(guardId).trigger(); + } + + public void await() throws RuntimeException { + try { + guards.get(guardId).await(); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to await operator"); + } + } +} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java index caf2a06781545..30291a252f7ad 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java @@ -40,23 +40,15 @@ /** A test base that includes utilities for taking a savepoint. */ public abstract class SavepointTestBase extends AbstractTestBase { - public String takeSavepoint( - T[] data, Function, StreamExecutionEnvironment> jobGraphFactory) - throws Exception { - return takeSavepoint(Arrays.asList(data), jobGraphFactory); - } - - public String takeSavepoint( - Collection data, - Function, StreamExecutionEnvironment> jobGraphFactory) - throws Exception { + public String takeSavepoint( + T waitingFunction, Function jobGraphFactory) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableClosureCleaner(); - WaitingSource waitingSource = createSource(data); + StreamExecutionEnvironment executionEnvironment = jobGraphFactory.apply(waitingFunction); + JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph(); - JobGraph jobGraph = jobGraphFactory.apply(waitingSource).getStreamGraph().getJobGraph(); JobID jobId = jobGraph.getJobID(); ClusterClient client = miniClusterResource.getClusterClient(); @@ -64,7 +56,7 @@ public String takeSavepoint( try { JobID jobID = client.submitJob(jobGraph).get(); - return CompletableFuture.runAsync(waitingSource::awaitSource) + return CompletableFuture.runAsync(waitingFunction::await) .thenCompose(ignore -> triggerSavepoint(client, jobID)) .get(5, TimeUnit.MINUTES); } catch (Exception e) { @@ -74,16 +66,25 @@ public String takeSavepoint( } } - private WaitingSource createSource(Collection data) throws Exception { + public WaitingSource createSource(T[] data) { + return createSource(Arrays.asList(data)); + } + + public WaitingSource createSource(Collection data) { T first = data.iterator().next(); if (first == null) { throw new IllegalArgumentException("Collection must not contain null elements"); } TypeInformation typeInfo = TypeExtractor.getForObject(first); - SourceFunction inner = - new FromElementsFunction<>(typeInfo.createSerializer(new ExecutionConfig()), data); - return new WaitingSource<>(inner, typeInfo); + try { + SourceFunction inner = + new FromElementsFunction<>( + typeInfo.createSerializer(new ExecutionConfig()), data); + return new WaitingSource<>(inner, typeInfo); + } catch (IOException e) { + throw new RuntimeException(e); + } } private CompletableFuture triggerSavepoint(ClusterClient client, JobID jobID) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingFunction.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingFunction.java new file mode 100644 index 0000000000000..980924d7e0d0d --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingFunction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.utils; + +/** Interface which allow to await of certain action inside of function. */ +public interface WaitingFunction { + /** This method blocks until the function will be called. */ + void await() throws RuntimeException; +} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java index b11c342a8aa1d..c8dc51a679588 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java @@ -38,7 +38,8 @@ * @param The output type of the inner source. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") -public class WaitingSource extends RichSourceFunction implements ResultTypeQueryable { +public class WaitingSource extends RichSourceFunction + implements ResultTypeQueryable, WaitingFunction { private static final Map guards = new HashMap<>(); @@ -105,7 +106,8 @@ public void cancel() { } /** This method blocks until the inner source has completed. */ - public void awaitSource() throws RuntimeException { + @Override + public void await() throws RuntimeException { try { guards.get(guardId).await(); } catch (InterruptedException e) { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingWindowAssigner.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingWindowAssigner.java new file mode 100644 index 0000000000000..a6df050d7b267 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingWindowAssigner.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.utils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.Collection; + +/** A wrapper class that allows threads to block until the inner window assigner starts. */ +public class WaitingWindowAssigner extends WindowAssigner + implements WaitingFunction { + private final OperatorLatch startLatch = new OperatorLatch(); + + private final WindowAssigner delegate; + + private WaitingWindowAssigner(WindowAssigner delegate) { + this.delegate = delegate; + } + + public static WaitingWindowAssigner wrap( + WindowAssigner delegate) { + return new WaitingWindowAssigner<>(delegate); + } + + @Override + public Collection assignWindows( + Object element, long timestamp, WindowAssignerContext context) { + startLatch.trigger(); + return delegate.assignWindows(element, timestamp, context); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return delegate.getDefaultTrigger(env); + } + + @Override + public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) { + return delegate.getWindowSerializer(executionConfig); + } + + @Override + public boolean isEventTime() { + return delegate.isEventTime(); + } + + /** This method blocks until the inner window assigner has started. */ + public void await() throws RuntimeException { + startLatch.await(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java index c81a5dde859a0..8045d0e41f743 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java @@ -111,7 +111,7 @@ public CompletableFuture calculateCheckpointPlan() { ? calculateAfterTasksFinished() : calculateWithAllTasksRunning(); - checkTasksStarted(result.getTasksToTrigger()); + checkTasksStarted(result.getTasksToWaitFor()); return result; } catch (Throwable throwable) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 479717d163286..83403f659e139 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -805,7 +805,7 @@ public String deserialize(int version, byte[] serialized) throws IOException { // ----------------- Mock class builders --------------- - public static final class MockOperatorCheckpointCoordinatorContextBuilder { + static final class MockOperatorCheckpointCoordinatorContextBuilder { private BiConsumer> onCallingCheckpointCoordinator = null; private Consumer onCallingAfterSourceBarrierInjection = null; private OperatorID operatorID = null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java index 492988f87b287..d9a9b34bc0b8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; @@ -48,6 +49,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.EnumSet.complementOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; @@ -149,18 +151,34 @@ public void testComputeWithMultipleLevels() throws Exception { } @Test - public void testWithTriggeredTasksNotRunning() throws Exception { - for (ExecutionState state : EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) { - JobVertexID jobVertexID = new JobVertexID(); + public void testPlanCalculationWhenOneTaskNotRunning() throws Exception { + // when: All combinations of Source/Not Source for one RUNNING and one NOT RUNNING tasks. + runWithNotRunningTask(true, true); + runWithNotRunningTask(true, false); + runWithNotRunningTask(false, false); + runWithNotRunningTask(false, true); + + // then: The plan failed because one task didn't have RUNNING state. + } + + private void runWithNotRunningTask( + boolean isRunningVertexSource, boolean isNotRunningVertexSource) throws Exception { + for (ExecutionState notRunningState : complementOf(EnumSet.of(ExecutionState.RUNNING))) { + JobVertexID runningVertex = new JobVertexID(); + JobVertexID notRunningVertex = new JobVertexID(); + ExecutionGraph graph = - new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() - .addJobVertex(jobVertexID) + new CheckpointExecutionGraphBuilder() + .addJobVertex(runningVertex, isRunningVertexSource) + .addJobVertex(notRunningVertex, isNotRunningVertexSource) .setTransitToRunning(false) .build(); - graph.getJobVertex(jobVertexID) - .getTaskVertices()[0] - .getCurrentExecutionAttempt() - .transitionState(state); + + // The first vertex is always RUNNING. + transitVertexToState(graph, runningVertex, ExecutionState.RUNNING); + // The second vertex is everything except RUNNING. + transitVertexToState(graph, notRunningVertex, notRunningState); + DefaultCheckpointPlanCalculator checkpointPlanCalculator = createCheckpointPlanCalculator(graph); @@ -168,7 +186,7 @@ public void testWithTriggeredTasksNotRunning() throws Exception { checkpointPlanCalculator.calculateCheckpointPlan().get(); fail( "The computation should fail since some tasks to trigger are in " - + state + + notRunningState + " state"); } catch (ExecutionException e) { Throwable cause = e.getCause(); @@ -180,6 +198,16 @@ public void testWithTriggeredTasksNotRunning() throws Exception { } } + private void transitVertexToState( + ExecutionGraph graph, JobVertexID jobVertexID, ExecutionState state) { + Arrays.stream(graph.getJobVertex(jobVertexID).getTaskVertices()) + .filter(vertex -> vertex.getJobvertexId().equals(jobVertexID)) + .findFirst() + .get() + .getCurrentExecutionAttempt() + .transitionState(state); + } + // ------------------------- Utility methods --------------------------------------- private void runSingleTest( @@ -239,7 +267,8 @@ private void runSingleTest( chooseTasks( graph, expectedToTriggerTaskDeclarations.toArray(new TaskDeclaration[0])); - // Tests computing checkpoint plan + // Tests computing checkpoint plan(isUnalignedCheckpoint flag doesn't influence on result + // because all tasks are in RUNNING state here). CheckpointPlan checkpointPlan = planCalculator.calculateCheckpointPlan().get(); checkCheckpointPlan( expectedToTriggerTasks,