Skip to content

Commit

Permalink
[FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhuw authored and rkhachatryan committed Jan 29, 2021
1 parent 38ba71f commit 4b1cacb
Showing 1 changed file with 85 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
Expand All @@ -52,6 +53,11 @@
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
Expand Down Expand Up @@ -364,6 +370,85 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
}
}

static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, BoundedOneInput {
static volatile CountDownLatch progressLatch;
static volatile boolean inputEnded;

private transient boolean processed;

BoundedPassThroughOperator() {
chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void endInput() throws Exception {
inputEnded = true;
}

@Override
public void processElement(StreamRecord<T> element) throws Exception {
output.collect(element);
if (!processed) {
processed = true;
progressLatch.countDown();
}
Thread.sleep(1000);
}

// --------------------------------------------------------------------

static CountDownLatch getProgressLatch() {
return progressLatch;
}

static void resetForTest(int parallelism) {
progressLatch = new CountDownLatch(parallelism);
inputEnded = false;
}
}

@Test
public void testStopSavepointWithBoundedInput() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;

final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

BoundedPassThroughOperator<Integer> operator = new BoundedPassThroughOperator<>();
DataStream<Integer> stream =
env.addSource(new InfiniteTestSource())
.transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);

stream.addSink(new DiscardingSink<>());

final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();

MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();

try {
BoundedPassThroughOperator.resetForTest(1);

client.submitJob(jobGraph).get();

BoundedPassThroughOperator.getProgressLatch().await();

client.stopWithSavepoint(jobId, false, null).get();

Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
} finally {
cluster.after();
}
}

@Test
public void testSubmitWithUnknownSavepointPath() throws Exception {
// Config
Expand Down

0 comments on commit 4b1cacb

Please sign in to comment.