From da73b7c74cabedafbc31fdf2403a4f01d77d46d0 Mon Sep 17 00:00:00 2001 From: andralungu Date: Mon, 27 Apr 2015 00:26:41 +0200 Subject: [PATCH] [FLINK-1944][gelly] Added GSA-PageRank example --- .../flink/graph/example/GSAPageRank.java | 209 ++++++++++++++++++ .../flink/graph/example/PageRankExample.java | 8 +- .../apache/flink/graph/gsa/ApplyFunction.java | 16 +- .../flink/graph/gsa/GatherFunction.java | 16 +- .../apache/flink/graph/gsa/SumFunction.java | 16 +- .../graph/test/GatherSumApplyITCase.java | 16 ++ 6 files changed, 265 insertions(+), 16 deletions(-) create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java new file mode 100644 index 0000000000000..b6f0c87ad3b09 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; +import org.apache.flink.util.Collector; + +/** + * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration. + * + * The edges input file is expected to contain one edge per line, with long IDs and double + * values, in the following format:"\t\t". + * + * If no arguments are provided, the example runs with a random graph of 10 vertices + * and random edge weights. + */ +public class GSAPageRank implements ProgramDescription { + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> links = getLinksDataSet(env); + + Graph network = Graph.fromDataSet(links, new MapFunction() { + + @Override + public Double map(Long value) throws Exception { + return 1.0; + } + }, env); + + DataSet> vertexOutDegrees = network.outDegrees(); + + // Assign the transition probabilities as the edge weights + Graph networkWithWeights = network + .joinWithEdgesOnSource(vertexOutDegrees, + new MapFunction, Double>() { + + @Override + public Double map(Tuple2 value) { + return value.f0 / value.f1; + } + }); + + long numberOfVertices = networkWithWeights.numberOfVertices(); + + // Execute the GSA iteration + Graph result = networkWithWeights + .runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(), + new UpdateRanks(numberOfVertices), maxIterations); + + // Extract the vertices as the result + DataSet> pageRanks = result.getVertices(); + + // emit result + if (fileOutput) { + pageRanks.writeAsCsv(outputPath, "\n", "\t"); + } else { + pageRanks.print(); + } + + env.execute("GSA Page Ranks"); + } + + // -------------------------------------------------------------------------------------------- + // Page Rank UDFs + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("serial") + private static final class GatherRanks extends GatherFunction { + + long numberOfVertices; + + public GatherRanks(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + @Override + public Double gather(Neighbor neighbor) { + double neighborRank = neighbor.getNeighborValue(); + + if(getSuperstepNumber() == 1) { + neighborRank = 1.0 / numberOfVertices; + } + + return neighborRank * neighbor.getEdgeValue(); + } + } + + @SuppressWarnings("serial") + private static final class SumRanks extends SumFunction { + + @Override + public Double sum(Double newValue, Double currentValue) { + return newValue + currentValue; + } + } + + @SuppressWarnings("serial") + private static final class UpdateRanks extends ApplyFunction { + + long numberOfVertices; + + public UpdateRanks(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + @Override + public void apply(Double rankSum, Double currentValue) { + setResult((1-DAMPENING_FACTOR)/numberOfVertices + DAMPENING_FACTOR * rankSum); + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static final double DAMPENING_FACTOR = 0.85; + private static long numPages = 10; + private static String edgeInputPath = null; + private static String outputPath = null; + private static int maxIterations = 10; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + if(args.length != 3) { + System.err.println("Usage: GSAPageRank "); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + maxIterations = Integer.parseInt(args[2]); + } else { + System.out.println("Executing GSAPageRank example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: GSAPageRank "); + } + return true; + } + + @SuppressWarnings("serial") + private static DataSet> getLinksDataSet(ExecutionEnvironment env) { + + if (fileOutput) { + return env.readCsvFile(edgeInputPath) + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap()); + } + + return env.generateSequence(1, numPages).flatMap( + new FlatMapFunction>() { + @Override + public void flatMap(Long key, + Collector> out) throws Exception { + int numOutEdges = (int) (Math.random() * (numPages / 2)); + for (int i = 0; i < numOutEdges; i++) { + long target = (long) (Math.random() * numPages) + 1; + out.collect(new Edge(key, target, 1.0)); + } + } + }); + } + + @Override + public String getDescription() { + return "GSA Page Rank"; + } +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java index cc0b702e1850d..2e36ad9ad096d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java @@ -104,15 +104,15 @@ public String getDescription() { private static boolean parseParameters(String[] args) { if(args.length > 0) { - if(args.length != 4) { + if(args.length != 3) { System.err.println("Usage: PageRank "); return false; } fileOutput = true; - edgeInputPath = args[1]; - outputPath = args[2]; - maxIterations = Integer.parseInt(args[3]); + edgeInputPath = args[0]; + outputPath = args[1]; + maxIterations = Integer.parseInt(args[2]); } else { System.out.println("Executing PageRank example with default parameters and built-in default data."); System.out.println(" Provide parameters to read input data from files."); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java index 75f64f93210d5..a4963e0fa0e31 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java @@ -45,20 +45,28 @@ public void setResult(VV result) { * * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail. */ - public void preSuperstep() {}; + public void preSuperstep() {} /** * This method is executed once per superstep after the vertex update function has been invoked for each vertex. * * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail. */ - public void postSuperstep() {}; + public void postSuperstep() {} + + /** + * Gets the number of the superstep, starting at 1. + * + * @return The number of the current superstep. + */ + public int getSuperstepNumber() { + return this.runtimeContext.getSuperstepNumber(); + } // -------------------------------------------------------------------------------------------- // Internal methods // -------------------------------------------------------------------------------------------- - @SuppressWarnings("unused") private IterationRuntimeContext runtimeContext; private Collector> out; @@ -67,7 +75,7 @@ public void setResult(VV result) { public void init(IterationRuntimeContext iterationRuntimeContext) { this.runtimeContext = iterationRuntimeContext; - }; + } public void setOutput(Vertex vertex, Collector> out) { this.out = out; diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java index 0b0caf9f4cbd4..4ffae8d2d7c63 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java @@ -32,23 +32,31 @@ public abstract class GatherFunction1. + * + * @return The number of the current superstep. + */ + public int getSuperstepNumber() { + return this.runtimeContext.getSuperstepNumber(); + } // -------------------------------------------------------------------------------------------- // Internal methods // -------------------------------------------------------------------------------------------- - @SuppressWarnings("unused") private IterationRuntimeContext runtimeContext; public void init(IterationRuntimeContext iterationRuntimeContext) { this.runtimeContext = iterationRuntimeContext; - }; + } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java index 6ed85c413b2d4..4836af687c244 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java @@ -32,23 +32,31 @@ public abstract class SumFunction1. + * + * @return The number of the current superstep. + */ + public int getSuperstepNumber() { + return this.runtimeContext.getSuperstepNumber(); + } // -------------------------------------------------------------------------------------------- // Internal methods // -------------------------------------------------------------------------------------------- - @SuppressWarnings("unused") private IterationRuntimeContext runtimeContext; public void init(IterationRuntimeContext iterationRuntimeContext) { this.runtimeContext = iterationRuntimeContext; - }; + } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java index 23ebfa5edd4ec..a0c1de4a4bed1 100755 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java @@ -21,6 +21,7 @@ import com.google.common.base.Charsets; import com.google.common.io.Files; import org.apache.flink.graph.example.GSAConnectedComponentsExample; +import org.apache.flink.graph.example.GSAPageRank; import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.After; @@ -96,6 +97,21 @@ public void testSingleSourceShortestPath() throws Exception { "7 Infinity\n"; } + // -------------------------------------------------------------------------------------------- + // Page Rank Test + // -------------------------------------------------------------------------------------------- + + @Test + public void testPageRank() throws Exception { + GSAPageRank.main(new String[]{edgesPath, resultPath, "16"}); + expectedResult = "1 7.47880014315678E21\n" + + "2 1.6383884499619055E21\n" + + "3 3.044048626469292E21\n" + + "4 1.6896936994425786E21\n" + + "5 4.214827876275491E21\n" + + "6 1.0\n" + + "7 8.157142857142858"; + } // -------------------------------------------------------------------------------------------- // Sample data