From e8a5250b4588326b606b2f29d2f2c2f6e4554925 Mon Sep 17 00:00:00 2001
From: Shivani
Date: Wed, 10 Jun 2015 13:22:37 +0200
Subject: [PATCH 1/9] [FLINK-2093][gelly] Added difference Method
---
docs/libs/gelly_guide.md | 1 +
.../java/org/apache/flink/graph/Graph.java | 13 ++++
.../flink/graph/test/TestGraphUtils.java | 44 +++++++++++
.../operations/GraphOperationsITCase.java | 75 +++++++++++++++++++
4 files changed, 133 insertions(+)
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 804efabf9f307..c16274939cd83 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -240,6 +240,7 @@ Graph networkWithWeights = network.joinWithEdgesOnSource(v
+* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the input graphs. The resultant graph is formed by removing the vertices and edges from the graph that are common with the second graph.
[Back to top](#top)
Graph Mutations
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index dab1a8f69771c..4ae0f90d3e54a 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1233,6 +1233,19 @@ public Graph union(Graph graph) {
return new Graph(unionedVertices, unionedEdges, this.context);
}
+ /**
+ * Performs Difference on the vertex and edge sets of the input graphs
+ * removes both vertices and edges with the vertex as a source/target
+ * @param graph the graph to perform difference with
+ * @return a new graph where the common vertices and edges have been removed
+ */
+ public Graph difference(Graph graph) throws java.lang.Exception{
+ DataSet> removeVerticesData = graph.getVertices();
+ final List> removeVerticesList = removeVerticesData.collect();
+ Graph G3= this.removeVertices(removeVerticesList);
+ return G3;
+ }
+
/**
* Runs a Vertex-Centric iteration on the graph.
* No configuration options are provided.
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
index 1769a2609414d..51b5b7f14ce4c 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -381,4 +381,48 @@ private static final class BlackholeOutputSteam extends java.io.OutputStream {
@Override
public void write(int b){}
}
+
+ /**
+ * utils for getting the second graph for the test of method difference();
+ * @param env
+ */
+ public static final DataSet> getLongLongEdgeDataDifference(
+ ExecutionEnvironment env){
+ return env.fromCollection(getLongLongEdgesForDifference());
+ }
+
+ public static final DataSet> getLongLongEdgeDataDifference2(
+ ExecutionEnvironment env){
+ return env.fromCollection(getLongLongEdgesForDifference2());
+ }
+
+ public static final DataSet> getLongLongVertexDataDifference(
+ ExecutionEnvironment env)
+ {
+ return env.fromCollection(getVerticesForDifference());
+ }
+
+ public static final List> getVerticesForDifference(){
+ List> vertices = new ArrayList>();
+ vertices.add(new Vertex(1L, 1L));
+ vertices.add(new Vertex(3L, 3L));
+ vertices.add(new Vertex(6L, 6L));
+
+ return vertices;
+
+ }
+
+ public static final List> getLongLongEdgesForDifference() {
+ List> edges = new ArrayList>();
+ edges.add(new Edge(1L, 3L, 13L));
+ edges.add(new Edge(1L, 6L, 26L));
+ edges.add(new Edge(6L, 3L, 63L));
+ return edges;
+ }
+
+ public static final List> getLongLongEdgesForDifference2() {
+ List> edges = new ArrayList>();
+ edges.add(new Edge(6L, 6L, 66L));
+ return edges;
+ }
}
\ No newline at end of file
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index 1b9d5ac835d59..99d8f6685a507 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -28,6 +28,7 @@
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.api.java.DataSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -266,6 +267,80 @@ public void testUnion() throws Exception {
"6,1,61\n";
}
+ @Test
+ public void testDifference() throws Exception {
+ /*Test difference() method by checking the output for getEdges() on the resultant graph
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph graph2 = Graph.fromDataSet(TestGraphUtils.getLongLongVertexDataDifference(env),
+ TestGraphUtils.getLongLongEdgeDataDifference(env), env);
+
+ graph = graph.difference(graph2);
+
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "4,5,45\n";
+
+
+ }
+
+
+ @Test
+ public void testDifferenceVertices() throws Exception{
+ /*Test difference() method by checking the output for getVertices() on the resultant graph
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ Graph graph2 = Graph.fromDataSet(TestGraphUtils.getLongLongVertexDataDifference(env),
+ TestGraphUtils.getLongLongEdgeDataDifference(env), env);
+
+ graph = graph.difference(graph2);
+
+ graph.getVertices().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "2,2\n" +
+ "4,4\n" +
+ "5,5\n" ;
+ }
+
+ @Test
+ public void testDifference2() throws Exception {
+ /*
+ * Test difference() such that no common vertices are there
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet> vertex = env.fromElements(new Vertex(6L,6L));
+
+ Graph graph2 = Graph.fromDataSet(vertex,TestGraphUtils.getLongLongEdgeDataDifference2(env),env);
+
+ graph = graph.difference(graph2);
+
+ graph.getEdges().writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2,12\n" +
+ "1,3,13\n" +
+ "2,3,23\n" +
+ "3,4,34\n" +
+ "3,5,35\n" +
+ "4,5,45\n" +
+ "5,1,51\n" ;
+
+ }
+
@Test
public void testTriplets() throws Exception {
/*
From b0a9228540fbafe819d1883e07087c4f59f4e4bb Mon Sep 17 00:00:00 2001
From: Shivani
Date: Wed, 10 Jun 2015 15:04:48 +0200
Subject: [PATCH 2/9] [FLINK-2093][gelly] Minor Changes in the Graph.java file
---
.../src/main/java/org/apache/flink/graph/Graph.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 4ae0f90d3e54a..3bba97b852467 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1235,15 +1235,14 @@ public Graph union(Graph graph) {
/**
* Performs Difference on the vertex and edge sets of the input graphs
- * removes both vertices and edges with the vertex as a source/target
+ * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
* @param graph the graph to perform difference with
* @return a new graph where the common vertices and edges have been removed
*/
public Graph difference(Graph graph) throws java.lang.Exception{
DataSet> removeVerticesData = graph.getVertices();
final List> removeVerticesList = removeVerticesData.collect();
- Graph G3= this.removeVertices(removeVerticesList);
- return G3;
+ return this.removeVertices(removeVerticesList);
}
/**
From e726507d41a588888abbc9a6abe2ead4e9e83b09 Mon Sep 17 00:00:00 2001
From: Shivani
Date: Mon, 15 Jun 2015 14:13:58 +0200
Subject: [PATCH 3/9] [FLINK-2093][gelly]Added difference method
---
docs/libs/gelly_guide.md | 2 +-
.../java/org/apache/flink/graph/Graph.java | 20 +++++++++++++++++--
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index c16274939cd83..11eb6f898e32f 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -240,7 +240,7 @@ Graph networkWithWeights = network.joinWithEdgesOnSource(v
-* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the input graphs. The resultant graph is formed by removing the vertices and edges from the graph that are common with the second graph.
+* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the input graphs. The resultant graph is formed by removing the common vertices and edges from the graph.
[Back to top](#top)
Graph Mutations
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 3bba97b852467..bf6ba5bb9cc0d 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1151,6 +1151,23 @@ public void coGroup(Iterable> vertex, Iterable> vert
}
}
+
+ public Graph removeVertices(DataSet> verticesToBeRemoved){
+ DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0).with(new VerticesRemovalCoGroup());
+ DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
+ // if the edge source was removed, the edge will also be removed
+ .with(new ProjectEdgeToBeRemoved())
+ // if the edge target was removed, the edge will also be removed
+ .join(newVertices).where(1).equalTo(0)
+ .with(new ProjectEdge());
+
+ return new Graph(newVertices, newEdges, context);
+ }
+
+
+
+
+
@ForwardedFieldsSecond("f0; f1; f2")
private static final class ProjectEdgeToBeRemoved implements JoinFunction, Edge, Edge> {
@Override
@@ -1241,8 +1258,7 @@ public Graph union(Graph graph) {
*/
public Graph difference(Graph graph) throws java.lang.Exception{
DataSet> removeVerticesData = graph.getVertices();
- final List> removeVerticesList = removeVerticesData.collect();
- return this.removeVertices(removeVerticesList);
+ return this.removeVertices(removeVerticesData);
}
/**
From 760047dc78739b9eb750757aea442aa947c2fc34 Mon Sep 17 00:00:00 2001
From: Shivani
Date: Mon, 15 Jun 2015 15:01:32 +0200
Subject: [PATCH 4/9] [FLINK-2093][gelly]Added difference method
---
.../java/org/apache/flink/graph/Graph.java | 33 +++++++++----------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index bf6ba5bb9cc0d..034223711077a 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1109,7 +1109,6 @@ public Graph removeVertex(Vertex vertex) {
return removeVertices(vertexToBeRemoved);
}
-
/**
* Removes the given list of vertices and its edges from the graph.
*
@@ -1117,9 +1116,22 @@ public Graph removeVertex(Vertex vertex) {
* @return the resulted graph containing the initial vertices and edges minus the vertices
* and edges removed.
*/
- public Graph removeVertices(List> verticesToBeRemoved) {
- DataSet> newVertices = getVertices().coGroup(this.context.fromCollection(verticesToBeRemoved)).where(0).equalTo(0)
+ public Graph removeVertices(List> verticesToBeRemoved)
+ {
+ return removeVertices(this.context.fromCollection(verticesToBeRemoved));
+ }
+
+ /**
+ * Removes the given list of vertices and its edges from the graph.
+ *
+ * @param verticesToBeRemoved the DataSet of vertices to be removed
+ * @return the resulted graph containing the initial vertices and edges minus the vertices
+ * and edges removed.
+ */
+ public Graph removeVertices(DataSet> verticesToBeRemoved) {
+
+ DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
.with(new VerticesRemovalCoGroup());
DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
@@ -1152,21 +1164,6 @@ public void coGroup(Iterable> vertex, Iterable> vert
}
- public Graph removeVertices(DataSet> verticesToBeRemoved){
- DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0).with(new VerticesRemovalCoGroup());
- DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
- // if the edge source was removed, the edge will also be removed
- .with(new ProjectEdgeToBeRemoved())
- // if the edge target was removed, the edge will also be removed
- .join(newVertices).where(1).equalTo(0)
- .with(new ProjectEdge());
-
- return new Graph(newVertices, newEdges, context);
- }
-
-
-
-
@ForwardedFieldsSecond("f0; f1; f2")
private static final class ProjectEdgeToBeRemoved implements JoinFunction, Edge, Edge> {
From 57f1b315f7fbb87c74085c9a68108fcd3ff58440 Mon Sep 17 00:00:00 2001
From: Shivani
Date: Mon, 15 Jun 2015 16:09:29 +0200
Subject: [PATCH 5/9] [FLINK-2093][gelly]Added difference method
---
docs/libs/gelly_guide.md | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 11eb6f898e32f..ac60d6a3b6c2a 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -234,14 +234,13 @@ Graph networkWithWeights = network.joinWithEdgesOnSource(v
* Undirected: In Gelly, a `Graph` is always directed. Undirected graphs can be represented by adding all opposite-direction edges to a graph. For this purpose, Gelly provides the `getUndirected()` method.
-* Union: Gelly's `union()` method performs a union on the vertex and edges sets of the input graphs. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained.
+* Union: Gelly's `union()` method performs a union operation on the vertex and edge sets of the specified graph and current graph. Duplicate vertices are removed from the resulting `Graph`, while if duplicate edges exists, these will be maintained.
-* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the input graphs. The resultant graph is formed by removing the common vertices and edges from the graph.
-[Back to top](#top)
+* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and specified graph.
Graph Mutations
-----------
@@ -267,11 +266,16 @@ Graph removeVertex(Vertex vertex)
// removes the given list of vertices and their edges from the Graph
Graph removeVertices(List> verticesToBeRemoved)
+//removes the given DataSet of vertices and their edges from the Graph.
+Graph removeVertices(DataSet> verticesToBeRemoved)
+
// removes *all* edges that match the given Edge from the Graph.
Graph removeEdge(Edge edge)
// removes *all* edges that match the edges in the given list
Graph removeEdges(List> edgesToBeRemoved)
+
+
{% endhighlight %}
Neighborhood Methods
From 9ca5d7485708c3dca7e41bf6d19b5bd9d492125f Mon Sep 17 00:00:00 2001
From: Shivani
Date: Mon, 15 Jun 2015 16:12:50 +0200
Subject: [PATCH 6/9] [FLINK-2093][gelly]Added difference method
---
docs/libs/gelly_guide.md | 2 ++
1 file changed, 2 insertions(+)
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index ac60d6a3b6c2a..f2f7079783cb9 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -242,6 +242,8 @@ Graph networkWithWeights = network.joinWithEdgesOnSource(v
* Difference: Gelly's `difference()` method performs a difference on the vertex and edge sets of the current graph and specified graph.
+-[Back to top](#top)
+
Graph Mutations
-----------
From 04eb03a9615e513f55c9325acdfd9f8f0d55c3a1 Mon Sep 17 00:00:00 2001
From: Shivani
Date: Thu, 18 Jun 2015 14:51:22 +0200
Subject: [PATCH 7/9] [FLINK-2093]Made the remove vertices method private
---
.../flink-gelly/src/main/java/org/apache/flink/graph/Graph.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 034223711077a..9355b6003010d 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1129,7 +1129,7 @@ public Graph removeVertices(List> verticesToBeRemoved)
* @return the resulted graph containing the initial vertices and edges minus the vertices
* and edges removed.
*/
- public Graph removeVertices(DataSet> verticesToBeRemoved) {
+ private Graph removeVertices(DataSet> verticesToBeRemoved) {
DataSet> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
.with(new VerticesRemovalCoGroup());
From 3943b316023a80b5e04e43d2177344f364dd9dec Mon Sep 17 00:00:00 2001
From: Shivani
Date: Thu, 18 Jun 2015 15:11:26 +0200
Subject: [PATCH 8/9] Updated the docs
---
docs/libs/gelly_guide.md | 3 ---
1 file changed, 3 deletions(-)
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index f2f7079783cb9..7db896171a72e 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -268,9 +268,6 @@ Graph removeVertex(Vertex vertex)
// removes the given list of vertices and their edges from the Graph
Graph removeVertices(List> verticesToBeRemoved)
-//removes the given DataSet of vertices and their edges from the Graph.
-Graph removeVertices(DataSet> verticesToBeRemoved)
-
// removes *all* edges that match the given Edge from the Graph.
Graph removeEdge(Edge edge)
From d9c8e0fc2d14d81983264f82d6bccc6bfdcec7f9 Mon Sep 17 00:00:00 2001
From: Shivani
Date: Wed, 24 Jun 2015 13:41:30 +0200
Subject: [PATCH 9/9] [FLINK-2093][gelly] Removes throws exception
---
.../flink-gelly/src/main/java/org/apache/flink/graph/Graph.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 9355b6003010d..6b0b8bea8c3af 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1253,7 +1253,7 @@ public Graph union(Graph graph) {
* @param graph the graph to perform difference with
* @return a new graph where the common vertices and edges have been removed
*/
- public Graph difference(Graph graph) throws java.lang.Exception{
+ public Graph difference(Graph graph) {
DataSet> removeVerticesData = graph.getVertices();
return this.removeVertices(removeVerticesData);
}