From ac7dde016e4284a6deb3bfe4529e27968899f825 Mon Sep 17 00:00:00 2001 From: Martin Junghanns Date: Thu, 29 Oct 2015 14:42:37 +0100 Subject: [PATCH 1/2] [FLINK-2905] [gelly] Add Graph Intersection method * added two strategies for edge intersection * added Scala methods * added tests * updated docs --- docs/libs/gelly_guide.md | 6 +- .../org/apache/flink/graph/scala/Graph.scala | 40 +++++++ .../java/org/apache/flink/graph/Graph.java | 111 +++++++++++++++++- .../operations/GraphOperationsITCase.java | 85 +++++++++++++- 4 files changed, 236 insertions(+), 6 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 59e1a3b74a4d8..fd117cf29823a 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -485,13 +485,15 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Do * Undirected: In Gelly, a `Graph` is always directed. Undirected graphs can be represented by adding all opposite-direction edges to a graph. For this purpose, Gelly provides the `getUndirected()` method. -* Union: Gelly's `union()` method performs a union operation on the vertex and edge sets of the specified graph and current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained. +* Union: Gelly's `union()` method performs an union operation on the vertex and edge sets of the specified graph and the current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained.

Union Transformation

-* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and specified graph. +* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and the specified graph. + +* Intersect: Gelly's `intersect()` method performs an intersect on the edge sets of the current graph and the specified graph. Edges are considered equal, if they have the same source identifier, target identifier and edge value. Vertex values are replaced with `NullValue` and, depending on a parameter, matching edges are either contained once in the resulting `Graph` or as often as there are matching edge pairs between the input graphs. -[Back to top](#top) diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index e51453eacee10..acf32beeb856d 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -967,6 +967,46 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { wrapGraph(jgraph.difference(graph.getWrappedGraph)) } + /** + * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they + * have the same source identifier, target identifier and edge value. + *

+ * The algorithm computes pairs of matching edges from the input graphs. If the same edge occurs + * multiple times in the input graphs, there will be multiple edge pairs to be considered. The + * output graph will contain exactly one edge representing all matching pairs. + *

+ * Vertices in the output graph will have no vertex values. + * + * @param graph the graph to perform intersect with + * @return a new graph which contains common vertices and edges from the input graphs + */ + def intersect(graph: Graph[K, VV, EV]): Graph[K, NullValue, EV] = { + wrapGraph(jgraph.intersect(graph.getWrappedGraph)) + } + + /** + * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they + * have the same source identifier, target identifier and edge value. + *

+ * The algorithm computes pairs of matching edges from the input graphs. If the same edge occurs + * multiple times in the input graphs, there will be multiple edge pairs to be considered. Each + * edge instance can only be part of one matching pair. If the given parameter + * {@code distinctEdges} is set to {@code true}, there will be exactly one edge in the output + * graph representing all matching pairs. If the parameter is set to {@code false}, both edges of + * each matching pair will be in the output. + *

+ * Vertices in the output graph will have no vertex values. + * + * @param graph the graph to perform intersect with + * @param distinctEdges if set to {@code true}, there will be exactly one edge in the output graph + * representing all matching pairs, otherwise, for each matching pair, both + * edges will be in the output graph + * @return a new graph which contains only common vertices and edges from the input graphs + */ + def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV] = { + wrapGraph(jgraph.intersect(graph.getWrappedGraph, distinctEdges)) + } + /** * Compute a reduce transformation over the neighbors' vertex values of each vertex. * For each vertex, the transformation consecutively calls a diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 4f603f7bb70eb..1175fb100e141 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -60,8 +60,8 @@ import org.apache.flink.graph.utils.Tuple3ToEdgeMap; import org.apache.flink.graph.utils.VertexToTuple2Map; import org.apache.flink.graph.validation.GraphValidator; -import org.apache.flink.util.Collector; import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; /** * Represents a Graph consisting of {@link Edge edges} and {@link Vertex @@ -174,7 +174,7 @@ public static Graph fromDataSet( } private static final class EmitSrcAndTarget implements FlatMapFunction< - Edge, Vertex> { + Edge, Vertex> { public void flatMap(Edge edge, Collector> out) { out.collect(new Vertex(edge.f0, NullValue.getInstance())); @@ -1477,7 +1477,6 @@ public void coGroup(Iterable> edge, Iterable> edgeToBeRe * @return a new graph */ public Graph union(Graph graph) { - DataSet> unionedVertices = graph.getVertices().union(this.getVertices()).distinct(); DataSet> unionedEdges = graph.getEdges().union(this.getEdges()); return new Graph(unionedVertices, unionedEdges, this.context); @@ -1496,6 +1495,112 @@ public Graph difference(Graph graph) { return this.removeVertices(removeVerticesData); } + /** + * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they + * have the same source identifier, target identifier and edge value. + *

+ * The algorithm computes pairs of matching edges from the input graphs. The output graph will + * contain exactly one edge representing all matching pairs, independent from the number of + * matching pairs. + *

+ * Vertices in the output graph will have no vertex values. + * + * @param graph the graph to perform intersect with + * @return a new graph which contains common vertices and edges from the input graphs + */ + public Graph intersect(Graph graph) { + return intersect(graph, true); + } + + /** + * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they + * have the same source identifier, target identifier and edge value. + *

+ * The algorithm computes pairs of matching edges from the input graphs. If the same edge occurs + * multiple times in the input graphs, there will be multiple edge pairs to be considered. Each + * edge instance can only be part of one matching pair. If the given parameter + * {@code distinctEdges} is set to {@code true}, there will be exactly one edge in the output + * graph representing all matching pairs. If the parameter is set to {@code false}, both edges of + * each matching pair will be in the output. + *

+ * Vertices in the output graph will have no vertex values. + * + * @param graph the graph to perform intersect with + * @param distinctEdges if set to {@code true}, there will be exactly one edge in the output graph + * representing all matching pairs, otherwise, for each matching pair, both + * edges will be in the output graph + * @return a new graph which contains only common vertices and edges from the input graphs + */ + public Graph intersect(Graph graph, boolean distinctEdges) { + DataSet> intersectEdges; + if (distinctEdges) { + intersectEdges = getDistinctEdgeIntersection(graph.getEdges()); + } else { + intersectEdges = getPairwiseEdgeIntersection(graph.getEdges()); + } + + return Graph.fromDataSet(intersectEdges, getContext()); + } + + /** + * Computes the intersection between the edge set and the given edge set. For all matching pairs, + * only one edge will be in the resulting data set. + * + * @param edges edges to compute intersection with + * @return edge set containing one edge for all matching pairs of the same edge + */ + private DataSet> getDistinctEdgeIntersection(DataSet> edges) { + return this.getEdges() + .join(edges) + .where(0, 1, 2) + .equalTo(0, 1, 2) + .with(new JoinFunction, Edge, Edge>() { + @Override + public Edge join(Edge first, Edge second) throws Exception { + return first; + } + }).withForwardedFieldsFirst("*") + .distinct(); + } + + /** + * Computes the intersection between the edge set and the given edge set. For all matching pairs, both edges will be + * in the resulting data set. + * + * @param edges edges to compute intersection with + * @return edge set containing both edges from all matching pairs of the same edge + */ + private DataSet> getPairwiseEdgeIntersection(DataSet> edges) { + return this.getEdges() + .coGroup(edges) + .where(0, 1, 2) + .equalTo(0, 1, 2) + .with(new MatchingEdgeReducer()); + } + + /** + * As long as both input iterables have more edges, the reducer outputs each edge of a pair. + * + * @param vertex identifier type + * @param edge value type + */ + private static final class MatchingEdgeReducer + implements CoGroupFunction, Edge, Edge> { + + @Override + public void coGroup(Iterable> edgesLeft, Iterable> edgesRight, Collector> out) + throws Exception { + Iterator> leftIt = edgesLeft.iterator(); + Iterator> rightIt = edgesRight.iterator(); + + // collect pairs once + while(leftIt.hasNext() && rightIt.hasNext()) { + out.collect(leftIt.next()); + out.collect(rightIt.next()); + } + } + } + /** * Runs a Vertex-Centric iteration on the graph. * No configuration options are provided. diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java index ffc9da94d2885..e6c925c0a940f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java @@ -21,9 +21,11 @@ import java.util.ArrayList; import java.util.List; +import com.google.common.collect.Lists; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -31,6 +33,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -302,7 +305,6 @@ public void testDifference() throws Exception { compareResultAsTuples(result, expectedResult); } - @Test public void testDifferenceVertices() throws Exception{ /*Test difference() method by checking the output for getVertices() on the resultant graph @@ -355,6 +357,87 @@ public void testDifference2() throws Exception { compareResultAsTuples(result, expectedResult); } + @Test + public final void testIntersect() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + List> edges1 = Lists.newArrayList( + new Edge<>(1L, 3L, 12L), + new Edge<>(1L, 3L, 13L), // needs to be in the output + new Edge<>(1L, 3L, 14L) + ); + + List> edges2 = Lists.newArrayList( + new Edge<>(1L, 3L, 13L) + ); + + Graph graph1 = Graph.fromCollection(edges1, env); + Graph graph2 = Graph.fromCollection(edges2, env); + + Graph intersect = graph1.intersect(graph2, true); + + List> vertices = Lists.newArrayList(); + List> edges = Lists.newArrayList(); + + intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices)); + intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges)); + + env.execute(); + + String expectedVertices = "1,(null)\n" + + "3,(null)\n"; + + String expectedEdges = "1,3,13\n"; + + compareResultAsTuples(vertices, expectedVertices); + compareResultAsTuples(edges, expectedEdges); + } + + @Test + public final void testIntersectWithPairs() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + List> edges1 = Lists.newArrayList( + new Edge<>(1L, 3L, 12L), + new Edge<>(1L, 3L, 13L), + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 14L) // output + ); + + List> edges2 = Lists.newArrayList( + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 13L), // output + new Edge<>(1L, 3L, 14L) // output + ); + + Graph graph1 = Graph.fromCollection(edges1, env); + Graph graph2 = Graph.fromCollection(edges2, env); + + Graph intersect = graph1.intersect(graph2, false); + + List> vertices = Lists.newArrayList(); + List> edges = Lists.newArrayList(); + + intersect.getVertices().output(new LocalCollectionOutputFormat<>(vertices)); + intersect.getEdges().output(new LocalCollectionOutputFormat<>(edges)); + + env.execute(); + + String expectedVertices = "1,(null)\n" + + "3,(null)\n"; + + String expectedEdges = "1,3,13\n" + + "1,3,13\n" + + "1,3,13\n" + + "1,3,13\n" + + "1,3,14\n" + + "1,3,14"; + + compareResultAsTuples(vertices, expectedVertices); + compareResultAsTuples(edges, expectedEdges); + } + @Test public void testTriplets() throws Exception { /* From d4d4726d9713061f9a5242ea65aa66e3d785ad73 Mon Sep 17 00:00:00 2001 From: Martin Junghanns Date: Thu, 5 Nov 2015 19:45:51 +0100 Subject: [PATCH 2/2] [FLINK-2905] [gelly] updated documentation * removed convenient method --- docs/libs/gelly_guide.md | 55 ++++++++++++++++++- .../org/apache/flink/graph/scala/Graph.scala | 33 +++-------- .../java/org/apache/flink/graph/Graph.java | 29 ++-------- 3 files changed, 67 insertions(+), 50 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index fd117cf29823a..db0fa4c9a7fcf 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -485,7 +485,7 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Do * Undirected: In Gelly, a `Graph` is always directed. Undirected graphs can be represented by adding all opposite-direction edges to a graph. For this purpose, Gelly provides the `getUndirected()` method. -* Union: Gelly's `union()` method performs an union operation on the vertex and edge sets of the specified graph and the current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained. +* Union: Gelly's `union()` method performs a union operation on the vertex and edge sets of the specified graph and the current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained.

Union Transformation @@ -493,7 +493,58 @@ val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Do * Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and the specified graph. -* Intersect: Gelly's `intersect()` method performs an intersect on the edge sets of the current graph and the specified graph. Edges are considered equal, if they have the same source identifier, target identifier and edge value. Vertex values are replaced with `NullValue` and, depending on a parameter, matching edges are either contained once in the resulting `Graph` or as often as there are matching edge pairs between the input graphs. +* Intersect: Gelly's `intersect()` method performs an intersect on the edge + sets of the current graph and the specified graph. The result is a new `Graph` that contains all + edges that exist in both input graphs. Two edges are considered equal, if they have the same source + identifier, target identifier and edge value. Vertices in the resulting graph have no + value, if values are required, one can for example retrieve them from one of the input graphs using + the `joinWithVertices()` method. + Depending on the parameter `distinct`, equal edges are either contained once in the resulting + `Graph` or as often as there are pairs of equal edges in the input graphs. + +

+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)} +List> edges1 = ... +Graph graph1 = Graph.fromCollection(edges1, env); + +// create second graph from edges {(1, 3, 13)} +List> edges2 = ... +Graph graph2 = Graph.fromCollection(edges2, env); + +// Using distinct = true results in {(1,3,13)} +Graph intersect1 = graph1.intersect(graph2, true); + +// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair +Graph intersect2 = graph1.intersect(graph2, false); + +{% endhighlight %} +
+ +
+{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment + +// create first graph from edges {(1, 3, 12) (1, 3, 13), (1, 3, 13)} +val edges1: List[Edge[Long, Long]] = ... +val graph1 = Graph.fromCollection(edges1, env) + +// create second graph from edges {(1, 3, 13)} +val edges2: List[Edge[Long, Long]] = ... +val graph2 = Graph.fromCollection(edges2, env) + + +// Using distinct = true results in {(1,3,13)} +val intersect1 = graph1.intersect(graph2, true) + +// Using distinct = false results in {(1,3,13),(1,3,13)} as there is one edge pair +val intersect2 = graph1.intersect(graph2, false) +{% endhighlight %} +
+
-[Back to top](#top) diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index acf32beeb856d..3a0843ac2a00d 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -971,36 +971,19 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they * have the same source identifier, target identifier and edge value. *

- * The algorithm computes pairs of matching edges from the input graphs. If the same edge occurs - * multiple times in the input graphs, there will be multiple edge pairs to be considered. The - * output graph will contain exactly one edge representing all matching pairs. - *

- * Vertices in the output graph will have no vertex values. - * - * @param graph the graph to perform intersect with - * @return a new graph which contains common vertices and edges from the input graphs - */ - def intersect(graph: Graph[K, VV, EV]): Graph[K, NullValue, EV] = { - wrapGraph(jgraph.intersect(graph.getWrappedGraph)) - } - - /** - * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they - * have the same source identifier, target identifier and edge value. - *

- * The algorithm computes pairs of matching edges from the input graphs. If the same edge occurs + * The method computes pairs of equal edges from the input graphs. If the same edge occurs * multiple times in the input graphs, there will be multiple edge pairs to be considered. Each - * edge instance can only be part of one matching pair. If the given parameter - * {@code distinctEdges} is set to {@code true}, there will be exactly one edge in the output - * graph representing all matching pairs. If the parameter is set to {@code false}, both edges of - * each matching pair will be in the output. + * edge instance can only be part of one pair. If the given parameter {@code distinctEdges} is set + * to {@code true}, there will be exactly one edge in the output graph representing all pairs of + * equal edges. If the parameter is set to {@code false}, both edges of each pair will be in the + * output. *

* Vertices in the output graph will have no vertex values. * * @param graph the graph to perform intersect with - * @param distinctEdges if set to {@code true}, there will be exactly one edge in the output graph - * representing all matching pairs, otherwise, for each matching pair, both - * edges will be in the output graph + * @param distinctEdges if set to { @code true}, there will be exactly one edge in the output + * graph representing all pairs of equal edges, otherwise, for each pair, + * both edges will be in the output graph * @return a new graph which contains only common vertices and edges from the input graphs */ def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV] = { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 1175fb100e141..cf16fd3945cda 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1499,35 +1499,18 @@ public Graph difference(Graph graph) { * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they * have the same source identifier, target identifier and edge value. *

- * The algorithm computes pairs of matching edges from the input graphs. The output graph will - * contain exactly one edge representing all matching pairs, independent from the number of - * matching pairs. - *

- * Vertices in the output graph will have no vertex values. - * - * @param graph the graph to perform intersect with - * @return a new graph which contains common vertices and edges from the input graphs - */ - public Graph intersect(Graph graph) { - return intersect(graph, true); - } - - /** - * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they - * have the same source identifier, target identifier and edge value. - *

- * The algorithm computes pairs of matching edges from the input graphs. If the same edge occurs + * The method computes pairs of equal edges from the input graphs. If the same edge occurs * multiple times in the input graphs, there will be multiple edge pairs to be considered. Each - * edge instance can only be part of one matching pair. If the given parameter - * {@code distinctEdges} is set to {@code true}, there will be exactly one edge in the output - * graph representing all matching pairs. If the parameter is set to {@code false}, both edges of - * each matching pair will be in the output. + * edge instance can only be part of one pair. If the given parameter {@code distinctEdges} is set + * to {@code true}, there will be exactly one edge in the output graph representing all pairs of + * equal edges. If the parameter is set to {@code false}, both edges of each pair will be in the + * output. *

* Vertices in the output graph will have no vertex values. * * @param graph the graph to perform intersect with * @param distinctEdges if set to {@code true}, there will be exactly one edge in the output graph - * representing all matching pairs, otherwise, for each matching pair, both + * representing all pairs of equal edges, otherwise, for each pair, both * edges will be in the output graph * @return a new graph which contains only common vertices and edges from the input graphs */