From c9976e046649474ada2af58bca881686e81154a5 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 28 Apr 2016 11:12:43 -0400 Subject: [PATCH 1/2] [FLINK-3846] [gelly] Graph.removeEdges also removes duplicate edges All original edges are now emitted when no matching edge is found in the edge removal set. --- .../src/main/java/org/apache/flink/graph/Graph.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index d46056a520b92..12ac099f341c1 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1459,14 +1459,8 @@ private static final class EdgeRemovalCoGroup implements CoGroupFunction> edge, Iterable> edgeToBeRemoved, Collector> out) throws Exception { - - final Iterator> edgeIterator = edge.iterator(); - final Iterator> edgeToBeRemovedIterator = edgeToBeRemoved.iterator(); - Edge next; - - if (edgeIterator.hasNext()) { - if (!edgeToBeRemovedIterator.hasNext()) { - next = edgeIterator.next(); + if (!edgeToBeRemoved.iterator().hasNext()) { + for (Edge next : edge) { out.collect(next); } } From 4a91b70a289474b9906dd2f919222ae6c90d7b9b Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 29 Apr 2016 12:58:17 -0400 Subject: [PATCH 2/2] Updated tests --- .../operations/GraphMutationsITCase.scala | 20 ++++++++++--------- .../java/org/apache/flink/graph/Graph.java | 4 ++-- .../test/operations/GraphMutationsITCase.java | 9 +++++++++ 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala index f6acdc1576c04..6375ecd7bd4b9 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala @@ -211,11 +211,13 @@ MultipleProgramsTestBase(mode) { @throws(classOf[Exception]) def testRemoveEdge() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + var graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L)) - val res = newgraph.getEdges.collect().toList - expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n" + graph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L, 2L), 12L) + graph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L)) + val res = graph.getEdges.collect().toList + expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + + "4,5,45\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } @@ -236,12 +238,12 @@ MultipleProgramsTestBase(mode) { @throws(classOf[Exception]) def testRemoveEdges() { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + var graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val newgraph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L), - new Edge(4L, 5L, 45L))) - val res = newgraph.getEdges.collect().toList - expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n" + graph = graph.addEdge(new Vertex[Long, Long](3L, 3L), new Vertex[Long, Long](4L, 4L), 34L) + graph = graph.removeEdges(List[Edge[Long, Long]](new Edge(1L, 2L, 12L), new Edge(4L, 5L, 45L))) + val res = graph.getEdges.collect().toList + expectedResult = "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,4,34\n" + "3,5,35\n" + "5,1,51\n" TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 12ac099f341c1..aabc466abd540 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1421,8 +1421,8 @@ public Edge join(Vertex vertex, Edge edge) throws Exception * the removed edges */ public Graph removeEdge(Edge edge) { - DataSet> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter(edge)); - return new Graph(this.vertices, newEdges, this.context); + DataSet> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<>(edge)); + return new Graph<>(this.vertices, newEdges, this.context); } private static final class EdgeRemovalEdgeFilter diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index 704a913d2fab3..8770274aad19a 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -471,12 +471,17 @@ public void testRemoveEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); + + // duplicate edge should be preserved in output + graph = graph.addEdge(new Vertex<>(1L, 1L), new Vertex<>(2L, 2L), 12L); + graph = graph.removeEdge(new Edge<>(5L, 1L, 51L)); DataSet> data = graph.getEdges(); List> result= data.collect(); expectedResult = "1,2,12\n" + + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + @@ -500,12 +505,16 @@ public void testRemoveEdges() throws Exception { edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L)); edgesToBeRemoved.add(new Edge<>(2L, 3L, 23L)); + // duplicate edge should be preserved in output + graph = graph.addEdge(new Vertex<>(1L, 1L), new Vertex<>(2L, 2L), 12L); + graph = graph.removeEdges(edgesToBeRemoved); DataSet> data = graph.getEdges(); List> result= data.collect(); expectedResult = "1,2,12\n" + + "1,2,12\n" + "1,3,13\n" + "3,4,34\n" + "3,5,35\n" +