From ffbc93178be80ee54ca9a47f9ccdde66061f56f1 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 4 Apr 2018 19:22:53 +0200 Subject: [PATCH] [FLINK-9136][tests] Remove StreamingProgramTestBase --- .../ExclamationWithBoltITCase.java | 30 ++++------ .../ExclamationWithSpoutITCase.java | 27 ++++----- .../StormExclamationLocalITCase.java | 30 ++++------ .../flink/storm/join/SingleJoinITCase.java | 27 ++++----- .../tests/StormFieldsGroupingITCase.java | 54 ++++++++---------- .../storm/tests/StormMetaDataITCase.java | 9 +-- .../flink/storm/tests/StormUnionITCase.java | 24 +++----- .../BoltTokenizerWordCountITCase.java | 27 ++++----- .../BoltTokenizerWordCountPojoITCase.java | 27 ++++----- ...BoltTokenizerWordCountWithNamesITCase.java | 27 ++++----- .../wordcount/SpoutSourceWordCountITCase.java | 27 ++++----- .../storm/wordcount/WordCountLocalITCase.java | 27 ++++----- .../wordcount/WordCountLocalNamedITCase.java | 27 ++++----- .../TopSpeedWindowingExampleITCase.java | 25 +++----- .../ContinuousFileProcessingITCase.java | 14 ++--- .../python/api/PythonStreamBinderTest.java | 7 ++- .../scala/api/StatefulFunctionITCase.java | 10 ++-- .../util/StreamingProgramTestBase.java | 57 ------------------- .../outputformat/CsvOutputFormatITCase.java | 20 +++---- .../outputformat/TextOutputFormatITCase.java | 20 +++---- 20 files changed, 178 insertions(+), 338 deletions(-) delete mode 100644 flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java index 358919fea2d59..f0725e91baac9 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java @@ -19,33 +19,25 @@ package org.apache.flink.storm.exclamation; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the ExclamationWithBolt example. */ -public class ExclamationWithBoltITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; - protected String exclamationNum; +public class ExclamationWithBoltITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - this.exclamationNum = "3"; - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); + String exclamationNum = "3"; - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); - } + ExclamationWithBolt.main(new String[]{textPath, resultPath, exclamationNum}); - @Override - protected void testProgram() throws Exception { - ExclamationWithBolt.main(new String[]{this.textPath, this.resultPath, this.exclamationNum}); + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java index 64294d1dd0263..4d16c4a8a707d 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java @@ -19,30 +19,23 @@ package org.apache.flink.storm.exclamation; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the ExclamationWithSpout example. */ -public class ExclamationWithSpoutITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class ExclamationWithSpoutITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); - } + ExclamationWithSpout.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - ExclamationWithSpout.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java index bc09a3d4a50f7..c82da37c893cf 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java @@ -19,32 +19,24 @@ package org.apache.flink.storm.exclamation; import org.apache.flink.storm.exclamation.util.ExclamationData; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the ExclamationLocal example. */ -public class StormExclamationLocalITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; - protected String exclamationNum; +public class StormExclamationLocalITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - this.exclamationNum = "3"; - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); + String exclamationNum = "3"; - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, this.resultPath); - } + ExclamationLocal.main(new String[]{textPath, resultPath, exclamationNum}); - @Override - protected void testProgram() throws Exception { - ExclamationLocal.main(new String[]{this.textPath, this.resultPath, this.exclamationNum}); + compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java index 5d406dba72a7f..c00c1542e3ee6 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java @@ -18,14 +18,16 @@ package org.apache.flink.storm.join; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; +import org.junit.Test; + /** * Test for the SingleJoin example. */ -public class SingleJoinITCase extends StreamingProgramTestBase { +public class SingleJoinITCase extends AbstractTestBase { protected static String[] expectedOutput = { "(male,20)", @@ -40,23 +42,14 @@ public class SingleJoinITCase extends StreamingProgramTestBase { "(female,29)" }; - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - this.resultPath = this.getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), this.resultPath); - } - - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { + String resultPath = getTempDirPath("result"); // We need to remove the file scheme because we can't use the Flink file system. // (to remain compatible with Storm) - SingleJoinExample.main(new String[]{ this.resultPath.replace("file:", "") }); + SingleJoinExample.main(new String[]{resultPath.replace("file:", "")}); + + compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index 6e02d81359d0b..69059537875b1 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -24,13 +24,14 @@ import org.apache.flink.storm.tests.operators.TaskIdBolt; import org.apache.flink.storm.util.BoltFileSink; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.MathUtils; import org.apache.storm.Config; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.junit.Assert; +import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; @@ -41,24 +42,36 @@ * This test relies on the hash function used by the {@link DataStream#keyBy}, which is * assumed to be {@link MathUtils#murmurHash}. */ -public class StormFieldsGroupingITCase extends StreamingProgramTestBase { +public class StormFieldsGroupingITCase extends AbstractTestBase { private static final String topologyId = "FieldsGrouping Test"; private static final String spoutId = "spout"; private static final String boltId = "bolt"; private static final String sinkId = "sink"; - private String resultPath; - @Override - protected void preSubmit() throws Exception { - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String resultPath = this.getTempDirPath("result"); + + final String[] tokens = resultPath.split(":"); + final String outputFile = tokens[tokens.length - 1]; + + final TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2)); + builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping( + spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number")); + builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); + + final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); + Config conf = new Config(); + conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test + cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); + cluster.shutdown(); - @Override - protected void postSubmit() throws Exception { List expectedResults = Arrays.asList( - "-1155484576", "1033096058", "-1930858313", "1431162155", "-1557280266", "-1728529858", "1654374947", - "-65105105", "-518907128", "-252332814"); + "-1155484576", "1033096058", "-1930858313", "1431162155", "-1557280266", "-1728529858", "1654374947", + "-65105105", "-518907128", "-252332814"); List actualResults = new ArrayList<>(); readAllResultLines(actualResults, resultPath, new String[0], false); @@ -82,23 +95,4 @@ protected void postSubmit() throws Exception { } } - @Override - protected void testProgram() throws Exception { - final String[] tokens = this.resultPath.split(":"); - final String outputFile = tokens[tokens.length - 1]; - - final TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2)); - builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping( - spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number")); - builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); - - final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); - Config conf = new Config(); - conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test - cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); - cluster.shutdown(); - } - } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java index fe09dafd916a3..c24a95e30bbcd 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormMetaDataITCase.java @@ -22,24 +22,25 @@ import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.tests.operators.MetaDataSpout; import org.apache.flink.storm.tests.operators.VerifyMetaDataBolt; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; import org.junit.Assert; +import org.junit.Test; /** * Test for meta data spouts/bolts. */ -public class StormMetaDataITCase extends StreamingProgramTestBase { +public class StormMetaDataITCase extends AbstractTestBase { private static final String topologyId = "FieldsGrouping Test"; private static final String spoutId = "spout"; private static final String boltId1 = "bolt1"; private static final String boltId2 = "bolt2"; - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new MetaDataSpout(), 2); diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java index 12e897acb896a..6f6e47fab0ff3 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormUnionITCase.java @@ -23,15 +23,16 @@ import org.apache.flink.storm.tests.operators.FiniteRandomSpout; import org.apache.flink.storm.tests.operators.MergerBolt; import org.apache.flink.storm.util.BoltFileSink; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.storm.Config; import org.apache.storm.topology.TopologyBuilder; +import org.junit.Test; /** * Test for the {@link MergerBolt}. */ -public class StormUnionITCase extends StreamingProgramTestBase { +public class StormUnionITCase extends AbstractTestBase { private static final String RESULT = "-1154715079\n" + "-1155869325\n" + "-1155484576\n" + "431529176\n" + "1260042744\n" + "1761283695\n" + "1749940626\n" + "892128508\n" @@ -47,20 +48,11 @@ public class StormUnionITCase extends StreamingProgramTestBase { private static final String spoutId3 = "spout3"; private static final String boltId = "merger"; private static final String sinkId = "sink"; - private String resultPath; - @Override - protected void preSubmit() throws Exception { - this.resultPath = this.getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(RESULT, this.resultPath); - } + @Test + public void testProgram() throws Exception { + String resultPath = this.getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); // get input data @@ -73,7 +65,7 @@ protected void testProgram() throws Exception { .shuffleGrouping(spoutId2, FiniteRandomSpout.STREAM_PREFIX + 0) .shuffleGrouping(spoutId3, FiniteRandomSpout.STREAM_PREFIX + 0); - final String[] tokens = this.resultPath.split(":"); + final String[] tokens = resultPath.split(":"); final String outputFile = tokens[tokens.length - 1]; builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId); @@ -83,6 +75,8 @@ protected void testProgram() throws Exception { conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true); // only required to stabilize integration test cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder)); cluster.shutdown(); + + compareResultsByLinesInMemory(RESULT, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java index d1cc5a282c59f..57e5d428ce9b3 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the BoltTokenizerWordCount example. */ -public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class BoltTokenizerWordCountITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + BoltTokenizerWordCount.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - BoltTokenizerWordCount.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java index 0eb4a6e95cb8d..656700ee33273 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the BoltTokenizerWordCountPojo example. */ -public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class BoltTokenizerWordCountPojoITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + BoltTokenizerWordCountPojo.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - BoltTokenizerWordCountPojo.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java index 8879d9e50c350..18e1f01e1803c 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the BoltTokenizerWordCountWithNames example. */ -public class BoltTokenizerWordCountWithNamesITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class BoltTokenizerWordCountWithNamesITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + BoltTokenizerWordCountWithNames.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - BoltTokenizerWordCountWithNames.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java index ec2ca2c62d8d5..594f56e281e8e 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the SpoutSourceWordCount example. */ -public class SpoutSourceWordCountITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class SpoutSourceWordCountITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + SpoutSourceWordCount.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - SpoutSourceWordCount.main(new String[]{this.textPath, this.resultPath}); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java index 471afa93e51de..16844e5478522 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the WordCountLocal example. */ -public class WordCountLocalITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class WordCountLocalITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + WordCountLocal.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - WordCountLocal.main(new String[] { this.textPath, this.resultPath }); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java index 445ea37cbdaa8..0353c2c56b005 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java @@ -18,31 +18,24 @@ package org.apache.flink.storm.wordcount; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Test for the WordCountLocalByName example. */ -public class WordCountLocalNamedITCase extends StreamingProgramTestBase { - - protected String textPath; - protected String resultPath; +public class WordCountLocalNamedITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - this.textPath = this.createTempFile("text.txt", WordCountData.TEXT); - this.resultPath = this.getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", WordCountData.TEXT); + String resultPath = getTempDirPath("result"); - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath); - } + WordCountLocalByName.main(new String[]{textPath, resultPath}); - @Override - protected void testProgram() throws Exception { - WordCountLocalByName.main(new String[] { this.textPath, this.resultPath }); + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); } } diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java index db27c60a38609..6a98096fd1d98 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java @@ -19,31 +19,24 @@ import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData; import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Tests for {@link TopSpeedWindowing}. */ -public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase { - protected String textPath; - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA); - resultPath = getTempDirPath("result"); - } +public class TopSpeedWindowingExampleITCase extends AbstractTestBase { - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath); - } + @Test + public void testProgram() throws Exception { + String textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA); + String resultPath = getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { TopSpeedWindowing.main(new String[]{ "--input", textPath, "--output", resultPath}); + compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath); } } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java index 42fddf5c9929c..65e46b50c2785 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; @@ -41,6 +41,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Test; import java.io.File; import java.io.IOException; @@ -58,7 +59,7 @@ /** * IT cases for the {@link ContinuousFileMonitoringFunction} and {@link ContinuousFileReaderOperator}. */ -public class ContinuousFileProcessingITCase extends StreamingProgramTestBase { +public class ContinuousFileProcessingITCase extends AbstractTestBase { private static final int NO_OF_FILES = 5; private static final int LINES_PER_FILE = 100; @@ -110,8 +111,8 @@ public void destroyHDFS() { // END OF PREPARATIONS - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { /* * This test checks the interplay between the monitor and the reader @@ -159,11 +160,6 @@ public void run() { Throwable th = e; for (int depth = 0; depth < 20; depth++) { if (th instanceof SuccessException) { - try { - postSubmit(); - } catch (Exception e1) { - e1.printStackTrace(); - } return; } else if (th.getCause() != null) { th = th.getCause(); diff --git a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java index 9b3ddd3538cc8..e03c9ef1d8d10 100644 --- a/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java +++ b/flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java @@ -24,9 +24,10 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.Preconditions; +import org.junit.Test; import org.python.core.PyException; import java.util.ArrayList; @@ -35,7 +36,7 @@ /** * Tests for the {@link PythonStreamBinder}. */ -public class PythonStreamBinderTest extends StreamingProgramTestBase { +public class PythonStreamBinderTest extends AbstractTestBase { private static Path getBaseTestPythonDir() { FileSystem fs = new LocalFileSystem(); @@ -60,7 +61,7 @@ private static List findTestFiles() throws Exception { return files; } - @Override + @Test public void testProgram() throws Exception { Path testEntryPoint = new Path(getBaseTestPythonDir(), "examples/word_count.py"); List testFiles = findTestFiles(); diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java index 2cb87b6d121fe..40e8ad95fdbb0 100644 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java +++ b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/StatefulFunctionITCase.java @@ -18,15 +18,17 @@ package org.apache.flink.streaming.scala.api; import org.apache.flink.streaming.api.scala.StateTestPrograms; -import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * IT case using stateful functions. */ -public class StatefulFunctionITCase extends StreamingProgramTestBase { +public class StatefulFunctionITCase extends AbstractTestBase { - @Override - protected void testProgram() throws Exception { + @Test + public void testProgram() throws Exception { StateTestPrograms.testStatefulFunctions(); } } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java deleted file mode 100644 index a18be08dd18fd..0000000000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.test.util.AbstractTestBase; - -import org.junit.Test; - -/** - * Base class for unit tests that run a single test. - * - *

To write a unit test against this test base, simply extend it and implement the {@link #testProgram()} method. - */ -public abstract class StreamingProgramTestBase extends AbstractTestBase { - - // -------------------------------------------------------------------------------------------- - // Methods to create the test program and for pre- and post- test work - // -------------------------------------------------------------------------------------------- - - protected abstract void testProgram() throws Exception; - - protected void preSubmit() throws Exception {} - - protected void postSubmit() throws Exception {} - - // -------------------------------------------------------------------------------------------- - // Test entry point - // -------------------------------------------------------------------------------------------- - - @Test - public void testJob() throws Exception { - // pre-submit - preSubmit(); - - // call the test program - testProgram(); - - // post-submit - postSubmit(); - } -} diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java index a95865a7fc235..cd85d26b04312 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java @@ -20,24 +20,21 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.testfunctions.Tokenizer; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Integration tests for {@link org.apache.flink.api.java.io.CsvOutputFormat}. */ -public class CsvOutputFormatITCase extends StreamingProgramTestBase { - - protected String resultPath; +public class CsvOutputFormatITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String resultPath = getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = env.fromElements(WordCountData.TEXT); @@ -49,10 +46,7 @@ protected void testProgram() throws Exception { counts.writeAsCsv(resultPath); env.execute("WriteAsCsvTest"); - } - @Override - protected void postSubmit() throws Exception { //Strip the parentheses from the expected text like output compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES .replaceAll("[\\\\(\\\\)]", ""), resultPath); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java index 7f0ebc9470611..d5a5b5cd074ad 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java @@ -20,24 +20,21 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.testfunctions.Tokenizer; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; /** * Integration tests for {@link org.apache.flink.api.java.io.TextOutputFormat}. */ -public class TextOutputFormatITCase extends StreamingProgramTestBase { - - protected String resultPath; +public class TextOutputFormatITCase extends AbstractTestBase { - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Test + public void testProgram() throws Exception { + String resultPath = getTempDirPath("result"); - @Override - protected void testProgram() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = env.fromElements(WordCountData.TEXT); @@ -49,10 +46,7 @@ protected void testProgram() throws Exception { counts.writeAsText(resultPath); env.execute("WriteAsTextTest"); - } - @Override - protected void postSubmit() throws Exception { compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); }