From 6f24c6918d7de4cd5c8fdced7248836a4704ceb3 Mon Sep 17 00:00:00 2001 From: Andra Lungu Date: Tue, 8 Sep 2015 14:15:23 +0200 Subject: [PATCH] [FLINK-2634] [gelly] Added a vertex-centric Triangle Count Library Method --- docs/libs/gelly_guide.md | 1 + .../flink/graph/library/TriangleCount.java | 208 ++++++++++++++++++ .../test/library/TriangleCountITCase.java | 15 ++ 3 files changed, 224 insertions(+) create mode 100644 flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleCount.java diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 562df31165941..056efac071983 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -800,6 +800,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * Label Propagation * Simple Community Detection * Connected Components +* Triangle Count * GSA PageRank * GSA Connected Components * GSA Single-Source Shortest Paths diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleCount.java new file mode 100644 index 0000000000000..07acdf9f0a25a --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleCount.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.utils.VertexToTuple2Map; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import java.util.Iterator; +import java.util.TreeMap; + +/** + * Triangle Count Algorithm. + * + * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs + * and send messages to them. Each received message is then propagated to neighbors with higher id. + * Finally, if a node encounters the target id in the list of received messages, it increments the number + * of triangles found. + * + * For skewed graphs, we recommend calling the GSATriangleCount library method as it uses the more restrictive + * `reduceOnNeighbors` function which internally makes use of combiners to speed up computation. + * + * This implementation is non - iterative. + * + * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of + * Tuple1 which contains a single integer representing the number of triangles. + */ +public class TriangleCount implements + GraphAlgorithm>> { + + @Override + public DataSet> run(Graph graph) throws Exception { + + // simulate the first superstep + // select the neighbors with id greater than the current vertex's id + DataSet> verticesWithHigherNeighbors = + graph.groupReduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN); + + // then group them by id to attach the resulting sets to the vertices + DataSet>> verticesWithNeighborTreeMaps = + verticesWithHigherNeighbors.groupBy(0).reduceGroup(new AttachNeighborIdsAsVertexValues()); + + // assign a value to the vertices with no neighbors as well + Graph, NullValue> graphWithInitializedVertexNeighbors = + graph.mapVertices(new InitializeTreeMaps()); + + Graph, NullValue> graphWithVertexNeighbors = graphWithInitializedVertexNeighbors. + joinWithVertices(verticesWithNeighborTreeMaps.map(new VertexToTuple2Map>()), + new RetrieveValueMapper()); + + // simulate the second superstep + // propagate each received "message" to neighbors with higher id + DataSet> verticesWithPropagatedValues = + graphWithVertexNeighbors.groupReduceOnNeighbors(new PropagateNeighborValues(), EdgeDirection.IN); + + DataSet>> verticesWithPropagatedTreeMaps = + verticesWithPropagatedValues.groupBy(0).reduceGroup(new AttachNeighborIdsAsVertexValues()); + + DataSet> numberOfTriangles = verticesWithPropagatedTreeMaps + .join(graph.getEdges()) + .where(0).equalTo(0).with(new CountTriangles()).reduce(new ReduceFunction>() { + + @Override + public Tuple1 reduce(Tuple1 firstTuple, Tuple1 secondTuple) throws Exception { + return new Tuple1(firstTuple.f0 + secondTuple.f0); + } + }); + + return numberOfTriangles; + } + + @SuppressWarnings("serial") + private static final class GatherHigherIdNeighbors implements + NeighborsFunctionWithVertexValue> { + + @Override + public void iterateNeighbors(Vertex vertex, + Iterable, Vertex>> neighbors, + Collector> collector) throws Exception { + + Tuple2, Vertex> next = null; + Iterator, Vertex>> neighborsIterator = + neighbors.iterator(); + + while (neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + if(next.f1.getId() > vertex.getId()) { + collector.collect(new Vertex(next.f1.getId(), vertex.getId())); + } + } + } + } + + @SuppressWarnings("serial") + private static final class AttachNeighborIdsAsVertexValues implements GroupReduceFunction, + Vertex>> { + + @Override + public void reduce(Iterable> vertices, + Collector>> collector) throws Exception { + + Iterator> vertexIertator = vertices.iterator(); + Vertex next = null; + TreeMap neighbors = new TreeMap(); + Long id = null; + + while (vertexIertator.hasNext()) { + next = vertexIertator.next(); + id = next.getId(); + + Integer value = neighbors.get(next.getValue()); + if (value != null) { + neighbors.put(next.getValue(), value + 1); + } else { + neighbors.put(next.getValue(), 1); + } + } + + collector.collect(new Vertex>(id, neighbors)); + } + } + + @SuppressWarnings("serial") + private static final class InitializeTreeMaps implements MapFunction, TreeMap> { + + @Override + public TreeMap map(Vertex vertex) throws Exception { + return new TreeMap(); + } + } + + @SuppressWarnings("serial") + private static final class RetrieveValueMapper implements MapFunction, + TreeMap>, TreeMap> { + + @Override + public TreeMap map(Tuple2, TreeMap> value) throws Exception { + return value.f1; + } + } + + @SuppressWarnings("serial") + private static final class PropagateNeighborValues implements + NeighborsFunctionWithVertexValue, NullValue, Vertex> { + + @Override + public void iterateNeighbors(Vertex> vertex, + Iterable, Vertex>>> neighbors, + Collector> collector) throws Exception { + + Tuple2, Vertex>> next = null; + Iterator, Vertex>>> neighborsIterator = neighbors.iterator(); + TreeMap vertexSet = vertex.getValue(); + + while (neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + if(next.f1.getId() > vertex.getId()) { + for(Long key: vertexSet.keySet()) { + collector.collect(new Vertex(next.f1.getId(), key)); + } + } + } + } + } + + @SuppressWarnings("serial") + private static final class CountTriangles implements + FlatJoinFunction>, Edge, Tuple1> { + + @Override + public void join(Vertex> vertex, + Edge edge, Collector> collector) throws Exception { + + if (vertex.getValue().get(edge.getTarget()) != null) { + collector.collect(new Tuple1(vertex.getValue().get(edge.getTarget()))); + } + } + } +} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java index 047bbf7459e1c..f15dfdc7c0132 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.example.utils.TriangleCountData; import org.apache.flink.graph.library.GSATriangleCount; +import org.apache.flink.graph.library.TriangleCount; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.NullValue; import org.junit.Test; @@ -53,4 +54,18 @@ public void testGSATriangleCount() throws Exception { compareResultAsTuples(numberOfTriangles, expectedResult); } + + @Test + public void testTriangleCount() throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), + env).getUndirected(); + + List> numberOfTriangles = graph.run(new TriangleCount()).collect(); + expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; + + compareResultAsTuples(numberOfTriangles, expectedResult); + } }