From bbc8a80b5f163f1b1de024995586a586a5987225 Mon Sep 17 00:00:00 2001 From: andralungu Date: Tue, 7 Apr 2015 22:29:18 +0200 Subject: [PATCH 1/6] [FLINK-1758][gelly] Neighborhood Methods Extensions --- docs/gelly_guide.md | 37 +- .../org/apache/flink/graph/EdgesFunction.java | 3 +- .../graph/EdgesFunctionWithVertexValue.java | 3 +- .../java/org/apache/flink/graph/Graph.java | 18 +- .../apache/flink/graph/NeighborsFunction.java | 3 +- .../NeighborsFunctionWithVertexValue.java | 3 +- .../flink/graph/example/MusicProfiles.java | 6 +- .../ReduceOnEdgesMethodsITCase.java | 395 ++++++++++++++- .../ReduceOnNeighborMethodsITCase.java | 457 ++++++++++++++++-- 9 files changed, 843 insertions(+), 82 deletions(-) diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md index cc852962acee6..f65d4d95c038c 100644 --- a/docs/gelly_guide.md +++ b/docs/gelly_guide.md @@ -286,21 +286,24 @@ DataSet> minWeights = graph.reduceOnEdges( new SelectMinWeight(), EdgeDirection.OUT); // user-defined function to select the minimum weight -static final class SelectMinWeight implements EdgesFunction> { +static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue> { - public Tuple2 iterateEdges(Iterable>> edges) { + @Override + public void iterateEdges(Vertex v, + Iterable> edges, Collector> out) throws Exception { - long minWeight = Double.MAX_VALUE; - long vertexId = -1; + long weight = Long.MAX_VALUE; + long minNeighborId = 0; - for (Tuple2> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - vertexId = edge.f0; - } - return new Tuple2(vertexId, minWeight); - } -} + for (Edge edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighborId = edge.getTarget(); + } + } + out.collect(new Tuple2(v.getId(), minNeighborId)); + } + } {% endhighlight %}

@@ -318,8 +321,8 @@ DataSet> verticesWithSum = graph.reduceOnNeighbors( // user-defined function to sum the neighbor values static final class SumValues implements NeighborsFunction> { - public Tuple2 iterateNeighbors(Iterable, - Vertex>> neighbors) { + public void iterateNeighbors(Iterable, + Vertex>> neighbors, Collector> out) { long sum = 0; long vertexId = -1; @@ -328,17 +331,19 @@ static final class SumValues implements NeighborsFunction(vertexId, sum); + out.collect(new Tuple2(vertexId, sum)); } } {% endhighlight %}

- reduseOnNeighbors Example + reduceOnNeighbors Example

When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead. +The neighborhood methods return zero, one or more values per vertex. + [Back to top](#top) Vertex-centric Iterations diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java index d35385f90aba2..30f73f4706c1d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; /** * Interface to be implemented by the function applied to a vertex neighborhood @@ -34,5 +35,5 @@ public interface EdgesFunction & Serializable, EV extends Serializable, O> extends Function, Serializable { - O iterateEdges(Iterable>> edges) throws Exception; + void iterateEdges(Iterable>> edges, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java index dd0f518eaa179..1bce0a098697f 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java @@ -21,6 +21,7 @@ import java.io.Serializable; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; /** * Interface to be implemented by the function applied to a vertex neighborhood @@ -35,5 +36,5 @@ public interface EdgesFunctionWithVertexValue & Serializable, VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { - O iterateEdges(Vertex v, Iterable> edges) throws Exception; + void iterateEdges(Vertex v, Iterable> edges, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 62173e3c6e730..da97980419726 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -28,10 +28,10 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -792,7 +792,7 @@ public ApplyGroupReduceFunction(EdgesFunction fun) { } public void reduce(Iterable>> edges, Collector out) throws Exception { - out.collect(function.iterateEdges(edges)); + function.iterateEdges(edges, out); } @Override @@ -828,7 +828,7 @@ public ApplyCoGroupFunction(EdgesFunctionWithVertexValue fun) { public void coGroup(Iterable> vertex, Iterable> edges, Collector out) throws Exception { - out.collect(function.iterateEdges(vertex.iterator().next(), edges)); + function.iterateEdges(vertex.iterator().next(), edges, out); } @Override @@ -876,7 +876,7 @@ public Iterator> iterator() { } }; - out.collect(function.iterateEdges(vertex.iterator().next(), edgesIterable)); + function.iterateEdges(vertex.iterator().next(), edgesIterable, out); } @Override @@ -1293,8 +1293,7 @@ public ApplyNeighborGroupReduceFunction(NeighborsFunction fun) { } public void reduce(Iterable, Vertex>> edges, Collector out) throws Exception { - out.collect(function.iterateNeighbors(edges)); - + function.iterateNeighbors(edges, out); } @Override @@ -1339,7 +1338,7 @@ public ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue> vertex, Iterable, Vertex>> neighbors, Collector out) throws Exception { - out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors)); + function.iterateNeighbors(vertex.iterator().next(), neighbors, out); } @Override @@ -1388,8 +1387,7 @@ public Iterator, Vertex>> iterator() { } }; - out.collect(function.iterateNeighbors(vertex.iterator().next(), - neighborsIterable)); + function.iterateNeighbors(vertex.iterator().next(), neighborsIterable, out); } @Override diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java index a2d28b26d5ae6..6fd31f4eeab0d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Collector; /** * Interface to be implemented by the function applied to a vertex neighborhood @@ -36,5 +37,5 @@ public interface NeighborsFunction & Serializable, VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { - O iterateNeighbors(Iterable, Vertex>> neighbors) throws Exception; + void iterateNeighbors(Iterable, Vertex>> neighbors, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java index 438ed8a45373d..6f1953a094afe 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; /** * Interface to be implemented by the function applied to a vertex neighborhood @@ -36,5 +37,5 @@ public interface NeighborsFunctionWithVertexValue & Serializable, VV extends Serializable, EV extends Serializable, O> extends Function, Serializable { - O iterateNeighbors(Vertex vertex, Iterable, Vertex>> neighbors) throws Exception; + void iterateNeighbors(Vertex vertex, Iterable, Vertex>> neighbors, Collector out) throws Exception; } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index 9b186231ff913..9d642db926e74 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -185,8 +185,8 @@ public boolean filter(Tuple2 value) throws Exception { public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue> { - public Tuple2 iterateEdges(Vertex vertex, - Iterable> edges) { + public void iterateEdges(Vertex vertex, + Iterable> edges, Collector> out) throws Exception { int maxPlaycount = 0; String topSong = ""; @@ -196,7 +196,7 @@ public Tuple2 iterateEdges(Vertex vertex, topSong = edge.getTarget(); } } - return new Tuple2(vertex.getId(), topSong); + out.collect(new Tuple2(vertex.getId(), topSong)); } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index ec0c84c6f713d..e60da1ee856a3 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -104,6 +105,210 @@ public void testLowestWeightInNeighbor() throws Exception { "5,3\n"; } + @Test + public void testAllOutNeighbors() throws Exception { + /* + * Get the all the out-neighbors for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllOutNeighbors = + graph.reduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT); + verticesWithAllOutNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "1,3\n" + + "2,3\n" + + "3,4\n" + + "3,5\n" + + "4,5\n" + + "5,1"; + } + + @Test + public void testAllOutNeighborsNoValue() throws Exception { + /* + * Get the all the out-neighbors for each vertex except for the vertex with id 5. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllOutNeighbors = + graph.reduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT); + verticesWithAllOutNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "1,3\n" + + "2,3\n" + + "3,4\n" + + "3,5\n" + + "4,5"; + } + + @Test + public void testAllOutNeighborsWithValueGreaterThanTwo() throws Exception { + /* + * Get the all the out-neighbors for each vertex that have a value greater than two. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllOutNeighbors = + graph.reduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT); + verticesWithAllOutNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "3,4\n" + + "3,5\n" + + "4,5\n" + + "5,1"; + } + + @Test + public void testAllInNeighbors() throws Exception { + /* + * Get the all the in-neighbors for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllInNeighbors = + graph.reduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN); + verticesWithAllInNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "3,2\n" + + "4,3\n" + + "5,3\n" + + "5,4"; + } + + @Test + public void testAllInNeighborsNoValue() throws Exception { + /* + * Get the all the in-neighbors for each vertex except for the vertex with id 5. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllInNeighbors = + graph.reduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN); + verticesWithAllInNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "3,2\n" + + "4,3"; + } + + @Test + public void testAllInNeighborsWithValueGreaterThanTwo() throws Exception { + /* + * Get the all the in-neighbors for each vertex that have a value greater than two. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllInNeighbors = + graph.reduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN); + verticesWithAllInNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "3,1\n" + + "3,2\n" + + "4,3\n" + + "5,3\n" + + "5,4"; + } + + @Test + public void testAllNeighbors() throws Exception { + /* + * Get the all the neighbors for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllNeighbors = + graph.reduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL); + verticesWithAllNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "1,3\n" + + "1,5\n" + + "2,1\n" + + "2,3\n" + + "3,1\n" + + "3,2\n" + + "3,4\n" + + "3,5\n" + + "4,3\n" + + "4,5\n" + + "5,1\n" + + "5,3\n" + + "5,4"; + } + + @Test + public void testAllNeighborsNoValue() throws Exception { + /* + * Get the all the neighbors for each vertex except for vertices with id 5 and 2. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllNeighbors = + graph.reduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL); + verticesWithAllNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "1,3\n" + + "1,5\n" + + "3,1\n" + + "3,2\n" + + "3,4\n" + + "3,5\n" + + "4,3\n" + + "4,5"; + } + + @Test + public void testAllNeighborsWithValueGreaterThanFour() throws Exception { + /* + * Get the all the neighbors for each vertex that have a value greater than four. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithAllNeighbors = + graph.reduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); + verticesWithAllNeighbors.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "5,1\n" + + "5,3\n" + + "5,4"; + } + @Test public void testMaxWeightEdge() throws Exception { /* @@ -195,28 +400,29 @@ public void testMaxWeightAllNeighbors() throws Exception { @SuppressWarnings("serial") private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue> { - public Tuple2 iterateEdges( - Vertex v, - Iterable> edges) { + @Override + public void iterateEdges(Vertex v, + Iterable> edges, Collector> out) throws Exception { long weight = Long.MAX_VALUE; - long minNeighorId = 0; - + long minNeighborId = 0; + for (Edge edge: edges) { if (edge.getValue() < weight) { weight = edge.getValue(); - minNeighorId = edge.getTarget(); + minNeighborId = edge.getTarget(); } } - return new Tuple2(v.getId(), minNeighorId); + out.collect(new Tuple2(v.getId(), minNeighborId)); } } @SuppressWarnings("serial") private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue> { - public Tuple2 iterateEdges(Vertex v, - Iterable> edges) { + @Override + public void iterateEdges(Vertex v, + Iterable> edges, Collector> out) throws Exception { long weight = Long.MIN_VALUE; @@ -225,37 +431,41 @@ public Tuple2 iterateEdges(Vertex v, weight = edge.getValue(); } } - return new Tuple2(v.getId(), weight); + out.collect(new Tuple2(v.getId(), weight)); } } @SuppressWarnings("serial") private static final class SelectMinWeightNeighborNoValue implements EdgesFunction> { - public Tuple2 iterateEdges(Iterable>> edges) { + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { long weight = Long.MAX_VALUE; - long minNeighorId = 0; + long minNeighborId = 0; long vertexId = -1; long i=0; for (Tuple2> edge: edges) { if (edge.f1.getValue() < weight) { weight = edge.f1.getValue(); - minNeighorId = edge.f1.getTarget(); + minNeighborId = edge.f1.getTarget(); } if (i==0) { vertexId = edge.f0; } i++; } - return new Tuple2(vertexId, minNeighorId); + out.collect(new Tuple2(vertexId, minNeighborId)); } } @SuppressWarnings("serial") private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction> { - public Tuple2 iterateEdges(Iterable>> edges) { + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { long weight = Long.MIN_VALUE; long vertexId = -1; @@ -269,16 +479,16 @@ public Tuple2 iterateEdges(Iterable>> vertexId = edge.f0; } i++; } - return new Tuple2(vertexId, weight); + out.collect(new Tuple2(vertexId, weight)); } } @SuppressWarnings("serial") private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue> { - public Tuple2 iterateEdges( - Vertex v, - Iterable> edges) { + @Override + public void iterateEdges(Vertex v, + Iterable> edges, Collector> out) throws Exception { long weight = Long.MAX_VALUE; long minNeighorId = 0; @@ -289,14 +499,16 @@ public Tuple2 iterateEdges( minNeighorId = edge.getSource(); } } - return new Tuple2(v.getId(), minNeighorId); + out.collect(new Tuple2(v.getId(), minNeighorId)); } } @SuppressWarnings("serial") private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction> { - public Tuple2 iterateEdges(Iterable>> edges) { + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { long weight = Long.MAX_VALUE; long minNeighorId = 0; @@ -312,7 +524,146 @@ public Tuple2 iterateEdges(Iterable>> vertexId = edge.f0; } i++; } - return new Tuple2(vertexId, minNeighorId); + out.collect(new Tuple2(vertexId, minNeighorId)); + } + } + + @SuppressWarnings("serial") + private static final class SelectOutNeighbors implements EdgesFunction> { + + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { + + for(Tuple2> edge : edges) { + out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + } + } + } + + @SuppressWarnings("serial") + private static final class SelectOutNeighborsExcludeFive implements EdgesFunction> { + + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { + + for(Tuple2> edge : edges) { + if(edge.f0 != 5) { + out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + } + } + } + } + + @SuppressWarnings("serial") + private static final class SelectOutNeighborsValueGreaterThanTwo implements + EdgesFunctionWithVertexValue> { + + @Override + public void iterateEdges(Vertex v, Iterable> edges, + Collector> out) throws Exception { + for (Edge edge: edges) { + if(v.getValue() > 2) { + out.collect(new Tuple2(v.getId(), edge.getTarget())); + } + } + } + } + + @SuppressWarnings("serial") + private static final class SelectInNeighbors implements EdgesFunction> { + + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { + + for(Tuple2> edge : edges) { + out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + } + } + } + + @SuppressWarnings("serial") + private static final class SelectInNeighborsExceptFive implements EdgesFunction> { + + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { + + for(Tuple2> edge : edges) { + if(edge.f0 != 5) { + out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + } + } + } + } + + @SuppressWarnings("serial") + private static final class SelectInNeighborsValueGreaterThanTwo implements + EdgesFunctionWithVertexValue> { + + @Override + public void iterateEdges(Vertex v, Iterable> edges, + Collector> out) throws Exception { + for (Edge edge: edges) { + if(v.getValue() > 2) { + out.collect(new Tuple2(v.getId(), edge.getSource())); + } + } + } + } + + @SuppressWarnings("serial") + private static final class SelectNeighbors implements EdgesFunction> { + + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { + for (Tuple2> edge : edges) { + if (edge.f0 == edge.f1.getTarget()) { + out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + } else { + out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + } + } + } + } + + @SuppressWarnings("serial") + private static final class SelectNeighborsExceptFiveAndTwo implements EdgesFunction> { + + @Override + public void iterateEdges(Iterable>> edges, + Collector> out) throws Exception { + for (Tuple2> edge : edges) { + if(edge.f0 != 5 && edge.f0 != 2) { + if (edge.f0 == edge.f1.getTarget()) { + out.collect(new Tuple2(edge.f0, edge.f1.getSource())); + } else { + out.collect(new Tuple2(edge.f0, edge.f1.getTarget())); + } + } + } + } + } + + @SuppressWarnings("serial") + private static final class SelectNeighborsValueGreaterThanFour implements + EdgesFunctionWithVertexValue> { + + @Override + public void iterateEdges(Vertex v, Iterable> edges, + Collector> out) throws Exception { + for(Edge edge : edges) { + if(v.getValue() > 4) { + if(v.getId().equals(edge.getTarget())) { + out.collect(new Tuple2(v.getId(), edge.getSource())); + } else { + out.collect(new Tuple2(v.getId(), edge.getTarget())); + } + } + } } } } \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index 785552c7172a5..fdb99893be6cb 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -131,6 +132,65 @@ public void testSumOfOAllNeighbors() throws Exception { "5,13\n"; } + @Test + public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex with id greater than three. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + expectedResult = "4,5\n" + + "5,1\n"; + } + + @Test + public void testSumOfInNeighborsIdGreaterThanThree() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex with id greater than three. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + expectedResult = "4,102\n" + + "5,285\n"; + } + + @Test + public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception { + /* + * Get the sum of all neighbor values + * including own vertex value + * for each vertex with id greater than three. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "4,12\n" + + "5,13\n"; + } + @Test public void testSumOfOutNeighborsNoValue() throws Exception { /* @@ -166,14 +226,13 @@ public void testSumOfInNeighborsNoValue() throws Exception { DataSet> verticesWithSum = graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); - verticesWithSum.writeAsCsv(resultPath); env.execute(); expectedResult = "1,255\n" + - "2,12\n" + + "2,12\n" + "3,59\n" + - "4,102\n" + + "4,102\n" + "5,285\n"; } @@ -187,10 +246,10 @@ public void testSumOfAllNeighborsNoValue() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> verticesWithSumOfOutNeighborValues = + DataSet> verticesWithSumOfAllNeighborValues = graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL); - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath); env.execute(); expectedResult = "1,10\n" + @@ -200,33 +259,191 @@ public void testSumOfAllNeighborsNoValue() throws Exception { "5,8\n"; } + @Test + public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex with id greater than two as well as the same sum multiplied by two. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "3,9\n" + + "3,18\n" + + "4,5\n" + + "4,10\n" + + "5,1\n" + + "5,2"; + } + + @Test + public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception { + /* + * Get the sum of in-neighbor values + * for each vertex with id greater than two as well as the same sum multiplied by two. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN); + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "3,59\n" + + "3,118\n" + + "4,204\n" + + "4,102\n" + + "5,570\n" + + "5,285"; + } + + @Test + public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception { + /* + * Get the sum of all neighbor values + * for each vertex with id greater than two as well as the same sum multiplied by two. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSumOfAllNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL); + + verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "3,12\n" + + "3,24\n" + + "4,8\n" + + "4,16\n" + + "5,8\n" + + "5,16"; + } + + @Test + public void testSumOfOutNeighborsMultipliedByTwo() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex as well as the sum of out-neighbor values multiplied by two. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,5\n" + + "1,10\n" + + "2,3\n" + + "2,6\n" + + "3,9\n" + + "3,18\n" + + "4,5\n" + + "4,10\n" + + "5,1\n" + + "5,2"; + } + + @Test + public void testSumOfInNeighborsSubtractOne() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex as well as the same sum minus one. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,255\n" + + "1,254\n" + + "2,12\n" + + "2,11\n" + + "3,59\n" + + "3,58\n" + + "4,102\n" + + "4,101\n" + + "5,285\n" + + "5,284"; + } + + @Test + public void testSumOfOAllNeighborsAddFive() throws Exception { + /* + * Get the sum of all neighbor values + * including own vertex value + * for each vertex as well as the same sum plus five. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,11\n" + + "1,16\n" + + "2,6\n" + + "2,11\n" + + "3,15\n" + + "3,20\n" + + "4,12\n" + + "4,17\n" + + "5,13\n" + + "5,18"; + } + @SuppressWarnings("serial") private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue> { - public Tuple2 iterateNeighbors(Vertex vertex, - Iterable, Vertex>> neighbors) { + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { long sum = 0; for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - return new Tuple2(vertex.getId(), sum); + out.collect(new Tuple2(vertex.getId(), sum)); } } @SuppressWarnings("serial") private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue> { - - public Tuple2 iterateNeighbors(Vertex vertex, - Iterable, Vertex>> neighbors) { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { long sum = 0; for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f0.getValue() * neighbor.f1.getValue(); } - return new Tuple2(vertex.getId(), sum); + out.collect(new Tuple2(vertex.getId(), sum)); } } @@ -234,14 +451,73 @@ public Tuple2 iterateNeighbors(Vertex vertex, private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue> { - public Tuple2 iterateNeighbors(Vertex vertex, - Iterable, Vertex>> neighbors) { + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { long sum = 0; for (Tuple2, Vertex> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - return new Tuple2(vertex.getId(), sum + vertex.getValue()); + out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); + } + } + + @SuppressWarnings("serial") + private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + if(vertex.getId() > 3) { + out.collect(new Tuple2(vertex.getId(), sum)); + } + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); + } + if(vertex.getId() > 3) { + out.collect(new Tuple2(vertex.getId(), sum)); + } + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + if(vertex.getId() > 3) { + out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); + } } } @@ -249,8 +525,9 @@ public Tuple2 iterateNeighbors(Vertex vertex, private static final class SumOutNeighborsNoValue implements NeighborsFunction> { - public Tuple2 iterateNeighbors( - Iterable, Vertex>> neighbors) { + @Override + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { long sum = 0; Tuple3, Vertex> next = null; @@ -260,17 +537,18 @@ public Tuple2 iterateNeighbors( next = neighborsIterator.next(); sum += next.f2.getValue(); } - return new Tuple2(next.f0, sum); + out.collect(new Tuple2(next.f0, sum)); } } @SuppressWarnings("serial") private static final class SumInNeighborsNoValue implements NeighborsFunction> { - - public Tuple2 iterateNeighbors( - Iterable, Vertex>> neighbors) { - + + @Override + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + long sum = 0; Tuple3, Vertex> next = null; Iterator, Vertex>> neighborsIterator = @@ -279,7 +557,7 @@ public Tuple2 iterateNeighbors( next = neighborsIterator.next(); sum += next.f2.getValue() * next.f1.getValue(); } - return new Tuple2(next.f0, sum); + out.collect(new Tuple2(next.f0, sum)); } } @@ -287,8 +565,9 @@ public Tuple2 iterateNeighbors( private static final class SumAllNeighborsNoValue implements NeighborsFunction> { - public Tuple2 iterateNeighbors( - Iterable, Vertex>> neighbors) { + @Override + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { long sum = 0; Tuple3, Vertex> next = null; @@ -298,7 +577,131 @@ public Tuple2 iterateNeighbors( next = neighborsIterator.next(); sum += next.f2.getValue(); } - return new Tuple2(next.f0, sum); + out.collect(new Tuple2(next.f0, sum)); + } + } + + @SuppressWarnings("serial") + private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction> { + + @Override + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + Tuple3, Vertex> next = null; + Iterator, Vertex>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); + } + if(next.f0 > 2) { + out.collect(new Tuple2(next.f0, sum)); + out.collect(new Tuple2(next.f0, sum * 2)); + } + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction> { + + @Override + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + Tuple3, Vertex> next = null; + Iterator, Vertex>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue() * next.f1.getValue(); + } + if(next.f0 > 2) { + out.collect(new Tuple2(next.f0, sum)); + out.collect(new Tuple2(next.f0, sum * 2)); + } + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction> { + + @Override + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + Tuple3, Vertex> next = null; + Iterator, Vertex>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); + } + if(next.f0 > 2) { + out.collect(new Tuple2(next.f0, sum)); + out.collect(new Tuple2(next.f0, sum * 2)); + } + } + } + + @SuppressWarnings("serial") + private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + out.collect(new Tuple2(vertex.getId(), sum)); + out.collect(new Tuple2(vertex.getId(), sum * 2)); + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); + } + out.collect(new Tuple2(vertex.getId(), sum)); + out.collect(new Tuple2(vertex.getId(), sum - 1)); } } -} \ No newline at end of file + + @SuppressWarnings("serial") + private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue())); + out.collect(new Tuple2(vertex.getId(), sum + vertex.getValue() + 5)); + } + } +} + From d59a3cc36861438cca6ee9a86ac40458c077ca4e Mon Sep 17 00:00:00 2001 From: andralungu Date: Fri, 10 Apr 2015 17:54:14 +0200 Subject: [PATCH 2/6] [FLINK-1758][gelly][work in progress] Faulty behavior reduce --- docs/gelly_guide.md | 32 ++-- .../org/apache/flink/graph/EdgeDirection.java | 8 +- .../org/apache/flink/graph/EdgesFunction.java | 2 +- .../graph/EdgesFunctionWithVertexValue.java | 2 +- .../java/org/apache/flink/graph/Graph.java | 139 +++++++++++++++++- .../apache/flink/graph/NeighborsFunction.java | 2 +- .../NeighborsFunctionWithVertexValue.java | 2 +- .../flink/graph/ReduceEdgesFunction.java | 36 +++++ .../flink/graph/ReduceNeighborsFunction.java | 40 +++++ .../flink/graph/example/MusicProfiles.java | 2 +- .../ReduceOnEdgesMethodsITCase.java | 94 ++++++------ .../ReduceOnNeighborMethodsITCase.java | 87 +++++------ 12 files changed, 311 insertions(+), 135 deletions(-) create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md index f65d4d95c038c..49d9cae7c494e 100644 --- a/docs/gelly_guide.md +++ b/docs/gelly_guide.md @@ -269,7 +269,10 @@ Neighborhood Methods Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood. -`reduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `reduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). +`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). + +The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex. +When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called as they are more efficient. For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph: @@ -282,7 +285,7 @@ The following code will collect the out-edges for each vertex and apply the `Sel {% highlight java %} Graph graph = ... -DataSet> minWeights = graph.reduceOnEdges( +DataSet> minWeights = graph.groupReduceOnEdges( new SelectMinWeight(), EdgeDirection.OUT); // user-defined function to select the minimum weight @@ -319,20 +322,15 @@ DataSet> verticesWithSum = graph.reduceOnNeighbors( new SumValues(), EdgeDirection.IN); // user-defined function to sum the neighbor values -static final class SumValues implements NeighborsFunction> { - - public void iterateNeighbors(Iterable, - Vertex>> neighbors, Collector> out) { - - long sum = 0; - long vertexId = -1; +static final class SumValues implements ReduceNeighborsFunction { - for (Tuple3, Vertex> neighbor : neighbors) { - vertexId = neighbor.f0; - sum += neighbor.f2.getValue(); - } - out.collect(new Tuple2(vertexId, sum)); - } + public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, + Tuple3, Vertex> secondNeighbor) { + + long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue(); + return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, + new Vertex(firstNeighbor.f0, sum)); + } } {% endhighlight %} @@ -340,9 +338,7 @@ static final class SumValues implements NeighborsFunction

-When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead. - -The neighborhood methods return zero, one or more values per vertex. +When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead. [Back to top](#top) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java index 379b302781484..65d40986f2870 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java @@ -20,10 +20,10 @@ /** * The EdgeDirection is used to select a node's neighborhood - * by the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)}, - * {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}, - * {@link Graph#reduceOnNeighbors(NeighborsFunction, EdgeDirection)} and - * {@link Graph#reduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)} + * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)}, + * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}, + * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and + * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)} * methods. */ public enum EdgeDirection { diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java index 30f73f4706c1d..aac63db7fb14c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java @@ -26,7 +26,7 @@ /** * Interface to be implemented by the function applied to a vertex neighborhood - * in the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)} method. + * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method. * * @param the vertex key type * @param the edge value type diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java index 1bce0a098697f..f4f43208e56c3 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java @@ -25,7 +25,7 @@ /** * Interface to be implemented by the function applied to a vertex neighborhood - * in the {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)} + * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)} * method. * * @param the vertex key type diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index da97980419726..8b640ed83e71b 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -718,8 +719,8 @@ public Graph getUndirected() { * @return a dataset of a T * @throws IllegalArgumentException */ - public DataSet reduceOnEdges(EdgesFunctionWithVertexValue edgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue edgesFunction, + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -749,8 +750,8 @@ public DataSet reduceOnEdges(EdgesFunctionWithVertexValue e * @return a dataset of T * @throws IllegalArgumentException */ - public DataSet reduceOnEdges(EdgesFunction edgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + public DataSet groupReduceOnEdges(EdgesFunction edgesFunction, + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -1209,8 +1210,8 @@ public Graph run(GraphAlgorithm algorithm) throws Exceptio * @return a dataset of a T * @throws IllegalArgumentException */ - public DataSet reduceOnNeighbors(NeighborsFunctionWithVertexValue neighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue neighborsFunction, + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1252,8 +1253,8 @@ public DataSet reduceOnNeighbors(NeighborsFunctionWithVertexValue DataSet reduceOnNeighbors(NeighborsFunction neighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + public DataSet groupReduceOnNeighbors(NeighborsFunction neighborsFunction, + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1395,4 +1396,126 @@ public TypeInformation getProducedType() { return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null); } } + + /** + * Compute an aggregate over the neighbors (edges and vertices) of each + * vertex. The function applied on the neighbors only has access to the + * vertex id (not the vertex value). + * + * @param reduceNeighborsFunction the function to apply to the neighborhood + * @param direction the edge direction (in-, out-, all-) + * @return a dataset containing one value per vertex + * @throws IllegalArgumentException + */ + public DataSet reduceOnNeighbors(ReduceNeighborsFunction reduceNeighborsFunction, + EdgeDirection direction) throws IllegalArgumentException { + switch (direction) { + case IN: + // create pairs + final DataSet, Vertex>> edgesWithSources = edges + .join(this.vertices).where(0).equalTo(0) + .with(new ProjectVertexIdJoin(1)); + return edgesWithSources.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)) + .map(new ApplyNeighborhoodMapFunction()); + case OUT: + // create pairs + DataSet, Vertex>> edgesWithTargets = edges + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectVertexIdJoin(0)); + return edgesWithTargets.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)) + .map(new ApplyNeighborhoodMapFunction()); + case ALL: + // create pairs + DataSet, Vertex>> edgesWithNeighbors = edges + .flatMap(new EmitOneEdgeWithNeighborPerNode()) + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectEdgeWithNeighbor()); + + return edgesWithNeighbors.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)) + .map(new ApplyNeighborhoodMapFunction()); + default: + throw new IllegalArgumentException("Illegal edge direction"); + } + } + + private static final class ApplyNeighborReduceFunction & Serializable, VV extends Serializable, EV extends Serializable> + implements ReduceFunction, Vertex>> { + + private ReduceNeighborsFunction function; + + public ApplyNeighborReduceFunction(ReduceNeighborsFunction fun) { + this.function = fun; + } + + @Override + public Tuple3, Vertex> reduce(Tuple3, Vertex> first, + Tuple3, Vertex> second) throws Exception { + return function.reduceNeighbors(first, second); + } + } + + public static final class ApplyNeighborhoodMapFunction & Serializable, VV extends Serializable, EV extends Serializable> + implements MapFunction, Vertex> ,Tuple2> { + + @Override + public Tuple2 map(Tuple3, Vertex> edgesWithSrc) throws Exception { + return new Tuple2(edgesWithSrc.f0, edgesWithSrc.f2.getValue()); + } + } + + /** + * Compute an aggregate over the edges of each vertex. The function applied + * on the edges only has access to the vertex id (not the vertex value). + * + * @param reduceEdgesFunction + * the function to apply to the neighborhood + * @param direction + * the edge direction (in-, out-, all-) + * @return a dataset containing one value per vertex + * @throws IllegalArgumentException + */ + public DataSet reduceOnEdges(ReduceEdgesFunction reduceEdgesFunction, + EdgeDirection direction) throws IllegalArgumentException { + + switch (direction) { + case IN: + return edges.map(new ProjectVertexIdMap(1)) + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) + .map(new ApplyEdgesMapFunction()); + case OUT: + return edges.map(new ProjectVertexIdMap(0)) + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) + .map(new ApplyEdgesMapFunction()); + case ALL: + return edges.flatMap(new EmitOneEdgePerNode()) + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) + .map(new ApplyEdgesMapFunction()); + default: + throw new IllegalArgumentException("Illegal edge direction"); + } + } + + private static final class ApplyReduceFunction & Serializable, EV extends Serializable> + implements ReduceFunction>> { + + private ReduceEdgesFunction function; + + public ApplyReduceFunction(ReduceEdgesFunction fun) { + this.function = fun; + } + + @Override + public Tuple2> reduce(Tuple2> first, Tuple2> second) throws Exception { + return function.reduceEdges(first, second); + } + } + + public static final class ApplyEdgesMapFunction & Serializable, VV extends Serializable, EV extends Serializable> + implements MapFunction> ,Tuple2> { + + @Override + public Tuple2 map(Tuple2> edge) throws Exception { + return new Tuple2(edge.f0, edge.f1.getValue()); + } + } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java index 6fd31f4eeab0d..b43f9d11bd4c4 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java @@ -26,7 +26,7 @@ /** * Interface to be implemented by the function applied to a vertex neighborhood - * in the {@link Graph#reduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)} + * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} * method. * * @param the vertex key type diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java index 6f1953a094afe..32d184dffcaa8 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java @@ -26,7 +26,7 @@ /** * Interface to be implemented by the function applied to a vertex neighborhood - * in the {@link Graph#reduceOnNeighbors(NeighborsFunction, EdgeDirection)} + * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)} * method. * * @param the vertex key type diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java new file mode 100644 index 0000000000000..0b5d2cf918de3 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.java.tuple.Tuple2; + +import java.io.Serializable; + +/** + * Interface to be implemented by the function applied to a vertex neighborhood + * in the {@link Graph#reduceOnEdges(org.apache.flink.graph.ReduceEdgesFunction, EdgeDirection)} method. + * + * @param the vertex key type + * @param the edge value type + */ +public interface ReduceEdgesFunction & Serializable, + EV extends Serializable> { + + Tuple2> reduceEdges(Tuple2> firstEdge, Tuple2> secondEdge); +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java new file mode 100644 index 0000000000000..50c0d35571054 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.io.Serializable; + +/** + * Interface to be implemented by the function applied to a vertex neighborhood + * in the {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)} + * method. + * + * @param the vertex key type + * @param the vertex value type + * @param the edge value type + */ +public interface ReduceNeighborsFunction & Serializable, VV extends Serializable, + EV extends Serializable> extends Function, Serializable { + + Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, + Tuple3, Vertex> secondNeighbor); +} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index 9d642db926e74..e8871eb3b4276 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -100,7 +100,7 @@ public static void main(String[] args) throws Exception { * Get the top track (most listened) for each user */ DataSet> usersWithTopTrack = userSongGraph - .reduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT) + .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT) .filter(new FilterSongNodes()); if (fileOutput) { diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index e60da1ee856a3..2452cba04517a 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.graph.EdgesFunction; import org.apache.flink.graph.EdgesFunctionWithVertexValue; import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceEdgesFunction; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; @@ -72,7 +73,7 @@ public void testLowestWeightOutNeighbor() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); + graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); verticesWithLowestOutNeighbor.writeAsCsv(resultPath); env.execute(); @@ -94,7 +95,7 @@ public void testLowestWeightInNeighbor() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); verticesWithLowestOutNeighbor.writeAsCsv(resultPath); env.execute(); @@ -115,7 +116,7 @@ public void testAllOutNeighbors() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllOutNeighbors = - graph.reduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT); + graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT); verticesWithAllOutNeighbors.writeAsCsv(resultPath); env.execute(); @@ -138,7 +139,7 @@ public void testAllOutNeighborsNoValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllOutNeighbors = - graph.reduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT); + graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT); verticesWithAllOutNeighbors.writeAsCsv(resultPath); env.execute(); @@ -160,7 +161,7 @@ public void testAllOutNeighborsWithValueGreaterThanTwo() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllOutNeighbors = - graph.reduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT); + graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT); verticesWithAllOutNeighbors.writeAsCsv(resultPath); env.execute(); @@ -180,7 +181,7 @@ public void testAllInNeighbors() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllInNeighbors = - graph.reduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN); verticesWithAllInNeighbors.writeAsCsv(resultPath); env.execute(); @@ -203,7 +204,7 @@ public void testAllInNeighborsNoValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllInNeighbors = - graph.reduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN); verticesWithAllInNeighbors.writeAsCsv(resultPath); env.execute(); @@ -224,7 +225,7 @@ public void testAllInNeighborsWithValueGreaterThanTwo() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllInNeighbors = - graph.reduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN); verticesWithAllInNeighbors.writeAsCsv(resultPath); env.execute(); @@ -245,7 +246,7 @@ public void testAllNeighbors() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllNeighbors = - graph.reduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL); verticesWithAllNeighbors.writeAsCsv(resultPath); env.execute(); @@ -275,7 +276,7 @@ public void testAllNeighborsNoValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllNeighbors = - graph.reduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL); verticesWithAllNeighbors.writeAsCsv(resultPath); env.execute(); @@ -300,7 +301,7 @@ public void testAllNeighborsWithValueGreaterThanFour() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithAllNeighbors = - graph.reduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); verticesWithAllNeighbors.writeAsCsv(resultPath); env.execute(); @@ -320,7 +321,7 @@ public void testMaxWeightEdge() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); verticesWithMaxEdgeWeight.writeAsCsv(resultPath); env.execute(); @@ -364,7 +365,7 @@ public void testLowestWeightInNeighborNoValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); verticesWithLowestOutNeighbor.writeAsCsv(resultPath); env.execute(); @@ -436,50 +437,51 @@ public void iterateEdges(Vertex v, } @SuppressWarnings("serial") - private static final class SelectMinWeightNeighborNoValue implements EdgesFunction> { + private static final class SelectMinWeightNeighborNoValue implements ReduceEdgesFunction { @Override - public void iterateEdges(Iterable>> edges, - Collector> out) throws Exception { + public Tuple2> reduceEdges(Tuple2> firstEdge, + Tuple2> secondEdge) { - long weight = Long.MAX_VALUE; - long minNeighborId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighborId = edge.f1.getTarget(); - } - if (i==0) { - vertexId = edge.f0; - } i++; + if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) { + return firstEdge; + } else { + return secondEdge; } - out.collect(new Tuple2(vertexId, minNeighborId)); + } } @SuppressWarnings("serial") - private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction> { + private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction { + +// @Override +// public void iterateEdges(Iterable>> edges, +// Collector> out) throws Exception { +// +// long weight = Long.MIN_VALUE; +// long vertexId = -1; +// long i=0; +// +// for (Tuple2> edge: edges) { +// if (edge.f1.getValue() > weight) { +// weight = edge.f1.getValue(); +// } +// if (i==0) { +// vertexId = edge.f0; +// } i++; +// } +// out.collect(new Tuple2(vertexId, weight)); +// } @Override - public void iterateEdges(Iterable>> edges, - Collector> out) throws Exception { - - long weight = Long.MIN_VALUE; - long vertexId = -1; - long i=0; - - for (Tuple2> edge: edges) { - if (edge.f1.getValue() > weight) { - weight = edge.f1.getValue(); - } - if (i==0) { - vertexId = edge.f0; - } i++; + public Tuple2> reduceEdges(Tuple2> firstEdge, + Tuple2> secondEdge) { + if(firstEdge.f1.getValue() > secondEdge.f1.getValue()) { + return firstEdge; + } else { + return secondEdge; } - out.collect(new Tuple2(vertexId, weight)); } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index fdb99893be6cb..5300d24405f1b 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -20,6 +20,7 @@ import java.util.Iterator; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; @@ -29,6 +30,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.NeighborsFunction; import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.ReduceNeighborsFunction; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; @@ -75,7 +77,7 @@ public void testSumOfOutNeighbors() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); + graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -97,7 +99,7 @@ public void testSumOfInNeighbors() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); verticesWithSum.writeAsCsv(resultPath); env.execute(); @@ -120,7 +122,7 @@ public void testSumOfOAllNeighbors() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); + graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -143,7 +145,7 @@ public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT); + graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -162,7 +164,7 @@ public void testSumOfInNeighborsIdGreaterThanThree() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN); verticesWithSum.writeAsCsv(resultPath); env.execute(); @@ -182,7 +184,7 @@ public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL); + graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -270,7 +272,7 @@ public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT); + graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -294,7 +296,7 @@ public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -317,7 +319,7 @@ public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfAllNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL); + graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL); verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -341,7 +343,7 @@ public void testSumOfOutNeighborsMultipliedByTwo() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT); + graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -368,7 +370,7 @@ public void testSumOfInNeighborsSubtractOne() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN); verticesWithSum.writeAsCsv(resultPath); env.execute(); @@ -396,7 +398,7 @@ public void testSumOfOAllNeighborsAddFive() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL); + graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL); verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); env.execute(); @@ -522,62 +524,39 @@ public void iterateNeighbors(Vertex vertex, } @SuppressWarnings("serial") - private static final class SumOutNeighborsNoValue implements NeighborsFunction> { + private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction { @Override - public void iterateNeighbors(Iterable, Vertex>> neighbors, - Collector> out) throws Exception { - - long sum = 0; - Tuple3, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - out.collect(new Tuple2(next.f0, sum)); + public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, + Tuple3, Vertex> secondNeighbor) { + long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue(); + return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, + new Vertex(firstNeighbor.f0, sum)); } } @SuppressWarnings("serial") - private static final class SumInNeighborsNoValue implements NeighborsFunction> { + private static final class SumInNeighborsNoValue implements ReduceNeighborsFunction { @Override - public void iterateNeighbors(Iterable, Vertex>> neighbors, - Collector> out) throws Exception { - - long sum = 0; - Tuple3, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue() * next.f1.getValue(); - } - out.collect(new Tuple2(next.f0, sum)); + public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, + Tuple3, Vertex> secondNeighbor) { + long sum = firstNeighbor.f2.getValue() * firstNeighbor.f1.getValue() + + secondNeighbor.f2.getValue() * secondNeighbor.f1.getValue(); + return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, + new Vertex(firstNeighbor.f0, sum)); } } @SuppressWarnings("serial") - private static final class SumAllNeighborsNoValue implements NeighborsFunction> { + private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction { @Override - public void iterateNeighbors(Iterable, Vertex>> neighbors, - Collector> out) throws Exception { - - long sum = 0; - Tuple3, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - out.collect(new Tuple2(next.f0, sum)); + public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, + Tuple3, Vertex> secondNeighbor) { + long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue(); + return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, + new Vertex(firstNeighbor.f0, sum)); } } From 72e258c6ff08dca42db183a1d3647e87acb0b1ef Mon Sep 17 00:00:00 2001 From: andralungu Date: Mon, 13 Apr 2015 10:26:55 +0200 Subject: [PATCH 3/6] [FLINK-1758][gelly] Replaced groupReduce with reduce --- docs/gelly_guide.md | 35 ++++++++- .../java/org/apache/flink/graph/Graph.java | 12 +-- .../flink/graph/ReduceEdgesFunction.java | 3 +- .../flink/graph/ReduceNeighborsFunction.java | 2 +- .../ReduceOnEdgesMethodsITCase.java | 74 ++++++------------- .../ReduceOnNeighborMethodsITCase.java | 23 +++--- 6 files changed, 78 insertions(+), 71 deletions(-) diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md index 49d9cae7c494e..151f0d201a8c9 100644 --- a/docs/gelly_guide.md +++ b/docs/gelly_guide.md @@ -269,10 +269,15 @@ Neighborhood Methods Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood. -`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). +`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, +while `groupReduceOnNeighbors()` has access to both the neighboring edges and vertices. The neighborhood scope +is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex. -When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called as they are more efficient. +When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called +as they are more efficient. Nevertheless, when the reduce on edges modifies the value produced per vertex, for +instance by multiplying it with a constant, `groupReduceOnEdges()` or `groupReduceOnNeighbors()` must be used +as illustrated in the third code snippet. For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph: @@ -338,6 +343,32 @@ static final class SumValues implements ReduceNeighborsFunction

+The following code will collect the in-edges for each vertex and apply the `SumInNeighbors()` user-defined function on each of the resulting neighborhoods: + +{% highlight java %} +Graph graph = ... + +DataSet> verticesWithSum = + graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); + +// user-defined function to sum up the in-neighbor values. +static final class SumInNeighbors implements NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + + long sum = 0; + for (Tuple2, Vertex> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); + } + out.collect(new Tuple2(vertex.getId(), sum)); + } +} +{% endhighlight %} + When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead. [Back to top](#top) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 8b640ed83e71b..a0d9fcc24a803 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -720,7 +720,7 @@ public Graph getUndirected() { * @throws IllegalArgumentException */ public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue edgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -751,7 +751,7 @@ public DataSet groupReduceOnEdges(EdgesFunctionWithVertexValue DataSet groupReduceOnEdges(EdgesFunction edgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: @@ -1211,7 +1211,7 @@ public Graph run(GraphAlgorithm algorithm) throws Exceptio * @throws IllegalArgumentException */ public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue neighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1254,7 +1254,7 @@ public DataSet groupReduceOnNeighbors(NeighborsFunctionWithVertexValue DataSet groupReduceOnNeighbors(NeighborsFunction neighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1408,7 +1408,7 @@ public TypeInformation getProducedType() { * @throws IllegalArgumentException */ public DataSet reduceOnNeighbors(ReduceNeighborsFunction reduceNeighborsFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: // create pairs @@ -1475,7 +1475,7 @@ public Tuple2 map(Tuple3, Vertex> edgesWithSrc) thr * @throws IllegalArgumentException */ public DataSet reduceOnEdges(ReduceEdgesFunction reduceEdgesFunction, - EdgeDirection direction) throws IllegalArgumentException { + EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java index 0b5d2cf918de3..53c7934e520b6 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.graph; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.tuple.Tuple2; import java.io.Serializable; @@ -30,7 +31,7 @@ * @param the edge value type */ public interface ReduceEdgesFunction & Serializable, - EV extends Serializable> { + EV extends Serializable> extends Function, Serializable { Tuple2> reduceEdges(Tuple2> firstEdge, Tuple2> secondEdge); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java index 50c0d35571054..f5e978f093a04 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java @@ -36,5 +36,5 @@ public interface ReduceNeighborsFunction & Serializable EV extends Serializable> extends Function, Serializable { Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor); + Tuple3, Vertex> secondNeighbor); } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index 2452cba04517a..29fa6e4e916a5 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -335,8 +335,8 @@ public void testMaxWeightEdge() throws Exception { @Test public void testLowestWeightOutNeighborNoValue() throws Exception { /* - * Get the lowest-weight out-neighbor - * for each vertex + * Get the lowest-weight out of all the out-neighbors + * of each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), @@ -347,33 +347,33 @@ public void testLowestWeightOutNeighborNoValue() throws Exception { verticesWithLowestOutNeighbor.writeAsCsv(resultPath); env.execute(); - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; + expectedResult = "1,12\n" + + "2,23\n" + + "3,34\n" + + "4,45\n" + + "5,51\n"; } @Test public void testLowestWeightInNeighborNoValue() throws Exception { /* - * Get the lowest-weight in-neighbor - * for each vertex + * Get the lowest-weight out of all the in-neighbors + * of each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithLowestOutNeighbor = - graph.groupReduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); + graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); verticesWithLowestOutNeighbor.writeAsCsv(resultPath); env.execute(); - expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; + expectedResult = "1,51\n" + + "2,12\n" + + "3,13\n" + + "4,34\n" + + "5,35\n"; } @Test @@ -455,25 +455,6 @@ public Tuple2> reduceEdges(Tuple2> @SuppressWarnings("serial") private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction { -// @Override -// public void iterateEdges(Iterable>> edges, -// Collector> out) throws Exception { -// -// long weight = Long.MIN_VALUE; -// long vertexId = -1; -// long i=0; -// -// for (Tuple2> edge: edges) { -// if (edge.f1.getValue() > weight) { -// weight = edge.f1.getValue(); -// } -// if (i==0) { -// vertexId = edge.f0; -// } i++; -// } -// out.collect(new Tuple2(vertexId, weight)); -// } - @Override public Tuple2> reduceEdges(Tuple2> firstEdge, Tuple2> secondEdge) { @@ -506,27 +487,16 @@ public void iterateEdges(Vertex v, } @SuppressWarnings("serial") - private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction> { + private static final class SelectMinWeightInNeighborNoValue implements ReduceEdgesFunction { @Override - public void iterateEdges(Iterable>> edges, - Collector> out) throws Exception { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getSource(); - } - if (i==0) { - vertexId = edge.f0; - } i++; + public Tuple2> reduceEdges(Tuple2> firstEdge, + Tuple2> secondEdge) { + if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) { + return firstEdge; + } else { + return secondEdge; } - out.collect(new Tuple2(vertexId, minNeighorId)); } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index 5300d24405f1b..5f235699e5ff0 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -20,7 +20,6 @@ import java.util.Iterator; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; @@ -227,7 +226,7 @@ public void testSumOfInNeighborsNoValue() throws Exception { TestGraphUtils.getLongLongEdgeData(env), env); DataSet> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); verticesWithSum.writeAsCsv(resultPath); env.execute(); @@ -536,15 +535,21 @@ public Tuple3, Vertex> reduceNeighbors(Tuple3 } @SuppressWarnings("serial") - private static final class SumInNeighborsNoValue implements ReduceNeighborsFunction { + private static final class SumInNeighborsNoValue implements NeighborsFunction> { @Override - public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor) { - long sum = firstNeighbor.f2.getValue() * firstNeighbor.f1.getValue() + - secondNeighbor.f2.getValue() * secondNeighbor.f1.getValue(); - return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, - new Vertex(firstNeighbor.f0, sum)); + public void iterateNeighbors(Iterable, Vertex>> neighbors, + Collector> out) throws Exception { + long sum = 0; + Tuple3, Vertex> next = null; + Iterator, Vertex>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue() * next.f1.getValue(); + } + out.collect(new Tuple2(next.f0, sum)); } } From 1fd05745f5afb87f201801e5dd63095d16e779b8 Mon Sep 17 00:00:00 2001 From: andralungu Date: Tue, 14 Apr 2015 23:46:01 +0200 Subject: [PATCH 4/6] [FLINK-1758][gelly] Rebased against current master --- .../graph/example/JaccardSimilarityMeasureExample.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java index c81aeb3b74ecf..2783a29ce2254 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java @@ -32,6 +32,7 @@ import org.apache.flink.graph.Triplet; import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; import java.util.HashSet; @@ -68,7 +69,7 @@ public static void main(String [] args) throws Exception { Graph graph = Graph.fromDataSet(edges, env); DataSet>> verticesWithNeighbors = - graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL); Graph, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env); @@ -106,7 +107,8 @@ public String getDescription() { private static final class GatherNeighbors implements EdgesFunction>> { @Override - public Vertex> iterateEdges(Iterable>> edges) throws Exception { + public void iterateEdges(Iterable>> edges, + Collector>> out) throws Exception { HashSet neighborsHashSet = new HashSet(); long vertexId = -1; @@ -115,7 +117,7 @@ public Vertex> iterateEdges(Iterable>(vertexId, neighborsHashSet); + out.collect(new Vertex>(vertexId, neighborsHashSet)); } } From 7331768f41e43207ec59b112fc0d4b6a74df8b8e Mon Sep 17 00:00:00 2001 From: andralungu Date: Mon, 20 Apr 2015 15:50:14 +0200 Subject: [PATCH 5/6] [FLINK-1758][gelly] Addressed inline comments --- docs/gelly_guide.md | 18 +++++++----------- .../org/apache/flink/graph/EdgeDirection.java | 6 ++++-- .../operations/ReduceOnEdgesMethodsITCase.java | 4 ++-- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md index 151f0d201a8c9..13980ec4103a0 100644 --- a/docs/gelly_guide.md +++ b/docs/gelly_guide.md @@ -274,10 +274,8 @@ while `groupReduceOnNeighbors()` has access to both the neighboring edges and ve is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors). The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex. -When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called -as they are more efficient. Nevertheless, when the reduce on edges modifies the value produced per vertex, for -instance by multiplying it with a constant, `groupReduceOnEdges()` or `groupReduceOnNeighbors()` must be used -as illustrated in the third code snippet. + +When the user-defined function to be applied on the neighborhood is associative and commutative, it is highly advised to use the `reduceOnEdges()` and `reduceOnNeighbors()` methods. These methods exploit combiners internally, significantly improving performance. For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph: @@ -294,22 +292,20 @@ DataSet> minWeights = graph.groupReduceOnEdges( new SelectMinWeight(), EdgeDirection.OUT); // user-defined function to select the minimum weight -static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue> { +static final class SelectMinWeight implements EdgesFunctionWithVertexValue> { @Override public void iterateEdges(Vertex v, Iterable> edges, Collector> out) throws Exception { - long weight = Long.MAX_VALUE; - long minNeighborId = 0; + long minWeight = Long.MAX_VALUE; for (Edge edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighborId = edge.getTarget(); + if (edge.getValue() < minWeight) { + minWeight = edge.getValue(); } } - out.collect(new Tuple2(v.getId(), minNeighborId)); + out.collect(new Tuple2(v.getId(), minWeight)); } } {% endhighlight %} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java index 65d40986f2870..0a055bbf4a138 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java @@ -22,8 +22,10 @@ * The EdgeDirection is used to select a node's neighborhood * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)}, * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}, - * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and - * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)} + * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)}, + * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}, + * {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and + * {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)} * methods. */ public enum EdgeDirection { diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index 29fa6e4e916a5..3ace49a3826bf 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -78,9 +78,9 @@ public void testLowestWeightOutNeighbor() throws Exception { env.execute(); expectedResult = "1,2\n" + - "2,3\n" + + "2,3\n" + "3,4\n" + - "4,5\n" + + "4,5\n" + "5,1\n"; } From 8d61b4b222b8421a65bb688dd23218680464bd88 Mon Sep 17 00:00:00 2001 From: andralungu Date: Mon, 20 Apr 2015 23:09:36 +0200 Subject: [PATCH 6/6] [FLINK-1758][gelly] Made reduce methods operate on values --- docs/gelly_guide.md | 11 +- .../java/org/apache/flink/graph/Graph.java | 142 ++++++++++-------- .../flink/graph/ReduceEdgesFunction.java | 2 +- .../flink/graph/ReduceNeighborsFunction.java | 9 +- .../ReduceOnEdgesMethodsITCase.java | 24 +-- .../ReduceOnNeighborMethodsITCase.java | 22 ++- 6 files changed, 114 insertions(+), 96 deletions(-) diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md index 13980ec4103a0..e8b88cc0efda5 100644 --- a/docs/gelly_guide.md +++ b/docs/gelly_guide.md @@ -323,14 +323,13 @@ DataSet> verticesWithSum = graph.reduceOnNeighbors( new SumValues(), EdgeDirection.IN); // user-defined function to sum the neighbor values -static final class SumValues implements ReduceNeighborsFunction { +static final class SumValues implements ReduceNeighborsFunction { - public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor) { + public Tuple2 reduceNeighbors(Tuple2 firstNeighbor, + Tuple2 secondNeighbor) { - long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue(); - return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, - new Vertex(firstNeighbor.f0, sum)); + long sum = firstNeighbor.f1 + secondNeighbor.f1; + return new Tuple2(firstNeighbor.f0, sum)); } } {% endhighlight %} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index a0d9fcc24a803..0f31c9c235712 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -783,6 +783,21 @@ public Tuple2> map(Edge edge) { } } + private static final class ProjectVertexWithEdgeValueMap & Serializable, EV extends Serializable> + implements MapFunction, Tuple2> { + + private int fieldPosition; + + public ProjectVertexWithEdgeValueMap(int position) { + this.fieldPosition = position; + } + + @SuppressWarnings("unchecked") + public Tuple2 map(Edge edge) { + return new Tuple2((K) edge.getField(fieldPosition), edge.getValue()); + } + } + private static final class ApplyGroupReduceFunction & Serializable, EV extends Serializable, T> implements GroupReduceFunction>, T>, ResultTypeQueryable { @@ -810,6 +825,14 @@ public void flatMap(Edge edge, Collector>> out) { } } + private static final class EmitOneVertexWithEdgeValuePerNode & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatMapFunction, Tuple2> { + public void flatMap(Edge edge, Collector> out) { + out.collect(new Tuple2(edge.getSource(), edge.getValue())); + out.collect(new Tuple2(edge.getTarget(), edge.getValue())); + } + } + private static final class EmitOneEdgeWithNeighborPerNode & Serializable, VV extends Serializable, EV extends Serializable> implements FlatMapFunction, Tuple3>> { public void flatMap(Edge edge, Collector>> out) { @@ -1303,27 +1326,50 @@ public TypeInformation getProducedType() { } } - private static final class ProjectVertexIdJoin & Serializable, VV extends Serializable, EV extends Serializable> - implements FlatJoinFunction, Vertex, Tuple3, Vertex>> { + private static final class ProjectVertexWithNeighborValueJoin & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatJoinFunction, Vertex, Tuple2> { private int fieldPosition; - public ProjectVertexIdJoin(int position) { + public ProjectVertexWithNeighborValueJoin(int position) { this.fieldPosition = position; } @SuppressWarnings("unchecked") public void join(Edge edge, Vertex otherVertex, - Collector, Vertex>> out) { + Collector> out) { + out.collect(new Tuple2((K) edge.getField(fieldPosition), otherVertex.getValue())); + } + } + + private static final class ProjectVertexIdJoin & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatJoinFunction, Vertex, Tuple3, Vertex>> { + private int fieldPosition; + public ProjectVertexIdJoin(int position) { + this.fieldPosition = position; + } + @SuppressWarnings("unchecked") + public void join(Edge edge, Vertex otherVertex, + Collector, Vertex>> out) { out.collect(new Tuple3, Vertex>((K) edge.getField(fieldPosition), edge, otherVertex)); } } + private static final class ProjectNeighborValue & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatJoinFunction>, Vertex, Tuple2> { + @SuppressWarnings("unchecked") + public void join(Tuple3> keysWithEdge, Vertex neighbor, + Collector> out) { + + out.collect(new Tuple2(keysWithEdge.f0, neighbor.getValue())); + } + } + private static final class ProjectEdgeWithNeighbor & Serializable, VV extends Serializable, EV extends Serializable> implements FlatJoinFunction>, Vertex, Tuple3, Vertex>> { + @SuppressWarnings("unchecked") public void join(Tuple3> keysWithEdge, Vertex neighbor, - Collector, Vertex>> out) { - + Collector, Vertex>> out) { out.collect(new Tuple3, Vertex>(keysWithEdge.f0, keysWithEdge.f2, neighbor)); } } @@ -1404,65 +1450,53 @@ public TypeInformation getProducedType() { * * @param reduceNeighborsFunction the function to apply to the neighborhood * @param direction the edge direction (in-, out-, all-) - * @return a dataset containing one value per vertex + * @return a dataset containing one value per vertex(vertex id, vertex value) * @throws IllegalArgumentException */ - public DataSet reduceOnNeighbors(ReduceNeighborsFunction reduceNeighborsFunction, + public DataSet> reduceOnNeighbors(ReduceNeighborsFunction reduceNeighborsFunction, EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: - // create pairs - final DataSet, Vertex>> edgesWithSources = edges + // create pairs + final DataSet> verticesWithSourceNeighborValues = edges .join(this.vertices).where(0).equalTo(0) - .with(new ProjectVertexIdJoin(1)); - return edgesWithSources.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)) - .map(new ApplyNeighborhoodMapFunction()); + .with(new ProjectVertexWithNeighborValueJoin(1)); + return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)); case OUT: - // create pairs - DataSet, Vertex>> edgesWithTargets = edges + // create pairs + DataSet> verticesWithTargetNeighborValues = edges .join(this.vertices).where(1).equalTo(0) - .with(new ProjectVertexIdJoin(0)); - return edgesWithTargets.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)) - .map(new ApplyNeighborhoodMapFunction()); + .with(new ProjectVertexWithNeighborValueJoin(0)); + return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)); case ALL: - // create pairs - DataSet, Vertex>> edgesWithNeighbors = edges + // create pairs + DataSet> verticesWithNeighborValues = edges .flatMap(new EmitOneEdgeWithNeighborPerNode()) .join(this.vertices).where(1).equalTo(0) - .with(new ProjectEdgeWithNeighbor()); + .with(new ProjectNeighborValue()); - return edgesWithNeighbors.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)) - .map(new ApplyNeighborhoodMapFunction()); + return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } } - private static final class ApplyNeighborReduceFunction & Serializable, VV extends Serializable, EV extends Serializable> - implements ReduceFunction, Vertex>> { + private static final class ApplyNeighborReduceFunction & Serializable, VV extends Serializable> + implements ReduceFunction> { - private ReduceNeighborsFunction function; + private ReduceNeighborsFunction function; - public ApplyNeighborReduceFunction(ReduceNeighborsFunction fun) { + public ApplyNeighborReduceFunction(ReduceNeighborsFunction fun) { this.function = fun; } @Override - public Tuple3, Vertex> reduce(Tuple3, Vertex> first, - Tuple3, Vertex> second) throws Exception { + public Tuple2 reduce(Tuple2 first, + Tuple2 second) throws Exception { return function.reduceNeighbors(first, second); } } - public static final class ApplyNeighborhoodMapFunction & Serializable, VV extends Serializable, EV extends Serializable> - implements MapFunction, Vertex> ,Tuple2> { - - @Override - public Tuple2 map(Tuple3, Vertex> edgesWithSrc) throws Exception { - return new Tuple2(edgesWithSrc.f0, edgesWithSrc.f2.getValue()); - } - } - /** * Compute an aggregate over the edges of each vertex. The function applied * on the edges only has access to the vertex id (not the vertex value). @@ -1471,32 +1505,29 @@ public Tuple2 map(Tuple3, Vertex> edgesWithSrc) thr * the function to apply to the neighborhood * @param direction * the edge direction (in-, out-, all-) - * @return a dataset containing one value per vertex + * @return a dataset containing one value per vertex(vertex key, edge value) * @throws IllegalArgumentException */ - public DataSet reduceOnEdges(ReduceEdgesFunction reduceEdgesFunction, + public DataSet> reduceOnEdges(ReduceEdgesFunction reduceEdgesFunction, EdgeDirection direction) throws IllegalArgumentException { switch (direction) { case IN: - return edges.map(new ProjectVertexIdMap(1)) - .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) - .map(new ApplyEdgesMapFunction()); + return edges.map(new ProjectVertexWithEdgeValueMap(1)) + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)); case OUT: - return edges.map(new ProjectVertexIdMap(0)) - .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) - .map(new ApplyEdgesMapFunction()); + return edges.map(new ProjectVertexWithEdgeValueMap(0)) + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)); case ALL: - return edges.flatMap(new EmitOneEdgePerNode()) - .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)) - .map(new ApplyEdgesMapFunction()); + return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode()) + .groupBy(0).reduce(new ApplyReduceFunction(reduceEdgesFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } } private static final class ApplyReduceFunction & Serializable, EV extends Serializable> - implements ReduceFunction>> { + implements ReduceFunction> { private ReduceEdgesFunction function; @@ -1505,17 +1536,8 @@ public ApplyReduceFunction(ReduceEdgesFunction fun) { } @Override - public Tuple2> reduce(Tuple2> first, Tuple2> second) throws Exception { + public Tuple2 reduce(Tuple2 first, Tuple2 second) throws Exception { return function.reduceEdges(first, second); } } - - public static final class ApplyEdgesMapFunction & Serializable, VV extends Serializable, EV extends Serializable> - implements MapFunction> ,Tuple2> { - - @Override - public Tuple2 map(Tuple2> edge) throws Exception { - return new Tuple2(edge.f0, edge.f1.getValue()); - } - } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java index 53c7934e520b6..a411aec74a8cb 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java @@ -33,5 +33,5 @@ public interface ReduceEdgesFunction & Serializable, EV extends Serializable> extends Function, Serializable { - Tuple2> reduceEdges(Tuple2> firstEdge, Tuple2> secondEdge); + Tuple2 reduceEdges(Tuple2 firstEdge, Tuple2 secondEdge); } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java index f5e978f093a04..8115a5d4462af 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java @@ -19,7 +19,7 @@ package org.apache.flink.graph; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple2; import java.io.Serializable; @@ -32,9 +32,8 @@ * @param the vertex value type * @param the edge value type */ -public interface ReduceNeighborsFunction & Serializable, VV extends Serializable, - EV extends Serializable> extends Function, Serializable { +public interface ReduceNeighborsFunction & Serializable, VV extends Serializable> extends Function, Serializable { - Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor); + Tuple2 reduceNeighbors(Tuple2 firstNeighbor, + Tuple2 secondNeighbor); } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index 3ace49a3826bf..8a0e258826327 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -440,10 +440,10 @@ public void iterateEdges(Vertex v, private static final class SelectMinWeightNeighborNoValue implements ReduceEdgesFunction { @Override - public Tuple2> reduceEdges(Tuple2> firstEdge, - Tuple2> secondEdge) { + public Tuple2 reduceEdges(Tuple2 firstEdge, + Tuple2 secondEdge) { - if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) { + if(firstEdge.f1 < secondEdge.f1) { return firstEdge; } else { return secondEdge; @@ -456,9 +456,9 @@ public Tuple2> reduceEdges(Tuple2> private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction { @Override - public Tuple2> reduceEdges(Tuple2> firstEdge, - Tuple2> secondEdge) { - if(firstEdge.f1.getValue() > secondEdge.f1.getValue()) { + public Tuple2 reduceEdges(Tuple2 firstEdge, + Tuple2 secondEdge) { + if(firstEdge.f1 > secondEdge.f1) { return firstEdge; } else { return secondEdge; @@ -474,15 +474,15 @@ public void iterateEdges(Vertex v, Iterable> edges, Collector> out) throws Exception { long weight = Long.MAX_VALUE; - long minNeighorId = 0; + long minNeighborId = 0; for (Edge edge: edges) { if (edge.getValue() < weight) { weight = edge.getValue(); - minNeighorId = edge.getSource(); + minNeighborId = edge.getSource(); } } - out.collect(new Tuple2(v.getId(), minNeighorId)); + out.collect(new Tuple2(v.getId(), minNeighborId)); } } @@ -490,9 +490,9 @@ public void iterateEdges(Vertex v, private static final class SelectMinWeightInNeighborNoValue implements ReduceEdgesFunction { @Override - public Tuple2> reduceEdges(Tuple2> firstEdge, - Tuple2> secondEdge) { - if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) { + public Tuple2 reduceEdges(Tuple2 firstEdge, + Tuple2 secondEdge) { + if(firstEdge.f1 < secondEdge.f1) { return firstEdge; } else { return secondEdge; diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index 5f235699e5ff0..62d6dc9fb864e 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -523,14 +523,13 @@ public void iterateNeighbors(Vertex vertex, } @SuppressWarnings("serial") - private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction { + private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction { @Override - public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor) { - long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue(); - return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, - new Vertex(firstNeighbor.f0, sum)); + public Tuple2 reduceNeighbors(Tuple2 firstNeighbor, + Tuple2 secondNeighbor) { + long sum = firstNeighbor.f1 + secondNeighbor.f1; + return new Tuple2(firstNeighbor.f0, sum); } } @@ -554,14 +553,13 @@ public void iterateNeighbors(Iterable, Vertex { + private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction { @Override - public Tuple3, Vertex> reduceNeighbors(Tuple3, Vertex> firstNeighbor, - Tuple3, Vertex> secondNeighbor) { - long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue(); - return new Tuple3, Vertex>(firstNeighbor.f0, firstNeighbor.f1, - new Vertex(firstNeighbor.f0, sum)); + public Tuple2 reduceNeighbors(Tuple2 firstNeighbor, + Tuple2 secondNeighbor) { + long sum = firstNeighbor.f1 + secondNeighbor.f1; + return new Tuple2(firstNeighbor.f0, sum); } }