Skip to content

Commit

Permalink
deadlines for everyone
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Mar 20, 2018
1 parent b00ce5e commit ea7a8e4
Showing 1 changed file with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
Expand All @@ -33,6 +35,7 @@
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -50,9 +53,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -136,6 +141,7 @@ public void testStreaming() throws Exception {
}

private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
Deadline deadline = Deadline.now().plus(Duration.ofSeconds(30));

ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();

Expand All @@ -144,12 +150,22 @@ private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exceptio

try {
NotifyingMapper.notifyLatch.await();
Thread.sleep(HEARTBEAT_INTERVAL * 4); // wait for heartbeat

Map<String, Object> initialAccumulators = client.getAccumulators(jobGraph.getJobID());
assertEquals(1, initialAccumulators.size());
assertTrue(initialAccumulators.containsKey(ACCUMULATOR_NAME));
assertEquals(NUM_ITERATIONS, (int) initialAccumulators.get(ACCUMULATOR_NAME));
FutureUtils.retrySuccesfulWithDelay(
() -> {
try {
return CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID()));
} catch (Exception e) {
return FutureUtils.completedExceptionally(e);
}
},
Time.milliseconds(20),
deadline,
accumulators -> accumulators.size() == 1
&& accumulators.containsKey(ACCUMULATOR_NAME)
&& (int) accumulators.get(ACCUMULATOR_NAME) == NUM_ITERATIONS,
TestingUtils.defaultScheduledExecutor()
).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);

NotifyingMapper.shutdownLatch.trigger();
} finally {
Expand Down

0 comments on commit ea7a8e4

Please sign in to comment.