Skip to content

Commit

Permalink
[FLINK-1523] [gelly] Added Vertex Centric Configuration Tests
Browse files Browse the repository at this point in the history
This commit squashes the following:

Pratially addressed inline comments

Added test for removal of a non-SP-edge
  • Loading branch information
andralungu authored and vasia committed May 19, 2015
1 parent 585d27d commit 1dfbcba
Show file tree
Hide file tree
Showing 13 changed files with 887 additions and 110 deletions.
110 changes: 68 additions & 42 deletions docs/libs/gelly_guide.md
Expand Up @@ -352,11 +352,11 @@ When the aggregation computation does not require access to the vertex value (fo
Vertex-centric Iterations Vertex-centric Iterations
----------- -----------


Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations. Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations. Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`. This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values:
based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`. A vertex-centric iteration can be extended with information such as the total number of vertices, the in degree and out degree.
This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values: Additionally, the neighborhood type (in/out/all) over which to run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used to modify the current vertex's state and messages are sent to out-neighbors.


{% highlight java %} {% highlight java %}
Graph<Long, Double, Double> graph = ... Graph<Long, Double, Double> graph = ...
Expand Down Expand Up @@ -388,6 +388,14 @@ all aggregates globally once per superstep and makes them available in the next


* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively. * <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables]({{site.baseurl}}/apis/programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively.


* <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method.
The number of vertices can then be accessed in the vertex update function and in the messaging function using the `getNumberOfVertices()` method.

* <strong>Degrees</strong>: Accessing the in/out degree for a vertex within an iteration. This property can be set using the `setOptDegrees()` method.
The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using `vertex.getInDegree()` or `vertex.getOutDegree()`.

* <strong>Messaging Direction</strong>: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either `EdgeDirection.IN`, `EdgeDirection.OUT`, `EdgeDirection.ALL`. The messaging direction also dictates the update direction which would be `EdgeDirection.OUT`, `EdgeDirection.IN` and `EdgeDirection.ALL`, respectively. This property can be set using the `setDirection()` method.

{% highlight java %} {% highlight java %}


Graph<Long, Double, Double> graph = ... Graph<Long, Double, Double> graph = ...
Expand Down Expand Up @@ -438,60 +446,35 @@ public static final class Messenger extends MessagingFunction {...}


{% endhighlight %} {% endhighlight %}


### Vertex-Centric Iteration Extensions The following example illustrates the usage of the degree as well as the number of vertices options.
A vertex-centric iteration can be extended with information such as the total number of vertices,
the in degree and out degree. Additionally, the neighborhood type (in/out/all) over which to
run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used
to modify the current vertex's state and messages are sent to out-neighbors.

In order to activate these options, the following parameters must be set to true:

<strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property
can be set using the `setOptNumVertices()` method.

The number of vertices can then be accessed in the vertex update function and in the messaging function
using the `getNumberOfVertices()` method.

<strong>Degrees</strong>: Accessing the in/out degree for a vertex within an iteration. This property can be set
using the `setOptDegrees()` method.

The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex
using `vertex.getInDegree()` or `vertex.getOutDegree()`.

<strong>Messaging Direction</strong>: The direction in which messages are sent. This can be either EdgeDirection.IN,
EdgeDirection.OUT, EdgeDirection.ALL. The messaging direction also dictates the update direction which would be
EdgeDirection.OUT, EdgeDirection.IN and EdgeDirection.ALL, respectively. This property can be set using the
`setDirection()` method.


{% highlight java %} {% highlight java %}
Graph<Long, Double, Double> graph = ...


// create the vertex-centric iteration Graph<Long, Double, Double> graph = ...
VertexCentricIteration<Long, Double, Double, Double> iteration =
graph.createVertexCentricIteration(
new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);


// set the messaging direction // configure the iteration
iteration.setDirection(EdgeDirection.IN); IterationConfiguration parameters = new IterationConfiguration();


// set the number of vertices option to true // set the number of vertices option to true
iteration.setOptNumVertices(true); parameters.setOptNumVertices(true);


// set the degree option to true // set the degree option to true
iteration.setOptDegrees(true); parameters.setOptDegrees(true);


// run the computation // run the vertex-centric iteration, also passing the configuration parameters
graph.runVertexCentricIteration(iteration); Graph<Long, Double, Double> result =
graph.runVertexCentricIteration(
new VertexUpdater(), new Messenger(), maxIterations, parameters);


// user-defined functions // user-defined functions
public static final class VertexDistanceUpdater { public static final class VertexUpdater {
... ...
// get the number of vertices // get the number of vertices
long numVertices = getNumberOfVertices(); long numVertices = getNumberOfVertices();
... ...
} }


public static final class MinDistanceMessenger { public static final class Messenger {
... ...
// decrement the number of out-degrees // decrement the number of out-degrees
outDegree = vertex.getOutDegree() - 1; outDegree = vertex.getOutDegree() - 1;
Expand All @@ -500,6 +483,49 @@ public static final class MinDistanceMessenger {


{% endhighlight %} {% endhighlight %}


The following example illustrates the usage of the edge direction option. Vertices update their values to contain a list of all their in-neighbors.

{% highlight java %}

Graph<Long, HashSet<Long>, Double> graph = ...

// configure the iteration
IterationConfiguration parameters = new IterationConfiguration();

// set the messaging direction
parameters.setDirection(EdgeDirection.IN);

// run the vertex-centric iteration, also passing the configuration parameters
DataSet<Vertex<Long, HashSet<Long>>> result =
graph.runVertexCentricIteration(
new VertexUpdater(), new Messenger(), maxIterations, parameters)
.getVertices();

// user-defined functions
public static final class VertexUpdater {
@Override
public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
vertex.getValue().clear();

for(long msg : messages) {
vertex.getValue().add(msg);
}

setNewVertexValue(vertex.getValue());
}
}

public static final class Messenger {
@Override
public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
sendMessageTo(edge.getSource(), vertex.getId());
}
}
}

{% endhighlight %}

[Back to top](#top) [Back to top](#top)




Expand Down
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph;

/**
* The exception that gets thrown when the degree option or the number of vertices
* option in {@link org.apache.flink.graph.spargel.IterationConfiguration} was not set.
*/
public class InaccessibleMethodException extends Exception {

public InaccessibleMethodException() {}

public InaccessibleMethodException(String text) {
super(text);
}
}
Expand Up @@ -35,8 +35,8 @@ public class Vertex<K, V> extends Tuple2<K, V> {
private Long outDegree; private Long outDegree;


public Vertex(){ public Vertex(){
inDegree = 0L; inDegree = -1L;
outDegree = 0L; outDegree = -1L;
} }


public Vertex(K k, V val) { public Vertex(K k, V val) {
Expand All @@ -62,15 +62,23 @@ public void setValue(V val) {
this.f1 = val; this.f1 = val;
} }


public Long getInDegree() { public Long getInDegree() throws Exception{
if(inDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return inDegree; return inDegree;
} }


public void setInDegree(Long inDegree) { public void setInDegree(Long inDegree) {
this.inDegree = inDegree; this.inDegree = inDegree;
} }


public Long getOutDegree() { public Long getOutDegree() throws Exception{
if(outDegree == -1) {
throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
"call iterationConfiguration.setOptDegrees(true).");
}
return outDegree; return outDegree;
} }


Expand Down
Expand Up @@ -20,11 +20,8 @@


import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge; import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph; import org.apache.flink.graph.Graph;
Expand All @@ -34,12 +31,21 @@
import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;


/** /**
* Incremental Single Sink Shortest Paths Example. * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
* upon edge removal.
*
* This example illustrates the usage of vertex-centric iteration's
* messaging direction configuration option.
* *
* The program takes as input the resulted graph after a SSSP computation, * The program takes as input the resulted graph after a SSSP computation,
* an edge to be removed and the initial graph(i.e. before SSSP was computed). * an edge to be removed and the initial graph(i.e. before SSSP was computed).
* In the following description, SP-graph is used as an abbreviation for
* the graph resulted from the SSSP computation. We denote the edges that belong to this
* graph by SP-edges.
* *
* - If the removed edge does not belong to the SP-graph, no computation is necessary. * - If the removed edge does not belong to the SP-graph, no computation is necessary.
* The edge is simply removed from the graph. * The edge is simply removed from the graph.
Expand All @@ -55,7 +61,8 @@
* or when we reach a vertex with no SP-in-neighbors. * or when we reach a vertex with no SP-in-neighbors.
* *
* Usage <code>IncrementalSSSPExample &lt;vertex path&gt; &lt;edge path&gt; &lt;edges in SSSP&gt; * Usage <code>IncrementalSSSPExample &lt;vertex path&gt; &lt;edge path&gt; &lt;edges in SSSP&gt;
* &lt;edge to be removed&gt; &lt;result path&gt; &lt;number of iterations&gt;</code><br> * &lt;src id edge to be removed&gt; &lt;trg id edge to be removed&gt; &lt;val edge to be removed&gt;
* &lt;result path&gt; &lt;number of iterations&gt;</code><br>
* If no parameters are provided, the program is run with default data from * If no parameters are provided, the program is run with default data from
* {@link org.apache.flink.graph.example.utils.IncrementalSSSPData} * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
*/ */
Expand Down Expand Up @@ -137,7 +144,7 @@ public String getDescription() {
* @param edgesInSSSP * @param edgesInSSSP
* @return * @return
*/ */
private static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception { public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {


return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() { return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() {
@Override @Override
Expand Down Expand Up @@ -204,27 +211,35 @@ public void sendMessages(Vertex<Long, Double> vertex) throws Exception {


private static String edgesInSSSPInputPath = null; private static String edgesInSSSPInputPath = null;


private static String edgeToBeRemoved = null; private static Long srcEdgeToBeRemoved = null;

private static Long trgEdgeToBeRemoved = null;

private static Double valEdgeToBeRemoved = null;


private static String outputPath = null; private static String outputPath = null;


private static int maxIterations = 5; private static int maxIterations = 5;


private static boolean parseParameters(String[] args) { private static boolean parseParameters(String[] args) {
if (args.length > 0) { if (args.length > 0) {
if (args.length == 6) { if (args.length == 8) {
fileOutput = true; fileOutput = true;
verticesInputPath = args[0]; verticesInputPath = args[0];
edgesInputPath = args[1]; edgesInputPath = args[1];
edgesInSSSPInputPath = args[2]; edgesInSSSPInputPath = args[2];
edgeToBeRemoved = args[3]; srcEdgeToBeRemoved = Long.parseLong(args[3]);
outputPath = args[4]; trgEdgeToBeRemoved = Long.parseLong(args[4]);
maxIterations = Integer.parseInt(args[5]); valEdgeToBeRemoved = Double.parseDouble(args[5]);
outputPath = args[6];
maxIterations = Integer.parseInt(args[7]);
} else { } else {
System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data."); System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data.");
System.out.println("Provide parameters to read input data from files."); System.out.println("Provide parameters to read input data from files.");
System.out.println("See the documentation for the correct format of input files."); System.out.println("See the documentation for the correct format of input files.");
System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> <output path> <max iterations>"); System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>");


return false; return false;
} }
Expand All @@ -237,15 +252,10 @@ private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnviron
return env.readCsvFile(verticesInputPath) return env.readCsvFile(verticesInputPath)
.lineDelimiter("\n") .lineDelimiter("\n")
.types(Long.class, Double.class) .types(Long.class, Double.class)
.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() { .map(new Tuple2ToVertexMap<Long, Double>());

@Override
public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception {
return new Vertex<Long, Double>(tuple2.f0, tuple2.f1);
}
});
} else { } else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " + System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>"); "<output path> <max iterations>");
return IncrementalSSSPData.getDefaultVertexDataSet(env); return IncrementalSSSPData.getDefaultVertexDataSet(env);
} }
Expand All @@ -256,15 +266,10 @@ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment
return env.readCsvFile(edgesInputPath) return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n") .lineDelimiter("\n")
.types(Long.class, Long.class, Double.class) .types(Long.class, Long.class, Double.class)
.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() { .map(new Tuple3ToEdgeMap<Long, Double>());

@Override
public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
}
});
} else { } else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " + System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>"); "<output path> <max iterations>");
return IncrementalSSSPData.getDefaultEdgeDataSet(env); return IncrementalSSSPData.getDefaultEdgeDataSet(env);
} }
Expand All @@ -275,28 +280,21 @@ private static DataSet<Edge<Long, Double>> getEdgesinSSSPDataSet(ExecutionEnviro
return env.readCsvFile(edgesInSSSPInputPath) return env.readCsvFile(edgesInSSSPInputPath)
.lineDelimiter("\n") .lineDelimiter("\n")
.types(Long.class, Long.class, Double.class) .types(Long.class, Long.class, Double.class)
.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() { .map(new Tuple3ToEdgeMap<Long, Double>());

@Override
public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
}
});
} else { } else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " + System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>"); "<output path> <max iterations>");
return IncrementalSSSPData.getDefaultEdgesInSSSP(env); return IncrementalSSSPData.getDefaultEdgesInSSSP(env);
} }
} }


private static Edge<Long, Double> getEdgeToBeRemoved() { private static Edge<Long, Double> getEdgeToBeRemoved() {
if (fileOutput) { if (fileOutput) {
String [] edgeComponents = edgeToBeRemoved.split(","); return new Edge<Long, Double>(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved);

return new Edge<Long, Double>(Long.parseLong(edgeComponents[0]), Long.parseLong(edgeComponents[1]),
Double.parseDouble(edgeComponents[2]));
} else { } else {
System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " + System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
"<output path> <max iterations>"); "<output path> <max iterations>");
return IncrementalSSSPData.getDefaultEdgeToBeRemoved(); return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
} }
Expand Down

0 comments on commit 1dfbcba

Please sign in to comment.