Skip to content

Commit

Permalink
[FLINK-1694] [gelly] added IterationConfiguration as a way to configu…
Browse files Browse the repository at this point in the history
…re a VertexCentricIteration
  • Loading branch information
vasia committed Apr 10, 2015
1 parent e45f13f commit e98bd85
Show file tree
Hide file tree
Showing 12 changed files with 565 additions and 175 deletions.
79 changes: 68 additions & 11 deletions docs/gelly_guide.md
Expand Up @@ -347,29 +347,86 @@ Vertex-centric Iterations
Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations. Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations.
Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value
based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep. based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
These functions are given as parameters to Gelly's `createVertexCentricIteration`, which returns a `VertexCentricIteration`. These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`.
The user can configure this iteration (set the name, the parallelism, aggregators, etc.) and then run the computation, using the `runVertexCentricIteration` method: This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values:


{% highlight java %} {% highlight java %}
Graph<Long, Double, Double> graph = ... Graph<Long, Double, Double> graph = ...


// create the vertex-centric iteration // run Single-Source-Shortest-Paths vertex-centric iteration
VertexCentricIteration<Long, Double, Double, Double> iteration = Graph<Long, Double, Double> result =
graph.createVertexCentricIteration( graph.runVertexCentricIteration(
new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations); new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);


// user-defined functions
public static final class VertexDistanceUpdater {...}
public static final class MinDistanceMessenger {...}

{% endhighlight %}

### Configuring a Vertex-Centric Iteration
A vertex-centric iteration can be configured using an `IterationConfiguration` object.
Currently, the following parameters can be specified:

* <strong>Name</strong>: The name for the vertex-centric iteration. The name is displayed in logs and messages
and can be specified using the `setName()` method.

* <strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the `setParallelism()` method.

* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.

* <strong>Aggregators</strong>: Iteration aggregators can be registered using the `registerAggregator()` method. An iteration aggregator combines
all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and `MessagingFunction`.

* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables](programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively.

{% highlight java %}

Graph<Long, Double, Double> graph = ...

// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();

// set the iteration name // set the iteration name
iteration.setName("Single Source Shortest Paths"); parameters.setName("Gelly Iteration");


// set the parallelism // set the parallelism
iteration.setParallelism(16); parameters.setParallelism(16);

// register an aggregator
parameters.registerAggregator("sumAggregator", new LongSumAggregator());


// run the computation // run the vertex-centric iteration, also passing the configuration parameters
graph.runVertexCentricIteration(iteration); Graph<Long, Double, Double> result =
graph.runVertexCentricIteration(
new VertexUpdater(), new Messenger(), maxIterations, parameters);


// user-defined functions // user-defined functions
public static final class VertexDistanceUpdater {...} public static final class VertexUpdater extends VertexUpdateFunction {
public static final class MinDistanceMessenger {...}
LongSumAggregator aggregator = new LongSumAggregator();

public void preSuperstep() {

// retrieve the Aggregator
aggregator = getIterationAggregator("sumAggregator");
}


public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator inMessages) {
//do some computation
Long partialValue = ...

// aggregate the partial value
aggregator.aggregate(partialValue);

// update the vertex value
setNewVertexValue(...);
}
}

public static final class Messenger extends MessagingFunction {...}


{% endhighlight %} {% endhighlight %}


Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration; import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
Expand Down Expand Up @@ -1149,30 +1150,52 @@ public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
} }


/** /**
* Create a Vertex-Centric iteration on the graph. * Runs a Vertex-Centric iteration on the graph.
* * No configuration options are provided.
* @param vertexUpdateFunction the vertex update function * @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function * @param messagingFunction the messaging function
* @param maximumNumberOfIterations maximum number of iterations to perform * @param maximumNumberOfIterations maximum number of iterations to perform
* @return *
* @return the updated Graph after the vertex-centric iteration has converged or
* after maximumNumberOfIterations.
*/ */
public <M> VertexCentricIteration<K, VV, M, EV> createVertexCentricIteration( public <M> Graph<K, VV, EV> runVertexCentricIteration(
VertexUpdateFunction<K, VV, M> vertexUpdateFunction, VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
MessagingFunction<K, VV, M, EV> messagingFunction, MessagingFunction<K, VV, M, EV> messagingFunction,
int maximumNumberOfIterations) { int maximumNumberOfIterations) {
return VertexCentricIteration.withEdges(edges, vertexUpdateFunction,
messagingFunction, maximumNumberOfIterations); VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);

DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);

return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
} }

/** /**
* Runs a Vertex-Centric iteration on the graph. * Runs a Vertex-Centric iteration on the graph with configuration options.
*
* @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function
* @param maximumNumberOfIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
* *
* @param iteration the Vertex-Centric iteration to run * @return the updated Graph after the vertex-centric iteration has converged or
* @return * after maximumNumberOfIterations.
*/ */
public <M> Graph<K, VV, EV> runVertexCentricIteration( public <M> Graph<K, VV, EV> runVertexCentricIteration(
VertexCentricIteration<K, VV, M, EV> iteration) { VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
MessagingFunction<K, VV, M, EV> messagingFunction,
int maximumNumberOfIterations, IterationConfiguration parameters) {

VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);

iteration.configure(parameters);

DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration); DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);

return new Graph<K, VV, EV>(newVertices, this.edges, this.context); return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
} }


Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.types.NullValue; import org.apache.flink.types.NullValue;


Expand Down Expand Up @@ -56,9 +55,8 @@ public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {


// iteratively adopt the most frequent label among the neighbors // iteratively adopt the most frequent label among the neighbors
// of each vertex // of each vertex
VertexCentricIteration<K, Long, Long, NullValue> iteration = input.createVertexCentricIteration( return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(), maxIterations); maxIterations);
return input.runVertexCentricIteration(iteration);
} }


/** /**
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;


public class PageRank<K extends Comparable<K> & Serializable> implements public class PageRank<K extends Comparable<K> & Serializable> implements
Expand All @@ -43,11 +42,8 @@ public PageRank(double beta, int maxIterations) {
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception { public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {


final long numberOfVertices = network.numberOfVertices(); final long numberOfVertices = network.numberOfVertices();

return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
maxIterations); maxIterations);
return network.runVertexCentricIteration(iteration);
} }


/** /**
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;


import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -65,11 +64,8 @@ public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
.mapVertices(new AddScoreToVertexValuesMapper()); .mapVertices(new AddScoreToVertexValuesMapper());


VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, Double>, Double> return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
iteration = graphWithScoredVertices.createVertexCentricIteration(new VertexLabelUpdater(delta), new LabelMessenger(), maxIterations)
new LabelMessenger(), maxIterations);

return graphWithScoredVertices.runVertexCentricIteration(iteration)
.mapVertices(new RemoveScoreFromVertexValuesMapper()); .mapVertices(new RemoveScoreFromVertexValuesMapper());
} }


Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.graph.Vertex; import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;


import java.io.Serializable; import java.io.Serializable;
Expand All @@ -45,12 +44,9 @@ public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
@Override @Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {


Graph<K, Double, Double> mappedInput = input.mapVertices(new InitVerticesMapper<K>(srcVertexId)); return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))

.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
VertexCentricIteration<K, Double, Double, Double> iteration = mappedInput.createVertexCentricIteration( maxIterations);
new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), maxIterations);

return mappedInput.runVertexCentricIteration(iteration);
} }


public static final class InitVerticesMapper<K extends Comparable<K> & Serializable> public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
Expand Down

0 comments on commit e98bd85

Please sign in to comment.