From ee9cc269118c04c9bb72b532745cd693043908ad Mon Sep 17 00:00:00 2001 From: andralungu Date: Fri, 15 May 2015 09:03:35 +0200 Subject: [PATCH 1/4] [FLINK-2012][gelly] Added methods to remove/add multiple edges/vertices --- docs/libs/gelly_guide.md | 15 + .../java/org/apache/flink/graph/Graph.java | 171 +++++++- .../test/operations/GraphMutationsITCase.java | 367 +++++++++++++++++- 3 files changed, 545 insertions(+), 8 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index fa49dcececcd6..26d99b3a52290 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -251,14 +251,29 @@ Gelly includes the following methods for adding and removing vertices and edges // adds a Vertex and the given edges to the Graph. If the Vertex already exists, it will not be added again, but the given edges will. Graph addVertex(final Vertex vertex, List> edges) +// adds a data set of vertices and a list of edges to the Graph. If the vertices already exist in the graph, they will not be added once more, however the edges will. +Graph addVertices(DataSet> verticesToAdd, List> edges) + // adds an Edge to the Graph. If the source and target vertices do not exist in the graph, they will also be added. Graph addEdge(Vertex source, Vertex target, EV edgeValue) +// adds a data set of edges to the Graph. If the vertices already exist in the graph, they will not be added, however the edges will. +Graph addEdges(DataSet> newEdges, DataSet> newVertices) + +// adds a data set of existing edges to the Graph +Graph addEdges(DataSet> newEdges) + // removes the given Vertex and its edges from the Graph. Graph removeVertex(Vertex vertex) +// removes the given data set of Vertices and their edges from the Graph +Graph removeVertices(DataSet> verticesToBeRemoved) + // removes *all* edges that match the given Edge from the Graph. Graph removeEdge(Edge edge) + +// removes *all* edges that match the edges in the given data set +Graph removeEdges(DataSet> edgesToBeRemoved) {% endhighlight %} Neighborhood Methods diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 6b632bc52541f..153acf2e6f92d 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; @@ -1009,22 +1010,38 @@ public Tuple2 map(Edge edge) throws Exception { * will. * * @param vertex the vertex to add to the graph - * @param edges a list of edges to add to the grap - * @return the new graph containing the existing and newly added vertices + * @param edges a list of edges to add to the graph + * @return the new graph containing the existing and newly added vertex * and edges */ @SuppressWarnings("unchecked") public Graph addVertex(final Vertex vertex, List> edges) { DataSet> newVertex = this.context.fromElements(vertex); - // Take care of empty edge set + return addVertices(newVertex, edges); + } + + /** + * Adds the data set of vertices and the edges, passed as input, to the graph. + * If the vertices already exist in the graph, they will not be added once more, + * however the edges will. + * + * @param verticesToAdd the data set of vertices to add + * @param edges a list of edges to add to the graph + * @return the new graph containing the existing and newly added vertices + * and edges + */ + @SuppressWarnings("unchecked") + public Graph addVertices(DataSet> verticesToAdd, List> edges) { + + // Consider empty edge set if (edges.isEmpty()) { - return new Graph(this.vertices.union(newVertex) + return new Graph(this.vertices.union(verticesToAdd) .distinct(), this.edges, this.context); } // Add the vertex and its edges - DataSet> newVertices = this.vertices.union(newVertex).distinct(); + DataSet> newVertices = this.vertices.union(verticesToAdd).distinct(); DataSet> newEdges = this.edges.union(context.fromCollection(edges)); return new Graph(newVertices, newEdges, this.context); @@ -1048,6 +1065,34 @@ public Graph addEdge(Vertex source, Vertex target, EV e return this.union(partialGraph); } + /** + * Adds the given data set of edges to the graph. if the vertices do not already exist in the + * graph, they will also be added. + * + * If the vertex values are not required during the computation, it is recommended to use + * addEdges(edges) + * + * @param newEdges the data set of edges to be added + * @param newVertices their corresponding vertices and vertexValues + * @return a new graph containing the existing vertices and edges plus the newly added edges and vertices. + */ + @SuppressWarnings("unchecked") + public Graph addEdges(DataSet> newEdges, DataSet> newVertices) { + Graph partialGraph = fromDataSet(newVertices, newEdges, context); + return this.union(partialGraph); + } + + /** + * Adds the given data set of edges to the graph provided that the source and target values already exist. + * + * @param newEdges the data set of edges to be added + * @return a new graph containing the existing vertices and edges plus the newly added edges. + */ + @SuppressWarnings("unchecked") + public Graph addEdges(DataSet> newEdges) { + return new Graph(vertices, edges.union(newEdges), context); + } + /** * Removes the given vertex and its edges from the graph. * @@ -1100,6 +1145,72 @@ public boolean filter(Edge edge) throws Exception { } /** + * Removes the given data set of vertices and its edges from the graph. + * + * @param verticesToBeRemoved the data set of vertices to be removed + * @return the resulted graph containing the initial vertices and edges minus the vertices + * and edges removed. + */ + public Graph removeVertices(DataSet> verticesToBeRemoved) { + + // determine whether the vertex existed in the initial graph + DataSet>> flaggedVertices = getVertices() + .map(new MapFunction, Vertex>>() { + + @Override + public Vertex> map(Vertex vertex) throws Exception { + return new Vertex>(vertex.getId(), new Tuple2(vertex.getValue(), + true)); + } + }).withForwardedFields("f0"); + DataSet>> flaggedVerticesToBeRemoved = verticesToBeRemoved + .map(new MapFunction, Vertex>>() { + + @Override + public Vertex> map(Vertex vertex) throws Exception { + return new Vertex>(vertex.getId(), new Tuple2(vertex.getValue(), + false)); + } + }).withForwardedFields("f0"); + + DataSet> newVertices = flaggedVertices.union(flaggedVerticesToBeRemoved) + .groupBy(0).reduceGroup(new VerticesRemovalGroupReduce()); + DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0) + // if the edge source was removed, the edge will also be removed + .with(new ProjectEdgeToBeRemoved()) + // if the edge target was removed, the edge will also be removed + .join(newVertices).where(1).equalTo(0) + .with(new ProjectEdge()); + + return new Graph(newVertices, newEdges, context); + } + + private static final class VerticesRemovalGroupReduce implements GroupReduceFunction>, Vertex> { + + @Override + public void reduce(Iterable>> vertices, + Collector> out) throws Exception { + + Iterator>> vertexIterator = vertices.iterator(); + + if(vertexIterator.hasNext()) { + Vertex> vertex = vertexIterator.next(); + if(!vertexIterator.hasNext() && vertex.getValue().f1) { + out.collect(new Vertex(vertex.getId(), vertex.getValue().f0)); + } + } + } + } + + @ForwardedFieldsSecond("f0; f1; f2") + private static final class ProjectEdgeToBeRemoved implements JoinFunction, Edge, Edge> { + @Override + public Edge join(Vertex vertex, Edge edge) throws Exception { + return edge; + } + } + + /** * Removes all edges that match the given edge from the graph. * * @param edge the edge to remove @@ -1126,6 +1237,56 @@ public boolean filter(Edge edge) { } } + /** + * Removes all the edges that match the edges in the given data set from the graph. + * + * @param edgesToBeRemoved the data set of edges to be removed + * @return a new graph where the edges have been removed and in which the vertices remained intact + */ + public Graph removeEdges(DataSet> edgesToBeRemoved) { + + // determine whether the edge existed in the initial graph + DataSet>> flaggedEdges = getEdges() + .map(new MapFunction, Edge>>() { + + @Override + public Edge> map(Edge edge) throws Exception { + return new Edge>(edge.getSource(), edge.getTarget(), + new Tuple2(edge.getValue(), true)); + } + }).withForwardedFields("f0;f1"); + DataSet>> flaggedEdgesToBeRemoved = edgesToBeRemoved + .map(new MapFunction, Edge>>() { + + @Override + public Edge> map(Edge edge) throws Exception { + return new Edge>(edge.getSource(), edge.getTarget(), + new Tuple2(edge.getValue(), false)); + } + }).withForwardedFields("f0;f1"); + + DataSet> newEdges = flaggedEdges.union(flaggedEdgesToBeRemoved) + .groupBy(0,1).reduceGroup(new EdgeRemovalGroupReduce()); + + return new Graph(this.vertices, newEdges, context); + } + + private static final class EdgeRemovalGroupReduce implements GroupReduceFunction>, Edge> { + + @Override + public void reduce(Iterable>> edges, Collector> out) throws Exception { + + Iterator>> edgesIterator = edges.iterator(); + + if(edgesIterator.hasNext()) { + Edge> edge = edgesIterator.next(); + if(!edgesIterator.hasNext() && edge.getValue().f1) { + out.collect(new Edge(edge.getSource(), edge.getTarget(), edge.getValue().f0)); + } + } + } + } + /** * Performs union on the vertices and edges sets of the input graphs * removing duplicate vertices but maintaining duplicate edges. diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index dce34a80bcc43..cc9fcc877f78e 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -85,6 +87,39 @@ public void testAddVertex() throws Exception { "6,1,61\n"; } + @Test + public void testAddVertices() throws Exception { + /* + * Test addVertices() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(6L, 1L, 61L)); + edges.add(new Edge(7L, 1L, 71L)); + + DataSet> newVertices = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + graph = graph.addVertices(newVertices, edges); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n" + + "7,1,71\n"; + } + @Test public void testAddVertexExisting() throws Exception { /* @@ -111,6 +146,72 @@ public void testAddVertexExisting() throws Exception { "5,1,51\n"; } + @Test + public void testAddVerticesBothExisting() throws Exception { + /* + * Test addVertices() -- add two existing vertices + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(1L, 5L, 15L)); + edges.add(new Edge(3L, 1L, 31L)); + + DataSet> newVertices = env.fromElements(new Vertex(1L, 1L), + new Vertex(3L, 3L)); + graph = graph.addVertices(newVertices, edges); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "1,5,15\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "3,1,31\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testAddVerticesOneExisting() throws Exception { + /* + * Test addVertices() -- add an existing vertex + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(1L, 5L, 15L)); + edges.add(new Edge(6L, 1L, 61L)); + + DataSet> newVertices = env.fromElements(new Vertex(1L, 1L), + new Vertex(6L, 6L)); + graph = graph.addVertices(newVertices, edges); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "1,5,15\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } + @Test public void testAddVertexNoEdges() throws Exception { /* @@ -134,6 +235,34 @@ public void testAddVertexNoEdges() throws Exception { "6,6\n"; } + @Test + public void testAddVerticesNoEdges() throws Exception { + /* + * Test addVertices() -- add vertices with empty edge set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + List> edges = new ArrayList>(); + + DataSet> newVertices = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + graph = graph.addVertices(newVertices, edges); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n" + + "6,6\n" + + "7,7\n"; + } + @Test public void testRemoveVertex() throws Exception { /* @@ -154,6 +283,28 @@ public void testRemoveVertex() throws Exception { "3,4,34\n"; } + @Test + public void testRemoveVertices() throws Exception { + /* + * Test removeVertices() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(1L, 1L), + new Vertex(2L, 2L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + @Test public void testRemoveInvalidVertex() throws Exception { /* @@ -176,6 +327,79 @@ public void testRemoveInvalidVertex() throws Exception { "4,5,45\n" + "5,1,51\n"; } + + @Test + public void testRemoveOneValidOneInvalidVertex() throws Exception { + /* + * Test removeVertices() -- remove one invalid vertex and a valid one + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(1L, 1L), + new Vertex(7L, 7L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @Test + public void testRemoveBothInvalidVertices() throws Exception { + /* + * Test removeVertices() -- remove two invalid vertices + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testRemoveBothInvalidVerticesVertexResult() throws Exception { + /* + * Test removeVertices() -- remove two invalid vertices and verify the data set of vertices + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> verticesToBeRemoved = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + + graph = graph.removeVertices(verticesToBeRemoved); + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; + } @Test public void testAddEdge() throws Exception { @@ -201,7 +425,39 @@ public void testAddEdge() throws Exception { "5,1,51\n" + "6,1,61\n"; } - + + @Test + public void testAddEdges() throws Exception { + /* + * Test addEdges() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> edgesToBeAdded = env.fromElements(new Edge(6L, 1L, 61L), + new Edge(7L, 1L, 71L)); + DataSet> verticesToBeAdded = env.fromElements(new Vertex(6L, 6L), + new Vertex(7L, 7L)); + + graph = graph.addEdges(edgesToBeAdded, verticesToBeAdded); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n" + + "7,1,71\n"; + } + @Test public void testAddExistingEdge() throws Exception { /* @@ -226,9 +482,38 @@ public void testAddExistingEdge() throws Exception { "4,5,45\n" + "5,1,51\n"; } + + @Test + public void testAddExistingEdges() throws Exception { + /* + * Test addEdges() -- add already existing edges + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet> edgesToBeAdded = env.fromElements(new Edge(5L, 1L, 51L), + new Edge(2L, 3L, 23L)); + + graph = graph.addEdges(edgesToBeAdded); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "5,1,51\n"; + } @Test - public void testRemoveVEdge() throws Exception { + public void testRemoveEdge() throws Exception { /* * Test removeEdge() -- simple case */ @@ -248,7 +533,56 @@ public void testRemoveVEdge() throws Exception { "3,5,35\n" + "4,5,45\n"; } - + + @Test + public void testRemoveEdges() throws Exception { + /* + * Test removeEdges() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> edgesToBeRemoved = env.fromElements(new Edge(5L, 1L, 51L), + new Edge(2L, 3L, 23L)); + + graph = graph.removeEdges(edgesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @Test + public void testRemoveSameEdgeTwice() throws Exception { + /* + * Test removeEdges() -- try to remove the same edge twice + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> edgesToBeRemoved = env.fromElements(new Edge(5L, 1L, 51L), + new Edge(5L, 1L, 51L)); + + graph = graph.removeEdges(edgesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + @Test public void testRemoveInvalidEdge() throws Exception { /* @@ -271,4 +605,31 @@ public void testRemoveInvalidEdge() throws Exception { "4,5,45\n" + "5,1,51\n"; } + + //!!!!!BUG!!!! Daca ii dai invalid edge, ii face union... + @Test + public void testRemoveOneValidOneInvalidEdge() throws Exception { + /* + * Test removeEdges() -- one edge is valid, the other is invalid + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + DataSet> edgesToBeRemoved = env.fromElements(new Edge(1L, 1L, 51L), + new Edge(6L, 1L, 61L)); + + graph = graph.removeEdges(edgesToBeRemoved); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } } \ No newline at end of file From 5af324280cd25a7cda9df46ffe56df6248b26bde Mon Sep 17 00:00:00 2001 From: andralungu Date: Fri, 15 May 2015 09:38:34 +0200 Subject: [PATCH 2/4] [FLINK-2012][gelly] Removed trailing comment --- .../apache/flink/graph/test/operations/GraphMutationsITCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index cc9fcc877f78e..c680c37ee1576 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -606,7 +606,6 @@ public void testRemoveInvalidEdge() throws Exception { "5,1,51\n"; } - //!!!!!BUG!!!! Daca ii dai invalid edge, ii face union... @Test public void testRemoveOneValidOneInvalidEdge() throws Exception { /* From cd7905373e0b0f5e6548f686bcd673bda5677b47 Mon Sep 17 00:00:00 2001 From: andralungu Date: Wed, 20 May 2015 14:12:54 +0200 Subject: [PATCH 3/4] [FLINK-2012][gelly] Made remove methods use a coGroup fun --- docs/libs/gelly_guide.md | 3 - .../java/org/apache/flink/graph/Graph.java | 105 +++++------------- .../test/operations/GraphMutationsITCase.java | 29 ----- 3 files changed, 30 insertions(+), 107 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 26d99b3a52290..19d442aafb5a6 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -260,9 +260,6 @@ Graph addEdge(Vertex source, Vertex target, EV edgeValu // adds a data set of edges to the Graph. If the vertices already exist in the graph, they will not be added, however the edges will. Graph addEdges(DataSet> newEdges, DataSet> newVertices) -// adds a data set of existing edges to the Graph -Graph addEdges(DataSet> newEdges) - // removes the given Vertex and its edges from the Graph. Graph removeVertex(Vertex vertex) diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 153acf2e6f92d..151450de60a29 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1082,17 +1082,6 @@ public Graph addEdges(DataSet> newEdges, DataSet addEdges(DataSet> newEdges) { - return new Graph(vertices, edges.union(newEdges), context); - } - /** * Removes the given vertex and its edges from the graph. * @@ -1153,50 +1142,33 @@ public boolean filter(Edge edge) throws Exception { */ public Graph removeVertices(DataSet> verticesToBeRemoved) { - // determine whether the vertex existed in the initial graph - DataSet>> flaggedVertices = getVertices() - .map(new MapFunction, Vertex>>() { + DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0) + .with(new VerticesRemovalCoGroup()); - @Override - public Vertex> map(Vertex vertex) throws Exception { - return new Vertex>(vertex.getId(), new Tuple2(vertex.getValue(), - true)); - } - }).withForwardedFields("f0"); - DataSet>> flaggedVerticesToBeRemoved = verticesToBeRemoved - .map(new MapFunction, Vertex>>() { - - @Override - public Vertex> map(Vertex vertex) throws Exception { - return new Vertex>(vertex.getId(), new Tuple2(vertex.getValue(), - false)); - } - }).withForwardedFields("f0"); - - DataSet> newVertices = flaggedVertices.union(flaggedVerticesToBeRemoved) - .groupBy(0).reduceGroup(new VerticesRemovalGroupReduce()); - DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0) - // if the edge source was removed, the edge will also be removed - .with(new ProjectEdgeToBeRemoved()) - // if the edge target was removed, the edge will also be removed - .join(newVertices).where(1).equalTo(0) - .with(new ProjectEdge()); + DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0) + // if the edge source was removed, the edge will also be removed + .with(new ProjectEdgeToBeRemoved()) + // if the edge target was removed, the edge will also be removed + .join(newVertices).where(1).equalTo(0) + .with(new ProjectEdge()); return new Graph(newVertices, newEdges, context); } - private static final class VerticesRemovalGroupReduce implements GroupReduceFunction>, Vertex> { + private static final class VerticesRemovalCoGroup implements CoGroupFunction, Vertex, Vertex> { @Override - public void reduce(Iterable>> vertices, - Collector> out) throws Exception { + public void coGroup(Iterable> vertex, Iterable> vertexToBeRemoved, + Collector> out) throws Exception { - Iterator>> vertexIterator = vertices.iterator(); + final Iterator> vertexIterator = vertex.iterator(); + final Iterator> vertexToBeRemovedIterator = vertexToBeRemoved.iterator(); + Vertex next; - if(vertexIterator.hasNext()) { - Vertex> vertex = vertexIterator.next(); - if(!vertexIterator.hasNext() && vertex.getValue().f1) { - out.collect(new Vertex(vertex.getId(), vertex.getValue().f0)); + if (vertexIterator.hasNext()) { + if (!vertexToBeRemovedIterator.hasNext()) { + next = vertexIterator.next(); + out.collect(next); } } } @@ -1245,43 +1217,26 @@ public boolean filter(Edge edge) { */ public Graph removeEdges(DataSet> edgesToBeRemoved) { - // determine whether the edge existed in the initial graph - DataSet>> flaggedEdges = getEdges() - .map(new MapFunction, Edge>>() { - - @Override - public Edge> map(Edge edge) throws Exception { - return new Edge>(edge.getSource(), edge.getTarget(), - new Tuple2(edge.getValue(), true)); - } - }).withForwardedFields("f0;f1"); - DataSet>> flaggedEdgesToBeRemoved = edgesToBeRemoved - .map(new MapFunction, Edge>>() { - - @Override - public Edge> map(Edge edge) throws Exception { - return new Edge>(edge.getSource(), edge.getTarget(), - new Tuple2(edge.getValue(), false)); - } - }).withForwardedFields("f0;f1"); - - DataSet> newEdges = flaggedEdges.union(flaggedEdgesToBeRemoved) - .groupBy(0,1).reduceGroup(new EdgeRemovalGroupReduce()); + DataSet> newEdges = getEdges().coGroup(edgesToBeRemoved).where(0,1).equalTo(0,1) + .with(new EdgeRemovalCoGroup()); return new Graph(this.vertices, newEdges, context); } - private static final class EdgeRemovalGroupReduce implements GroupReduceFunction>, Edge> { + private static final class EdgeRemovalCoGroup implements CoGroupFunction, Edge, Edge> { @Override - public void reduce(Iterable>> edges, Collector> out) throws Exception { + public void coGroup(Iterable> edge, Iterable> edgeToBeRemoved, + Collector> out) throws Exception { - Iterator>> edgesIterator = edges.iterator(); + final Iterator> edgeIterator = edge.iterator(); + final Iterator> edgeToBeRemovedIterator = edgeToBeRemoved.iterator(); + Edge next; - if(edgesIterator.hasNext()) { - Edge> edge = edgesIterator.next(); - if(!edgesIterator.hasNext() && edge.getValue().f1) { - out.collect(new Edge(edge.getSource(), edge.getTarget(), edge.getValue().f0)); + if (edgeIterator.hasNext()) { + if (!edgeToBeRemovedIterator.hasNext()) { + next = edgeIterator.next(); + out.collect(next); } } } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index c680c37ee1576..6aa4ef3fc4c2d 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -483,35 +483,6 @@ public void testAddExistingEdge() throws Exception { "5,1,51\n"; } - @Test - public void testAddExistingEdges() throws Exception { - /* - * Test addEdges() -- add already existing edges - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> edgesToBeAdded = env.fromElements(new Edge(5L, 1L, 51L), - new Edge(2L, 3L, 23L)); - - graph = graph.addEdges(edgesToBeAdded); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "5,1,51\n"; - } - @Test public void testRemoveEdge() throws Exception { /* From 94efe0212fbb37db527cf3d4e5ae9de19fa598ac Mon Sep 17 00:00:00 2001 From: andralungu Date: Mon, 25 May 2015 19:29:24 +0200 Subject: [PATCH 4/4] [FLINK-2012][gelly] Changed addVertices to operate on vertices only --- docs/libs/gelly_guide.md | 20 +- .../java/org/apache/flink/graph/Graph.java | 143 +++++----- .../test/operations/GraphMutationsITCase.java | 245 ++++++++---------- 3 files changed, 172 insertions(+), 236 deletions(-) diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 19d442aafb5a6..87466d27894f5 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -248,29 +248,29 @@ Graph Mutations Gelly includes the following methods for adding and removing vertices and edges from an input `Graph`: {% highlight java %} -// adds a Vertex and the given edges to the Graph. If the Vertex already exists, it will not be added again, but the given edges will. -Graph addVertex(final Vertex vertex, List> edges) +// adds a Vertex to the Graph. If the Vertex already exists, it will not be added again. +Graph addVertex(final Vertex vertex) -// adds a data set of vertices and a list of edges to the Graph. If the vertices already exist in the graph, they will not be added once more, however the edges will. -Graph addVertices(DataSet> verticesToAdd, List> edges) +// adds a list of vertices to the Graph. If the vertices already exist in the graph, they will not be added once more. +Graph addVertices(List> verticesToAdd) // adds an Edge to the Graph. If the source and target vertices do not exist in the graph, they will also be added. Graph addEdge(Vertex source, Vertex target, EV edgeValue) -// adds a data set of edges to the Graph. If the vertices already exist in the graph, they will not be added, however the edges will. -Graph addEdges(DataSet> newEdges, DataSet> newVertices) +// adds a list of edges to the Graph. When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored. +Graph addEdges(List> newEdges) // removes the given Vertex and its edges from the Graph. Graph removeVertex(Vertex vertex) -// removes the given data set of Vertices and their edges from the Graph -Graph removeVertices(DataSet> verticesToBeRemoved) +// removes the given list of vertices and their edges from the Graph +Graph removeVertices(List> verticesToBeRemoved) // removes *all* edges that match the given Edge from the Graph. Graph removeEdge(Edge edge) -// removes *all* edges that match the edges in the given data set -Graph removeEdges(DataSet> edgesToBeRemoved) +// removes *all* edges that match the edges in the given list +Graph removeEdges(List> edgesToBeRemoved) {% endhighlight %} Neighborhood Methods diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 151450de60a29..dab1a8f69771c 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -18,6 +18,7 @@ package org.apache.flink.graph; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; @@ -1005,46 +1006,33 @@ public Tuple2 map(Edge edge) throws Exception { } /** - * Adds the input vertex and edges to the graph. If the vertex already - * exists in the graph, it will not be added again, but the given edges - * will. + * Adds the input vertex to the graph. If the vertex already + * exists in the graph, it will not be added again. * - * @param vertex the vertex to add to the graph - * @param edges a list of edges to add to the graph - * @return the new graph containing the existing and newly added vertex - * and edges + * @param vertex the vertex to be added + * @return the new graph containing the existing vertices as well as the one just added */ @SuppressWarnings("unchecked") - public Graph addVertex(final Vertex vertex, List> edges) { - DataSet> newVertex = this.context.fromElements(vertex); + public Graph addVertex(final Vertex vertex) { + List> newVertex = new ArrayList>(); + newVertex.add(vertex); - return addVertices(newVertex, edges); + return addVertices(newVertex); } /** - * Adds the data set of vertices and the edges, passed as input, to the graph. - * If the vertices already exist in the graph, they will not be added once more, - * however the edges will. + * Adds the list of vertices, passed as input, to the graph. + * If the vertices already exist in the graph, they will not be added once more. * - * @param verticesToAdd the data set of vertices to add - * @param edges a list of edges to add to the graph + * @param verticesToAdd the list of vertices to add * @return the new graph containing the existing and newly added vertices - * and edges */ @SuppressWarnings("unchecked") - public Graph addVertices(DataSet> verticesToAdd, List> edges) { + public Graph addVertices(List> verticesToAdd) { + // Add the vertices + DataSet> newVertices = this.vertices.union(this.context.fromCollection(verticesToAdd)).distinct(); - // Consider empty edge set - if (edges.isEmpty()) { - return new Graph(this.vertices.union(verticesToAdd) - .distinct(), this.edges, this.context); - } - - // Add the vertex and its edges - DataSet> newVertices = this.vertices.union(verticesToAdd).distinct(); - DataSet> newEdges = this.edges.union(context.fromCollection(edges)); - - return new Graph(newVertices, newEdges, this.context); + return new Graph(newVertices, this.edges, this.context); } /** @@ -1066,83 +1054,72 @@ public Graph addEdge(Vertex source, Vertex target, EV e } /** - * Adds the given data set of edges to the graph. if the vertices do not already exist in the - * graph, they will also be added. + * Adds the given list edges to the graph. * - * If the vertex values are not required during the computation, it is recommended to use - * addEdges(edges) + * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored. * * @param newEdges the data set of edges to be added - * @param newVertices their corresponding vertices and vertexValues - * @return a new graph containing the existing vertices and edges plus the newly added edges and vertices. + * @return a new graph containing the existing edges plus the newly added edges. */ @SuppressWarnings("unchecked") - public Graph addEdges(DataSet> newEdges, DataSet> newVertices) { - Graph partialGraph = fromDataSet(newVertices, newEdges, context); - return this.union(partialGraph); - } + public Graph addEdges(List> newEdges) { - /** - * Removes the given vertex and its edges from the graph. - * - * @param vertex the vertex to remove - * @return the new graph containing the existing vertices and edges without - * the removed vertex and its edges - */ - public Graph removeVertex(Vertex vertex) { - - DataSet> newVertices = getVertices().filter(new RemoveVertexFilter(vertex)); - DataSet> newEdges = getEdges().filter(new VertexRemovalEdgeFilter(vertex)); - return new Graph(newVertices, newEdges, this.context); - } + DataSet> newEdgesDataSet = this.context.fromCollection(newEdges); - private static final class RemoveVertexFilter - implements FilterFunction> { + DataSet> validNewEdges = this.getVertices().join(newEdgesDataSet) + .where(0).equalTo(0) + .with(new JoinVerticesWithEdgesOnSrc()) + .join(this.getVertices()).where(1).equalTo(0) + .with(new JoinWithVerticesOnTrg()); - private Vertex vertexToRemove; + return Graph.fromDataSet(this.vertices, this.edges.union(validNewEdges), this.context); + } - public RemoveVertexFilter(Vertex vertex) { - vertexToRemove = vertex; - } + @ForwardedFieldsSecond("f0; f1; f2") + private static final class JoinVerticesWithEdgesOnSrc implements + JoinFunction, Edge, Edge> { @Override - public boolean filter(Vertex vertex) throws Exception { - return !vertex.f0.equals(vertexToRemove.f0); + public Edge join(Vertex vertex, Edge edge) throws Exception { + return edge; } } - private static final class VertexRemovalEdgeFilter - implements FilterFunction> { - - private Vertex vertexToRemove; + @ForwardedFieldsFirst("f0; f1; f2") + private static final class JoinWithVerticesOnTrg implements + JoinFunction, Vertex, Edge> { - public VertexRemovalEdgeFilter(Vertex vertex) { - vertexToRemove = vertex; + @Override + public Edge join(Edge edge, Vertex vertex) throws Exception { + return edge; } + } - @Override - public boolean filter(Edge edge) throws Exception { + /** + * Removes the given vertex and its edges from the graph. + * + * @param vertex the vertex to remove + * @return the new graph containing the existing vertices and edges without + * the removed vertex and its edges + */ + public Graph removeVertex(Vertex vertex) { - if (edge.f0.equals(vertexToRemove.f0)) { - return false; - } - if (edge.f1.equals(vertexToRemove.f0)) { - return false; - } - return true; - } + List> vertexToBeRemoved = new ArrayList>(); + vertexToBeRemoved.add(vertex); + + return removeVertices(vertexToBeRemoved); } /** - * Removes the given data set of vertices and its edges from the graph. + * Removes the given list of vertices and its edges from the graph. * - * @param verticesToBeRemoved the data set of vertices to be removed + * @param verticesToBeRemoved the list of vertices to be removed * @return the resulted graph containing the initial vertices and edges minus the vertices * and edges removed. */ - public Graph removeVertices(DataSet> verticesToBeRemoved) { + public Graph removeVertices(List> verticesToBeRemoved) { - DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0) + DataSet> newVertices = getVertices().coGroup(this.context.fromCollection(verticesToBeRemoved)).where(0).equalTo(0) .with(new VerticesRemovalCoGroup()); DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0) @@ -1212,13 +1189,13 @@ public boolean filter(Edge edge) { /** * Removes all the edges that match the edges in the given data set from the graph. * - * @param edgesToBeRemoved the data set of edges to be removed + * @param edgesToBeRemoved the list of edges to be removed * @return a new graph where the edges have been removed and in which the vertices remained intact */ - public Graph removeEdges(DataSet> edgesToBeRemoved) { + public Graph removeEdges(List> edgesToBeRemoved) { - DataSet> newEdges = getEdges().coGroup(edgesToBeRemoved).where(0,1).equalTo(0,1) - .with(new EdgeRemovalCoGroup()); + DataSet> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved)) + .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup()); return new Graph(this.vertices, newEdges, context); } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java index 6aa4ef3fc4c2d..0d71b97d6db3c 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -21,14 +21,12 @@ import java.util.ArrayList; import java.util.List; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -71,20 +69,16 @@ public void testAddVertex() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edges = new ArrayList>(); - edges.add(new Edge(6L, 1L, 61L)); - graph = graph.addVertex(new Vertex(6L, 6L), edges); - graph.getEdges().writeAsCsv(resultPath); + graph = graph.addVertex(new Vertex(6L, 6L)); + graph.getVertices().writeAsCsv(resultPath); env.execute(); - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n" + + "6,6\n"; } @Test @@ -98,26 +92,22 @@ public void testAddVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edges = new ArrayList>(); - edges.add(new Edge(6L, 1L, 61L)); - edges.add(new Edge(7L, 1L, 71L)); + List> vertices = new ArrayList>(); + vertices.add(new Vertex(6L, 6L)); + vertices.add(new Vertex(7L, 7L)); - DataSet> newVertices = env.fromElements(new Vertex(6L, 6L), - new Vertex(7L, 7L)); - graph = graph.addVertices(newVertices, edges); + graph = graph.addVertices(vertices); - graph.getEdges().writeAsCsv(resultPath); + graph.getVertices().writeAsCsv(resultPath); env.execute(); - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n" + - "7,1,71\n"; + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n" + + "6,6\n" + + "7,7\n"; } @Test @@ -126,24 +116,19 @@ public void testAddVertexExisting() throws Exception { * Test addVertex() -- add an existing vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 5L, 15L)); - graph = graph.addVertex(new Vertex(1L, 1L), edges); - graph.getEdges().writeAsCsv(resultPath); + + graph = graph.addVertex(new Vertex(1L, 1L)); + graph.getVertices().writeAsCsv(resultPath); env.execute(); - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "1,5,15\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; } @Test @@ -157,26 +142,20 @@ public void testAddVerticesBothExisting() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 5L, 15L)); - edges.add(new Edge(3L, 1L, 31L)); + List> vertices = new ArrayList>(); + vertices.add(new Vertex(1L, 1L)); + vertices.add(new Vertex(3L, 3L)); - DataSet> newVertices = env.fromElements(new Vertex(1L, 1L), - new Vertex(3L, 3L)); - graph = graph.addVertices(newVertices, edges); + graph = graph.addVertices(vertices); - graph.getEdges().writeAsCsv(resultPath); + graph.getVertices().writeAsCsv(resultPath); env.execute(); - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "1,5,15\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "3,1,31\n" + - "4,5,45\n" + - "5,1,51\n"; + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; } @Test @@ -190,66 +169,11 @@ public void testAddVerticesOneExisting() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 5L, 15L)); - edges.add(new Edge(6L, 1L, 61L)); + List> vertices = new ArrayList>(); + vertices.add(new Vertex(1L, 1L)); + vertices.add(new Vertex(6L, 6L)); - DataSet> newVertices = env.fromElements(new Vertex(1L, 1L), - new Vertex(6L, 6L)); - graph = graph.addVertices(newVertices, edges); - - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "1,5,15\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } - - @Test - public void testAddVertexNoEdges() throws Exception { - /* - * Test addVertex() -- add vertex with empty edge set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - List> edges = new ArrayList>(); - graph = graph.addVertex(new Vertex(6L, 6L), edges); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,1\n" + - "2,2\n" + - "3,3\n" + - "4,4\n" + - "5,5\n" + - "6,6\n"; - } - - @Test - public void testAddVerticesNoEdges() throws Exception { - /* - * Test addVertices() -- add vertices with empty edge set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - List> edges = new ArrayList>(); - - DataSet> newVertices = env.fromElements(new Vertex(6L, 6L), - new Vertex(7L, 7L)); - graph = graph.addVertices(newVertices, edges); + graph = graph.addVertices(vertices); graph.getVertices().writeAsCsv(resultPath); env.execute(); @@ -259,8 +183,7 @@ public void testAddVerticesNoEdges() throws Exception { "3,3\n" + "4,4\n" + "5,5\n" + - "6,6\n" + - "7,7\n"; + "6,6\n"; } @Test @@ -293,8 +216,10 @@ public void testRemoveVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> verticesToBeRemoved = env.fromElements(new Vertex(1L, 1L), - new Vertex(2L, 2L)); + + List> verticesToBeRemoved = new ArrayList>(); + verticesToBeRemoved.add(new Vertex(1L, 1L)); + verticesToBeRemoved.add(new Vertex(2L, 2L)); graph = graph.removeVertices(verticesToBeRemoved); graph.getEdges().writeAsCsv(resultPath); @@ -338,8 +263,9 @@ public void testRemoveOneValidOneInvalidVertex() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> verticesToBeRemoved = env.fromElements(new Vertex(1L, 1L), - new Vertex(7L, 7L)); + List> verticesToBeRemoved = new ArrayList>(); + verticesToBeRemoved.add(new Vertex(1L, 1L)); + verticesToBeRemoved.add(new Vertex(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); graph.getEdges().writeAsCsv(resultPath); @@ -361,8 +287,9 @@ public void testRemoveBothInvalidVertices() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> verticesToBeRemoved = env.fromElements(new Vertex(6L, 6L), - new Vertex(7L, 7L)); + List> verticesToBeRemoved = new ArrayList>(); + verticesToBeRemoved.add(new Vertex(6L, 6L)); + verticesToBeRemoved.add(new Vertex(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); graph.getEdges().writeAsCsv(resultPath); @@ -387,8 +314,9 @@ public void testRemoveBothInvalidVerticesVertexResult() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> verticesToBeRemoved = env.fromElements(new Vertex(6L, 6L), - new Vertex(7L, 7L)); + List> verticesToBeRemoved = new ArrayList>(); + verticesToBeRemoved.add(new Vertex(6L, 6L)); + verticesToBeRemoved.add(new Vertex(7L, 7L)); graph = graph.removeVertices(verticesToBeRemoved); graph.getVertices().writeAsCsv(resultPath); @@ -437,12 +365,11 @@ public void testAddEdges() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> edgesToBeAdded = env.fromElements(new Edge(6L, 1L, 61L), - new Edge(7L, 1L, 71L)); - DataSet> verticesToBeAdded = env.fromElements(new Vertex(6L, 6L), - new Vertex(7L, 7L)); + List> edgesToBeAdded = new ArrayList>(); + edgesToBeAdded.add(new Edge(2L, 4L, 24L)); + edgesToBeAdded.add(new Edge(4L, 1L, 41L)); - graph = graph.addEdges(edgesToBeAdded, verticesToBeAdded); + graph = graph.addEdges(edgesToBeAdded); graph.getEdges().writeAsCsv(resultPath); env.execute(); @@ -450,12 +377,41 @@ public void testAddEdges() throws Exception { expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + + "2,4,24\n" + "3,4,34\n" + "3,5,35\n" + + "4,1,41\n" + "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n" + - "7,1,71\n"; + "5,1,51\n"; + } + + @Test + public void testAddEdgesInvalidVertices() throws Exception { + /* + * Test addEdges() -- the source and target vertices do not exist in the graph + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edgesToBeAdded = new ArrayList>(); + edgesToBeAdded.add(new Edge(6L, 1L, 61L)); + edgesToBeAdded.add(new Edge(7L, 1L, 71L)); + + graph = graph.addEdges(edgesToBeAdded); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; } @Test @@ -515,8 +471,9 @@ public void testRemoveEdges() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> edgesToBeRemoved = env.fromElements(new Edge(5L, 1L, 51L), - new Edge(2L, 3L, 23L)); + List> edgesToBeRemoved = new ArrayList>(); + edgesToBeRemoved.add(new Edge(5L, 1L, 51L)); + edgesToBeRemoved.add(new Edge(2L, 3L, 23L)); graph = graph.removeEdges(edgesToBeRemoved); graph.getEdges().writeAsCsv(resultPath); @@ -539,8 +496,9 @@ public void testRemoveSameEdgeTwice() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> edgesToBeRemoved = env.fromElements(new Edge(5L, 1L, 51L), - new Edge(5L, 1L, 51L)); + List> edgesToBeRemoved = new ArrayList>(); + edgesToBeRemoved.add(new Edge(5L, 1L, 51L)); + edgesToBeRemoved.add(new Edge(5L, 1L, 51L)); graph = graph.removeEdges(edgesToBeRemoved); graph.getEdges().writeAsCsv(resultPath); @@ -587,8 +545,9 @@ public void testRemoveOneValidOneInvalidEdge() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - DataSet> edgesToBeRemoved = env.fromElements(new Edge(1L, 1L, 51L), - new Edge(6L, 1L, 61L)); + List> edgesToBeRemoved = new ArrayList>(); + edgesToBeRemoved.add(new Edge(1L, 1L, 51L)); + edgesToBeRemoved.add(new Edge(6L, 1L, 61L)); graph = graph.removeEdges(edgesToBeRemoved); graph.getEdges().writeAsCsv(resultPath);