From 9ad04a672b1b277d1853e94f7dd792ef8448c19d Mon Sep 17 00:00:00 2001 From: Martin Junghanns Date: Sun, 13 Mar 2016 18:01:52 +0100 Subject: [PATCH] [FLINK-3272] [Gelly] Generic value in Connected Components * Updated scather-gather and GSA implementation * tests * updated documentation --- docs/apis/batch/libs/gelly.md | 27 +++-- .../graph/examples/ConnectedComponents.java | 4 +- .../scala/examples/ConnectedComponents.scala | 2 +- .../graph/test/GatherSumApplyITCase.java | 2 +- .../graph/library/ConnectedComponents.java | 75 +++++++++----- .../graph/library/GSAConnectedComponents.java | 98 +++++++++++++++---- ...edComponentsWithRandomisedEdgesITCase.java | 2 +- .../graph/spargel/SpargelCompilerTest.java | 11 ++- 8 files changed, 160 insertions(+), 61 deletions(-) diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 8d17de76f8042..b22e362d37e3d 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1595,14 +1595,15 @@ The constructor takes one parameter: ### Connected Components #### Overview -This is an implementation of the Weakly Connected Components algorithm. Upon convergence, two vertices belong to the same component, if there is a path from one to the other, -without taking edge direction into account. +This is an implementation of the Weakly Connected Components algorithm. Upon convergence, two vertices belong to the +same component, if there is a path from one to the other, without taking edge direction into account. #### Details The algorithm is implemented using [scatter-gather iterations](#scatter-gather-iterations). -This implementation assumes that the vertex values of the input Graph are initialized with Long component IDs. -The vertices propagate their current component ID in iterations. Upon receiving component IDs from its neighbors, a vertex adopts a new component ID if its value -is lower than its current component ID. The algorithm converges when vertices no longer update their component ID value or when the maximum number of iterations has been reached. +This implementation uses a comparable vertex value as initial component identifier (ID). Vertices propagate their +current value in each iteration. Upon receiving component IDs from its neighbors, a vertex adopts a new component ID if +its value is lower than its current component ID. The algorithm converges when vertices no longer update their component +ID value or when the maximum number of iterations has been reached. #### Usage The result is a `DataSet` of vertices, where the vertex value corresponds to the assigned component. @@ -1612,9 +1613,23 @@ The constructor takes one parameter: ### GSA Connected Components +#### Overview +This is an implementation of the Weakly Connected Components algorithm. Upon convergence, two vertices belong to the +same component, if there is a path from one to the other, without taking edge direction into account. + +#### Details The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations). +This implementation uses a comparable vertex value as initial component identifier (ID). In the gather phase, each +vertex collects the vertex value of their adjacent vertices. In the sum phase, the minimum among those values is +selected. In the apply phase, the algorithm sets the minimum value as the new vertex value if it is smaller than +the current value. The algorithm converges when vertices no longer update their component ID value or when the +maximum number of iterations has been reached. + +#### Usage +The result is a `DataSet` of vertices, where the vertex value corresponds to the assigned component. +The constructor takes one parameter: -See the [Connected Components](#connected-components) library method for implementation details and usage information. +* `maxIterations`: the maximum number of iterations to run. ### PageRank diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java index 93c801ff91423..835703b8d4854 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java @@ -69,7 +69,7 @@ public Long map(Long value) throws Exception { }, env); DataSet> verticesWithMinIds = graph - .run(new GSAConnectedComponents(maxIterations)); + .run(new GSAConnectedComponents(maxIterations)); // emit result if (fileOutput) { @@ -131,7 +131,7 @@ private static DataSet> getEdgesDataSet(ExecutionEnvironme .map(new MapFunction, Edge>() { @Override public Edge map(Tuple2 value) throws Exception { - return new Edge(value.f0, value.f1, NullValue.getInstance()); + return new Edge<>(value.f0, value.f1, NullValue.getInstance()); } }); } else { diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala index 704d4765e955d..b49a520135f47 100644 --- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala +++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala @@ -57,7 +57,7 @@ object ConnectedComponents { val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env) val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env) - val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations)) + val components = graph.run(new GSAConnectedComponents[Long, Long, NullValue](maxIterations)) // emit result diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java index 7a3d550df7f97..56d57f0263e8d 100755 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java @@ -56,7 +56,7 @@ public void testConnectedComponents() throws Exception { new InitMapperCC(), env); List> result = inputGraph.run( - new GSAConnectedComponents(16)).collect(); + new GSAConnectedComponents(16)).collect(); expectedResult = "1,1\n" + "2,1\n" + diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java index 12047e7dc835d..efc32c120b465 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java @@ -18,7 +18,10 @@ package org.apache.flink.graph.library; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; @@ -31,20 +34,22 @@ /** * A scatter-gather implementation of the Weakly Connected Components algorithm. * - * This implementation assumes that the vertex values of the input Graph are initialized with Long component IDs. - * The vertices propagate their current component ID in iterations. - * Upon receiving component IDs from its neighbors, a vertex adopts a new component ID if its value - * is lower than its current component ID. + * This implementation uses a comparable vertex value as initial component + * identifier (ID). Vertices propagate their current value in each iteration. + * Upon receiving component IDs from its neighbors, a vertex adopts a new + * component ID if its value is lower than its current component ID. * - * The algorithm converges when vertices no longer update their component ID value - * or when the maximum number of iterations has been reached. + * The algorithm converges when vertices no longer update their component ID + * value or when the maximum number of iterations has been reached. * - * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID. + * The result is a DataSet of vertices, where the vertex value corresponds to + * the assigned component ID. * * @see GSAConnectedComponents */ @SuppressWarnings("serial") -public class ConnectedComponents implements GraphAlgorithm>> { +public class ConnectedComponents, EV> + implements GraphAlgorithm>> { private Integer maxIterations; @@ -61,46 +66,66 @@ public ConnectedComponents(Integer maxIterations) { } @Override - public DataSet> run(Graph graph) throws Exception { + public DataSet> run(Graph graph) throws Exception { - Graph undirectedGraph = graph.mapEdges(new NullValueEdgeMapper()) - .getUndirected(); + // get type information for vertex value + TypeInformation valueTypeInfo = ((TupleTypeInfo) graph.getVertices().getType()).getTypeAt(1); + + Graph undirectedGraph = graph + .mapEdges(new NullValueEdgeMapper()) + .getUndirected(); - // initialize vertex values and run the Scatter-Gather Iteration return undirectedGraph.runScatterGatherIteration( - new CCUpdater(), new CCMessenger(), maxIterations) - .getVertices(); + new CCUpdater(), + new CCMessenger(valueTypeInfo), + maxIterations).getVertices(); } /** - * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages. + * Updates the value of a vertex by picking the minimum neighbor value out of all the incoming messages. */ - public static final class CCUpdater extends VertexUpdateFunction { + public static final class CCUpdater> + extends VertexUpdateFunction { @Override - public void updateVertex(Vertex vertex, MessageIterator messages) throws Exception { - long min = Long.MAX_VALUE; + public void updateVertex(Vertex vertex, MessageIterator messages) throws Exception { + VV current = vertex.getValue(); + VV min = current; - for (long msg : messages) { - min = Math.min(min, msg); + for (VV msg : messages) { + if (msg.compareTo(min) < 0) { + min = msg; + } } - // update vertex value, if new minimum - if (min < vertex.getValue()) { + if (!min.equals(current)) { setNewVertexValue(min); } } } /** - * Distributes the minimum ID associated with a given vertex among all the target vertices. + * Sends the current vertex value to all adjacent vertices. */ - public static final class CCMessenger extends MessagingFunction { + public static final class CCMessenger> + extends MessagingFunction + implements ResultTypeQueryable { + + private final TypeInformation typeInformation; + + public CCMessenger(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } @Override - public void sendMessages(Vertex vertex) throws Exception { + public void sendMessages(Vertex vertex) throws Exception { // send current minimum to neighbors sendMessageToAllNeighbors(vertex.getValue()); } + + @Override + public TypeInformation getProducedType() { + return typeInformation; + } } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java index 0354da4055f91..a12cb20f758c6 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java @@ -18,7 +18,10 @@ package org.apache.flink.graph.library; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; @@ -30,13 +33,24 @@ import org.apache.flink.types.NullValue; /** - * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration. - * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs. - * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID. + * A gather-sum-apply implementation of the Weakly Connected Components algorithm. + * + * This implementation uses a comparable vertex value as initial component + * identifier (ID). In the gather phase, each vertex collects the vertex value + * of their adjacent vertices. In the sum phase, the minimum among those values + * is selected. In the apply phase, the algorithm sets the minimum value as the + * new vertex value if it is smaller than the current value. + * + * The algorithm converges when vertices no longer update their component ID + * value or when the maximum number of iterations has been reached. + * + * The result is a DataSet of vertices, where the vertex value corresponds to + * the assigned component ID. * * @see ConnectedComponents */ -public class GSAConnectedComponents implements GraphAlgorithm>> { +public class GSAConnectedComponents, EV> + implements GraphAlgorithm>> { private Integer maxIterations; @@ -53,15 +67,20 @@ public GSAConnectedComponents(Integer maxIterations) { } @Override - public DataSet> run(Graph graph) throws Exception { + public DataSet> run(Graph graph) throws Exception { - Graph undirectedGraph = graph.mapEdges(new NullValueEdgeMapper()) - .getUndirected(); + // get type information for vertex value + TypeInformation valueTypeInfo = ((TupleTypeInfo) graph.getVertices().getType()).getTypeAt(1); + + Graph undirectedGraph = graph + .mapEdges(new NullValueEdgeMapper()) + .getUndirected(); - // initialize vertex values and run the Vertex Centric Iteration return undirectedGraph.runGatherSumApplyIteration( - new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId(), - maxIterations).getVertices(); + new GatherNeighborIds<>(valueTypeInfo), + new SelectMinId<>(valueTypeInfo), + new UpdateComponentId(valueTypeInfo), + maxIterations).getVertices(); } // -------------------------------------------------------------------------------------------- @@ -69,28 +88,67 @@ public DataSet> run(Graph graph) throws Exception { // -------------------------------------------------------------------------------------------- @SuppressWarnings("serial") - private static final class GatherNeighborIds extends GatherFunction { + private static final class GatherNeighborIds> + extends GatherFunction + implements ResultTypeQueryable { + + private final TypeInformation typeInformation; - public Long gather(Neighbor neighbor) { + private GatherNeighborIds(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public VV gather(Neighbor neighbor) { return neighbor.getNeighborValue(); } - }; + + @Override + public TypeInformation getProducedType() { + return typeInformation; + } + } @SuppressWarnings("serial") - private static final class SelectMinId extends SumFunction { + private static final class SelectMinId> + extends SumFunction + implements ResultTypeQueryable { + + private final TypeInformation typeInformation; - public Long sum(Long newValue, Long currentValue) { - return Math.min(newValue, currentValue); + private SelectMinId(TypeInformation typeInformation) { + this.typeInformation = typeInformation; } - }; + + public VV sum(VV newValue, VV currentValue) { + return newValue.compareTo(currentValue) < 0 ? newValue : currentValue; + } + + @Override + public TypeInformation getProducedType() { + return typeInformation; + } + } @SuppressWarnings("serial") - private static final class UpdateComponentId extends ApplyFunction { + private static final class UpdateComponentId> + extends ApplyFunction + implements ResultTypeQueryable { - public void apply(Long summedValue, Long origValue) { - if (summedValue < origValue) { + private final TypeInformation typeInformation; + + private UpdateComponentId(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public void apply(VV summedValue, VV origValue) { + if (summedValue.compareTo(origValue) < 0) { setResult(summedValue); } } + + @Override + public TypeInformation getProducedType() { + return typeInformation; + } } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java index 2f619a67ccac7..b602fe6fbd842 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java @@ -59,7 +59,7 @@ protected void testProgram() throws Exception { Graph graph = Graph.fromDataSet(initialVertices, edges, env); - DataSet> result = graph.run(new ConnectedComponents(100)); + DataSet> result = graph.run(new ConnectedComponents(100)); result.writeAsCsv(resultPath, "\n", " "); env.execute(); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java index f4f9859b8c01d..3a750af57c755 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.util.CompilerTestBase; @@ -72,10 +73,10 @@ public Edge map(Tuple2 edge) { }); Graph graph = Graph.fromDataSet(initialVertices, edges, env); - + DataSet> result = graph.runScatterGatherIteration( - new ConnectedComponents.CCUpdater(), - new ConnectedComponents.CCMessenger(), 100) + new ConnectedComponents.CCUpdater(), + new ConnectedComponents.CCMessenger(BasicTypeInfo.LONG_TYPE_INFO), 100) .getVertices(); result.output(new DiscardingOutputFormat>()); @@ -160,8 +161,8 @@ public Edge map(Tuple2 edge) { parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar); DataSet> result = graph.runScatterGatherIteration( - new ConnectedComponents.CCUpdater(), - new ConnectedComponents.CCMessenger(), 100) + new ConnectedComponents.CCUpdater(), + new ConnectedComponents.CCMessenger(BasicTypeInfo.LONG_TYPE_INFO), 100) .getVertices(); result.output(new DiscardingOutputFormat>());