Skip to content

Commit

Permalink
[FLINK-1758] [gelly] Neighborhood Methods Extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
andralungu authored and vasia committed Apr 26, 2015
1 parent 6e24879 commit 9de640a
Show file tree
Hide file tree
Showing 12 changed files with 1,090 additions and 153 deletions.
59 changes: 30 additions & 29 deletions docs/libs/gelly_guide.md
Expand Up @@ -266,7 +266,10 @@ Neighborhood Methods


Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood. Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood.


`reduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `reduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). `groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).

The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex.
When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called as they are more efficient.


For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph: For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph:


Expand All @@ -279,25 +282,28 @@ The following code will collect the out-edges for each vertex and apply the `Sel
{% highlight java %} {% highlight java %}
Graph<Long, Long, Double> graph = ... Graph<Long, Long, Double> graph = ...


DataSet<Tuple2<Long, Double>> minWeights = graph.reduceOnEdges( DataSet<Tuple2<Long, Double>> minWeights = graph.groupReduceOnEdges(
new SelectMinWeight(), EdgeDirection.OUT); new SelectMinWeight(), EdgeDirection.OUT);


// user-defined function to select the minimum weight // user-defined function to select the minimum weight
static final class SelectMinWeight implements EdgesFunction<Long, Double, Tuple2<Long, Double>> { static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {


public Tuple2<Long, Double> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges) { @Override
public void iterateEdges(Vertex<Long, Long> v,
Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {


long minWeight = Double.MAX_VALUE; long weight = Long.MAX_VALUE;
long vertexId = -1; long minNeighborId = 0;


for (Tuple2<Long, Edge<Long, Double>> edge: edges) { for (Edge<Long, Long> edge: edges) {
if (edge.f1.getValue() < weight) { if (edge.getValue() < weight) {
weight = edge.f1.getValue(); weight = edge.getValue();
vertexId = edge.f0; minNeighborId = edge.getTarget();
} }
return new Tuple2<Long, Double>(vertexId, minWeight); }
} out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
} }
}
{% endhighlight %} {% endhighlight %}


<p class="text-center"> <p class="text-center">
Expand All @@ -313,28 +319,23 @@ DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(
new SumValues(), EdgeDirection.IN); new SumValues(), EdgeDirection.IN);


// user-defined function to sum the neighbor values // user-defined function to sum the neighbor values
static final class SumValues implements NeighborsFunction<Long, Long, Double, Tuple2<Long, Long>> { static final class SumValues implements ReduceNeighborsFunction<Long, Long, Double> {

public Tuple2<Long, Long> iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Double>,
Vertex<Long, Long>>> neighbors) {

long sum = 0;
long vertexId = -1;


for (Tuple3<Long, Edge<Long, Double>, Vertex<Long, Long>> neighbor : neighbors) { public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
vertexId = neighbor.f0; Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
sum += neighbor.f2.getValue();
} long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
return new Tuple2<Long, Long>(vertexId, sum); return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
} new Vertex<Long, Long>(firstNeighbor.f0, sum));
}
} }
{% endhighlight %} {% endhighlight %}


<p class="text-center"> <p class="text-center">
<img alt="reduseOnNeighbors Example" width="70%" src="fig/gelly-reduceOnNeighbors.png"/> <img alt="reduceOnNeighbors Example" width="70%" src="img/gelly-reduceOnNeighbors.png"/>
</p> </p>


When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead. When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead.


[Back to top](#top) [Back to top](#top)


Expand Down
Expand Up @@ -20,10 +20,10 @@


/** /**
* The EdgeDirection is used to select a node's neighborhood * The EdgeDirection is used to select a node's neighborhood
* by the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)}, * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
* {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}, * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
* {@link Graph#reduceOnNeighbors(NeighborsFunction, EdgeDirection)} and * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and
* {@link Graph#reduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)} * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
* methods. * methods.
*/ */
public enum EdgeDirection { public enum EdgeDirection {
Expand Down
Expand Up @@ -22,10 +22,11 @@


import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


/** /**
* Interface to be implemented by the function applied to a vertex neighborhood * Interface to be implemented by the function applied to a vertex neighborhood
* in the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)} method. * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method.
* *
* @param <K> the vertex key type * @param <K> the vertex key type
* @param <EV> the edge value type * @param <EV> the edge value type
Expand All @@ -34,5 +35,5 @@
public interface EdgesFunction<K extends Comparable<K> & Serializable, public interface EdgesFunction<K extends Comparable<K> & Serializable,
EV extends Serializable, O> extends Function, Serializable { EV extends Serializable, O> extends Function, Serializable {


O iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges) throws Exception; void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
} }
Expand Up @@ -21,10 +21,11 @@
import java.io.Serializable; import java.io.Serializable;


import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Function;
import org.apache.flink.util.Collector;


/** /**
* Interface to be implemented by the function applied to a vertex neighborhood * Interface to be implemented by the function applied to a vertex neighborhood
* in the {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)} * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
* method. * method.
* *
* @param <K> the vertex key type * @param <K> the vertex key type
Expand All @@ -35,5 +36,5 @@
public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & Serializable, public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { VV extends Serializable, EV extends Serializable, O> extends Function, Serializable {


O iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) throws Exception; void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
} }
Expand Up @@ -28,10 +28,11 @@
import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
Expand Down Expand Up @@ -722,8 +723,8 @@ public Graph<K, VV, EV> getUndirected() {
* @return a dataset of a T * @return a dataset of a T
* @throws IllegalArgumentException * @throws IllegalArgumentException
*/ */
public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction, public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
EdgeDirection direction) throws IllegalArgumentException { EdgeDirection direction) throws IllegalArgumentException {


switch (direction) { switch (direction) {
case IN: case IN:
Expand Down Expand Up @@ -753,8 +754,8 @@ public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> e
* @return a dataset of T * @return a dataset of T
* @throws IllegalArgumentException * @throws IllegalArgumentException
*/ */
public <T> DataSet<T> reduceOnEdges(EdgesFunction<K, EV, T> edgesFunction, public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
EdgeDirection direction) throws IllegalArgumentException { EdgeDirection direction) throws IllegalArgumentException {


switch (direction) { switch (direction) {
case IN: case IN:
Expand Down Expand Up @@ -796,7 +797,7 @@ public ApplyGroupReduceFunction(EdgesFunction<K, EV, T> fun) {
} }


public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception { public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
out.collect(function.iterateEdges(edges)); function.iterateEdges(edges, out);
} }


@Override @Override
Expand Down Expand Up @@ -832,7 +833,7 @@ public ApplyCoGroupFunction(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {


public void coGroup(Iterable<Vertex<K, VV>> vertex, public void coGroup(Iterable<Vertex<K, VV>> vertex,
Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception { Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception {
out.collect(function.iterateEdges(vertex.iterator().next(), edges)); function.iterateEdges(vertex.iterator().next(), edges, out);
} }


@Override @Override
Expand Down Expand Up @@ -880,7 +881,7 @@ public Iterator<Edge<K, EV>> iterator() {
} }
}; };


out.collect(function.iterateEdges(vertex.iterator().next(), edgesIterable)); function.iterateEdges(vertex.iterator().next(), edgesIterable, out);
} }


@Override @Override
Expand Down Expand Up @@ -1238,8 +1239,8 @@ public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exceptio
* @return a dataset of a T * @return a dataset of a T
* @throws IllegalArgumentException * @throws IllegalArgumentException
*/ */
public <T> DataSet<T> reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction, public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
EdgeDirection direction) throws IllegalArgumentException { EdgeDirection direction) throws IllegalArgumentException {
switch (direction) { switch (direction) {
case IN: case IN:
// create <edge-sourceVertex> pairs // create <edge-sourceVertex> pairs
Expand Down Expand Up @@ -1281,8 +1282,8 @@ public <T> DataSet<T> reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV,
* @return a dataset of a T * @return a dataset of a T
* @throws IllegalArgumentException * @throws IllegalArgumentException
*/ */
public <T> DataSet<T> reduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction, public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
EdgeDirection direction) throws IllegalArgumentException { EdgeDirection direction) throws IllegalArgumentException {
switch (direction) { switch (direction) {
case IN: case IN:
// create <edge-sourceVertex> pairs // create <edge-sourceVertex> pairs
Expand Down Expand Up @@ -1322,8 +1323,7 @@ public ApplyNeighborGroupReduceFunction(NeighborsFunction<K, VV, EV, T> fun) {
} }


public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception { public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception {
out.collect(function.iterateNeighbors(edges)); function.iterateNeighbors(edges, out);

} }


@Override @Override
Expand Down Expand Up @@ -1368,7 +1368,7 @@ public ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue<K, VV, EV,


public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
Collector<T> out) throws Exception { Collector<T> out) throws Exception {
out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors)); function.iterateNeighbors(vertex.iterator().next(), neighbors, out);
} }


@Override @Override
Expand Down Expand Up @@ -1417,13 +1417,134 @@ public Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> iterator() {
} }
}; };


out.collect(function.iterateNeighbors(vertex.iterator().next(), function.iterateNeighbors(vertex.iterator().next(), neighborsIterable, out);
neighborsIterable));
} }


@Override @Override
public TypeInformation<T> getProducedType() { public TypeInformation<T> getProducedType() {
return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null); return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
} }
} }
}
/**
* Compute an aggregate over the neighbors (edges and vertices) of each
* vertex. The function applied on the neighbors only has access to the
* vertex id (not the vertex value).
*
* @param reduceNeighborsFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @return a dataset containing one value per vertex
* @throws IllegalArgumentException
*/
public DataSet reduceOnNeighbors(ReduceNeighborsFunction<K, VV, EV> reduceNeighborsFunction,
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
final DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
.join(this.vertices).where(0).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(1));
return edgesWithSources.groupBy(0).reduce(new ApplyNeighborReduceFunction<K,VV,EV>(reduceNeighborsFunction))
.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(0));
return edgesWithTargets.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
.flatMap(new EmitOneEdgeWithNeighborPerNode<K, VV, EV>())
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectEdgeWithNeighbor<K, VV, EV>());

return edgesWithNeighbors.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}

private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements ReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {

private ReduceNeighborsFunction<K, VV, EV> function;

public ApplyNeighborReduceFunction(ReduceNeighborsFunction<K, VV, EV> fun) {
this.function = fun;
}

@Override
public Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduce(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> first,
Tuple3<K, Edge<K, EV>, Vertex<K, VV>> second) throws Exception {
return function.reduceNeighbors(first, second);
}
}

public static final class ApplyNeighborhoodMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements MapFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>> ,Tuple2<K, VV>> {

@Override
public Tuple2<K, VV> map(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> edgesWithSrc) throws Exception {
return new Tuple2<K, VV>(edgesWithSrc.f0, edgesWithSrc.f2.getValue());
}
}

/**
* Compute an aggregate over the edges of each vertex. The function applied
* on the edges only has access to the vertex id (not the vertex value).
*
* @param reduceEdgesFunction
* the function to apply to the neighborhood
* @param direction
* the edge direction (in-, out-, all-)
* @return a dataset containing one value per vertex
* @throws IllegalArgumentException
*/
public DataSet reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
EdgeDirection direction) throws IllegalArgumentException {

switch (direction) {
case IN:
return edges.map(new ProjectVertexIdMap<K, EV>(1))
.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
.map(new ApplyEdgesMapFunction());
case OUT:
return edges.map(new ProjectVertexIdMap<K, EV>(0))
.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
.map(new ApplyEdgesMapFunction());
case ALL:
return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
.map(new ApplyEdgesMapFunction());
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}

private static final class ApplyReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable>
implements ReduceFunction<Tuple2<K, Edge<K, EV>>> {

private ReduceEdgesFunction<K, EV> function;

public ApplyReduceFunction(ReduceEdgesFunction<K, EV> fun) {
this.function = fun;
}

@Override
public Tuple2<K, Edge<K, EV>> reduce(Tuple2<K, Edge<K, EV>> first, Tuple2<K, Edge<K, EV>> second) throws Exception {
return function.reduceEdges(first, second);
}
}

public static final class ApplyEdgesMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements MapFunction<Tuple2<K, Edge<K, EV>> ,Tuple2<K, EV>> {

@Override
public Tuple2<K, EV> map(Tuple2<K, Edge<K, EV>> edge) throws Exception {
return new Tuple2<K, EV>(edge.f0, edge.f1.getValue());
}
}
}

0 comments on commit 9de640a

Please sign in to comment.