From ac6c8ac63d730fd03125b3798778e76267c59dcb Mon Sep 17 00:00:00 2001 From: vasia Date: Wed, 24 Jun 2015 18:09:13 +0200 Subject: [PATCH] [FLINK-2271] [FLINK-1522] [gelly] changed PageRank example to expect an unweighted edge list added PageRank and MusicProfiles tests got rid of unused suppress warning annotations in Graph and JaccardSimilarityMeasure changed GSAPageRank example to expect an unweighted edge list --- .../java/org/apache/flink/graph/Graph.java | 3 - .../flink/graph/example/GSAPageRank.java | 13 ++- .../example/JaccardSimilarityMeasure.java | 2 - .../apache/flink/graph/example/PageRank.java | 16 +-- .../example/utils/MusicProfilesData.java | 23 +++- .../graph/example/utils/PageRankData.java | 42 ++++++++ .../graph/test/GatherSumApplyITCase.java | 17 --- .../test/example/MusicProfilesITCase.java | 101 ++++++++++++++++++ .../graph/test/example/PageRankITCase.java | 78 ++++++++++++++ 9 files changed, 260 insertions(+), 35 deletions(-) create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java create mode 100644 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java create mode 100644 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 5e13ce17e1ca6..a7c75bccd90e5 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1025,7 +1025,6 @@ public Tuple2 map(Edge edge) throws Exception { * @param vertex the vertex to be added * @return the new graph containing the existing vertices as well as the one just added */ - @SuppressWarnings("unchecked") public Graph addVertex(final Vertex vertex) { List> newVertex = new ArrayList>(); newVertex.add(vertex); @@ -1040,7 +1039,6 @@ public Graph addVertex(final Vertex vertex) { * @param verticesToAdd the list of vertices to add * @return the new graph containing the existing and newly added vertices */ - @SuppressWarnings("unchecked") public Graph addVertices(List> verticesToAdd) { // Add the vertices DataSet> newVertices = this.vertices.union(this.context.fromCollection(verticesToAdd)).distinct(); @@ -1074,7 +1072,6 @@ public Graph addEdge(Vertex source, Vertex target, EV e * @param newEdges the data set of edges to be added * @return a new graph containing the existing edges plus the newly added edges. */ - @SuppressWarnings("unchecked") public Graph addEdges(List> newEdges) { DataSet> newEdgesDataSet = this.context.fromCollection(newEdges); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java index 47f67b86f57cb..45d4555794d48 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java @@ -31,14 +31,13 @@ import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.util.Collector; /** * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration. * - * The edges input file is expected to contain one edge per line, with long IDs and double - * values, in the following format:"\t\t". + * The edges input file is expected to contain one edge per line, with long IDs and no + * values, in the following format:"\t". * * If no arguments are provided, the example runs with a random graph of 10 vertices * and random edge weights. @@ -187,8 +186,12 @@ private static DataSet> getLinksDataSet(ExecutionEnvironment return env.readCsvFile(edgeInputPath) .fieldDelimiter("\t") .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); + .types(Long.class, Long.class) + .map(new MapFunction, Edge>() { + public Edge map(Tuple2 input) { + return new Edge(input.f0, input.f1, 1.0); + } + }).withForwardedFields("f0; f1"); } return env.generateSequence(1, numPages).flatMap( diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java index be241ce8310ad..79de407b5f847 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java @@ -114,7 +114,6 @@ public String getDescription() { /** * Each vertex will have a HashSet containing its neighbor ids as value. */ - @SuppressWarnings("serial") private static final class GatherNeighbors implements ReduceNeighborsFunction> { @Override @@ -136,7 +135,6 @@ public HashSet reduceNeighbors(HashSet first, * * The Jaccard similarity coefficient is then, the intersection/union. */ - @SuppressWarnings("serial") private static final class ComputeJaccard implements MapFunction, Double>, Edge> { diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java index e4ad13f062621..10b4be4cbb041 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java @@ -28,17 +28,15 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.library.PageRankAlgorithm; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.util.Collector; /** * This example implements a simple PageRank algorithm, using a vertex-centric iteration. * - * The edges input file is expected to contain one edge per line, with long IDs and double - * values, in the following format:"\t\t". + * The edges input file is expected to contain one edge per line, with long IDs and no + * values, in the following format:"\t". * - * If no arguments are provided, the example runs with a random graph of 10 vertices - * and random edge weights. + * If no arguments are provided, the example runs with a random graph of 10 vertices. * */ public class PageRank implements ProgramDescription { @@ -131,8 +129,12 @@ private static DataSet> getLinksDataSet(ExecutionEnvironment return env.readCsvFile(edgeInputPath) .fieldDelimiter("\t") .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap()); + .types(Long.class, Long.class) + .map(new MapFunction, Edge>() { + public Edge map(Tuple2 input) { + return new Edge(input.f0, input.f1, 1.0); + } + }).withForwardedFields("f0; f1"); } return env.generateSequence(1, numPages).flatMap( diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java index 0a7162d311303..6b963724a2c02 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java @@ -79,5 +79,26 @@ public static DataSet getMismatches(ExecutionEnvironment env) { errors.add("ERROR: Black Trees"); return env.fromCollection(errors); } -} + public static final String USER_SONG_TRIPLETS = "user_1 song_1 100\n" + "user_1 song_5 200\n" + + "user_2 song_1 10\n" + "user_2 song_4 20\n" + + "user_3 song_2 3\n" + + "user_4 song_2 1\n" + "user_4 song_3 2\n" + + "user_5 song_3 30"; + + public static final String MISMATCHES = "ERROR: Angie"; + + public static final String MAX_ITERATIONS = "2"; + + public static final String TOP_SONGS_RESULT = "user_1 song_1\n" + + "user_2 song_4\n" + + "user_3 song_2\n" + + "user_4 song_3\n" + + "user_5 song_3"; + + public static final String COMMUNITIES_RESULT = "user_1 1\n" + + "user_2 1\n" + + "user_3 3\n" + + "user_4 3\n" + + "user_5 4"; +} \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java new file mode 100644 index 0000000000000..077572e612916 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; + +public class PageRankData { + + public static final String EDGES = "2 1\n" + + "5 2\n" + + "5 4\n" + + "4 3\n" + + "4 2\n" + + "1 4\n" + + "1 2\n" + + "1 3\n" + + "3 5\n"; + + + public static final String RANKS_AFTER_3_ITERATIONS = "1 0.237\n" + + "2 0.248\n" + + "3 0.173\n" + + "4 0.175\n" + + "5 0.165\n"; + + private PageRankData() {} +} + diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java index ca40323401b8a..a883fa0e1d2ec 100755 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java @@ -20,7 +20,6 @@ import com.google.common.base.Charsets; import com.google.common.io.Files; -import org.apache.flink.graph.example.GSAPageRank; import org.apache.flink.graph.example.GSAConnectedComponents; import org.apache.flink.graph.example.GSASingleSourceShortestPaths; import org.apache.flink.test.util.MultipleProgramsTestBase; @@ -97,22 +96,6 @@ public void testSingleSourceShortestPaths() throws Exception { "7 Infinity\n"; } - // -------------------------------------------------------------------------------------------- - // Page Rank Test - // -------------------------------------------------------------------------------------------- - - @Test - public void testPageRank() throws Exception { - GSAPageRank.main(new String[]{edgesPath, resultPath, "16"}); - expectedResult = "1 7.47880014315678E21\n" + - "2 1.6383884499619055E21\n" + - "3 3.044048626469292E21\n" + - "4 1.6896936994425786E21\n" + - "5 4.214827876275491E21\n" + - "6 1.0\n" + - "7 8.157142857142858"; - } - // -------------------------------------------------------------------------------------------- // Sample data // -------------------------------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java new file mode 100644 index 0000000000000..0410d417472e7 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +import org.apache.flink.graph.example.MusicProfiles; +import org.apache.flink.graph.example.utils.MusicProfilesData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; + +@RunWith(Parameterized.class) +public class MusicProfilesITCase extends MultipleProgramsTestBase { + + private String tripletsPath; + + private String mismatchesPath; + + private String topSongsResultPath; + + private String communitiesResultPath; + + private String expectedTopSongs; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public MusicProfilesITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + topSongsResultPath = tempFolder.newFile().toURI().toString(); + communitiesResultPath = tempFolder.newFile().toURI().toString(); + + File tripletsFile = tempFolder.newFile(); + Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, Charsets.UTF_8); + tripletsPath = tripletsFile.toURI().toString(); + + File mismatchesFile = tempFolder.newFile(); + Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, Charsets.UTF_8); + mismatchesPath = mismatchesFile.toURI().toString(); + } + + @Test + public void testMusicProfilesExample() throws Exception { + MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath, + MusicProfilesData.MAX_ITERATIONS + ""}); + expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath); + + ArrayList list = new ArrayList(); + readAllResultLines(list, communitiesResultPath, new String[]{}, false); + + String[] result = list.toArray(new String[list.size()]); + Arrays.sort(result); + + // check that user_1 and user_2 are in the same community + Assert.assertEquals("users 1 and 2 are not in the same community", + result[0].substring(7), result[1].substring(7)); + + // check that user_3, user_4 and user_5 are in the same community + Assert.assertEquals("users 3 and 4 are not in the same community", + result[2].substring(7), result[3].substring(7)); + Assert.assertEquals("users 4 and 5 are not in the same community", + result[3].substring(7), result[4].substring(7)); + } +} \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java new file mode 100644 index 0000000000000..544cc66f01068 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import java.io.File; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +import org.apache.flink.graph.example.GSAPageRank; +import org.apache.flink.graph.example.PageRank; +import org.apache.flink.graph.example.utils.PageRankData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class PageRankITCase extends MultipleProgramsTestBase { + + public PageRankITCase(TestExecutionMode mode){ + super(mode); + } + + private String edgesPath; + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + + File edgesFile = tempFolder.newFile(); + Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8); + + edgesPath = edgesFile.toURI().toString(); + } + + @After + public void after() throws Exception{ + compareKeyValueParisWithDelta(expected, resultPath, "\t", 0.01); + } + + @Test + public void testPageRankWithThreeIterations() throws Exception { + PageRank.main(new String[] {edgesPath, resultPath, "3"}); + expected = PageRankData.RANKS_AFTER_3_ITERATIONS; + } + + @Test + public void testGSAPageRankWithThreeIterations() throws Exception { + GSAPageRank.main(new String[] {edgesPath, resultPath, "3"}); + expected = PageRankData.RANKS_AFTER_3_ITERATIONS; + } +}