From 8bb8be4b620ab2f4068d41c6e0af9640d8ebdb15 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 17 May 2016 10:02:47 -0400 Subject: [PATCH 1/5] [FLINK-3906] [gelly] Global Clustering Coefficient The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph). --- docs/apis/batch/libs/gelly.md | 18 +- .../graph/library/TriangleCountITCase.java | 53 ---- .../flink/graph/AbstractGraphAnalytic.java | 52 ++++ .../java/org/apache/flink/graph/Graph.java | 27 +- .../org/apache/flink/graph/GraphAnalytic.java | 67 +++++ .../flink/graph/library/GSATriangleCount.java | 192 -------------- .../GlobalClusteringCoefficient.java | 154 +++++++++++ .../LocalClusteringCoefficient.java | 2 +- .../clustering/undirected/TriangleCount.java | 77 ++++++ .../undirected/TriangleListing.java | 4 +- .../metric/undirected/VertexMetrics.java | 250 ++++++++++++++++++ .../GlobalClusteringCoefficientTest.java | 84 ++++++ .../undirected/TriangleCountTest.java | 75 ++++++ .../metric/undirected/VertexMetricsTest.java | 97 +++++++ 14 files changed, 893 insertions(+), 259 deletions(-) delete mode 100644 flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java delete mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 7adff0499c9e8..a2daa0df03e9b 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1834,6 +1834,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * [Summarization](#summarization) * [Jaccard Index](#jaccard-index) * [Local Clustering Coefficient](#local-clustering-coefficient) +* [Global Clustering Coefficient](#global-clustering-coefficient) Gelly's library methods can be used by simply calling the `run()` method on the input graph: @@ -2108,7 +2109,22 @@ See the [Triangle Enumeration](#triangle-enumeration) library method for a detai #### Usage The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing the vertex ID, -vertex degree, and number of triangles containing the vertex. The vertex ID must be `Comparable` and `Copyable`. +vertex degree, and number of triangles containing the vertex. The graph ID type must be `Comparable` and `Copyable`. + +### Global Clustering Coefficient + +#### Overview +The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no edges between +neighbors) to 1.0 (complete graph). + +#### Details +See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of +clustering coefficient. + +#### Usage +The algorithm takes a simple, undirected graph as input and outputs a result containing the total number of triplets and +triangles in the graph. The graph ID type must be `Comparable` and `Copyable`. + {% top %} diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java deleted file mode 100644 index aaada8f975bc0..0000000000000 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.examples.data.TriangleCountData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class TriangleCountITCase extends MultipleProgramsTestBase { - - public TriangleCountITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testGSATriangleCount() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), - env).getUndirected(); - - List numberOfTriangles = graph.run(new GSATriangleCount()).collect(); - String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; - - Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult)); - } -} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java new file mode 100644 index 0000000000000..84099e8ff2790 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.util.Preconditions; + +/** + * Base class for {@link GraphAnalytic}. + * + * @param key type + * @param vertex value type + * @param edge value type + * @param the return type + */ +public abstract class AbstractGraphAnalytic +implements GraphAnalytic { + + protected ExecutionEnvironment env; + + @Override + public GraphAnalytic run(Graph input) + throws Exception { + env = input.getContext(); + return null; + } + + @Override + public T execute() + throws Exception { + Preconditions.checkNotNull(env); + + env.execute(); + return getResult(); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index b17f7a587c701..26def97d4e863 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -18,20 +18,13 @@ package org.apache.flink.graph; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.List; -import java.util.Arrays; - import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -71,6 +64,13 @@ import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + /** * Represents a Graph consisting of {@link Edge edges} and {@link Vertex * vertices}. @@ -1787,6 +1787,15 @@ public T run(GraphAlgorithm algorithm) throws Exception { return algorithm.run(this); } + /** + * @param analytic the algorithm to run on the Graph + * @param the result type + * @throws Exception + */ + public void run(GraphAnalytic analytic) throws Exception { + analytic.run(this); + } + /** * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices) * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java new file mode 100644 index 0000000000000..23f60c0c5df8e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; + +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.DataSet; + +/** + * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is + * terminal and results are retrieved via accumulators. + *
+ * A Flink program has a single point of execution. A {@code GraphAnalytic} + * defers execution to the user to allow composing multiple analytics and + * algorithms into a single program. + * + * @param key type + * @param vertex value type + * @param edge value type + * @param the return type + */ +public interface GraphAnalytic { + + /** + * This method must be called after the program has executed: + * 1) "run" analytics and algorithms + * 2) call ExecutionEnvironment.execute() + * 3) get analytics results + * + * @return the result + */ + T getResult(); + + /** + * Execute the program and return the result. + * + * @return the result + * @throws Exception + */ + T execute() throws Exception; + + /** + * All {@code GraphAnalytic} processing must be terminated by an + * {@link OutputFormat}. Rather than obtained via accumulators rather than + * returned by a {@link DataSet}. + * + * @param input input graph + * @return this + * @throws Exception + */ + GraphAnalytic run(Graph input) throws Exception; +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java deleted file mode 100644 index 1eafce202a070..0000000000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.library; - - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.GraphAlgorithm; -import org.apache.flink.graph.ReduceNeighborsFunction; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Triplet; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.VertexJoinFunction; -import org.apache.flink.types.NullValue; - -import java.util.TreeMap; - -/** - * Triangle Count Algorithm. - * - * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs - * and send messages to them. Each received message is then propagated to neighbors with higher id. - * Finally, if a node encounters the target id in the list of received messages, it increments the number - * of triangles found. - * - * This implementation is non - iterative. - * - * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet - * which contains a single integer representing the number of triangles. - */ -public class GSATriangleCount, VV, EV> implements - GraphAlgorithm> { - - @SuppressWarnings("serial") - @Override - public DataSet run(Graph input) throws Exception { - - ExecutionEnvironment env = input.getContext(); - - // order the edges so that src is always higher than trg - DataSet> edges = input.getEdges().map(new OrderEdges()).distinct(); - - Graph, NullValue> graph = Graph.fromDataSet(edges, - new VertexInitializer(), env); - - // select neighbors with ids higher than the current vertex id - // Gather: a no-op in this case - // Sum: create the set of neighbors - DataSet>> higherIdNeighbors = - graph.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN); - - Graph, NullValue> graphWithReinitializedVertexValues = - graph.mapVertices(new VertexInitializerEmptyTreeMap()); - - // Apply: attach the computed values to the vertices - // joinWithVertices to update the node values - DataSet>> verticesWithHigherIdNeighbors = - graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues()).getVertices(); - - Graph, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors, - edges, env); - - // propagate each received value to neighbors with higher id - // Gather: a no-op in this case - // Sum: propagate values - DataSet>> propagatedValues = graphWithNeighbors - .reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN); - - // Apply: attach propagated values to vertices - DataSet>> verticesWithPropagatedValues = - graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues()).getVertices(); - - Graph, NullValue> graphWithPropagatedNeighbors = - Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env); - - // Scatter: compute the number of triangles - DataSet numberOfTriangles = graphWithPropagatedNeighbors.getTriplets() - .map(new ComputeTriangles()).reduce(new ReduceFunction() { - - @Override - public Integer reduce(Integer first, Integer second) throws Exception { - return first + second; - } - }); - - return numberOfTriangles; - } - - @SuppressWarnings("serial") - private static final class OrderEdges, EV> implements - MapFunction, Edge> { - - @Override - public Edge map(Edge edge) throws Exception { - if (edge.getSource().compareTo(edge.getTarget()) < 0) { - return new Edge(edge.getTarget(), edge.getSource(), NullValue.getInstance()); - } else { - return new Edge(edge.getSource(), edge.getTarget(), NullValue.getInstance()); - } - } - } - - @SuppressWarnings("serial") - private static final class VertexInitializer implements MapFunction> { - - @Override - public TreeMap map(K value) throws Exception { - TreeMap neighbors = new TreeMap(); - neighbors.put(value, 1); - - return neighbors; - } - } - - @SuppressWarnings("serial") - private static final class VertexInitializerEmptyTreeMap implements - MapFunction>, TreeMap> { - - @Override - public TreeMap map(Vertex> vertex) throws Exception { - return new TreeMap(); - } - } - - @SuppressWarnings("serial") - private static final class AttachValues implements VertexJoinFunction, - TreeMap> { - - @Override - public TreeMap vertexJoin(TreeMap vertexValue, - TreeMap inputValue) { - return inputValue; - } - } - - @SuppressWarnings("serial") - private static final class GatherHigherIdNeighbors implements - ReduceNeighborsFunction> { - - @Override - public TreeMap reduceNeighbors(TreeMap first, TreeMap second) { - for (K key : second.keySet()) { - Integer value = first.get(key); - if (value != null) { - first.put(key, value + second.get(key)); - } else { - first.put(key, second.get(key)); - } - } - return first; - } - } - - @SuppressWarnings("serial") - private static final class ComputeTriangles implements MapFunction, NullValue>, - Integer> { - - @Override - public Integer map(Triplet, NullValue> triplet) throws Exception { - - Vertex> srcVertex = triplet.getSrcVertex(); - Vertex> trgVertex = triplet.getTrgVertex(); - int triangles = 0; - - if(trgVertex.getValue().get(srcVertex.getId()) != null) { - triangles = trgVertex.getValue().get(srcVertex.getId()); - } - return triangles; - } - } -} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java new file mode 100644 index 0000000000000..c92b4cb58b5d3 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result; +import org.apache.flink.graph.library.metric.undirected.VertexMetrics; +import org.apache.flink.types.CopyableValue; + +/** + * The global clustering coefficient measures the connectedness of a graph. + * Scores range from 0.0 (no triangles) to 1.0 (complete graph). + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class GlobalClusteringCoefficient & CopyableValue, VV, EV> +extends AbstractGraphAnalytic { + + private TriangleCount triangleCount; + + private VertexMetrics vertexMetrics; + + // Optional configuration + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public GlobalClusteringCoefficient setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + @Override + public GlobalClusteringCoefficient run(Graph input) + throws Exception { + super.run(input); + + triangleCount = new TriangleCount() + .setLittleParallelism(littleParallelism); + + input.run(triangleCount); + + vertexMetrics = new VertexMetrics() + .setParallelism(littleParallelism); + + input.run(vertexMetrics); + + return this; + } + + @Override + public Result getResult() { + return new Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * triangleCount.getResult()); + } + + /** + * Wraps global clustering coefficient metrics. + */ + public static class Result { + private long tripletCount; + private long triangleCount; + + public Result(long tripletCount, long triangleCount) { + this.tripletCount = tripletCount; + this.triangleCount = triangleCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + /** + * Get the number of triangles. + * + * @return number of triangles + */ + public long getNumberOfTriangles() { + return triangleCount; + } + + /** + * Get the global clustering coefficient score. This is computed as the + * number of closed triplets (triangles) divided by the total number of + * triplets. + * + * A score of {@code Double.NaN} is returned for a graph of isolated vertices + * for which both the triangle count and number of neighbors are zero. + * + * @return global clustering coefficient score + */ + public double getLocalClusteringCoefficientScore() { + return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount; + } + + @Override + public String toString() { + return "triplet count: " + tripletCount + ", triangle count:" + triangleCount; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(tripletCount) + .append(triangleCount) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(tripletCount, rhs.tripletCount) + .append(triangleCount, rhs.triangleCount) + .isEquals(); + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index d1618d14e643d..bc62d36a934aa 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -185,7 +185,7 @@ public Result join(Vertex vertexAndDegree, Tuple2 } /** - * Wraps the vertex type to encapsulate results from the Clustering Coefficient algorithm. + * Wraps the vertex type to encapsulate results from the local clustering coefficient algorithm. * * @param ID type */ diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java new file mode 100644 index 0000000000000..9f35c11206f85 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.CountHelper; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.util.AbstractID; + +/** + * Count the number of distinct triangles in an undirected graph. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + * @see TriangleListing + */ +public class TriangleCount & CopyableValue, VV, EV> +extends AbstractGraphAnalytic { + + private String id = new AbstractID().toString(); + + // Optional configuration + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public TriangleCount setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + @Override + public TriangleCount run(Graph input) + throws Exception { + super.run(input); + + DataSet> triangles = input + .run(new TriangleListing() + .setSortTriangleVertices(false) + .setLittleParallelism(littleParallelism)); + + triangles.output(new CountHelper>(id)).name("Count triangles"); + + return this; + } + + @Override + public Long getResult() { + return env.getLastJobExecutionResult(). getAccumulatorResult(id); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 1319d02cd905a..62454330b02be 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -286,11 +286,9 @@ private static final class SortTriangleVertices> @Override public Tuple3 map(Tuple3 value) throws Exception { - T temp_val; - // by the triangle listing algorithm we know f1 < f2 if (value.f0.compareTo(value.f1) > 0) { - temp_val = value.f0; + T temp_val = value.f0; value.f0 = value.f1; if (temp_val.compareTo(value.f2) <= 0) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java new file mode 100644 index 0000000000000..428ce191bf6d8 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.undirected; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; +import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.AbstractID; + +import java.io.IOException; + +/** + * Compute the number of vertices, number of edges, and number of triplets in + * an undirected graph. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class VertexMetrics & CopyableValue, VV, EV> +extends AbstractGraphAnalytic { + + private String id = new AbstractID().toString(); + + // Optional configuration + private boolean includeZeroDegreeVertices = false; + + private boolean reduceOnTargetId = false; + + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * By default only the edge set is processed for the computation of degree. + * When this flag is set an additional join is performed against the vertex + * set in order to output vertices with a degree of zero. + * + * @param includeZeroDegreeVertices whether to output vertices with a + * degree of zero + * @return this + */ + public VertexMetrics setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { + this.includeZeroDegreeVertices = includeZeroDegreeVertices; + + return this; + } + + /** + * The degree can be counted from either the edge source or target IDs. + * By default the source IDs are counted. Reducing on target IDs may + * optimize the algorithm if the input edge list is sorted by target ID. + * + * @param reduceOnTargetId set to {@code true} if the input edge list + * is sorted by target ID + * @return this + */ + public VertexMetrics setReduceOnTargetId(boolean reduceOnTargetId) { + this.reduceOnTargetId = reduceOnTargetId; + + return this; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public VertexMetrics setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public VertexMetrics run(Graph input) + throws Exception { + super.run(input); + + DataSet> vertexDegree = input + .run(new VertexDegree() + .setIncludeZeroDegreeVertices(includeZeroDegreeVertices) + .setReduceOnTargetId(reduceOnTargetId) + .setParallelism(parallelism)); + + vertexDegree.output(new VertexMetricsHelper(id)).name("Vertex metrics"); + + return this; + } + + @Override + public Result getResult() { + JobExecutionResult res = env.getLastJobExecutionResult(); + + long vertexCount = res.getAccumulatorResult(id + "-0"); + long edgeCount = res.getAccumulatorResult(id + "-1"); + long tripletCount = res.getAccumulatorResult(id + "-2"); + + return new Result(vertexCount, edgeCount / 2, tripletCount); + } + + /** + * Helper class to collect vertex metrics. + * + * @param ID type + */ + private static class VertexMetricsHelper + extends RichOutputFormat> { + private final String id; + + private long vertexCount; + private long edgeCount; + private long tripletCount; + + /** + * This helper class collects vertex metrics by scanning over and + * discarding elements from the given DataSet. + * + * The unique id is required because Flink's accumulator namespace is + * among all operators. + * + * @param id unique string used for accumulator names + */ + public VertexMetricsHelper(String id) { + this.id = id; + } + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(int taskNumber, int numTasks) throws IOException {} + + @Override + public void writeRecord(Vertex record) throws IOException { + long degree = record.f1.getValue(); + + vertexCount++; + edgeCount += degree; + tripletCount += degree * (degree - 1) / 2; + } + + @Override + public void close() throws IOException { + getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); + getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); + getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount)); + } + } + + /** + * Wraps vertex metrics. + */ + public static class Result { + private long vertexCount; + private long edgeCount; + private long tripletCount; + + public Result(long vertexCount, long edgeCount, long tripletCount) { + this.vertexCount = vertexCount; + this.edgeCount = edgeCount; + this.tripletCount = tripletCount; + } + + /** + * Get the number of vertices. + * + * @return number of vertices + */ + public long getNumberOfVertices() { + return vertexCount; + } + + /** + * Get the number of edges. + * + * @return number of edges + */ + public long getNumberOfEdges() { + return edgeCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + @Override + public String toString() { + return "vertex count: " + vertexCount + + ", edge count:" + edgeCount + + ", triplet count: " + tripletCount; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(vertexCount) + .append(edgeCount) + .append(tripletCount) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(vertexCount, rhs.vertexCount) + .append(edgeCount, rhs.edgeCount) + .append(tripletCount, rhs.tripletCount) + .isEquals(); + } + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java new file mode 100644 index 0000000000000..71ec2a6c1a4ef --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class GlobalClusteringCoefficientTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + Result expectedResult = new Result(13, 6); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient() + .run(undirectedSimpleGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + + Result expectedResult = new Result(expectedCount, expectedCount); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient() + .run(completeGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + Result expectedResult = new Result(0, 0); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient() + .run(emptyGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } + + @Test + public void testWithRMatGraph() + throws Exception { + Result expectedResult = new Result(1003442, 225147); + + Result globalClusteringCoefficient = new GlobalClusteringCoefficient() + .run(undirectedRMatGraph) + .execute(); + + assertEquals(expectedResult, globalClusteringCoefficient); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java new file mode 100644 index 0000000000000..6bf9b0dcf65fa --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.clustering.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TriangleCountTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + long triangleCount = new TriangleCount() + .run(undirectedSimpleGraph) + .execute(); + + assertEquals(2, triangleCount); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3; + + long triangleCount = new TriangleCount() + .run(completeGraph) + .execute(); + + assertEquals(expectedCount, triangleCount); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + long triangleCount = new TriangleCount() + .run(emptyGraph) + .execute(); + + assertEquals(0, triangleCount); + } + + @Test + public void testWithRMatGraph() + throws Exception { + long triangleCount = new TriangleCount() + .run(undirectedRMatGraph) + .execute(); + + assertEquals(75049, triangleCount); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java new file mode 100644 index 0000000000000..a36ca94f667e3 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class VertexMetricsTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + Result expectedResult = new Result(6, 7, 13); + + Result vertexMetrics = new VertexMetrics() + .run(undirectedSimpleGraph) + .execute(); + + assertEquals(expectedResult, vertexMetrics); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedEdges = completeGraphVertexCount * expectedDegree / 2; + long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + + Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets); + + Result vertexMetrics = new VertexMetrics() + .run(completeGraph) + .execute(); + + assertEquals(expectedResult, vertexMetrics); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + Result expectedResult; + + expectedResult = new Result(0, 0, 0); + + Result withoutZeroDegreeVertices = new VertexMetrics() + .setIncludeZeroDegreeVertices(false) + .run(emptyGraph) + .execute(); + + assertEquals(withoutZeroDegreeVertices, expectedResult); + + expectedResult = new Result(3, 0, 0); + + Result withZeroDegreeVertices = new VertexMetrics() + .setIncludeZeroDegreeVertices(true) + .run(emptyGraph) + .execute(); + + assertEquals(expectedResult, withZeroDegreeVertices); + } + + @Test + public void testWithRMatGraph() + throws Exception { + Result expectedResult = new Result(902, 10442, 1003442); + + Result withoutZeroDegreeVertices = new VertexMetrics() + .run(undirectedRMatGraph) + .execute(); + + assertEquals(expectedResult, withoutZeroDegreeVertices); + } +} From 361e3e13f7367bb5754326fcba6e6816b4b3f9ba Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 19 May 2016 08:16:07 -0400 Subject: [PATCH 2/5] Update gelly docs --- docs/apis/batch/libs/gelly.md | 20 +++++++++++-------- .../LocalClusteringCoefficientTest.java | 8 +++++++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index a2daa0df03e9b..45fdbe581fc80 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1828,7 +1828,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * [GSA PageRank](#gsa-pagerank) * [Single Source Shortest Paths](#single-source-shortest-paths) * [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths) -* [GSA Triangle Count](#gsa-triangle-count) +* [Triangle Count](#triangle-count) * [Triangle Enumerator](#triangle-enumerator) * [Hyperlink-Induced Topic Search](#hyperlink-induced-topic-search) * [Summarization](#summarization) @@ -1998,19 +1998,23 @@ The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-app See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information. -### GSA Triangle Count +### Triangle Count #### Overview -An implementation of the Triangle Count algorithm. Given an input graph, it returns the number of unique triangles in it. +An analytic for counting the number of unique triangles in a graph. #### Details -This algorithm operates in three phases. First, vertices select neighbors with IDs greater than theirs -and send messages to them. Each received message is then propagated to neighbors with higher IDs. -Finally, if a node encounters the target ID in the list of received messages, it increments the number of discovered triangles. +Counts the triangles generated by [Triangle Listing](#triangle-listing). #### Usage -The algorithm takes an undirected, unweighted graph as input and outputs a `DataSet` which contains a single integer corresponding to the number of triangles -in the graph. The algorithm constructor takes no arguments. +The analytic takes an undirected graph as input and returns as a result a `Long` corresponding to the number of triangles +in the graph. The graph ID type must be `Comparable` and `Copyable`. + +### Triangle Listing + +This algorithm supports object reuse. The graph ID type must be `Comparable` and `Copyable`. + +See the [Triangle Enumerator](#triangle-enumerator) library method for implementation details. ### Triangle Enumerator diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java index 414f200001fce..3455df4b98b9f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java @@ -30,6 +30,8 @@ import org.apache.flink.types.NullValue; import org.junit.Test; +import java.util.List; + import static org.junit.Assert.assertEquals; public class LocalClusteringCoefficientTest @@ -61,7 +63,11 @@ public void testCompleteGraph() DataSet> cc = completeGraph .run(new LocalClusteringCoefficient()); - for (Result result : cc.collect()) { + List> results = cc.collect(); + + assertEquals(completeGraphVertexCount, results.size()); + + for (Result result : results) { assertEquals(expectedDegree, result.getDegree().getValue()); assertEquals(expectedTriangleCount, result.getTriangleCount().getValue()); } From 9448ede4f13aff3cbea5ff0cbf198205e65d64c9 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 20 May 2016 15:17:32 -0400 Subject: [PATCH 3/5] Allow setting job name in GraphAnalytic --- .../org/apache/flink/graph/AbstractGraphAnalytic.java | 10 ++++++++++ .../java/org/apache/flink/graph/GraphAnalytic.java | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java index 84099e8ff2790..b13e82e81aa20 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java @@ -49,4 +49,14 @@ public T execute() env.execute(); return getResult(); } + + @Override + public T execute(String jobName) + throws Exception { + Preconditions.checkNotNull(jobName); + Preconditions.checkNotNull(env); + + env.execute(jobName); + return getResult(); + } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java index 23f60c0c5df8e..fe115bfd9fbeb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java @@ -54,6 +54,15 @@ public interface GraphAnalytic { */ T execute() throws Exception; + /** + * Execute the program and return the result. + * + * @param jobName the name to assign to the job + * @return the result + * @throws Exception + */ + T execute(String jobName) throws Exception; + /** * All {@code GraphAnalytic} processing must be terminated by an * {@link OutputFormat}. Rather than obtained via accumulators rather than From 0cb3c5150966b0e51d2d68dc520f565dc00c16dc Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 20 May 2016 17:02:26 -0400 Subject: [PATCH 4/5] Add run(GraphAnalytic) to gelly's scala API --- .../org/apache/flink/graph/scala/Graph.scala | 18 +++++++++++++++++- .../java/org/apache/flink/graph/Graph.java | 7 ++++++- .../org/apache/flink/graph/GraphAnalytic.java | 10 ++++------ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index f31619decccbd..3881aaedacc0b 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -1103,18 +1103,34 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @return a Dataset of Tuple2, with one tuple per vertex. * The first field of the Tuple2 is the vertex ID and the second field * is the aggregate value computed by the provided [[ReduceNeighborsFunction]]. - */ + */ def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)] = { wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0, jtuple.f1)) } + /** + * @param algorithm the algorithm to run on the Graph + * @return the result of the graph algorithm + */ def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T = { jgraph.run(algorithm) } + /** + * A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results + * are retrieved via accumulators. A Flink program has a single point of + * execution. A GraphAnalytic defers execution to the user to allow composing + * multiple analytics and algorithms into a single program. + * + * @param analytic the analytic to run on the Graph + */ + def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T])= { + jgraph.run(analytic) + } + /** * Runs a scatter-gather iteration on the graph. * No configuration options are provided. diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 26def97d4e863..dd25cfde55c1e 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1788,7 +1788,12 @@ public T run(GraphAlgorithm algorithm) throws Exception { } /** - * @param analytic the algorithm to run on the Graph + * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal + * and results are retrieved via accumulators. A Flink program has a single + * point of execution. A {@code GraphAnalytic} defers execution to the user to + * allow composing multiple analytics and algorithms into a single program. + * + * @param analytic the analytic to run on the Graph * @param the result type * @throws Exception */ diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java index fe115bfd9fbeb..dd221dc4ef06d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java @@ -22,12 +22,10 @@ import org.apache.flink.api.java.DataSet; /** - * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is - * terminal and results are retrieved via accumulators. - *
- * A Flink program has a single point of execution. A {@code GraphAnalytic} - * defers execution to the user to allow composing multiple analytics and - * algorithms into a single program. + * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal + * and results are retrieved via accumulators. A Flink program has a single + * point of execution. A {@code GraphAnalytic} defers execution to the user to + * allow composing multiple analytics and algorithms into a single program. * * @param key type * @param vertex value type From 1a6935ade9a75bc54af5bf50b23a00cfe773d156 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 7 Jun 2016 11:09:03 -0400 Subject: [PATCH 5/5] rebase with master --- .../clustering/undirected/GlobalClusteringCoefficient.java | 5 +++-- .../graph/library/clustering/undirected/TriangleCount.java | 5 +++-- .../flink/graph/library/metric/undirected/VertexMetrics.java | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java index c92b4cb58b5d3..fc89e43ca14a1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java @@ -20,13 +20,14 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Graph; import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result; import org.apache.flink.graph.library.metric.undirected.VertexMetrics; import org.apache.flink.types.CopyableValue; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * The global clustering coefficient measures the connectedness of a graph. * Scores range from 0.0 (no triangles) to 1.0 (complete graph). @@ -43,7 +44,7 @@ public class GlobalClusteringCoefficient & CopyableValue private VertexMetrics vertexMetrics; // Optional configuration - private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + private int littleParallelism = PARALLELISM_DEFAULT; /** * Override the parallelism of operators processing small amounts of data. diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java index 9f35c11206f85..bc43725e234a8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java @@ -18,7 +18,6 @@ package org.apache.flink.graph.library.clustering.undirected; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils.CountHelper; import org.apache.flink.api.java.tuple.Tuple3; @@ -27,6 +26,8 @@ import org.apache.flink.types.CopyableValue; import org.apache.flink.util.AbstractID; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * Count the number of distinct triangles in an undirected graph. * @@ -41,7 +42,7 @@ public class TriangleCount & CopyableValue, VV, EV> private String id = new AbstractID().toString(); // Optional configuration - private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + private int littleParallelism = PARALLELISM_DEFAULT; /** * Override the parallelism of operators processing small amounts of data. diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java index 428ce191bf6d8..41ae27a601295 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -20,7 +20,6 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.io.RichOutputFormat; @@ -37,6 +36,8 @@ import java.io.IOException; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + /** * Compute the number of vertices, number of edges, and number of triplets in * an undirected graph. @@ -55,7 +56,7 @@ public class VertexMetrics & CopyableValue, VV, EV> private boolean reduceOnTargetId = false; - private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + private int parallelism = PARALLELISM_DEFAULT; /** * By default only the edge set is processed for the computation of degree.