From 75403a672ffd2ffda842f56073f643d4b61a33e5 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Sat, 16 Apr 2016 06:04:07 -0400 Subject: [PATCH 1/2] [FLINK-3770] [gelly] Fix TriangleEnumerator performance Implement optimization of ordering edges by degree and a JoinHint for the joining of edges and vertices. --- .../graph/examples/data/TriangleCountData.java | 2 +- .../graph/library/TriangleEnumeratorITCase.java | 5 +++-- .../flink/graph/library/TriangleEnumerator.java | 14 ++++++++------ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java index 71b874c061f59..a14010f530074 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java @@ -57,7 +57,7 @@ public static List> getListOfTriangles() { ArrayList> ret = new ArrayList<>(3); ret.add(new Tuple3<>(1L,2L,3L)); ret.add(new Tuple3<>(2L,3L,6L)); - ret.add(new Tuple3<>(3L,4L,5L)); + ret.add(new Tuple3<>(4L,3L,5L)); return ret; } diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java index 56b3289cd48bc..3890bb076caaf 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java @@ -47,10 +47,11 @@ public void testTriangleEnumerator() throws Exception { env); List> actualOutput = graph.run(new TriangleEnumerator()).collect(); - List> expectedResult = TriangleCountData.getListOfTriangles(); + List> expectedResult = TriangleCountData.getListOfTriangles(); - Assert.assertEquals(actualOutput.size(), expectedResult.size()); + Assert.assertEquals(expectedResult.size(), actualOutput.size()); for(Tuple3 resultTriangle:actualOutput) { + System.out.println(resultTriangle); Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java index 3842e6cfc9bc6..8272d8f613b33 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation; import org.apache.flink.api.java.tuple.Tuple3; @@ -81,7 +82,7 @@ public DataSet> run(Graph input) throws Exception { .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING) .reduceGroup(new TriadBuilder()) // filter triads - .join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter()); + .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter()); return triangles; } @@ -165,7 +166,7 @@ private static final class DegreeJoiner implements ReduceFunction reduce(EdgeWithDegrees edge1, EdgeWithDegrees edge2) throws Exception { // copy first edge - /*\t*/outEdge.copyFrom(edge1); + outEdge.copyFrom(edge1); // set missing degree if (edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) { @@ -173,6 +174,7 @@ public EdgeWithDegrees reduce(EdgeWithDegrees edge1, EdgeWithDegrees ed } else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) { outEdge.setSecondDegree(edge2.getSecondDegree()); } + return outEdge; } } @@ -183,7 +185,7 @@ public EdgeWithDegrees reduce(EdgeWithDegrees edge1, EdgeWithDegrees ed @SuppressWarnings("serial") private static final class EdgeByDegreeProjector implements MapFunction, Edge> { - private final Edge outEdge = new Edge<>(); + private Edge outEdge = new Edge<>(); @Override public Edge map(EdgeWithDegrees inEdge) throws Exception { @@ -195,7 +197,7 @@ public Edge map(EdgeWithDegrees inEdge) throws Exception { // flip vertices if first degree is larger than second degree. if (inEdge.getFirstDegree() > inEdge.getSecondDegree()) { - outEdge.reverse(); + outEdge = outEdge.reverse(); } // return edge @@ -214,8 +216,8 @@ private static final class EdgeByIdProjector> public Edge map(Edge inEdge) throws Exception { // flip vertices if necessary - if (inEdge.getSource().compareTo(inEdge.getTarget()) < 0) { - inEdge.reverse(); + if (inEdge.getSource().compareTo(inEdge.getTarget()) > 0) { + inEdge = inEdge.reverse(); } return inEdge; From f7962ee71f6b3e3b433c41b21b2fd093093c24bd Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Mon, 18 Apr 2016 09:16:30 -0400 Subject: [PATCH 2/2] Remove vestigial print. --- .../org/apache/flink/graph/library/TriangleEnumeratorITCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java index 3890bb076caaf..95504053af5bd 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java @@ -51,7 +51,6 @@ public void testTriangleEnumerator() throws Exception { Assert.assertEquals(expectedResult.size(), actualOutput.size()); for(Tuple3 resultTriangle:actualOutput) { - System.out.println(resultTriangle); Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0); } }