From c24358a90324a32d74ca4da29d6a4573eeda756a Mon Sep 17 00:00:00 2001 From: Samia Date: Wed, 24 Jun 2015 12:41:34 +0200 Subject: [PATCH 1/3] [FLINK-2264] [gelly] changed the tests to use collect instead of temp files --- .../GatherSumApplyConfigurationITCase.java | 56 ++--- .../VertexCentricConfigurationITCase.java | 211 +++++++++--------- .../graph/test/operations/CompareResults.java | 58 +++++ .../graph/test/operations/DegreesITCase.java | 71 +++--- .../test/operations/FromCollectionITCase.java | 44 ++-- .../test/operations/GraphCreationITCase.java | 69 +++--- .../GraphCreationWithMapperITCase.java | 50 ++--- .../test/operations/GraphMutationsITCase.java | 153 ++++++++----- .../operations/GraphOperationsITCase.java | 110 +++++---- .../test/operations/JoinWithEdgesITCase.java | 144 ++++++------ .../operations/JoinWithVerticesITCase.java | 61 +++-- .../graph/test/operations/MapEdgesITCase.java | 50 ++--- .../test/operations/MapVerticesITCase.java | 53 ++--- .../ReduceOnEdgesMethodsITCase.java | 96 ++++---- .../ReduceOnNeighborMethodsITCase.java | 115 +++++----- 15 files changed, 712 insertions(+), 629 deletions(-) create mode 100644 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java index 5befafe809eb9..4ff14a25082e9 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java @@ -31,14 +31,11 @@ import org.apache.flink.graph.gsa.GatherSumApplyIteration; import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.test.operations.CompareResults; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.LongValue; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -51,22 +48,8 @@ public GatherSumApplyConfigurationITCase(TestExecutionMode mode) { super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testRunWithConfiguration() throws Exception { /* @@ -86,17 +69,19 @@ public void testRunWithConfiguration() throws Exception { parameters.registerAggregator("superstepAggregator", new LongSumAggregator()); parameters.setOptNumVertices(true); - Graph result = graph.runGatherSumApplyIteration(new Gather(), new Sum(), + Graph res = graph.runGatherSumApplyIteration(new Gather(), new Sum(), new Apply(), 10, parameters); - result.getVertices().writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + DataSet> data = res.getVertices(); + List> result= data.collect(); - expectedResult = "1 11\n" + - "2 11\n" + - "3 11\n" + - "4 11\n" + - "5 11"; + expectedResult = "1,11\n" + + "2,11\n" + + "3,11\n" + + "4,11\n" + + "5,11"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -122,15 +107,16 @@ public void testIterationConfiguration() throws Exception { Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism()); Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory()); - DataSet> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration); - - result.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); - expectedResult = "1 11\n" + - "2 12\n" + - "3 13\n" + - "4 14\n" + - "5 15"; + DataSet> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration); + List> result= data.collect(); + + expectedResult = "1,11\n" + + "2,12\n" + + "3,13\n" + + "4,14\n" + + "5,15"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java index 567c01501fe26..5732adb70a807 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; @@ -35,14 +36,11 @@ import org.apache.flink.graph.spargel.VertexCentricConfiguration; import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.test.operations.CompareResults; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.LongValue; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.flink.graph.utils.VertexToTuple2Map; @@ -54,22 +52,8 @@ public VertexCentricConfigurationITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testRunWithConfiguration() throws Exception { /* @@ -88,16 +72,19 @@ public void testRunWithConfiguration() throws Exception { parameters.registerAggregator("superstepAggregator", new LongSumAggregator()); parameters.setOptNumVertices(true); - Graph result = graph.runVertexCentricIteration( + Graph res = graph.runVertexCentricIteration( new UpdateFunction(), new MessageFunction(), 10, parameters); - result.getVertices().writeAsCsv(resultPath, "\n", "\t"); - env.execute(); - expectedResult = "1 11\n" + - "2 11\n" + - "3 11\n" + - "4 11\n" + - "5 11"; + DataSet> data = res.getVertices(); + List> result= data.collect(); + + expectedResult = "1,11\n" + + "2,11\n" + + "3,11\n" + + "4,11\n" + + "5,11"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -123,15 +110,16 @@ public void testIterationConfiguration() throws Exception { Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism()); Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory()); - DataSet> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration); + DataSet> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration); + List> result= data.collect(); + + expectedResult = "1,11\n" + + "2,12\n" + + "3,13\n" + + "4,14\n" + + "5,15"; - result.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); - expectedResult = "1 11\n" + - "2 12\n" + - "3 13\n" + - "4 14\n" + - "5 15"; + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -145,16 +133,20 @@ public void testDefaultConfiguration() throws Exception { Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper()); - Graph result = graph.runVertexCentricIteration( + Graph res = graph.runVertexCentricIteration( new UpdateFunctionDefault(), new MessageFunctionDefault(), 5); - result.getVertices().map(new VertexToTuple2Map()).writeAsCsv(resultPath, "\n", "\t"); - env.execute(); - expectedResult = "1 6\n" + - "2 6\n" + - "3 6\n" + - "4 6\n" + - "5 6"; + + DataSet> data = res.getVertices().map(new VertexToTuple2Map()); + List> result= data.collect(); + + expectedResult = "1,6\n" + + "2,6\n" + + "3,6\n" + + "4,6\n" + + "5,6"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -174,14 +166,15 @@ public void testIterationDefaultDirection() throws Exception { .runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerTrg(), 5) .getVertices(); - resultedVertices.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List>> result= resultedVertices.collect(); - expectedResult = "1 [5]\n" + - "2 [1]\n" + - "3 [1, 2]\n" + - "4 [3]\n" + - "5 [3, 4]"; + expectedResult = "1,[5]\n" + + "2,[1]\n" + + "3,[1, 2]\n" + + "4,[3]\n" + + "5,[3, 4]"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -206,14 +199,15 @@ public void testIterationINDirection() throws Exception { .runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerSrc(), 5, parameters) .getVertices(); - resultedVertices.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List>> result= resultedVertices.collect(); - expectedResult = "1 [2, 3]\n" + - "2 [3]\n" + - "3 [4, 5]\n" + - "4 [5]\n" + - "5 [1]"; + expectedResult = "1,[2, 3]\n" + + "2,[3]\n" + + "3,[4, 5]\n" + + "4,[5]\n" + + "5,[1]"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -238,14 +232,15 @@ public void testIterationALLDirection() throws Exception { .runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters) .getVertices(); - resultedVertices.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List>> result= resultedVertices.collect(); - expectedResult = "1 [2, 3, 5]\n" + - "2 [1, 3]\n" + - "3 [1, 2, 4, 5]\n" + - "4 [3, 5]\n" + - "5 [1, 3, 4]"; + expectedResult = "1,[2, 3, 5]\n" + + "2,[1, 3]\n" + + "3,[1, 2, 4, 5]\n" + + "4,[3, 5]\n" + + "5,[1, 3, 4]"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -262,14 +257,15 @@ public void testNumVerticesNotSet() throws Exception { DataSet> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(), new DummyMessageFunction(), 2).getVertices(); - verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List> result= verticesWithNumVertices.collect(); - expectedResult = "1 -1\n" + - "2 -1\n" + - "3 -1\n" + - "4 -1\n" + - "5 -1"; + expectedResult = "1,-1\n" + + "2,-1\n" + + "3,-1\n" + + "4,-1\n" + + "5,-1"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -293,14 +289,15 @@ public void testInDegreesSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); - verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List> result= verticesWithDegrees.collect(); - expectedResult = "1 1\n" + - "2 1\n" + - "3 2\n" + - "4 1\n" + - "5 2"; + expectedResult = "1,1\n" + + "2,1\n" + + "3,2\n" + + "4,1\n" + + "5,2"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -317,14 +314,15 @@ public void testInDegreesNotSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); - verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List> result= verticesWithDegrees.collect(); - expectedResult = "1 -1\n" + - "2 -1\n" + - "3 -1\n" + - "4 -1\n" + - "5 -1"; + expectedResult = "1,-1\n" + + "2,-1\n" + + "3,-1\n" + + "4,-1\n" + + "5,-1"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -348,14 +346,15 @@ public void testOutDegreesSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); - verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List> result= verticesWithDegrees.collect(); - expectedResult = "1 2\n" + - "2 1\n" + - "3 2\n" + - "4 1\n" + - "5 1"; + expectedResult = "1,2\n" + + "2,1\n" + + "3,2\n" + + "4,1\n" + + "5,1"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -372,14 +371,15 @@ public void testOutDegreesNotSet() throws Exception { DataSet> verticesWithDegrees = graph.runVertexCentricIteration( new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); - verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List> result= verticesWithDegrees.collect(); - expectedResult = "1 -1\n" + - "2 -1\n" + - "3 -1\n" + - "4 -1\n" + - "5 -1"; + expectedResult = "1,-1\n" + + "2,-1\n" + + "3,-1\n" + + "4,-1\n" + + "5,-1"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -403,14 +403,15 @@ public void testDirectionALLAndDegrees() throws Exception { DataSet> verticesWithNumNeighbors = graph.runVertexCentricIteration( new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices(); - verticesWithNumNeighbors.writeAsCsv(resultPath, "\n", "\t"); - env.execute(); + List> result= verticesWithNumNeighbors.collect(); - expectedResult = "1 true\n" + - "2 true\n" + - "3 true\n" + - "4 true\n" + - "5 true"; + expectedResult = "1,true\n" + + "2,true\n" + + "3,true\n" + + "4,true\n" + + "5,true"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java new file mode 100644 index 0000000000000..b8ceb28399812 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java @@ -0,0 +1,58 @@ +package org.apache.flink.graph.test.operations; + + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple; + +public class CompareResults { + + public static void compareResultAsTuples(List result, String expected) { + compareResult(result, expected, true); + } + + public static void compareResultAsText(List result, String expected) { + compareResult(result, expected, false); + } + + private static void compareResult(List result, String expected, boolean asTuples) { + String[] extectedStrings = expected.split("\n"); + String[] resultStrings = new String[result.size()]; + + for (int i = 0; i < resultStrings.length; i++) { + T val = result.get(i); + + if (asTuples) { + if (val instanceof Tuple) { + Tuple t = (Tuple) val; + Object first = t.getField(0); + StringBuilder bld = new StringBuilder(first == null ? "null" : first.toString()); + for (int pos = 1; pos < t.getArity(); pos++) { + Object next = t.getField(pos); + bld.append(',').append(next == null ? "null" : next.toString()); + } + resultStrings[i] = bld.toString(); + } + else { + throw new IllegalArgumentException(val + " is no tuple"); + } + } + else { + resultStrings[i] = (val == null) ? "null" : val.toString(); + } + } + + assertEquals("Wrong number of elements result", extectedStrings.length, resultStrings.length); + + Arrays.sort(extectedStrings); + Arrays.sort(resultStrings); + + for (int i = 0; i < extectedStrings.length; i++) { + assertEquals(extectedStrings[i], resultStrings[i]); + } + } + +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java index 0391dcb6239f2..50f97da454fcd 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java @@ -18,16 +18,15 @@ package org.apache.flink.graph.test.operations; +import java.util.List; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -38,21 +37,8 @@ public DegreesITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } @Test public void testOutDegrees() throws Exception { @@ -64,14 +50,18 @@ public void testOutDegrees() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - + DataSet> data =graph.outDegrees(); + List> result= data.collect(); + + expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); + } @Test @@ -84,14 +74,18 @@ public void testOutDegreesWithNoOutEdges() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - + + + DataSet> data =graph.outDegrees(); + List> result= data.collect(); + expectedResult = "1,3\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,0\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -104,13 +98,16 @@ public void testInDegrees() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.inDegrees().writeAsCsv(resultPath); - env.execute(); + + DataSet> data =graph.inDegrees(); + List> result= data.collect(); + expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n"; + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -123,13 +120,16 @@ public void testInDegreesWithNoInEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); - graph.inDegrees().writeAsCsv(resultPath); - env.execute(); + DataSet> data =graph.inDegrees(); + List> result= data.collect(); + expectedResult = "1,0\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,3\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -142,13 +142,16 @@ public void testGetDegrees() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.getDegrees().writeAsCsv(resultPath); - env.execute(); + DataSet> data =graph.getDegrees(); + List> result= data.collect(); + expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -161,12 +164,18 @@ public void testGetDegreesWithDisconnectedData() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); + DataSet> data =graph.outDegrees(); + List> result= data.collect(); + expectedResult = "1,2\n" + "2,1\n" + "3,0\n" + "4,1\n" + "5,0\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } + + + } \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java index 5d4b7d735b9d7..87a66ba276693 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java @@ -18,17 +18,18 @@ package org.apache.flink.graph.test.operations; + +import java.util.List; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -39,21 +40,8 @@ public FromCollectionITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } @Test public void testFromCollectionVerticesEdges() throws Exception { @@ -64,8 +52,9 @@ public void testFromCollectionVerticesEdges() throws Exception { Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); + expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + @@ -73,6 +62,8 @@ public void testFromCollectionVerticesEdges() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -84,13 +75,17 @@ public void testFromCollectionEdgesNoInitialValue() throws Exception { Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,(null)\n" + "2,(null)\n" + "3,(null)\n" + "4,(null)\n" + "5,(null)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -103,13 +98,16 @@ public void testFromCollectionEdgesWithInitialValue() throws Exception { Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), new InitVerticesMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java index 3fe69fd494191..abdde0dc9283d 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java @@ -18,6 +18,9 @@ package org.apache.flink.graph.test.operations; +import java.util.LinkedList; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -29,11 +32,7 @@ import org.apache.flink.graph.validation.InvalidVertexIdsValidator; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -44,21 +43,8 @@ public GraphCreationITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } + private String expectedResult; @Test public void testCreateWithoutVertexValues() throws Exception { @@ -68,13 +54,16 @@ public void testCreateWithoutVertexValues() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,(null)\n" + "2,(null)\n" + "3,(null)\n" + "4,(null)\n" + "5,(null)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -86,13 +75,16 @@ public void testCreateWithMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), new AssignIdAsValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -104,13 +96,16 @@ public void testCreateWithCustomVertexValue() throws Exception { Graph, Long> graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet>> data = graph.getVertices(); + List>> result= data.collect(); + expectedResult = "1,(2.0,0)\n" + "2,(4.0,1)\n" + "3,(6.0,2)\n" + "4,(8.0,3)\n" + "5,(10.0,4)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -123,12 +118,16 @@ public void testValidate() throws Exception { DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); Graph graph = Graph.fromDataSet(vertices, edges, env); - Boolean result = graph.validate(new InvalidVertexIdsValidator()); - - env.fromElements(result).writeAsText(resultPath); - env.execute(); - - expectedResult = "true\n"; + Boolean valid = graph.validate(new InvalidVertexIdsValidator()); + + //env.fromElements(result).writeAsText(resultPath); + + String res= valid.toString();//env.fromElements(valid); + List result= new LinkedList(); + result.add(res); + expectedResult = "true"; + + CompareResults.compareResultAsText(result, expectedResult); } @Test @@ -141,11 +140,15 @@ public void testValidateWithInvalidIds() throws Exception { DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); Graph graph = Graph.fromDataSet(vertices, edges, env); - Boolean result = graph.validate(new InvalidVertexIdsValidator()); - env.fromElements(result).writeAsText(resultPath); - env.execute(); + Boolean valid = graph.validate(new InvalidVertexIdsValidator()); + + String res= valid.toString();//env.fromElements(valid); + List result= new LinkedList(); + result.add(res); expectedResult = "false\n"; + + CompareResults.compareResultAsText(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java index 23d05acffb1fa..523cee59c4245 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java @@ -18,18 +18,18 @@ package org.apache.flink.graph.test.operations; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -40,21 +40,8 @@ public GraphCreationWithMapperITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } @Test public void testWithDoubleValueMapper() throws Exception { @@ -65,13 +52,16 @@ public void testWithDoubleValueMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), new AssignDoubleValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,0.1\n" + "2,0.1\n" + "3,0.1\n" + "4,0.1\n" + "5,0.1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -83,13 +73,16 @@ public void testWithTuple2ValueMapper() throws Exception { Graph, Long> graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet>> data = graph.getVertices(); + List>> result= data.collect(); + expectedResult = "1,(2,42)\n" + "2,(4,42)\n" + "3,(6,42)\n" + "4,(8,42)\n" + "5,(10,42)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -102,13 +95,17 @@ public void testWithConstantValueMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), new AssignDoubleConstantMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,0.1\n" + "2,0.1\n" + "3,0.1\n" + "4,0.1\n" + "5,0.1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); + } @Test @@ -120,13 +117,16 @@ public void testWithDCustomValueMapper() throws Exception { Graph graph = Graph.fromDataSet( TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); + expectedResult = "1,(F,0)\n" + "2,(F,1)\n" + "3,(F,2)\n" + "4,(F,3)\n" + "5,(F,4)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index 0d71b97d6db3c..c6a8012bbf959 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -21,17 +21,14 @@ import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -42,21 +39,8 @@ public GraphMutationsITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } @Test public void testAddVertex() throws Exception { @@ -70,8 +54,9 @@ public void testAddVertex() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.addVertex(new Vertex(6L, 6L)); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getVertices(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + @@ -79,6 +64,8 @@ public void testAddVertex() throws Exception { "4,4\n" + "5,5\n" + "6,6\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -98,8 +85,8 @@ public void testAddVertices() throws Exception { graph = graph.addVertices(vertices); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + @@ -108,6 +95,8 @@ public void testAddVertices() throws Exception { "5,5\n" + "6,6\n" + "7,7\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -121,14 +110,17 @@ public void testAddVertexExisting() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.addVertex(new Vertex(1L, 1L)); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getVertices(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -148,14 +140,16 @@ public void testAddVerticesBothExisting() throws Exception { graph = graph.addVertices(vertices); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -175,8 +169,8 @@ public void testAddVerticesOneExisting() throws Exception { graph = graph.addVertices(vertices); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getVertices(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + @@ -184,6 +178,8 @@ public void testAddVerticesOneExisting() throws Exception { "4,4\n" + "5,5\n" + "6,6\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -197,13 +193,16 @@ public void testRemoveVertex() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.removeVertex(new Vertex(5L, 5L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -222,12 +221,15 @@ public void testRemoveVertices() throws Exception { verticesToBeRemoved.add(new Vertex(2L, 2L)); graph = graph.removeVertices(verticesToBeRemoved); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -241,8 +243,9 @@ public void testRemoveInvalidVertex() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.removeVertex(new Vertex(6L, 6L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -251,6 +254,8 @@ public void testRemoveInvalidVertex() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -268,13 +273,16 @@ public void testRemoveOneValidOneInvalidVertex() throws Exception { verticesToBeRemoved.add(new Vertex(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -292,8 +300,9 @@ public void testRemoveBothInvalidVertices() throws Exception { verticesToBeRemoved.add(new Vertex(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -302,6 +311,8 @@ public void testRemoveBothInvalidVertices() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -319,14 +330,17 @@ public void testRemoveBothInvalidVerticesVertexResult() throws Exception { verticesToBeRemoved.add(new Vertex(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getVertices(); + List> result= data.collect(); expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -341,8 +355,9 @@ public void testAddEdge() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.addEdge(new Vertex(6L, 6L), new Vertex(1L, 1L), 61L); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -352,6 +367,8 @@ public void testAddEdge() throws Exception { "4,5,45\n" + "5,1,51\n" + "6,1,61\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -371,8 +388,8 @@ public void testAddEdges() throws Exception { graph = graph.addEdges(edgesToBeAdded); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -383,6 +400,8 @@ public void testAddEdges() throws Exception { "4,1,41\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -402,8 +421,8 @@ public void testAddEdgesInvalidVertices() throws Exception { graph = graph.addEdges(edgesToBeAdded); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -412,6 +431,8 @@ public void testAddEdgesInvalidVertices() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -426,8 +447,9 @@ public void testAddExistingEdge() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.addEdge(new Vertex(1L, 1L), new Vertex(2L, 2L), 12L); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,2,12\n" + @@ -437,6 +459,8 @@ public void testAddExistingEdge() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -450,8 +474,9 @@ public void testRemoveEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.removeEdge(new Edge(5L, 1L, 51L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -459,6 +484,8 @@ public void testRemoveEdge() throws Exception { "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -476,14 +503,17 @@ public void testRemoveEdges() throws Exception { edgesToBeRemoved.add(new Edge(2L, 3L, 23L)); graph = graph.removeEdges(edgesToBeRemoved); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -501,8 +531,9 @@ public void testRemoveSameEdgeTwice() throws Exception { edgesToBeRemoved.add(new Edge(5L, 1L, 51L)); graph = graph.removeEdges(edgesToBeRemoved); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -510,6 +541,8 @@ public void testRemoveSameEdgeTwice() throws Exception { "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -523,8 +556,9 @@ public void testRemoveInvalidEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); graph = graph.removeEdge(new Edge(6L, 1L, 61L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -533,6 +567,8 @@ public void testRemoveInvalidEdge() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -550,8 +586,9 @@ public void testRemoveOneValidOneInvalidEdge() throws Exception { edgesToBeRemoved.add(new Edge(6L, 1L, 61L)); graph = graph.removeEdges(edgesToBeRemoved); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); + + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -560,5 +597,7 @@ public void testRemoveOneValidOneInvalidEdge() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } } \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java index 1b9d5ac835d59..904b58ab8a3dc 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java @@ -22,17 +22,16 @@ import java.util.List; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Triplet; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,22 +42,8 @@ public GraphOperationsITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testUndirected() throws Exception { /* @@ -69,8 +54,9 @@ public void testUndirected() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.getUndirected().getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.getUndirected().getEdges(); + List> result= data.collect(); + expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2,23\n" + @@ -78,6 +64,8 @@ public void testUndirected() throws Exception { "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" + "5,1,51\n" + "1,5,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -90,8 +78,9 @@ public void testReverse() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.reverse().getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = graph.reverse().getEdges(); + List> result= data.collect(); + expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + @@ -99,6 +88,8 @@ public void testReverse() throws Exception { "5,3,35\n" + "5,4,45\n" + "1,5,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") @@ -111,7 +102,8 @@ public void testSubGraph() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.subgraph(new FilterFunction>() { + + DataSet> data= graph.subgraph(new FilterFunction>() { public boolean filter(Vertex vertex) throws Exception { return (vertex.getValue() > 2); } @@ -120,11 +112,14 @@ public boolean filter(Vertex vertex) throws Exception { public boolean filter(Edge edge) throws Exception { return (edge.getValue() > 34); } - }).getEdges().writeAsCsv(resultPath); + }).getEdges(); - env.execute(); + List> result= data.collect(); + expectedResult = "3,5,35\n" + "4,5,45\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") @@ -137,16 +132,20 @@ public void testFilterVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.filterOnVertices(new FilterFunction>() { + + DataSet> data = graph.filterOnVertices(new FilterFunction>() { public boolean filter(Vertex vertex) throws Exception { return (vertex.getValue() > 2); } - }).getEdges().writeAsCsv(resultPath); + }).getEdges(); - env.execute(); + List> result= data.collect(); + expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") @@ -159,16 +158,20 @@ public void testFilterEdges() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.filterOnEdges(new FilterFunction>() { + + DataSet> data = graph.filterOnEdges(new FilterFunction>() { public boolean filter(Edge edge) throws Exception { return (edge.getValue() > 34); } - }).getEdges().writeAsCsv(resultPath); + }).getEdges(); - env.execute(); + List> result= data.collect(); + expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -180,10 +183,13 @@ public void testNumberOfVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - env.fromElements(graph.numberOfVertices()).writeAsText(resultPath); + DataSet data = env.fromElements(graph.numberOfVertices()); - env.execute(); + List result= data.collect(); + expectedResult = "5"; + + CompareResults.compareResultAsText(result, expectedResult); } @Test @@ -195,10 +201,13 @@ public void testNumberOfEdges() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - env.fromElements(graph.numberOfEdges()).writeAsText(resultPath); + DataSet data = env.fromElements(graph.numberOfEdges()); - env.execute(); + List result= data.collect(); + expectedResult = "7"; + + CompareResults.compareResultAsText(result, expectedResult); } @Test @@ -210,10 +219,13 @@ public void testVertexIds() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.getVertexIds().writeAsText(resultPath); - - env.execute(); + + DataSet data = graph.getVertexIds(); + List result= data.collect(); + expectedResult = "1\n2\n3\n4\n5\n"; + + CompareResults.compareResultAsText(result, expectedResult); } @Test @@ -225,13 +237,16 @@ public void testEdgesIds() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.getEdgeIds().writeAsCsv(resultPath); - - env.execute(); + + DataSet> data = graph.getEdgeIds(); + List> result= data.collect(); + expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -252,9 +267,8 @@ public void testUnion() throws Exception { graph = graph.union(Graph.fromCollection(vertices, edges, env)); - graph.getEdges().writeAsCsv(resultPath); - - env.execute(); + DataSet> data = graph.getEdges(); + List> result= data.collect(); expectedResult = "1,2,12\n" + "1,3,13\n" + @@ -264,6 +278,8 @@ public void testUnion() throws Exception { "4,5,45\n" + "5,1,51\n" + "6,1,61\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -276,12 +292,14 @@ public void testTriplets() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.getTriplets().writeAsCsv(resultPath); + DataSet> data = graph.getTriplets(); + List> result= data.collect(); - env.execute(); expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" + "3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } } \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java index c02f0bb13a58e..afdfbf2358c31 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java @@ -18,7 +18,10 @@ package org.apache.flink.graph.test.operations; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -28,11 +31,7 @@ import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; import org.apache.flink.graph.utils.EdgeToTuple3Map; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,22 +42,8 @@ public JoinWithEdgesITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testWithEdgesInputDataset() throws Exception { /* @@ -70,11 +55,11 @@ public void testWithEdgesInputDataset() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdges(graph.getEdges() + Graph res = graph.joinWithEdges(graph.getEdges() .map(new EdgeToTuple3Map()), new AddValuesMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -83,6 +68,8 @@ public void testWithEdgesInputDataset() throws Exception { "3,5,70\n" + "4,5,90\n" + "5,1,102\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -96,11 +83,11 @@ public void testWithLessElements() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdges(graph.getEdges().first(3) + Graph res = graph.joinWithEdges(graph.getEdges().first(3) .map(new EdgeToTuple3Map()), new AddValuesMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -109,6 +96,8 @@ public void testWithLessElements() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -122,11 +111,11 @@ public void testWithLessElementsDifferentType() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdges(graph.getEdges().first(3) + Graph res = graph.joinWithEdges(graph.getEdges().first(3) .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -135,6 +124,8 @@ public void testWithLessElementsDifferentType() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -148,11 +139,11 @@ public void testWithNoCommonKeys() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), + Graph res = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), new DoubleValueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -161,6 +152,8 @@ public void testWithNoCommonKeys() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -173,11 +166,11 @@ public void testWithCustomType() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), + Graph res = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), new CustomValueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,10\n" + "1,3,20\n" + @@ -186,6 +179,8 @@ public void testWithCustomType() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -199,11 +194,11 @@ public void testWithEdgesOnSource() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnSource(graph.getEdges() + Graph res = graph.joinWithEdgesOnSource(graph.getEdges() .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,25\n" + @@ -212,6 +207,8 @@ public void testWithEdgesOnSource() throws Exception { "3,5,69\n" + "4,5,90\n" + "5,1,102\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -225,11 +222,11 @@ public void testOnSourceWithLessElements() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + Graph res = graph.joinWithEdgesOnSource(graph.getEdges().first(3) .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,25\n" + @@ -238,6 +235,8 @@ public void testOnSourceWithLessElements() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -251,11 +250,11 @@ public void testOnSourceWithDifferentType() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + Graph res = graph.joinWithEdgesOnSource(graph.getEdges().first(3) .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -264,6 +263,8 @@ public void testOnSourceWithDifferentType() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -277,11 +278,11 @@ public void testOnSourceWithNoCommonKeys() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), + Graph res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), new DoubleValueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,20\n" + "1,3,20\n" + @@ -290,6 +291,8 @@ public void testOnSourceWithNoCommonKeys() throws Exception { "3,5,80\n" + "4,5,120\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -302,11 +305,11 @@ public void testOnSourceWithCustom() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), + Graph res = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), new CustomValueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,10\n" + "1,3,10\n" + @@ -315,6 +318,8 @@ public void testOnSourceWithCustom() throws Exception { "3,5,40\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -328,11 +333,11 @@ public void testWithEdgesOnTarget() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnTarget(graph.getEdges() + Graph res = graph.joinWithEdgesOnTarget(graph.getEdges() .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -341,6 +346,8 @@ public void testWithEdgesOnTarget() throws Exception { "3,5,70\n" + "4,5,80\n" + "5,1,102\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -354,14 +361,11 @@ public void testWithOnTargetWithLessElements() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + Graph res = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -370,6 +374,8 @@ public void testWithOnTargetWithLessElements() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -383,11 +389,11 @@ public void testOnTargetWithDifferentType() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + Graph res = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,24\n" + "1,3,26\n" + @@ -396,6 +402,8 @@ public void testOnTargetWithDifferentType() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -409,11 +417,11 @@ public void testOnTargetWithNoCommonKeys() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), + Graph res = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), new DoubleValueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,20\n" + "1,3,40\n" + @@ -422,6 +430,8 @@ public void testOnTargetWithNoCommonKeys() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,140\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -434,11 +444,11 @@ public void testOnTargetWithCustom() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), + Graph res = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), new CustomValueMapper()); - result.getEdges().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getEdges(); + List> result= data.collect(); expectedResult = "1,2,10\n" + "1,3,20\n" + @@ -447,6 +457,8 @@ public void testOnTargetWithCustom() throws Exception { "3,5,35\n" + "4,5,45\n" + "5,1,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java index 120e97a1e01fc..99f893e664e95 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java @@ -18,7 +18,10 @@ package org.apache.flink.graph.test.operations; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; @@ -27,11 +30,7 @@ import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; import org.apache.flink.graph.utils.VertexToTuple2Map; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -42,22 +41,8 @@ public JoinWithVerticesITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testJoinWithVertexSet() throws Exception { /* @@ -69,17 +54,19 @@ public void testJoinWithVertexSet() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithVertices(graph.getVertices() + Graph res = graph.joinWithVertices(graph.getVertices() .map(new VertexToTuple2Map()), new AddValuesMapper()); - result.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getVertices(); + List> result= data.collect(); expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -93,17 +80,19 @@ public void testWithLessElements() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithVertices(graph.getVertices().first(3) + Graph res = graph.joinWithVertices(graph.getVertices().first(3) .map(new VertexToTuple2Map()), new AddValuesMapper()); - result.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getVertices(); + List> result= data.collect(); expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,4\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -117,17 +106,19 @@ public void testWithDifferentType() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithVertices(graph.getVertices().first(3) + Graph res = graph.joinWithVertices(graph.getVertices().first(3) .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper()); - result.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getVertices(); + List> result= data.collect(); expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,4\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -141,17 +132,19 @@ public void testWithDifferentKeys() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), + Graph res = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), new ProjectSecondMapper()); - result.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getVertices(); + List> result= data.collect(); expectedResult = "1,10\n" + "2,20\n" + "3,30\n" + "4,40\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -164,17 +157,19 @@ public void testWithCustomType() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - Graph result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), + Graph res = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), new CustomValueMapper()); - result.getVertices().writeAsCsv(resultPath); - env.execute(); + DataSet> data = res.getVertices(); + List> result= data.collect(); expectedResult = "1,10\n" + "2,20\n" + "3,30\n" + "4,40\n" + "5,5\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java index d1ba9a51c5ac5..b3e9b117b86d3 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.test.operations; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -28,11 +30,7 @@ import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,22 +41,8 @@ public MapEdgesITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testWithSameValue() throws Exception { /* @@ -70,9 +54,8 @@ public void testWithSameValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); + List> result= mappedEdges.collect(); + expectedResult = "1,2,13\n" + "1,3,14\n" + "2,3,24\n" + @@ -80,6 +63,8 @@ public void testWithSameValue() throws Exception { "3,5,36\n" + "4,5,46\n" + "5,1,52\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -93,9 +78,8 @@ public void testWithStringValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges(); + List> result= mappedEdges.collect(); - mappedEdges.writeAsCsv(resultPath); - env.execute(); expectedResult = "1,2,string(12)\n" + "1,3,string(13)\n" + "2,3,string(23)\n" + @@ -103,6 +87,8 @@ public void testWithStringValue() throws Exception { "3,5,string(35)\n" + "4,5,string(45)\n" + "5,1,string(51)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -116,9 +102,7 @@ public void testWithTuple1Type() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); + List>> result= mappedEdges.collect(); expectedResult = "1,2,(12)\n" + "1,3,(13)\n" + @@ -127,6 +111,8 @@ public void testWithTuple1Type() throws Exception { "3,5,(35)\n" + "4,5,(45)\n" + "5,1,(51)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -140,9 +126,7 @@ public void testWithCustomType() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); + List> result= mappedEdges.collect(); expectedResult = "1,2,(T,12)\n" + "1,3,(T,13)\n" + @@ -151,6 +135,8 @@ public void testWithCustomType() throws Exception { "3,5,(T,35)\n" + "4,5,(T,45)\n" + "5,1,(T,51)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -165,9 +151,7 @@ public void testWithParametrizedCustomType() throws Exception { DataSet>> mappedEdges = graph.mapEdges( new ToCustomParametrizedTypeMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); + List>> result= mappedEdges.collect(); expectedResult = "1,2,(12.0,12)\n" + "1,3,(13.0,13)\n" + @@ -176,6 +160,8 @@ public void testWithParametrizedCustomType() throws Exception { "3,5,(35.0,35)\n" + "4,5,(45.0,45)\n" + "5,1,(51.0,51)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java index f3a63bec2daca..0c63b4ee4dfac 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.test.operations; +import java.util.List; + import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -28,11 +30,7 @@ import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,22 +41,8 @@ public MapVerticesITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testWithSameValue() throws Exception { /* @@ -69,15 +53,16 @@ public void testWithSameValue() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); + DataSet> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices(); + List> result= mappedVertices.collect(); + expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + "4,5\n" + "5,6\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -91,15 +76,15 @@ public void testWithStringValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); + List> result= mappedVertices.collect(); expectedResult = "1,one\n" + "2,two\n" + "3,three\n" + "4,four\n" + "5,five\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -113,15 +98,15 @@ public void testWithtuple1Value() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); + List>> result= mappedVertices.collect(); expectedResult = "1,(1)\n" + "2,(2)\n" + "3,(3)\n" + "4,(4)\n" + "5,(5)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -135,15 +120,15 @@ public void testWithCustomType() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); + List> result= mappedVertices.collect(); expectedResult = "1,(T,1)\n" + "2,(T,2)\n" + "3,(T,3)\n" + "4,(T,4)\n" + "5,(T,5)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -158,15 +143,15 @@ public void testWithCustomParametrizedType() throws Exception { DataSet>> mappedVertices = graph.mapVertices( new ToCustomParametrizedTypeMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); + List>> result= mappedVertices.collect(); expectedResult = "1,(1.0,1)\n" + "2,(2.0,2)\n" + "3,(3.0,3)\n" + "4,(4.0,4)\n" + "5,(5.0,5)\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index b03268c28e80c..c7aa45c24b247 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.test.operations; +import java.util.List; + import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; @@ -31,11 +33,7 @@ import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -46,22 +44,8 @@ public ReduceOnEdgesMethodsITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testLowestWeightOutNeighbor() throws Exception { /* @@ -74,14 +58,16 @@ public void testLowestWeightOutNeighbor() throws Exception { DataSet> verticesWithLowestOutNeighbor = graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithLowestOutNeighbor.collect(); + expectedResult = "1,2\n" + "2,3\n" + "3,4\n" + "4,5\n" + "5,1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -96,14 +82,15 @@ public void testLowestWeightInNeighbor() throws Exception { DataSet> verticesWithLowestOutNeighbor = graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithLowestOutNeighbor.collect(); expectedResult = "1,5\n" + "2,1\n" + "3,1\n" + "4,3\n" + "5,3\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -117,8 +104,7 @@ public void testAllOutNeighbors() throws Exception { DataSet> verticesWithAllOutNeighbors = graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT); - verticesWithAllOutNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllOutNeighbors.collect(); expectedResult = "1,2\n" + "1,3\n" + @@ -127,6 +113,8 @@ public void testAllOutNeighbors() throws Exception { "3,5\n" + "4,5\n" + "5,1"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -140,8 +128,7 @@ public void testAllOutNeighborsNoValue() throws Exception { DataSet> verticesWithAllOutNeighbors = graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT); - verticesWithAllOutNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllOutNeighbors.collect(); expectedResult = "1,2\n" + "1,3\n" + @@ -149,6 +136,8 @@ public void testAllOutNeighborsNoValue() throws Exception { "3,4\n" + "3,5\n" + "4,5"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -162,13 +151,14 @@ public void testAllOutNeighborsWithValueGreaterThanTwo() throws Exception { DataSet> verticesWithAllOutNeighbors = graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT); - verticesWithAllOutNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllOutNeighbors.collect(); expectedResult = "3,4\n" + "3,5\n" + "4,5\n" + "5,1"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -182,8 +172,7 @@ public void testAllInNeighbors() throws Exception { DataSet> verticesWithAllInNeighbors = graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN); - verticesWithAllInNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllInNeighbors.collect(); expectedResult = "1,5\n" + "2,1\n" + @@ -192,6 +181,8 @@ public void testAllInNeighbors() throws Exception { "4,3\n" + "5,3\n" + "5,4"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -205,14 +196,15 @@ public void testAllInNeighborsNoValue() throws Exception { DataSet> verticesWithAllInNeighbors = graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN); - verticesWithAllInNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllInNeighbors.collect(); expectedResult = "1,5\n" + "2,1\n" + "3,1\n" + "3,2\n" + "4,3"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -226,14 +218,15 @@ public void testAllInNeighborsWithValueGreaterThanTwo() throws Exception { DataSet> verticesWithAllInNeighbors = graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN); - verticesWithAllInNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllInNeighbors.collect(); expectedResult = "3,1\n" + "3,2\n" + "4,3\n" + "5,3\n" + "5,4"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -247,8 +240,7 @@ public void testAllNeighbors() throws Exception { DataSet> verticesWithAllNeighbors = graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL); - verticesWithAllNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllNeighbors.collect(); expectedResult = "1,2\n" + "1,3\n" + @@ -264,6 +256,8 @@ public void testAllNeighbors() throws Exception { "5,1\n" + "5,3\n" + "5,4"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -277,8 +271,7 @@ public void testAllNeighborsNoValue() throws Exception { DataSet> verticesWithAllNeighbors = graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL); - verticesWithAllNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllNeighbors.collect(); expectedResult = "1,2\n" + "1,3\n" + @@ -289,6 +282,8 @@ public void testAllNeighborsNoValue() throws Exception { "3,5\n" + "4,3\n" + "4,5"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -302,12 +297,13 @@ public void testAllNeighborsWithValueGreaterThanFour() throws Exception { DataSet> verticesWithAllNeighbors = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); - verticesWithAllNeighbors.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithAllNeighbors.collect(); expectedResult = "5,1\n" + "5,3\n" + "5,4"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -322,14 +318,15 @@ public void testMaxWeightEdge() throws Exception { DataSet> verticesWithMaxEdgeWeight = graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithMaxEdgeWeight.collect(); expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -344,14 +341,15 @@ public void testLowestWeightOutNeighborNoValue() throws Exception { DataSet> verticesWithLowestOutNeighbor = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithLowestOutNeighbor.collect(); expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -366,14 +364,15 @@ public void testLowestWeightInNeighborNoValue() throws Exception { DataSet> verticesWithLowestOutNeighbor = graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithLowestOutNeighbor.collect(); expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -388,14 +387,15 @@ public void testMaxWeightAllNeighbors() throws Exception { DataSet> verticesWithMaxEdgeWeight = graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithMaxEdgeWeight.collect(); expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index a9fb06e326f5b..f25391ebcd1e2 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.test.operations; import java.util.Iterator; +import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -34,11 +35,7 @@ import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -49,22 +46,8 @@ public ReduceOnNeighborMethodsITCase(TestExecutionMode mode){ super(mode); } - private String resultPath; private String expectedResult; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - @Test public void testSumOfOutNeighbors() throws Exception { /* @@ -77,14 +60,15 @@ public void testSumOfOutNeighbors() throws Exception { DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); + expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -99,14 +83,17 @@ public void testSumOfInNeighbors() throws Exception { DataSet> verticesWithSum = graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSum.collect(); + expectedResult = "1,255\n" + "2,12\n" + "3,59\n" + "4,102\n" + "5,285\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); + + } @Test @@ -122,15 +109,15 @@ public void testSumOfOAllNeighbors() throws Exception { DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -145,11 +132,12 @@ public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception { DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); + expectedResult = "4,5\n" + "5,1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -164,11 +152,12 @@ public void testSumOfInNeighborsIdGreaterThanThree() throws Exception { DataSet> verticesWithSum = graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSum.collect(); + expectedResult = "4,102\n" + "5,285\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -184,12 +173,12 @@ public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception { DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "4,12\n" + "5,13\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -204,15 +193,15 @@ public void testSumOfOutNeighborsNoValue() throws Exception { DataSet> verticesWithSumOfOutNeighborValues = graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -227,14 +216,15 @@ public void testSumOfInNeighborsNoValue() throws Exception { DataSet> verticesWithSum = graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); - verticesWithSum.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSum.collect(); expectedResult = "1,255\n" + "2,12\n" + "3,59\n" + "4,102\n" + "5,285\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -249,15 +239,15 @@ public void testSumOfAllNeighborsNoValue() throws Exception { DataSet> verticesWithSumOfAllNeighborValues = graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL); - - verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfAllNeighborValues.collect(); expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -272,9 +262,7 @@ public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "3,9\n" + "3,18\n" + @@ -282,6 +270,8 @@ public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws "4,10\n" + "5,1\n" + "5,2"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -296,8 +286,7 @@ public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN); - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "3,59\n" + "3,118\n" + @@ -305,6 +294,8 @@ public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws "4,102\n" + "5,570\n" + "5,285"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -319,9 +310,7 @@ public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws DataSet> verticesWithSumOfAllNeighborValues = graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL); - - verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfAllNeighborValues.collect(); expectedResult = "3,12\n" + "3,24\n" + @@ -329,6 +318,8 @@ public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws "4,16\n" + "5,8\n" + "5,16"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -343,9 +334,8 @@ public void testSumOfOutNeighborsMultipliedByTwo() throws Exception { DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); + expectedResult = "1,5\n" + "1,10\n" + "2,3\n" + @@ -356,6 +346,8 @@ public void testSumOfOutNeighborsMultipliedByTwo() throws Exception { "4,10\n" + "5,1\n" + "5,2"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -370,9 +362,8 @@ public void testSumOfInNeighborsSubtractOne() throws Exception { DataSet> verticesWithSum = graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSum.collect(); + expectedResult = "1,255\n" + "1,254\n" + "2,12\n" + @@ -383,6 +374,8 @@ public void testSumOfInNeighborsSubtractOne() throws Exception { "4,101\n" + "5,285\n" + "5,284"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @Test @@ -398,9 +391,7 @@ public void testSumOfOAllNeighborsAddFive() throws Exception { DataSet> verticesWithSumOfOutNeighborValues = graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); + List> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "1,11\n" + "1,16\n" + @@ -412,6 +403,8 @@ public void testSumOfOAllNeighborsAddFive() throws Exception { "4,17\n" + "5,13\n" + "5,18"; + + CompareResults.compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") From 866bc535b8d99e937841c9ce16372b024d1657cd Mon Sep 17 00:00:00 2001 From: Samia Date: Wed, 24 Jun 2015 13:12:48 +0200 Subject: [PATCH 2/3] [FLINK-2264] [gelly] added liecense --- .../graph/test/operations/CompareResults.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java index b8ceb28399812..2372f558faa3b 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/CompareResults.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.graph.test.operations; From a8e047bba80d723f257111f801f72b62cd062580 Mon Sep 17 00:00:00 2001 From: Samia Date: Thu, 2 Jul 2015 11:04:38 +0200 Subject: [PATCH 3/3] [FLINK-GELLY] added missing assumption to the pageRank example: pages must have at least one incoming and one outgoing link --- .../main/java/org/apache/flink/graph/example/PageRank.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java index e4ad13f062621..7308431c40e73 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java @@ -36,7 +36,9 @@ * * The edges input file is expected to contain one edge per line, with long IDs and double * values, in the following format:"\t\t". - * + * For this simple implementation it is required that each page has at least one incoming + * and one outgoing link, otherwise the rank will differ from the actual theoretical value. + * * If no arguments are provided, the example runs with a random graph of 10 vertices * and random edge weights. *