From ab0e1ba2f7c61ec4b378a7514043c63304f553c3 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 28 Oct 2016 14:09:42 +0200 Subject: [PATCH] [FLINK-4956] Enhance StreamTaskTestHarness with easy snapshot/restore methods --- .../state/RocksDBAsyncSnapshotTest.java | 8 +-- .../runtime/tasks/OneInputStreamTaskTest.java | 54 +++---------------- .../runtime/tasks/StreamMockEnvironment.java | 31 ++++++++++- .../runtime/tasks/StreamTaskTestHarness.java | 33 +++++++++++- .../flink/core/testutils/OneShotLatch.java | 9 ++++ 5 files changed, 82 insertions(+), 53 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 98d46bbf52bad..dd709a1702e32 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; @@ -73,6 +74,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; /** @@ -141,8 +143,6 @@ public void acknowledgeCheckpoint( CheckpointMetaData checkpointMetaData, SubtaskState checkpointStateHandles) { - super.acknowledgeCheckpoint(checkpointMetaData); - // block on the latch, to verify that triggerCheckpoint returns below, // even though the async checkpoint would not finish try { @@ -152,7 +152,9 @@ public void acknowledgeCheckpoint( } // should be one k/v state - assertNotNull(checkpointStateHandles.getManagedKeyedState()); + KeyGroupsStateHandle keyedState = checkpointStateHandles.getManagedKeyedState(); + assertNotNull(keyedState); + assertEquals(1, keyedState.getNumberOfKeyGroups()); // we now know that the checkpoint went through ensureCheckpointLatch.trigger(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index d31990a93cce4..dac7e8479c435 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ListState; @@ -26,10 +25,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -37,7 +36,6 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; @@ -356,7 +354,7 @@ public void testSnapshottingAndRestoring() throws Exception { configureChainedTestingStreamOperator(streamConfig, numberChainedTasks, seed, recoveryTimestamp); - AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment( + StreamMockEnvironment env = new StreamMockEnvironment( testHarness.jobConfig, testHarness.taskConfig, testHarness.executionConfig, @@ -372,22 +370,20 @@ public void testSnapshottingAndRestoring() throws Exception { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp); - while(!streamTask.triggerCheckpoint(checkpointMetaData)); + Tuple2 checkpoint = testHarness.performCheckpoint(checkpointMetaData); // since no state was set, there shouldn't be restore calls assertEquals(0, TestingStreamOperator.numberRestoreCalls); - env.getCheckpointLatch().await(); - - assertEquals(checkpointId, env.getCheckpointId()); + assertEquals(checkpointId, checkpoint.f0.getCheckpointId()); testHarness.endInput(); testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis()); final OneInputStreamTask restoredTask = new OneInputStreamTask(); - restoredTask.setInitialState(new TaskStateHandles(env.getCheckpointStateHandles())); - final OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness(restoredTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + final OneInputStreamTaskTestHarness restoredTaskHarness = + new OneInputStreamTaskTestHarness(restoredTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO); StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig(); @@ -396,6 +392,7 @@ public void testSnapshottingAndRestoring() throws Exception { TestingStreamOperator.numberRestoreCalls = 0; + restoredTaskHarness.initializeState(checkpoint.f1); restoredTaskHarness.invoke(); restoredTaskHarness.endInput(); restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis()); @@ -475,43 +472,6 @@ public IN getKey(IN value) throws Exception { } } - private static class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment { - private volatile long checkpointId; - private volatile SubtaskState checkpointStateHandles; - - private final OneShotLatch checkpointLatch = new OneShotLatch(); - - public long getCheckpointId() { - return checkpointId; - } - - AcknowledgeStreamMockEnvironment( - Configuration jobConfig, Configuration taskConfig, - ExecutionConfig executionConfig, long memorySize, - MockInputSplitProvider inputSplitProvider, int bufferSize) { - super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize); - } - - - @Override - public void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, - SubtaskState checkpointStateHandles) { - - this.checkpointId = checkpointMetaData.getCheckpointId(); - this.checkpointStateHandles = checkpointStateHandles; - checkpointLatch.trigger(); - } - - public OneShotLatch getCheckpointLatch() { - return checkpointLatch; - } - - public SubtaskState getCheckpointStateHandles() { - return checkpointStateHandles; - } - } - private static class TestingStreamOperator extends AbstractStreamOperator implements OneInputStreamOperator, StreamCheckpointedOperator { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 2376a6046e70b..885ad5091324b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -22,9 +22,11 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -101,6 +103,10 @@ public class StreamMockEnvironment implements Environment { private volatile boolean wasFailedExternally = false; + private final OneShotLatch checkpointLatch; + private Tuple2 checkpointStates; + + public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { this.taskInfo = new TaskInfo( @@ -124,6 +130,8 @@ public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, KvStateRegistry registry = new KvStateRegistry(); this.kvStateRegistry = registry.createTaskRegistry(jobID, getJobVertexId()); + this.checkpointLatch = new OneShotLatch(); + this.checkpointLatch.trigger(); // initial state : triggered = true } public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, @@ -314,11 +322,20 @@ public TaskKvStateRegistry getTaskKvStateRegistry() { @Override public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) { + acknowledgeCheckpoint(checkpointMetaData, null); } @Override - public void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) { + public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) { + + if (checkpointLatch.isTriggered()) { + throw new IllegalStateException("Unexpected acknowledged checkpoint on unprepared environment: " + checkpointMetaData); + } + + this.checkpointStates = new Tuple2<>(checkpointMetaData, subtaskState); + + checkpointLatch.trigger(); + } @Override @@ -339,5 +356,15 @@ public TaskManagerRuntimeInfo getTaskManagerInfo() { public TaskMetricGroup getMetricGroup() { return new UnregisteredTaskMetricsGroup(); } + + public void prepareForCheckpoint() { + checkpointLatch.reset(); + checkpointStates = null; + } + + public Tuple2 receiveCheckpointResult() throws InterruptedException { + checkpointLatch.await(); + return checkpointStates; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index ab7bf69f35c7b..6308ecaaa8bc1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -21,12 +21,17 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -35,8 +40,8 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.junit.Assert; import java.io.IOException; @@ -393,5 +398,31 @@ public Throwable getError() { return error; } } + + public Tuple2 performCheckpoint( + CheckpointMetaData checkpointMetaData) throws Exception { + + if (task instanceof StatefulTask) { + StatefulTask statefulTask = (StatefulTask) task; + mockEnv.prepareForCheckpoint(); + while (!statefulTask.triggerCheckpoint(checkpointMetaData)) { + Thread.sleep(5); + } + return mockEnv.receiveCheckpointResult(); + } else { + throw new IllegalStateException("Can not set state for a task that is not stateful!"); + } + } + + public void initializeState(SubtaskState restoreState) throws Exception { + + if (task instanceof StatefulTask) { + TaskStateHandles taskStateHandles = new TaskStateHandles(restoreState); + StatefulTask statefulTask = (StatefulTask) task; + statefulTask.setInitialState(taskStateHandles); + } else { + throw new IllegalStateException("Can not set state for a task that is not stateful!"); + } + } } diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java index 0418bf53b6119..b3d86e5855fb6 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java @@ -107,4 +107,13 @@ public void await(long timeout, TimeUnit timeUnit) throws InterruptedException, public boolean isTriggered() { return triggered; } + + /** + * resets the latch to triggered = false + */ + public void reset() { + synchronized (lock) { + triggered = false; + } + } }