From 904857e5290ca38e31ea54e69099ebc4d3332bde Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 9 Jun 2016 11:34:34 +0200 Subject: [PATCH] [hotfix] [streaming-java] Harden IterateTest Adds retries and timeout scaling to all iteration tests, which rely on iteration timeouts. The way the tests rely on these timoeuts is prone to races. If the failures occur again, I vote to ignore the tests until iteration termination is fixed properly. Example test failures: - https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215892/log.txt - https://s3.amazonaws.com/archive.travis-ci.org/jobs/134215975/log.txt --- .../flink/streaming/api/IterateTest.java | 327 +++++++++++------- 1 file changed, 201 insertions(+), 126 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java index c4343f6544923..c6875dd409e9d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -17,11 +17,6 @@ package org.apache.flink.streaming.api; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -29,7 +24,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.util.MathUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -52,13 +46,25 @@ import org.apache.flink.streaming.util.ReceiveCheckNoOpSink; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; +import org.apache.flink.util.MathUtils; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings({ "unchecked", "unused", "serial" }) public class IterateTest extends StreamingMultipleProgramsTestBase { + private static final Logger LOG = LoggerFactory.getLogger(IterateTest.class); + private static boolean iterated[]; @Test(expected = UnsupportedOperationException.class) @@ -366,100 +372,135 @@ public void testmultipleHeadsTailsWithTailPartitioning() { @SuppressWarnings("rawtypes") @Test public void testSimpleIteration() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - iterated = new boolean[DEFAULT_PARALLELISM]; + int numRetries = 5; + int timeoutScale = 1; - DataStream source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) - .map(NoOpBoolMap).name("ParallelizeMap"); + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + iterated = new boolean[DEFAULT_PARALLELISM]; - IterativeStream iteration = source.iterate(3000); + DataStream source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) + .map(NoOpBoolMap).name("ParallelizeMap"); - DataStream increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap); + IterativeStream iteration = source.iterate(3000 * timeoutScale); - iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink()); + DataStream increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap); - iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink()); + iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink()); - env.execute(); + iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink()); - for (boolean iter : iterated) { - assertTrue(iter); - } + env.execute(); + for (boolean iter : iterated) { + assertTrue(iter); + } + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } @Test public void testCoIteration() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); + int numRetries = 5; + int timeoutScale = 1; - DataStream otherSource = env.fromElements("1000", "2000") - .map(NoOpStrMap).name("ParallelizeMap"); + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + TestSink.collected = new ArrayList<>(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); - ConnectedIterativeStreams coIt = env.fromElements(0, 0) - .map(NoOpIntMap).name("ParallelizeMap") - .iterate(2000) - .withFeedbackType("String"); + DataStream otherSource = env.fromElements("1000", "2000") + .map(NoOpStrMap).name("ParallelizeMap"); - try { - coIt.keyBy(1, 2); - fail(); - } catch (InvalidProgramException e) { - // this is expected - } - DataStream head = coIt - .flatMap(new RichCoFlatMapFunction() { + ConnectedIterativeStreams coIt = env.fromElements(0, 0) + .map(NoOpIntMap).name("ParallelizeMap") + .iterate(2000 * timeoutScale) + .withFeedbackType("String"); + + try { + coIt.keyBy(1, 2); + fail(); + } catch (InvalidProgramException e) { + // this is expected + } - private static final long serialVersionUID = 1L; - boolean seenFromSource = false; + DataStream head = coIt + .flatMap(new RichCoFlatMapFunction() { - @Override - public void flatMap1(Integer value, Collector out) throws Exception { - out.collect(((Integer) (value + 1)).toString()); - } + private static final long serialVersionUID = 1L; + boolean seenFromSource = false; + + @Override + public void flatMap1(Integer value, Collector out) throws Exception { + out.collect(((Integer) (value + 1)).toString()); + } + + @Override + public void flatMap2(String value, Collector out) throws Exception { + Integer intVal = Integer.valueOf(value); + if (intVal < 2) { + out.collect(((Integer) (intVal + 1)).toString()); + } + if (intVal == 1000 || intVal == 2000) { + seenFromSource = true; + } + } + + @Override + public void close() { + assertTrue(seenFromSource); + } + }); + + coIt.map(new CoMapFunction() { @Override - public void flatMap2(String value, Collector out) throws Exception { - Integer intVal = Integer.valueOf(value); - if (intVal < 2) { - out.collect(((Integer) (intVal + 1)).toString()); - } - if (intVal == 1000 || intVal == 2000) { - seenFromSource = true; - } + public String map1(Integer value) throws Exception { + return value.toString(); } @Override - public void close() { - assertTrue(seenFromSource); + public String map2(String value) throws Exception { + return value; } - }); + }).addSink(new ReceiveCheckNoOpSink()); - coIt.map(new CoMapFunction() { + coIt.closeWith(head.broadcast().union(otherSource)); - @Override - public String map1(Integer value) throws Exception { - return value.toString(); - } + head.addSink(new TestSink()).setParallelism(1); - @Override - public String map2(String value) throws Exception { - return value; - } - }).addSink(new ReceiveCheckNoOpSink()); + assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size()); - coIt.closeWith(head.broadcast().union(otherSource)); + env.execute(); - head.addSink(new TestSink()).setParallelism(1); + Collections.sort(TestSink.collected); + assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); - assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size()); - - env.execute(); + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); - Collections.sort(TestSink.collected); - assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } /** @@ -473,89 +514,123 @@ public String map2(String value) throws Exception { */ @Test public void testGroupByFeedback() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(DEFAULT_PARALLELISM - 1); + int numRetries = 5; + int timeoutScale = 1; - KeySelector key = new KeySelector() { + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM - 1); - @Override - public Integer getKey(Integer value) throws Exception { - return value % 3; - } - }; + KeySelector key = new KeySelector() { - DataStream source = env.fromElements(1, 2, 3) - .map(NoOpIntMap).name("ParallelizeMap"); + @Override + public Integer getKey(Integer value) throws Exception { + return value % 3; + } + }; - IterativeStream it = source.keyBy(key).iterate(3000); + DataStream source = env.fromElements(1, 2, 3) + .map(NoOpIntMap).name("ParallelizeMap"); - DataStream head = it.flatMap(new RichFlatMapFunction() { + IterativeStream it = source.keyBy(key).iterate(3000 * timeoutScale); - int received = 0; - int key = -1; + DataStream head = it.flatMap(new RichFlatMapFunction() { - @Override - public void flatMap(Integer value, Collector out) throws Exception { - received++; - if (key == -1) { - key = MathUtils.murmurHash(value % 3) % 3; - } else { - assertEquals(key, MathUtils.murmurHash(value % 3) % 3); - } - if (value > 0) { - out.collect(value - 1); - } - } + int received = 0; + int key = -1; - @Override - public void close() { - assertTrue(received > 1); - } - }); + @Override + public void flatMap(Integer value, Collector out) throws Exception { + received++; + if (key == -1) { + key = MathUtils.murmurHash(value % 3) % 3; + } else { + assertEquals(key, MathUtils.murmurHash(value % 3) % 3); + } + if (value > 0) { + out.collect(value - 1); + } + } - it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink()); + @Override + public void close() { + assertTrue(received > 1); + } + }); - env.execute(); + it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink()); + + env.execute(); + + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } @SuppressWarnings("deprecation") @Test public void testWithCheckPointing() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + int numRetries = 5; + int timeoutScale = 1; - env.enableCheckpointing(); + for (int numRetry = 0; numRetry < numRetries; numRetry++) { + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream source = env .fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) - .map(NoOpBoolMap).name("ParallelizeMap"); + env.enableCheckpointing(); + DataStream source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false)) + .map(NoOpBoolMap).name("ParallelizeMap"); - IterativeStream iteration = source.iterate(3000); - iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink()); + IterativeStream iteration = source.iterate(3000 * timeoutScale); - try { - env.execute(); + iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink()); - // this statement should never be reached - fail(); - } catch (UnsupportedOperationException e) { - // expected behaviour - } + try { + env.execute(); - // Test force checkpointing + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } - try { - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false); - env.execute(); + // Test force checkpointing - // this statement should never be reached - fail(); - } catch (UnsupportedOperationException e) { - // expected behaviour - } + try { + env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false); + env.execute(); + + // this statement should never be reached + fail(); + } catch (UnsupportedOperationException e) { + // expected behaviour + } + + env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); + env.getStreamGraph().getJobGraph(); - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); - env.getStreamGraph().getJobGraph(); + break; // success + } catch (Throwable t) { + LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t); + + if (numRetry >= numRetries - 1) { + throw t; + } else { + timeoutScale *= 2; + } + } + } } public static final class IterationHead extends RichFlatMapFunction {