Skip to content

Commit

Permalink
[FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test wit…
Browse files Browse the repository at this point in the history
…h chaining strategy
  • Loading branch information
rkhachatryan committed Jan 29, 2021
1 parent 4b1cacb commit 5edf84e
Showing 1 changed file with 32 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>

private transient boolean processed;

BoundedPassThroughOperator() {
chainingStrategy = ChainingStrategy.ALWAYS;
BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
this.chainingStrategy = chainingStrategy;
}

@Override
Expand All @@ -393,7 +393,6 @@ public void processElement(StreamRecord<T> element) throws Exception {
processed = true;
progressLatch.countDown();
}
Thread.sleep(1000);
}

// --------------------------------------------------------------------
Expand All @@ -413,39 +412,46 @@ public void testStopSavepointWithBoundedInput() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;

final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig());
for (ChainingStrategy chainingStrategy : ChainingStrategy.values()) {
final MiniClusterResourceFactory clusterFactory =
new MiniClusterResourceFactory(
numTaskManagers,
numSlotsPerTaskManager,
getFileBasedCheckpointsConfig());

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

BoundedPassThroughOperator<Integer> operator = new BoundedPassThroughOperator<>();
DataStream<Integer> stream =
env.addSource(new InfiniteTestSource())
.transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
BoundedPassThroughOperator<Integer> operator =
new BoundedPassThroughOperator<>(chainingStrategy);
DataStream<Integer> stream =
env.addSource(new InfiniteTestSource())
.transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);

stream.addSink(new DiscardingSink<>());
stream.addSink(new DiscardingSink<>());

final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = jobGraph.getJobID();

MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();
MiniClusterWithClientResource cluster = clusterFactory.get();
cluster.before();
ClusterClient<?> client = cluster.getClusterClient();

try {
BoundedPassThroughOperator.resetForTest(1);
try {
BoundedPassThroughOperator.resetForTest(1);

client.submitJob(jobGraph).get();
client.submitJob(jobGraph).get();

BoundedPassThroughOperator.getProgressLatch().await();
BoundedPassThroughOperator.getProgressLatch().await();

client.stopWithSavepoint(jobId, false, null).get();
client.stopWithSavepoint(jobId, false, null).get();

Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
} finally {
cluster.after();
Assert.assertFalse(
"input ended with chainingStrategy " + chainingStrategy,
BoundedPassThroughOperator.inputEnded);
} finally {
cluster.after();
}
}
}

Expand Down

0 comments on commit 5edf84e

Please sign in to comment.