Skip to content

Commit

Permalink
[FLINK-1201] [gelly] reduceOnEdges method for in-/out- and all edges
Browse files Browse the repository at this point in the history
  • Loading branch information
vasia authored and StephanEwen committed Feb 11, 2015
1 parent 2d26e27 commit 69426eb
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 33 deletions.
@@ -0,0 +1,7 @@
package flink.graphs;

public enum EdgeDirection {
IN,
OUT,
ALL
}
Expand Up @@ -5,8 +5,8 @@
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple2;

public interface OutEdgesFunction<K extends Comparable<K> & Serializable,
public interface EdgesFunction<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, O> extends Function, Serializable {

Tuple2<K, O> iterateOutEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> outEdges) throws Exception;
Tuple2<K, O> iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) throws Exception;
}
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.functions.CoGroupFunction;
Expand Down Expand Up @@ -330,44 +331,111 @@ public Graph<K, VV, EV> getUndirected() throws UnsupportedOperationException {
}

/**
* Utility function that allows each vertex of the graph
* to access its out-edges
* @param edgesFunction the function to apply to the neighborhood
* Compute an aggregate over the edges of each vertex.
* @param edgesFunction edgesFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @return a dataset of a Tuple2 with the vertex id and the computed value
* @throws IllegalArgumentException
*/
public <T> DataSet<Tuple2<K, T>> foreachEdge(OutEdgesFunction<K, VV, EV, T> edgesFunction) {
return vertices.coGroup(edges).where(0).equalTo(0).with(
new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
public <T> DataSet<Tuple2<K, T>> reduceOnEdges(EdgesFunction<K, VV, EV, T> edgesFunction,
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
return vertices.coGroup(edges).where(0).equalTo(1).with(
new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
case OUT:
return vertices.coGroup(edges).where(0).equalTo(0).with(
new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
case ALL:
return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
.where(0).equalTo(0)
.with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}

private static final class EmitOneEdgePerNode<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable> implements FlatMapFunction<
Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
}
}

/**
*
* @param <K>
* @param <VV>
* @param <EV>
* @param <T>
*/
private static final class ApplyCoGroupFunction<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, T>
implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, T>>,
ResultTypeQueryable<Tuple2<K, T>> {

private OutEdgesFunction<K, VV, EV, T> function;
private EdgesFunction<K, VV, EV, T> function;

public ApplyCoGroupFunction (OutEdgesFunction<K, VV, EV, T> fun) {
public ApplyCoGroupFunction (EdgesFunction<K, VV, EV, T> fun) {
this.function = fun;
}
public void coGroup(Iterable<Vertex<K, VV>> vertex,
Iterable<Edge<K, EV>> outEdges, Collector<Tuple2<K, T>> out) throws Exception {
out.collect(function.iterateOutEdges(vertex.iterator().next(), outEdges));
Iterable<Edge<K, EV>> edges, Collector<Tuple2<K, T>> out) throws Exception {
out.collect(function.iterateEdges(vertex.iterator().next(), edges));
}
@Override
public TypeInformation<Tuple2<K, T>> getProducedType() {
return new TupleTypeInfo<Tuple2<K, T>>(keyType,
TypeExtractor.createTypeInfo(OutEdgesFunction.class, function.getClass(), 3, null, null));
TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 3, null, null));
}
}

private static final class ApplyCoGroupFunctionOnAllEdges<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, T>
implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, Tuple2<K, T>>,
ResultTypeQueryable<Tuple2<K, T>> {

private EdgesFunction<K, VV, EV, T> function;

public ApplyCoGroupFunctionOnAllEdges (EdgesFunction<K, VV, EV, T> fun) {
this.function = fun;
}

public void coGroup(Iterable<Vertex<K, VV>> vertex,
final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, Collector<Tuple2<K, T>> out)
throws Exception {

final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K,EV>>() {

final Iterator<Tuple2<K, Edge<K, EV>>> keysWithEdgesIterator = keysWithEdges.iterator();

@Override
public boolean hasNext() {
return keysWithEdgesIterator.hasNext();
}

@Override
public Edge<K, EV> next() {
return keysWithEdgesIterator.next().f1;
}

@Override
public void remove() {
keysWithEdgesIterator.remove();
}
};

Iterable<Edge<K, EV>> edgesIterable = new Iterable<Edge<K,EV>>() {
public Iterator<Edge<K, EV>> iterator() {
return edgesIterator;
}
};

out.collect(function.iterateEdges(vertex.iterator().next(), edgesIterable));
}

@Override
public TypeInformation<Tuple2<K, T>> getProducedType() {
return new TupleTypeInfo<Tuple2<K, T>>(keyType,
TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 3, null, null));
}
}

@ConstantFields("0->1;1->0;2->2")
private static final class ReverseEdgesMap<K extends Comparable<K> & Serializable,
EV extends Serializable> implements MapFunction<Edge<K, EV>,
Expand Down
Expand Up @@ -197,8 +197,8 @@ public DummyCustomParameterizedType<Double> map(Long vertexId) {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env);
DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
Graph<Long, Long, Long> graph = new Graph(vertices, edges, env);
DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator());
Graph<Long, Long, Long> graph = new Graph<Long, Long, Long>(vertices, edges, env);
DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
result.writeAsText(resultPath);
env.execute();

Expand All @@ -212,8 +212,8 @@ public DummyCustomParameterizedType<Double> map(Long vertexId) {
DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env);
DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);

Graph<Long, Long, Long> graph = new Graph(vertices, edges, env);
DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator());
Graph<Long, Long, Long> graph = new Graph<Long, Long, Long>(vertices, edges, env);
DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
result.writeAsText(resultPath);
env.execute();

Expand Down
Expand Up @@ -15,15 +15,15 @@
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class TestForeachEdge extends JavaProgramTestBase {
public class TestNeighborMethods extends JavaProgramTestBase {

private static int NUM_PROGRAMS = 1;
private static int NUM_PROGRAMS = 3;

private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
private String expectedResult;

public TestForeachEdge(Configuration config) {
public TestNeighborMethods(Configuration config) {
super(config);
}

Expand Down Expand Up @@ -70,26 +70,26 @@ public static String runProgram(int progId, String resultPath) throws Exception
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);

DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
graph.foreachEdge(new OutEdgesFunction<Long, Long, Long, Long>() {
graph.reduceOnEdges(new EdgesFunction<Long, Long, Long, Long>() {

public Tuple2<Long, Long> iterateOutEdges(
public Tuple2<Long, Long> iterateEdges(
Vertex<Long, Long> v,
Iterable<Edge<Long, Long>> outEdges) {
Iterable<Edge<Long, Long>> edges) {

long weight = Long.MAX_VALUE;
long minNeighorId = 0;

for (Edge<Long, Long> edge: outEdges) {
for (Edge<Long, Long> edge: edges) {
if (edge.getValue() < weight) {
weight = edge.getValue();
minNeighorId = edge.getTarget();
}
}
return new Tuple2<Long, Long>(v.getId(), minNeighorId);
}
});
}, EdgeDirection.OUT);
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
env.execute();
return "1,2\n" +
Expand All @@ -98,6 +98,75 @@ public Tuple2<Long, Long> iterateOutEdges(
"4,5\n" +
"5,1\n";
}
case 2: {
/*
* Get the lowest-weight in-neighbor
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);

DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
graph.reduceOnEdges(new EdgesFunction<Long, Long, Long, Long>() {

public Tuple2<Long, Long> iterateEdges(
Vertex<Long, Long> v,
Iterable<Edge<Long, Long>> edges) {

long weight = Long.MAX_VALUE;
long minNeighorId = 0;

for (Edge<Long, Long> edge: edges) {
if (edge.getValue() < weight) {
weight = edge.getValue();
minNeighorId = edge.getSource();
}
}
return new Tuple2<Long, Long>(v.getId(), minNeighorId);
}
}, EdgeDirection.IN);
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
env.execute();
return "1,5\n" +
"2,1\n" +
"3,1\n" +
"4,3\n" +
"5,3\n";
}
case 3: {
/*
* Get the maximum weight among all edges
* of a vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);

DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
graph.reduceOnEdges(new EdgesFunction<Long, Long, Long, Long>() {

public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
Iterable<Edge<Long, Long>> edges) {

long weight = Long.MIN_VALUE;

for (Edge<Long, Long> edge: edges) {
if (edge.getValue() > weight) {
weight = edge.getValue();
}
}
return new Tuple2<Long, Long>(v.getId(), weight);
}
}, EdgeDirection.ALL);
verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
env.execute();
return "1,51\n" +
"2,23\n" +
"3,35\n" +
"4,45\n" +
"5,51\n";
}
default:
throw new IllegalArgumentException("Invalid program id");
}
Expand Down

0 comments on commit 69426eb

Please sign in to comment.