diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java index 28e795f5c4ece..e3fd00da7cfdf 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java @@ -32,8 +32,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.state.api.utils.OperatorLatch; -import org.apache.flink.state.api.utils.WaitingFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -57,6 +55,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.flink.state.api.utils.SavepointTestBase.waitForAllRunningOrSomeTerminal; + /** IT case for reading state. */ public abstract class SavepointReaderITTestBase extends AbstractTestBase { static final String UID = "stateful-operator"; @@ -98,7 +98,7 @@ public void testOperatorStateInputFormat() throws Exception { JobGraph jobGraph = streamEnv.getStreamGraph().getJobGraph(); - String savepoint = takeSavepoint(jobGraph, statefulOperator); + String savepoint = takeSavepoint(jobGraph); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); @@ -168,8 +168,7 @@ private void verifyBroadcastState(String path, ExecutionEnvironment batchEnv) th broadcastStateValues); } - private String takeSavepoint(JobGraph jobGraph, WaitingFunction waitingFunction) - throws Exception { + private String takeSavepoint(JobGraph jobGraph) throws Exception { SavepointSource.initializeForTest(); ClusterClient client = miniClusterResource.getClusterClient(); @@ -182,7 +181,7 @@ private String takeSavepoint(JobGraph jobGraph, WaitingFunction waitingFunction) try { JobID jobID = client.submitJob(jobGraph).get(); - waitingFunction.await(); + waitForAllRunningOrSomeTerminal(jobID, miniClusterResource); boolean finished = false; while (deadline.hasTimeLeft()) { if (SavepointSource.isFinished()) { @@ -254,8 +253,7 @@ private static List getElements() { } private static class StatefulOperator extends BroadcastProcessFunction - implements CheckpointedFunction, WaitingFunction { - private final OperatorLatch startLatch = new OperatorLatch(); + implements CheckpointedFunction { private final ListStateDescriptor list; private final ListStateDescriptor union; private final MapStateDescriptor broadcast; @@ -283,8 +281,6 @@ public void open(Configuration parameters) { @Override public void processElement(Integer value, ReadOnlyContext ctx, Collector out) { - startLatch.trigger(); - elements.add(value); } @@ -311,10 +307,5 @@ public void initializeState(FunctionInitializationContext context) throws Except unionState = context.getOperatorStateStore().getUnionListState(union); } - - @Override - public void await() throws RuntimeException { - startLatch.await(); - } } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java index d66885427de55..489eb67585e58 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java @@ -25,9 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.state.api.functions.KeyedStateReaderFunction; -import org.apache.flink.state.api.utils.OperatorLatch; import org.apache.flink.state.api.utils.SavepointTestBase; -import org.apache.flink.state.api.utils.WaitingFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -58,24 +56,19 @@ public abstract class SavepointReaderKeyedStateITCase @Test public void testUserKeyedStateReader() throws Exception { - String savepointPath = - takeSavepoint( - new KeyedStatefulOperator(), - process -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(elements)) - .rebalance() - .keyBy(id -> id.key) - .process(process) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(elements)) + .returns(Pojo.class) + .rebalance() + .keyBy(id -> id.key) + .process(new KeyedStatefulOperator()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -88,9 +81,7 @@ public void testUserKeyedStateReader() throws Exception { "Unexpected results from keyed state", expected, new HashSet<>(results)); } - private static class KeyedStatefulOperator extends KeyedProcessFunction - implements WaitingFunction { - private final OperatorLatch startLatch = new OperatorLatch(); + private static class KeyedStatefulOperator extends KeyedProcessFunction { private transient ValueState state; @Override @@ -100,19 +91,12 @@ public void open(Configuration parameters) { @Override public void processElement(Pojo value, Context ctx, Collector out) throws Exception { - startLatch.trigger(); - state.update(value.state); value.eventTimeTimer.forEach(timer -> ctx.timerService().registerEventTimeTimer(timer)); value.processingTimeTimer.forEach( timer -> ctx.timerService().registerProcessingTimeTimer(timer)); } - - @Override - public void await() throws RuntimeException { - startLatch.await(); - } } private static class Reader extends KeyedStateReaderFunction { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java index f8927629d1e36..8d7a7036529d4 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWindowReaderITCase.java @@ -30,7 +30,6 @@ import org.apache.flink.state.api.utils.AggregateSum; import org.apache.flink.state.api.utils.ReduceSum; import org.apache.flink.state.api.utils.SavepointTestBase; -import org.apache.flink.state.api.utils.WaitingWindowAssigner; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; @@ -64,29 +63,22 @@ public abstract class SavepointWindowReaderITCase @Test public void testReduceWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - windowAssigner -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .reduce(new ReduceSum()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .reduce(new ReduceSum()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -105,30 +97,24 @@ public void testReduceWindowStateReader() throws Exception { @Test public void testReduceEvictorWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - windowAssigner -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .evictor(new NoOpEvictor<>()) - .reduce(new ReduceSum()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .evictor(new NoOpEvictor<>()) + .reduce(new ReduceSum()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -148,29 +134,23 @@ public void testReduceEvictorWindowStateReader() throws Exception { @Test public void testAggregateWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - windowAssigner -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .aggregate(new AggregateSum()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .aggregate(new AggregateSum()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -189,30 +169,24 @@ public void testAggregateWindowStateReader() throws Exception { @Test public void testAggregateEvictorWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - windowAssigner -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .evictor(new NoOpEvictor<>()) - .aggregate(new AggregateSum()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .evictor(new NoOpEvictor<>()) + .aggregate(new AggregateSum()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -232,29 +206,23 @@ public void testAggregateEvictorWindowStateReader() throws Exception { @Test public void testProcessWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - windowAssigner -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .process(new NoOpProcessWindowFunction()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .process(new NoOpProcessWindowFunction()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -273,30 +241,24 @@ public void testProcessWindowStateReader() throws Exception { @Test public void testProcessEvictorWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - windowAssigner -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .evictor(new NoOpEvictor<>()) - .process(new NoOpProcessWindowFunction()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .evictor(new NoOpEvictor<>()) + .process(new NoOpProcessWindowFunction()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -316,29 +278,23 @@ public void testProcessEvictorWindowStateReader() throws Exception { @Test public void testApplyWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - windowAssigner -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner((event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .apply(new NoOpWindowFunction()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .apply(new NoOpWindowFunction()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -357,35 +313,24 @@ public void testApplyWindowStateReader() throws Exception { @Test public void testApplyEvictorWindowStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap( - TumblingEventTimeWindows.of(Time.milliseconds(10))), - (windowAssigner) -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - try { - env.addSource(createSource(numbers)) - .rebalance() - .assignTimestampsAndWatermarks( - WatermarkStrategy.noWatermarks() - .withTimestampAssigner( - (event, timestamp) -> 0)) - .keyBy(id -> id) - .window(windowAssigner) - .evictor(new NoOpEvictor<>()) - .apply(new NoOpWindowFunction()) - .uid(uid) - .addSink(new DiscardingSink<>()); - } catch (Exception e) { - e.printStackTrace(); - } - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .assignTimestampsAndWatermarks( + WatermarkStrategy.noWatermarks() + .withTimestampAssigner((event, timestamp) -> 0)) + .keyBy(id -> id) + .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .evictor(new NoOpEvictor<>()) + .apply(new NoOpWindowFunction()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); @@ -405,26 +350,21 @@ public void testApplyEvictorWindowStateReader() throws Exception { @Test public void testWindowTriggerStateReader() throws Exception { - String savepointPath = - takeSavepoint( - WaitingWindowAssigner.wrap(GlobalWindows.create()), - source -> { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(getStateBackend()); - env.setParallelism(4); - - env.addSource(createSource(numbers)) - .rebalance() - .keyBy(id -> id) - .window(source) - .trigger(PurgingTrigger.of(CountTrigger.of(10))) - .reduce(new ReduceSum()) - .uid(uid) - .addSink(new DiscardingSink<>()); - - return env; - }); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(getStateBackend()); + env.setParallelism(4); + + env.addSource(createSource(numbers)) + .rebalance() + .keyBy(id -> id) + .window(GlobalWindows.create()) + .trigger(PurgingTrigger.of(CountTrigger.of(10))) + .reduce(new ReduceSum()) + .uid(uid) + .addSink(new DiscardingSink<>()); + + String savepointPath = takeSavepoint(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(batchEnv, savepointPath, getStateBackend()); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java index 64808ec505101..f39f7275ae8a0 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/SavepointTestBase.java @@ -23,33 +23,34 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.AbstractID; -import org.junit.Ignore; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; /** A test base that includes utilities for taking a savepoint. */ -@Ignore public abstract class SavepointTestBase extends AbstractTestBase { - public String takeSavepoint( - T waitingFunction, Function jobGraphFactory) { - + public String takeSavepoint(StreamExecutionEnvironment executionEnvironment) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableClosureCleaner(); - StreamExecutionEnvironment executionEnvironment = jobGraphFactory.apply(waitingFunction); JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph(); JobID jobId = jobGraph.getJobID(); @@ -59,9 +60,9 @@ public String takeSavepoint( try { JobID jobID = client.submitJob(jobGraph).get(); - return CompletableFuture.runAsync(waitingFunction::await) - .thenCompose(ignore -> triggerSavepoint(client, jobID)) - .get(5, TimeUnit.MINUTES); + waitForAllRunningOrSomeTerminal(jobID, miniClusterResource); + + return triggerSavepoint(client, jobID).get(5, TimeUnit.MINUTES); } catch (Exception e) { throw new RuntimeException("Failed to take savepoint", e); } finally { @@ -69,11 +70,29 @@ public String takeSavepoint( } } - public WaitingSource createSource(T[] data) { + public static void waitForAllRunningOrSomeTerminal( + JobID jobID, MiniClusterWithClientResource miniClusterResource) throws Exception { + while (true) { + JobDetailsInfo jobInfo = + miniClusterResource.getRestClusterClient().getJobDetails(jobID).get(); + Set vertexStates = + jobInfo.getJobVertexInfos().stream() + .map(JobDetailsInfo.JobVertexDetailsInfo::getExecutionState) + .collect(Collectors.toSet()); + if (vertexStates.equals(EnumSet.of(RUNNING)) + || vertexStates.stream().anyMatch(ExecutionState::isTerminal)) { + return; + } else { + Thread.sleep(500); + } + } + } + + public SourceFunction createSource(T[] data) { return createSource(Arrays.asList(data)); } - public WaitingSource createSource(Collection data) { + public SourceFunction createSource(Collection data) { T first = data.iterator().next(); if (first == null) { throw new IllegalArgumentException("Collection must not contain null elements"); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingFunction.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingFunction.java deleted file mode 100644 index 980924d7e0d0d..0000000000000 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingFunction.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.state.api.utils; - -/** Interface which allow to await of certain action inside of function. */ -public interface WaitingFunction { - /** This method blocks until the function will be called. */ - void await() throws RuntimeException; -} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java index c8dc51a679588..92145b77ded09 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingSource.java @@ -22,41 +22,27 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - /** - * A wrapper class that allows threads to block until the inner source completes. It's run method - * does not return until explicitly canceled so external processes can perform operations such as - * taking savepoints. + * A wrapper class that does not return until explicitly canceled so external processes can perform + * operations such as taking savepoints. * * @param The output type of the inner source. */ -@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") -public class WaitingSource extends RichSourceFunction - implements ResultTypeQueryable, WaitingFunction { - - private static final Map guards = new HashMap<>(); +public class WaitingSource extends RichSourceFunction implements ResultTypeQueryable { private final SourceFunction source; private final TypeInformation returnType; - private final String guardId; - private volatile boolean running; public WaitingSource(SourceFunction source, TypeInformation returnType) { this.source = source; this.returnType = returnType; - this.guardId = UUID.randomUUID().toString(); - guards.put(guardId, new OneShotLatch()); this.running = true; } @@ -83,12 +69,7 @@ public void close() throws Exception { @Override public void run(SourceContext ctx) throws Exception { - OneShotLatch latch = guards.get(guardId); - try { - source.run(ctx); - } finally { - latch.trigger(); - } + source.run(ctx); while (running) { try { @@ -105,16 +86,6 @@ public void cancel() { running = false; } - /** This method blocks until the inner source has completed. */ - @Override - public void await() throws RuntimeException { - try { - guards.get(guardId).await(); - } catch (InterruptedException e) { - throw new RuntimeException("Failed to initialize source"); - } - } - @Override public TypeInformation getProducedType() { return returnType; diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingWindowAssigner.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingWindowAssigner.java deleted file mode 100644 index a6df050d7b267..0000000000000 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/utils/WaitingWindowAssigner.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.state.api.utils; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; -import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.windows.Window; - -import java.util.Collection; - -/** A wrapper class that allows threads to block until the inner window assigner starts. */ -public class WaitingWindowAssigner extends WindowAssigner - implements WaitingFunction { - private final OperatorLatch startLatch = new OperatorLatch(); - - private final WindowAssigner delegate; - - private WaitingWindowAssigner(WindowAssigner delegate) { - this.delegate = delegate; - } - - public static WaitingWindowAssigner wrap( - WindowAssigner delegate) { - return new WaitingWindowAssigner<>(delegate); - } - - @Override - public Collection assignWindows( - Object element, long timestamp, WindowAssignerContext context) { - startLatch.trigger(); - return delegate.assignWindows(element, timestamp, context); - } - - @Override - public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { - return delegate.getDefaultTrigger(env); - } - - @Override - public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) { - return delegate.getWindowSerializer(executionConfig); - } - - @Override - public boolean isEventTime() { - return delegate.isEventTime(); - } - - /** This method blocks until the inner window assigner has started. */ - public void await() throws RuntimeException { - startLatch.await(); - } -}