From be46073703bc91441eda4e91c8199dd866c845ec Mon Sep 17 00:00:00 2001 From: balidani Date: Fri, 20 Feb 2015 11:30:57 +0100 Subject: [PATCH 1/2] [FLINK-1522][FLINK-1576] Updated LabelPropagationExample and test --- flink-staging/flink-gelly/pom.xml | 31 ++++++ .../example/LabelPropagationExample.java | 87 +++++++++++++++-- .../test/LabelPropagationExampleITCase.java | 97 +++++++++++++++++++ 3 files changed, 208 insertions(+), 7 deletions(-) mode change 100644 => 100755 flink-staging/flink-gelly/pom.xml create mode 100755 flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml old mode 100644 new mode 100755 index af1fcb40623ce..13ed002161e4b --- a/flink-staging/flink-gelly/pom.xml +++ b/flink-staging/flink-gelly/pom.xml @@ -52,4 +52,35 @@ under the License. test + + + + + hadoop-1 + + + + hadoop.profile1 + + + + + + com.google.guava + guava + ${guava.version} + provided + + + + + hadoop-2 + + + + !hadoop.profile + + + + diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java index c490bb3af0256..78cb5d599c4c0 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -42,31 +43,84 @@ public class LabelPropagationExample implements ProgramDescription { public static void main(String[] args) throws Exception { + if(!parseParameters(args)) { + return; + } + + // Set up the execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + // Set up the graph DataSet> vertices = getVertexDataSet(env); DataSet> edges = getEdgeDataSet(env); Graph graph = Graph.fromDataSet(vertices, edges, env); + // Set up the program DataSet> verticesWithCommunity = graph.run( new LabelPropagation(maxIterations)).getVertices(); - verticesWithCommunity.print(); + // Emit results + if(fileOutput) { + verticesWithCommunity.writeAsCsv(outputPath, "\n", ","); + } else { + verticesWithCommunity.print(); + } - env.execute(); + // Execute the program + env.execute("Label Propagation Example"); } - @Override - public String getDescription() { - return "Label Propagation Example"; - } + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + private static boolean fileOutput = false; + private static String vertexInputPath = null; + private static String edgeInputPath = null; + private static String outputPath = null; private static long numVertices = 100; - private static int maxIterations = 20; + private static int maxIterations = 10; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 5) { + System.err.println("Usage: LabelPropagation "); + return false; + } + + fileOutput = true; + vertexInputPath = args[0]; + edgeInputPath = args[1]; + outputPath = args[2]; + numVertices = Integer.parseInt(args[3]); + maxIterations = Integer.parseInt(args[4]); + } else { + System.out.println("Executing LabelPropagation example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: LabelPropagation "); + } + return true; + } @SuppressWarnings("serial") private static DataSet> getVertexDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(vertexInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction, Vertex>() { + @Override + public Vertex map(Tuple2 value) throws Exception { + return new Vertex(value.f0, value.f1); + } + }); + } + return env.generateSequence(1, numVertices).map( new MapFunction>() { public Vertex map(Long l) throws Exception { @@ -77,6 +131,20 @@ public Vertex map(Long l) throws Exception { @SuppressWarnings("serial") private static DataSet> getEdgeDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter(" ") + .lineDelimiter("\n") + .types(Long.class, Long.class) + .map(new MapFunction, Edge>() { + @Override + public Edge map(Tuple2 value) throws Exception { + return new Edge(value.f0, value.f1, NullValue.getInstance()); + } + }); + } + return env.generateSequence(1, numVertices).flatMap( new FlatMapFunction>() { @Override @@ -91,4 +159,9 @@ public void flatMap(Long key, } }); } + + @Override + public String getDescription() { + return "Label Propagation Example"; + } } \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java new file mode 100755 index 0000000000000..d5b2239877b8e --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.LabelPropagationExample; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class LabelPropagationExampleITCase extends MultipleProgramsTestBase { + + public LabelPropagationExampleITCase(ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + private String verticesPath; + private String edgesPath; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + + final String vertices = "1 1\n" + + "2 2\n" + + "3 3\n" + + "4 4\n" + + "5 5\n"; + + final String edges = "1 2\n" + + "1 3\n" + + "2 3\n" + + "3 4\n" + + "3 5\n" + + "4 5\n" + + "5 1\n"; + + File verticesFile = tempFolder.newFile(); + Files.write(vertices, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(edges, edgesFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testLabelPropagation() throws Exception { + /* + * Test the label propagation example + */ + LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "5", "16"}); + + expectedResult = "1,5\n" + + "2,5\n" + + "3,5\n" + + "4,5\n" + + "5,5\n"; + } +} From d93ff8f6026a0f31a7529b86c3754f585f785d33 Mon Sep 17 00:00:00 2001 From: balidani Date: Tue, 3 Mar 2015 18:58:59 +0100 Subject: [PATCH 2/2] [FLINK-1522][FLINK-1576] Added more test cases --- .../test/LabelPropagationExampleITCase.java | 145 ++++++++++++++---- 1 file changed, 112 insertions(+), 33 deletions(-) diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java index d5b2239877b8e..dfb0f3feeb208 100755 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java @@ -42,56 +42,135 @@ public LabelPropagationExampleITCase(ExecutionMode mode){ private String resultPath; private String expectedResult; - private String verticesPath; - private String edgesPath; - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @Before public void before() throws Exception{ resultPath = tempFolder.newFile().toURI().toString(); + } - final String vertices = "1 1\n" + - "2 2\n" + - "3 3\n" + - "4 4\n" + - "5 5\n"; - - final String edges = "1 2\n" + - "1 3\n" + - "2 3\n" + - "3 4\n" + - "3 5\n" + - "4 5\n" + - "5 1\n"; + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } - File verticesFile = tempFolder.newFile(); - Files.write(vertices, verticesFile, Charsets.UTF_8); + @Test + public void testSingleIteration() throws Exception { + /* + * Test one iteration of label propagation example with a simple graph + */ - File edgesFile = tempFolder.newFile(); - Files.write(edges, edgesFile, Charsets.UTF_8); + final String vertices = "1 10\n" + + "2 10\n" + + "3 30\n" + + "4 40\n" + + "5 40\n" + + "6 40\n" + + "7 70\n"; - verticesPath = verticesFile.toURI().toString(); - edgesPath = edgesFile.toURI().toString(); + final String edges = "1 3\n" + + "2 3\n" + + "4 7\n" + + "5 7\n" + + "6 7\n" + + "7 3\n"; + + String verticesPath = createTempFile(vertices); + String edgesPath = createTempFile(edges); + + LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "7", "1"}); + + expectedResult = "1,10\n" + + "2,10\n" + + "3,10\n" + + "4,40\n" + + "5,40\n" + + "6,40\n" + + "7,40\n"; } - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); + @Test + public void testTieBreaker() throws Exception { + /* + * Test the label propagation example where a tie must be broken + */ + + final String vertices = "1 10\n" + + "2 10\n" + + "3 10\n" + + "4 10\n" + + "5 0\n" + + "6 20\n" + + "7 20\n" + + "8 20\n" + + "9 20\n"; + + final String edges = "1 5\n" + + "2 5\n" + + "3 5\n" + + "4 5\n" + + "6 5\n" + + "7 5\n" + + "8 5\n" + + "9 5\n"; + + String verticesPath = createTempFile(vertices); + String edgesPath = createTempFile(edges); + + LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "9", "1"}); + + expectedResult = "1,10\n" + + "2,10\n" + + "3,10\n" + + "4,10\n" + + "5,20\n" + + "6,20\n" + + "7,20\n" + + "8,20\n" + + "9,20\n"; } @Test - public void testLabelPropagation() throws Exception { + public void testTermination() throws Exception { /* - * Test the label propagation example + * Test the label propagation example where the algorithm terminates on the first iteration */ - LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "5", "16"}); - expectedResult = "1,5\n" + - "2,5\n" + - "3,5\n" + - "4,5\n" + - "5,5\n"; + final String vertices = "1 10\n" + + "2 10\n" + + "3 10\n" + + "4 40\n" + + "5 40\n" + + "6 40\n"; + + final String edges = "1 2\n" + + "2 3\n" + + "3 1\n" + + "4 5\n" + + "5 6\n" + + "6 4\n"; + + String verticesPath = createTempFile(vertices); + String edgesPath = createTempFile(edges); + + LabelPropagationExample.main(new String[]{verticesPath, edgesPath, resultPath, "6", "2"}); + + expectedResult = "1,10\n" + + "2,10\n" + + "3,10\n" + + "4,40\n" + + "5,40\n" + + "6,40\n"; + } + + // ------------------------------------------------------------------------- + // Util methods + // ------------------------------------------------------------------------- + + private String createTempFile(final String rows) throws Exception { + File tempFile = tempFolder.newFile(); + Files.write(rows, tempFile, Charsets.UTF_8); + return tempFile.toURI().toString(); } }