From cffd9440599cf56e7e22da69a3a1a94168789b17 Mon Sep 17 00:00:00 2001 From: andralungu Date: Thu, 4 Jun 2015 14:13:21 +0200 Subject: [PATCH] [FLINK-2140][gelly] Allowed access to the number of vertices in the GSA functions --- docs/libs/gelly_guide.md | 45 ++++++++++++++++++- .../flink/graph/IterationConfiguration.java | 23 ++++++++++ .../apache/flink/graph/gsa/ApplyFunction.java | 21 +++++++++ .../flink/graph/gsa/GatherFunction.java | 21 +++++++++ .../graph/gsa/GatherSumApplyIteration.java | 19 ++++++++ .../apache/flink/graph/gsa/SumFunction.java | 21 +++++++++ .../spargel/VertexCentricConfiguration.java | 23 ---------- .../GatherSumApplyConfigurationITCase.java | 17 +++++++ 8 files changed, 166 insertions(+), 24 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index d7b9bd823afea..064ed9f60082c 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -459,7 +459,6 @@ The number of vertices can then be accessed in the vertex update function and in The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using the `getInDegree()` and `getOutDegree()` methods. If the degrees option is not set in the configuration, these methods will return -1. - * Messaging Direction: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either `EdgeDirection.IN`, `EdgeDirection.OUT`, `EdgeDirection.ALL`. The messaging direction also dictates the update direction which would be `EdgeDirection.OUT`, `EdgeDirection.IN` and `EdgeDirection.ALL`, respectively. This property can be set using the `setDirection()` method. {% highlight java %} @@ -684,6 +683,50 @@ Currently, the following parameters can be specified: * Broadcast Variables: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/programming_guide.html#broadcast-variables) to the `GatherFunction`, `SumFunction` and `ApplyFunction`, using the methods `addBroadcastSetForGatherFunction()`, `addBroadcastSetForSumFunction()` and `addBroadcastSetForApplyFunction` methods, respectively. +* Number of Vertices: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method. +The number of vertices can then be accessed in the gather, sum and/or apply functions by using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1. + +The following example illustrates the usage of the number of vertices option. + +{% highlight java %} + +Graph graph = ... + +// configure the iteration +GSAConfiguration parameters = new GSAConfiguration(); + +// set the number of vertices option to true +parameters.setOptNumVertices(true); + +// run the gather-sum-apply iteration, also passing the configuration parameters +Graph result = graph.runGatherSumApplyIteration( + new Gather(), new Sum(), new Apply(), + maxIterations, parameters); + +// user-defined functions +public static final class Gather { + ... + // get the number of vertices + long numVertices = getNumberOfVertices(); + ... +} + +public static final class Sum { + ... + // get the number of vertices + long numVertices = getNumberOfVertices(); + ... +} + +public static final class Apply { + ... + // get the number of vertices + long numVertices = getNumberOfVertices(); + ... +} + +{% endhighlight %} + [Back to top](#top) ### Vertex-centric and GSA Comparison diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java index 3086172f93f7c..321519438c4e7 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java @@ -40,6 +40,9 @@ public abstract class IterationConfiguration { /** flag that defines whether the solution set is kept in managed memory **/ private boolean unmanagedSolutionSet = false; + + /** flag that defines whether the number of vertices option is set **/ + private boolean optNumVertices = false; public IterationConfiguration() {} @@ -108,6 +111,26 @@ public boolean isSolutionSetUnmanagedMemory() { return this.unmanagedSolutionSet; } + /** + * Gets whether the number of vertices option is set. + * By default, the number of vertices option is not set. + * + * @return True, if the number of vertices option is set, false otherwise. + */ + public boolean isOptNumVertices() { + return optNumVertices; + } + + /** + * Sets the number of vertices option. + * By default, the number of vertices option is not set. + * + * @param optNumVertices True, to set this option, false otherwise. + */ + public void setOptNumVertices(boolean optNumVertices) { + this.optNumVertices = optNumVertices; + } + /** * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates * via {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getIterationAggregator(String)} and diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java index d88fe0d5cdd30..ed0cf70542863 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java @@ -30,6 +30,27 @@ @SuppressWarnings("serial") public abstract class ApplyFunction implements Serializable { + // -------------------------------------------------------------------------------------------- + // Attribute that allows access to the total number of vertices inside an iteration. + // -------------------------------------------------------------------------------------------- + + private long numberOfVertices = -1L; + + /** + * Retrieves the number of vertices in the graph. + * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)} + * option has been set; -1 otherwise. + */ + public long getNumberOfVertices() { + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //--------------------------------------------------------------------------------------------- + public abstract void apply(M newValue, VV currentValue); /** diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java index 37ff2d696a2df..5a09a5ace8da1 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java @@ -28,6 +28,27 @@ @SuppressWarnings("serial") public abstract class GatherFunction implements Serializable { + // -------------------------------------------------------------------------------------------- + // Attribute that allows access to the total number of vertices inside an iteration. + // -------------------------------------------------------------------------------------------- + + private long numberOfVertices = -1L; + + /** + * Retrieves the number of vertices in the graph. + * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)} + * option has been set; -1 otherwise. + */ + public long getNumberOfVertices() { + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //--------------------------------------------------------------------------------------------- + public abstract M gather(Neighbor neighbor); /** diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java index a80369d16ac07..389cf024b70e5 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.operators.CustomUnaryOperation; @@ -38,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; import java.util.Map; @@ -116,6 +118,23 @@ public DataSet> createResult() { TypeInformation> innerType = new TupleTypeInfo>(keyType, messageType); TypeInformation> outputType = vertexDataSet.getType(); + // create a graph + Graph graph = + Graph.fromDataSet(vertexDataSet, edgeDataSet, ExecutionEnvironment.getExecutionEnvironment()); + + // check whether the numVertices option is set and, if so, compute the total number of vertices + // and set it within the gather, sum and apply functions + if (this.configuration != null && this.configuration.isOptNumVertices()) { + try { + long numberOfVertices = graph.numberOfVertices(); + gather.setNumberOfVertices(numberOfVertices); + sum.setNumberOfVertices(numberOfVertices); + apply.setNumberOfVertices(numberOfVertices); + } catch (Exception e) { + e.printStackTrace(); + } + } + // Prepare UDFs GatherUdf gatherUdf = new GatherUdf(gather, innerType); SumUdf sumUdf = new SumUdf(sum, innerType); diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java index 16cd682e2f884..69baae4966a8e 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java @@ -28,6 +28,27 @@ @SuppressWarnings("serial") public abstract class SumFunction implements Serializable { + // -------------------------------------------------------------------------------------------- + // Attribute that allows access to the total number of vertices inside an iteration. + // -------------------------------------------------------------------------------------------- + + private long numberOfVertices = -1L; + + /** + * Retrieves the number of vertices in the graph. + * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)} + * option has been set; -1 otherwise. + */ + public long getNumberOfVertices() { + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //--------------------------------------------------------------------------------------------- + public abstract M sum(M arg0, M arg1); /** diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java index e76c174c645a4..afd4ffd045e44 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java @@ -47,9 +47,6 @@ public class VertexCentricConfiguration extends IterationConfiguration { /** flag that defines whether the degrees option is set **/ private boolean optDegrees = false; - /** flag that defines whether the number of vertices option is set **/ - private boolean optNumVertices = false; - /** the direction in which the messages should be sent **/ private EdgeDirection direction = EdgeDirection.OUT; @@ -115,26 +112,6 @@ public void setOptDegrees(boolean optDegrees) { this.optDegrees = optDegrees; } - /** - * Gets whether the number of vertices option is set. - * By default, the number of vertices option is not set. - * - * @return True, if the number of vertices option is set, false otherwise. - */ - public boolean isOptNumVertices() { - return optNumVertices; - } - - /** - * Sets the number of vertices option. - * By default, the number of vertices option is not set. - * - * @param optNumVertices True, to set this option, false otherwise. - */ - public void setOptNumVertices(boolean optNumVertices) { - this.optNumVertices = optNumVertices; - } - /** * Gets the direction in which messages are sent in the MessagingFunction. * By default the messaging direction is OUT. diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java index ca5d5d968e7db..5befafe809eb9 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java @@ -84,6 +84,7 @@ public void testRunWithConfiguration() throws Exception { parameters.addBroadcastSetForSumFunction("sumBcastSet", env.fromElements(4, 5, 6)); parameters.addBroadcastSetForApplyFunction("applyBcastSet", env.fromElements(7, 8, 9)); parameters.registerAggregator("superstepAggregator", new LongSumAggregator()); + parameters.setOptNumVertices(true); Graph result = graph.runGatherSumApplyIteration(new Gather(), new Sum(), new Apply(), 10, parameters); @@ -151,6 +152,9 @@ public void preSuperstep() { Assert.assertEquals(7, aggrValue); } + + // test number of vertices + Assert.assertEquals(5, getNumberOfVertices()); } public Long gather(Neighbor neighbor) { @@ -175,6 +179,9 @@ public void preSuperstep() { // test aggregator aggregator = getIterationAggregator("superstepAggregator"); + + // test number of vertices + Assert.assertEquals(5, getNumberOfVertices()); } public Long sum(Long newValue, Long currentValue) { @@ -201,6 +208,9 @@ public void preSuperstep() { // test aggregator aggregator = getIterationAggregator("superstepAggregator"); + + // test number of vertices + Assert.assertEquals(5, getNumberOfVertices()); } public void apply(Long summedValue, Long origValue) { @@ -213,6 +223,13 @@ public void apply(Long summedValue, Long origValue) { @SuppressWarnings("serial") private static final class DummyGather extends GatherFunction { + @Override + public void preSuperstep() { + // test number of vertices + // when the numVertices option is not set, -1 is returned + Assert.assertEquals(-1, getNumberOfVertices()); + } + public Long gather(Neighbor neighbor) { return neighbor.getNeighborValue(); }