From 00fee7f5655f0741622a38f13e3347c1714b6934 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Wed, 20 Jul 2016 15:44:54 -0400 Subject: [PATCH] [FLINK-4264] [gelly] New GraphMetrics driver Updates VertexMetrics analytic, adds directed and undirected EdgeMetric analytics, and includes a new GraphMetrics driver. --- .../flink/graph/driver/GraphMetrics.java | 232 ++++++++ .../graph/examples/ClusteringCoefficient.java | 14 +- .../org/apache/flink/graph/examples/HITS.java | 2 +- .../flink/graph/examples/JaccardIndex.java | 2 +- .../flink/graph/examples/TriangleListing.java | 2 +- .../annotate/directed/VertexDegrees.java | 2 +- .../directed/LocalClusteringCoefficient.java | 32 +- .../LocalClusteringCoefficient.java | 31 +- .../library/metric/directed/EdgeMetrics.java | 507 ++++++++++++++++++ .../metric/directed/VertexMetrics.java | 102 +++- .../metric/undirected/EdgeMetrics.java | 445 +++++++++++++++ .../metric/undirected/VertexMetrics.java | 63 ++- .../metric/directed/EdgeMetricsTest.java | 90 ++++ .../metric/directed/VertexMetricsTest.java | 19 +- .../metric/undirected/EdgeMetricsTest.java | 89 +++ .../metric/undirected/VertexMetricsTest.java | 14 +- 16 files changed, 1600 insertions(+), 46 deletions(-) create mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java new file mode 100644 index 0000000000000..cc265bba82fbe --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.driver; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.text.WordUtils; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAnalytic; +import org.apache.flink.graph.GraphCsvReader; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.types.StringValue; + +import java.text.NumberFormat; + +/** + * Computes vertex and edge metrics on a directed or undirected graph. + * + * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics + * @see org.apache.flink.graph.library.metric.directed.VertexMetrics + * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics + * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics + */ +public class GraphMetrics { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + private static void printUsage() { + System.out.println(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80)); + System.out.println(); + System.out.println("usage: GraphMetrics --directed --input "); + System.out.println(); + System.out.println("options:"); + System.out.println(" --input csv --type [--simplify ] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); + System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]"); + } + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + if (! parameters.has("directed")) { + printUsage(); + return; + } + boolean directedAlgorithm = parameters.getBoolean("directed"); + + GraphAnalytic vm; + GraphAnalytic em; + + switch (parameters.get("input", "")) { + case "csv": { + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + + GraphCsvReader reader = Graph + .fromCsvReader(parameters.get("input_filename"), env) + .ignoreCommentsEdges("#") + .lineDelimiterEdges(lineDelimiter) + .fieldDelimiterEdges(fieldDelimiter); + + switch (parameters.get("type", "")) { + case "integer": { + Graph graph = reader + .keyType(LongValue.class); + + if (directedAlgorithm) { + if (parameters.getBoolean("simplify", false)) { + graph = graph + .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); + } + + vm = graph + .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); + em = graph + .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); + } else { + if (parameters.getBoolean("simplify", false)) { + graph = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false)); + } + + vm = graph + .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); + em = graph + .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); + } + } break; + + case "string": { + Graph graph = reader + .keyType(StringValue.class); + + if (directedAlgorithm) { + if (parameters.getBoolean("simplify", false)) { + graph = graph + .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); + } + + vm = graph + .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); + em = graph + .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); + } else { + if (parameters.getBoolean("simplify", false)) { + graph = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(false)); + } + + vm = graph + .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); + em = graph + .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); + } + } break; + + default: + printUsage(); + return; + } + } break; + + case "rmat": { + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate(); + + if (directedAlgorithm) { + if (scale > 32) { + Graph newGraph = graph + .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); + + vm = newGraph + .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); + em = newGraph + .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); + } else { + Graph newGraph = graph + .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new org.apache.flink.graph.asm.simple.directed.Simplify()); + + vm = newGraph + .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics()); + em = newGraph + .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics()); + } + } else { + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + if (scale > 32) { + Graph newGraph = graph + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); + + vm = newGraph + .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); + em = newGraph + .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); + } else { + Graph newGraph = graph + .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new org.apache.flink.graph.asm.simple.undirected.Simplify(clipAndFlip)); + + vm = newGraph + .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics()); + em = newGraph + .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics()); + } + } + } break; + + default: + printUsage(); + return; + } + + env.execute("Graph Metrics"); + + System.out.print("Vertex metrics:\n "); + System.out.println(vm.getResult().toString().replace(";", "\n ")); + System.out.print("\nEdge metrics:\n "); + System.out.println(em.getResult().toString().replace(";", "\n ")); + + JobExecutionResult result = env.getLastJobExecutionResult(); + + NumberFormat nf = NumberFormat.getInstance(); + System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + } +} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java index 8641428fa1d0d..e099e2b203d32 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java @@ -72,7 +72,7 @@ private static void printUsage() { System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + " the vertex, and the number of edges between vertex neighbors.", 80)); System.out.println(); - System.out.println("usage: ClusteringCoefficient --directed --input --output --input --output "); System.out.println(); System.out.println("options:"); System.out.println(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); @@ -174,7 +174,8 @@ public static void main(String[] args) throws Exception { gcc = newGraph .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient()); lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() + .setIncludeZeroDegreeVertices(false)); } else { Graph newGraph = graph .run(new TranslateGraphIds(new LongValueToIntValue())) @@ -183,7 +184,8 @@ public static void main(String[] args) throws Exception { gcc = newGraph .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient()); lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient() + .setIncludeZeroDegreeVertices(false)); } } else { boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); @@ -195,7 +197,8 @@ public static void main(String[] args) throws Exception { gcc = newGraph .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient()); lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() + .setIncludeZeroDegreeVertices(false)); } else { Graph newGraph = graph .run(new TranslateGraphIds(new LongValueToIntValue())) @@ -204,7 +207,8 @@ public static void main(String[] args) throws Exception { gcc = newGraph .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient()); lcc = newGraph - .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient()); + .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient() + .setIncludeZeroDegreeVertices(false)); } } } break; diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java index c772a3a9f3b5d..59612d9c2ace5 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java @@ -65,7 +65,7 @@ private static void printUsage() { " scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" + " and good \"authorities\" are linked from good \"hubs\".", 80)); System.out.println(); - System.out.println("usage: HITS --input --output --output "); System.out.println(); System.out.println("options:"); System.out.println(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 2158fa2e534c3..824aab776e5f2 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -69,7 +69,7 @@ private static void printUsage() { System.out.println(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + " number of shared neighbors, and the number of distinct neighbors.", 80)); System.out.println(); - System.out.println("usage: JaccardIndex --input --output --output "); System.out.println(); System.out.println("options:"); System.out.println(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java index cd06dde38af0a..f3ce708f66c35 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java @@ -66,7 +66,7 @@ private static void printUsage() { System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" + " for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80)); System.out.println(); - System.out.println("usage: TriangleListing --directed --input --output --input --output "); System.out.println(); System.out.println("options:"); System.out.println(" --input csv --type --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 9fef221b17b2a..84873bc9c126a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -86,7 +86,7 @@ public VertexDegrees setParallelism(int parallelism) { @Override protected String getAlgorithmName() { - return VertexOutDegree.class.getName(); + return VertexDegrees.class.getName(); } @Override diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index e0defcd585de1..22c8b41d940da 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -32,6 +32,7 @@ import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -59,8 +60,25 @@ public class LocalClusteringCoefficient & CopyableValue< extends GraphAlgorithmDelegatingDataSet> { // Optional configuration + private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true); + private int littleParallelism = PARALLELISM_DEFAULT; + /** + * By default the vertex set is checked for zero degree vertices. When this + * flag is disabled only clustering coefficient scores for vertices with + * a degree of a least one will be produced. + * + * @param includeZeroDegreeVertices whether to output scores for vertices + * with a degree of zero + * @return this + */ + public LocalClusteringCoefficient setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { + this.includeZeroDegreeVertices.set(includeZeroDegreeVertices); + + return this; + } + /** * Override the parallelism of operators processing small amounts of data. * @@ -90,6 +108,16 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other; + // verify that configurations can be merged + + if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) { + return false; + } + + // merge configurations + + includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); + littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); return true; @@ -128,8 +156,8 @@ public DataSet> runInternal(Graph input) // u, deg(u) DataSet> vertexDegree = input .run(new VertexDegrees() - .setParallelism(littleParallelism) - .setIncludeZeroDegreeVertices(true)); + .setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get()) + .setParallelism(littleParallelism)); // u, deg(u), triangle count return vertexDegree diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index cd859d965406d..4b4bf07866ff7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -32,6 +32,7 @@ import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -59,8 +60,25 @@ public class LocalClusteringCoefficient & CopyableValue< extends GraphAlgorithmDelegatingDataSet> { // Optional configuration + private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true); + private int littleParallelism = PARALLELISM_DEFAULT; + /** + * By default the vertex set is checked for zero degree vertices. When this + * flag is disabled only clustering coefficient scores for vertices with + * a degree of a least one will be produced. + * + * @param includeZeroDegreeVertices whether to output scores for vertices + * with a degree of zero + * @return this + */ + public LocalClusteringCoefficient setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) { + this.includeZeroDegreeVertices.set(includeZeroDegreeVertices); + + return this; + } + /** * Override the parallelism of operators processing small amounts of data. * @@ -91,6 +109,15 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other; + // verify that configurations can be merged + + if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) { + return false; + } + + // merge configurations + + includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices); littleParallelism = Math.min(littleParallelism, rhs.littleParallelism); return true; @@ -129,8 +156,8 @@ public DataSet> runInternal(Graph input) // u, deg(u) DataSet> vertexDegree = input .run(new VertexDegree() - .setParallelism(littleParallelism) - .setIncludeZeroDegreeVertices(true)); + .setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get()) + .setParallelism(littleParallelism)); // u, deg(u), triangle count return vertexDegree diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java new file mode 100644 index 0000000000000..167e31c1882dd --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.directed; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.accumulators.LongMaximum; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair; +import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; +import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.text.NumberFormat; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Compute the following edge metrics in a directed graph: + * - number of vertices + * - number of edges + * - number of triangle triplets + * - number of rectangle triplets + * - number of triplets + * - maximum degree + * - maximum out degree + * - maximum in degree + * - maximum number of triangle triplets + * - maximum number of rectangle triplets + * - maximum number of triplets + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeMetrics & CopyableValue, VV, EV> +extends AbstractGraphAnalytic { + + private String id = new AbstractID().toString(); + + private int parallelism = PARALLELISM_DEFAULT; + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public EdgeMetrics setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + /* + * Implementation notes: + * + * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use + * a hash-combineable hashable-reduce. + * + * Use distinct to replace ReduceEdgeStats when the combiner can be disabled + * with a sorted-reduce forced. + */ + + @Override + public EdgeMetrics run(Graph input) + throws Exception { + super.run(input); + + // s, t, (d(s), d(t)) + DataSet>> edgeDegreesPair = input + .run(new EdgeDegreesPair() + .setParallelism(parallelism)); + + // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) + DataSet> edgeStats = edgeDegreesPair + .flatMap(new EdgeStats()) + .setParallelism(parallelism) + .name("Edge stats") + .groupBy(0, 1) + .reduceGroup(new ReduceEdgeStats()) + .setParallelism(parallelism) + .name("Reduce edge stats") + .groupBy(0) + .reduce(new SumEdgeStats()) + .setCombineHint(CombineHint.HASH) + .setParallelism(parallelism) + .name("Sum edge stats"); + + edgeStats + .output(new EdgeMetricsHelper(id)) + .setParallelism(parallelism) + .name("Edge metrics"); + + return this; + } + + @Override + public Result getResult() { + JobExecutionResult res = env.getLastJobExecutionResult(); + + long vertexCount = res.getAccumulatorResult(id + "-0"); + long edgeCount = res.getAccumulatorResult(id + "-1"); + long triangleTripletCount = res.getAccumulatorResult(id + "-2"); + long rectangleTripletCount = res.getAccumulatorResult(id + "-3"); + long tripletCount = res.getAccumulatorResult(id + "-4"); + long maximumDegree = res.getAccumulatorResult(id + "-5"); + long maximumOutDegree = res.getAccumulatorResult(id + "-6"); + long maximumInDegree = res.getAccumulatorResult(id + "-7"); + long maximumTriangleTriplets = res.getAccumulatorResult(id + "-8"); + long maximumRectangleTriplets = res.getAccumulatorResult(id + "-9"); + long maximumTriplets = res.getAccumulatorResult(id + "-a"); + + return new Result(vertexCount, edgeCount, triangleTripletCount, rectangleTripletCount, tripletCount, + maximumDegree, maximumOutDegree, maximumInDegree, + maximumTriangleTriplets, maximumRectangleTriplets, maximumTriplets); + } + + /** + * Produces a pair of tuples. The first tuple contains the source vertex ID, + * the target vertex ID, the source degrees, and the low-order count. The + * second tuple is the same with the source and target roles reversed. + * + * The low-order count is one if the source vertex degree is less than the + * target vertex degree or if the degrees are equal and the source vertex + * ID compares lower than the target vertex ID; otherwise the low-order + * count is zero. + * + * @param ID type + * @param edge value type + */ + private static final class EdgeStats, ET> + implements FlatMapFunction>, Tuple4> { + private LongValue zero = new LongValue(0); + + private LongValue one = new LongValue(1); + + private Tuple4 output = new Tuple4<>(); + + @Override + public void flatMap(Edge> edge, Collector> out) + throws Exception { + Tuple3 degrees = edge.f2; + long sourceDegree = degrees.f1.getDegree().getValue(); + long targetDegree = degrees.f2.getDegree().getValue(); + + boolean ordered = (sourceDegree < targetDegree + || (sourceDegree == targetDegree && edge.f0.compareTo(edge.f1) < 0)); + + output.f0 = edge.f0; + output.f1 = edge.f1; + output.f2 = edge.f2.f1; + output.f3 = ordered ? one : zero; + out.collect(output); + + output.f0 = edge.f1; + output.f1 = edge.f0; + output.f2 = edge.f2.f2; + output.f3 = ordered ? zero : one; + out.collect(output); + } + } + + /** + * Produces a distinct value for each edge. + * + * @param ID type + */ + @ForwardedFields("0") + private static final class ReduceEdgeStats + implements GroupReduceFunction, Tuple3> { + Tuple3 output = new Tuple3<>(); + + @Override + public void reduce(Iterable> values, Collector> out) + throws Exception { + Tuple4 value = values.iterator().next(); + + output.f0 = value.f0; + output.f1 = value.f2; + output.f2 = value.f3; + + out.collect(output); + } + } + + /** + * Sums the low-order counts. + * + * @param ID type + */ + private static class SumEdgeStats + implements ReduceFunction> { + @Override + public Tuple3 reduce(Tuple3 value1, Tuple3 value2) + throws Exception { + value1.f2.setValue(value1.f2.getValue() + value2.f2.getValue()); + return value1; + } + } + + /** + * Helper class to collect edge metrics. + * + * @param ID type + */ + private static class EdgeMetricsHelper, ET> + extends RichOutputFormat> { + private final String id; + + private long vertexCount; + private long edgeCount; + private long triangleTripletCount; + private long rectangleTripletCount; + private long tripletCount; + private long maximumDegree; + private long maximumOutDegree; + private long maximumInDegree; + private long maximumTriangleTriplets; + private long maximumRectangleTriplets; + private long maximumTriplets; + + /** + * This helper class collects edge metrics by scanning over and + * discarding elements from the given DataSet. + * + * The unique id is required because Flink's accumulator namespace is + * among all operators. + * + * @param id unique string used for accumulator names + */ + public EdgeMetricsHelper(String id) { + this.id = id; + } + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(int taskNumber, int numTasks) throws IOException {} + + @Override + public void writeRecord(Tuple3 record) throws IOException { + Degrees degrees = record.f1; + long degree = degrees.getDegree().getValue(); + long outDegree = degrees.getOutDegree().getValue(); + long inDegree = degrees.getInDegree().getValue(); + + long lowDegree = record.f2.getValue(); + long highDegree = degree - lowDegree; + + long triangleTriplets = lowDegree * (lowDegree - 1) / 2; + long rectangleTriplets = triangleTriplets + lowDegree * highDegree; + long triplets = degree * (degree - 1) / 2; + + vertexCount++; + edgeCount += outDegree; + triangleTripletCount += triangleTriplets; + rectangleTripletCount += rectangleTriplets; + tripletCount += triplets; + maximumDegree = Math.max(maximumDegree, degree); + maximumOutDegree = Math.max(maximumOutDegree, outDegree); + maximumInDegree = Math.max(maximumInDegree, inDegree); + maximumTriangleTriplets = Math.max(maximumTriangleTriplets, triangleTriplets); + maximumRectangleTriplets = Math.max(maximumRectangleTriplets, rectangleTriplets); + maximumTriplets = Math.max(maximumTriplets, triplets); + } + + @Override + public void close() throws IOException { + getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); + getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); + getRuntimeContext().addAccumulator(id + "-2", new LongCounter(triangleTripletCount)); + getRuntimeContext().addAccumulator(id + "-3", new LongCounter(rectangleTripletCount)); + getRuntimeContext().addAccumulator(id + "-4", new LongCounter(tripletCount)); + getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumDegree)); + getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumOutDegree)); + getRuntimeContext().addAccumulator(id + "-7", new LongMaximum(maximumInDegree)); + getRuntimeContext().addAccumulator(id + "-8", new LongMaximum(maximumTriangleTriplets)); + getRuntimeContext().addAccumulator(id + "-9", new LongMaximum(maximumRectangleTriplets)); + getRuntimeContext().addAccumulator(id + "-a", new LongMaximum(maximumTriplets)); + } + } + + /** + * Wraps edge metrics. + */ + public static class Result { + private long vertexCount; + private long edgeCount; + private long triangleTripletCount; + private long rectangleTripletCount; + private long tripletCount; + private long maximumDegree; + private long maximumOutDegree; + private long maximumInDegree; + private long maximumTriangleTriplets; + private long maximumRectangleTriplets; + private long maximumTriplets; + + public Result(long vertexCount, long edgeCount, long triangleTripletCount, long rectangleTripletCount, long tripletCount, + long maximumDegree, long maximumOutDegree, long maximumInDegree, + long maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) { + this.vertexCount = vertexCount; + this.edgeCount = edgeCount; + this.triangleTripletCount = triangleTripletCount; + this.rectangleTripletCount = rectangleTripletCount; + this.tripletCount = tripletCount; + this.maximumDegree = maximumDegree; + this.maximumOutDegree = maximumOutDegree; + this.maximumInDegree = maximumInDegree; + this.maximumTriangleTriplets = maximumTriangleTriplets; + this.maximumRectangleTriplets = maximumRectangleTriplets; + this.maximumTriplets = maximumTriplets; + } + + /** + * Get the number of vertices. + * + * @return number of vertices + */ + public long getNumberOfVertices() { + return vertexCount; + } + + /** + * Get the number of edges. + * + * @return number of edges + */ + public long getNumberOfEdges() { + return edgeCount; + } + + /** + * Get the number of triangle triplets. + * + * @return number of triangle triplets + */ + public long getNumberOfTriangleTriplets() { + return triangleTripletCount; + } + + /** + * Get the number of rectangle triplets. + * + * @return number of rectangle triplets + */ + public long getNumberOfRectangleTriplets() { + return rectangleTripletCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + /** + * Get the maximum degree. + * + * @return maximum degree + */ + public long getMaximumDegree() { + return maximumDegree; + } + + /** + * Get the maximum out degree. + * + * @return maximum out degree + */ + public long getMaximumOutDegree() { + return maximumOutDegree; + } + + /** + * Get the maximum in degree. + * + * @return maximum in degree + */ + public long getMaximumInDegree() { + return maximumInDegree; + } + + /** + * Get the maximum triangle triplets. + * + * @return maximum triangle triplets + */ + public long getMaximumTriangleTriplets() { + return maximumTriangleTriplets; + } + + /** + * Get the maximum rectangle triplets. + * + * @return maximum rectangle triplets + */ + public long getMaximumRectangleTriplets() { + return maximumRectangleTriplets; + } + + /** + * Get the maximum triplets. + * + * @return maximum triplets + */ + public long getMaximumTriplets() { + return maximumTriplets; + } + + @Override + public String toString() { + NumberFormat nf = NumberFormat.getInstance(); + + return "vertex count: " + nf.format(vertexCount) + + "; edge count: " + nf.format(edgeCount) + + "; triangle triplet count: " + nf.format(triangleTripletCount) + + "; rectangle triplet count: " + nf.format(rectangleTripletCount) + + "; triplet count: " + nf.format(tripletCount) + + "; maximum degree: " + nf.format(maximumDegree) + + "; maximum out degree: " + nf.format(maximumOutDegree) + + "; maximum in degree: " + nf.format(maximumInDegree) + + "; maximum triangle triplets: " + nf.format(maximumTriangleTriplets) + + "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets) + + "; maximum triplets: " + nf.format(maximumTriplets); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(vertexCount) + .append(edgeCount) + .append(triangleTripletCount) + .append(rectangleTripletCount) + .append(tripletCount) + .append(maximumDegree) + .append(maximumOutDegree) + .append(maximumInDegree) + .append(maximumTriangleTriplets) + .append(maximumRectangleTriplets) + .append(maximumTriplets) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(vertexCount, rhs.vertexCount) + .append(edgeCount, rhs.edgeCount) + .append(triangleTripletCount, rhs.triangleTripletCount) + .append(rectangleTripletCount, rhs.rectangleTripletCount) + .append(tripletCount, rhs.tripletCount) + .append(maximumDegree, rhs.maximumDegree) + .append(maximumOutDegree, rhs.maximumOutDegree) + .append(maximumInDegree, rhs.maximumInDegree) + .append(maximumTriangleTriplets, rhs.maximumTriangleTriplets) + .append(maximumRectangleTriplets, rhs.maximumRectangleTriplets) + .append(maximumTriplets, rhs.maximumTriplets) + .isEquals(); + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java index 434bd286f3b8f..22f7733909706 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.accumulators.LongMaximum; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.configuration.Configuration; @@ -35,12 +36,19 @@ import org.apache.flink.util.AbstractID; import java.io.IOException; +import java.text.NumberFormat; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * Compute the number of vertices, number of edges, and number of triplets in - * a directed graph. + * Compute the following vertex metrics in a directed graph: + * - number of vertices + * - number of edges + * - number of triplets + * - maximum degree + * - maximum out degree + * - maximum in degree + * - maximum number of triplets * * @param graph ID type * @param vertex value type @@ -107,8 +115,12 @@ public Result getResult() { long vertexCount = res.getAccumulatorResult(id + "-0"); long edgeCount = res.getAccumulatorResult(id + "-1"); long tripletCount = res.getAccumulatorResult(id + "-2"); + long maximumDegree = res.getAccumulatorResult(id + "-3"); + long maximumOutDegree = res.getAccumulatorResult(id + "-4"); + long maximumInDegree = res.getAccumulatorResult(id + "-5"); + long maximumTriplets = res.getAccumulatorResult(id + "-6"); - return new Result(vertexCount, edgeCount / 2, tripletCount); + return new Result(vertexCount, edgeCount, tripletCount, maximumDegree, maximumOutDegree, maximumInDegree, maximumTriplets); } /** @@ -123,13 +135,17 @@ private static class VertexMetricsHelper private long vertexCount; private long edgeCount; private long tripletCount; + private long maximumDegree; + private long maximumOutDegree; + private long maximumInDegree; + private long maximumTriplets; /** * This helper class collects vertex metrics by scanning over and * discarding elements from the given DataSet. * * The unique id is required because Flink's accumulator namespace is - * among all operators. + * shared among all operators. * * @param id unique string used for accumulator names */ @@ -147,10 +163,16 @@ public void open(int taskNumber, int numTasks) throws IOException {} public void writeRecord(Vertex record) throws IOException { long degree = record.f1.getDegree().getValue(); long outDegree = record.f1.getOutDegree().getValue(); + long inDegree = record.f1.getInDegree().getValue(); + long triplets = degree * (degree - 1) / 2; vertexCount++; edgeCount += outDegree; - tripletCount += degree * (degree - 1) / 2; + tripletCount += triplets; + maximumDegree = Math.max(maximumDegree, degree); + maximumOutDegree = Math.max(maximumOutDegree, outDegree); + maximumInDegree = Math.max(maximumInDegree, inDegree); + maximumTriplets = Math.max(maximumTriplets, triplets); } @Override @@ -158,6 +180,10 @@ public void close() throws IOException { getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount)); + getRuntimeContext().addAccumulator(id + "-3", new LongMaximum(maximumDegree)); + getRuntimeContext().addAccumulator(id + "-4", new LongMaximum(maximumOutDegree)); + getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumInDegree)); + getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumTriplets)); } } @@ -168,11 +194,19 @@ public static class Result { private long vertexCount; private long edgeCount; private long tripletCount; + private long maximumDegree; + private long maximumOutDegree; + private long maximumInDegree; + private long maximumTriplets; - public Result(long vertexCount, long edgeCount, long tripletCount) { + public Result(long vertexCount, long edgeCount, long tripletCount, long maximumDegree, long maximumOutDegree, long maximumInDegree, long maximumTriplets) { this.vertexCount = vertexCount; this.edgeCount = edgeCount; this.tripletCount = tripletCount; + this.maximumDegree = maximumDegree; + this.maximumOutDegree = maximumOutDegree; + this.maximumInDegree = maximumInDegree; + this.maximumTriplets = maximumTriplets; } /** @@ -202,11 +236,53 @@ public long getNumberOfTriplets() { return tripletCount; } + /** + * Get the maximum degree. + * + * @return maximum degree + */ + public long getMaximumDegree() { + return maximumDegree; + } + + /** + * Get the maximum out degree. + * + * @return maximum out degree + */ + public long getMaximumOutDegree() { + return maximumOutDegree; + } + + /** + * Get the maximum in degree. + * + * @return maximum in degree + */ + public long getMaximumInDegree() { + return maximumInDegree; + } + + /** + * Get the maximum triplets. + * + * @return maximum triplets + */ + public long getMaximumTriplets() { + return maximumTriplets; + } + @Override public String toString() { - return "vertex count: " + vertexCount - + ", edge count:" + edgeCount - + ", triplet count: " + tripletCount; + NumberFormat nf = NumberFormat.getInstance(); + + return "vertex count: " + nf.format(vertexCount) + + "; edge count: " + nf.format(edgeCount) + + "; triplet count: " + nf.format(tripletCount) + + "; maximum degree: " + nf.format(maximumDegree) + + "; maximum out degree: " + nf.format(maximumOutDegree) + + "; maximum in degree: " + nf.format(maximumInDegree) + + "; maximum triplets: " + nf.format(maximumTriplets); } @Override @@ -215,6 +291,10 @@ public int hashCode() { .append(vertexCount) .append(edgeCount) .append(tripletCount) + .append(maximumDegree) + .append(maximumOutDegree) + .append(maximumInDegree) + .append(maximumTriplets) .hashCode(); } @@ -230,6 +310,10 @@ public boolean equals(Object obj) { .append(vertexCount, rhs.vertexCount) .append(edgeCount, rhs.edgeCount) .append(tripletCount, rhs.tripletCount) + .append(maximumDegree, rhs.maximumDegree) + .append(maximumOutDegree, rhs.maximumOutDegree) + .append(maximumInDegree, rhs.maximumInDegree) + .append(maximumTriplets, rhs.maximumTriplets) .isEquals(); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java new file mode 100644 index 0000000000000..1d5b66402337e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.undirected; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.accumulators.LongMaximum; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.AbstractGraphAnalytic; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair; +import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.AbstractID; + +import java.io.IOException; +import java.text.NumberFormat; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * Compute the following edge metrics in an undirected graph: + * - number of vertices + * - number of edges + * - number of triangle triplets + * - number of rectangle triplets + * - number of triplets + * - maximum degree + * - maximum number of triangle triplets + * - maximum number of rectangle triplets + * - maximum number of triplets + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class EdgeMetrics & CopyableValue, VV, EV> +extends AbstractGraphAnalytic { + + private String id = new AbstractID().toString(); + + // Optional configuration + private boolean reduceOnTargetId = false; + + private int parallelism = PARALLELISM_DEFAULT; + + /** + * The degree can be counted from either the edge source or target IDs. + * By default the source IDs are counted. Reducing on target IDs may + * optimize the algorithm if the input edge list is sorted by target ID. + * + * @param reduceOnTargetId set to {@code true} if the input edge list + * is sorted by target ID + * @return this + */ + public EdgeMetrics setReduceOnTargetId(boolean reduceOnTargetId) { + this.reduceOnTargetId = reduceOnTargetId; + + return this; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public EdgeMetrics setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + /* + * Implementation notes: + * + * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use + * a hash-combineable hashed-reduce. + */ + + @Override + public EdgeMetrics run(Graph input) + throws Exception { + super.run(input); + + // s, t, (d(s), d(t)) + DataSet>> edgeDegreePair = input + .run(new EdgeDegreePair() + .setReduceOnTargetId(reduceOnTargetId) + .setParallelism(parallelism)); + + // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v) + DataSet> edgeStats = edgeDegreePair + .map(new EdgeStats()) + .setParallelism(parallelism) + .name("Edge stats") + .groupBy(0) + .reduce(new SumEdgeStats()) + .setCombineHint(CombineHint.HASH) + .setParallelism(parallelism) + .name("Sum edge stats"); + + edgeStats + .output(new EdgeMetricsHelper(id)) + .setParallelism(parallelism) + .name("Edge metrics"); + + return this; + } + + @Override + public Result getResult() { + JobExecutionResult res = env.getLastJobExecutionResult(); + + long vertexCount = res.getAccumulatorResult(id + "-0"); + long edgeCount = res.getAccumulatorResult(id + "-1"); + long triangleTripletCount = res.getAccumulatorResult(id + "-2"); + long rectangleTripletCount = res.getAccumulatorResult(id + "-3"); + long tripletCount = res.getAccumulatorResult(id + "-4"); + long maximumDegree = res.getAccumulatorResult(id + "-5"); + long maximumTriangleTriplets = res.getAccumulatorResult(id + "-6"); + long maximumRectangleTriplets = res.getAccumulatorResult(id + "-7"); + long maximumTriplets = res.getAccumulatorResult(id + "-8"); + + return new Result(vertexCount, edgeCount / 2, triangleTripletCount, rectangleTripletCount, tripletCount, + maximumDegree, maximumTriangleTriplets, maximumRectangleTriplets, maximumTriplets); + } + + /** + * Evaluates each edge and emits a tuple containing the source vertex ID, + * the source vertex degree, and a value of zero or one indicating the + * low-order count. The low-order count is one if the source vertex degree + * is less than the target vertex degree or if the degrees are equal and + * the source vertex ID compares lower than the target vertex ID; otherwise + * the low-order count is zero. + * + * @param ID type + * @param edge value type + */ + @FunctionAnnotation.ForwardedFields("0; 2.1->1") + private static class EdgeStats, ET> + implements MapFunction>, Tuple3> { + private LongValue zero = new LongValue(0); + + private LongValue one = new LongValue(1); + + private Tuple3 output = new Tuple3<>(); + + @Override + public Tuple3 map(Edge> edge) + throws Exception { + Tuple3 degrees = edge.f2; + + output.f0 = edge.f0; + output.f1 = degrees.f1; + + long sourceDegree = degrees.f1.getValue(); + long targetDegree = degrees.f2.getValue(); + + if (sourceDegree < targetDegree || + (sourceDegree == targetDegree && edge.f0.compareTo(edge.f1) < 0)) { + output.f2 = one; + } else { + output.f2 = zero; + } + + return output; + } + } + + /** + * Sums the low-order counts. + * + * @param ID type + */ + private static class SumEdgeStats + implements ReduceFunction> { + @Override + public Tuple3 reduce(Tuple3 value1, Tuple3 value2) + throws Exception { + value1.f2.setValue(value1.f2.getValue() + value2.f2.getValue()); + return value1; + } + } + + /** + * Helper class to collect edge metrics. + * + * @param ID type + */ + private static class EdgeMetricsHelper, ET> + extends RichOutputFormat> { + private final String id; + + private long vertexCount; + private long edgeCount; + private long triangleTripletCount; + private long rectangleTripletCount; + private long tripletCount; + private long maximumDegree; + private long maximumTriangleTriplets; + private long maximumRectangleTriplets; + private long maximumTriplets; + + /** + * This helper class collects edge metrics by scanning over and + * discarding elements from the given DataSet. + * + * The unique id is required because Flink's accumulator namespace is + * among all operators. + * + * @param id unique string used for accumulator names + */ + public EdgeMetricsHelper(String id) { + this.id = id; + } + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(int taskNumber, int numTasks) throws IOException {} + + @Override + public void writeRecord(Tuple3 record) throws IOException { + long degree = record.f1.getValue(); + long lowDegree = record.f2.getValue(); + long highDegree = degree - lowDegree; + + long triangleTriplets = lowDegree * (lowDegree - 1) / 2; + long rectangleTriplets = triangleTriplets + lowDegree * highDegree; + long triplets = degree * (degree - 1) / 2; + + vertexCount++; + edgeCount += degree; + triangleTripletCount += triangleTriplets; + rectangleTripletCount += rectangleTriplets; + tripletCount += triplets; + maximumDegree = Math.max(maximumDegree, degree); + maximumTriangleTriplets = Math.max(maximumTriangleTriplets, triangleTriplets); + maximumRectangleTriplets = Math.max(maximumRectangleTriplets, rectangleTriplets); + maximumTriplets = Math.max(maximumTriplets, triplets); + } + + @Override + public void close() throws IOException { + getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); + getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); + getRuntimeContext().addAccumulator(id + "-2", new LongCounter(triangleTripletCount)); + getRuntimeContext().addAccumulator(id + "-3", new LongCounter(rectangleTripletCount)); + getRuntimeContext().addAccumulator(id + "-4", new LongCounter(tripletCount)); + getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumDegree)); + getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumTriangleTriplets)); + getRuntimeContext().addAccumulator(id + "-7", new LongMaximum(maximumRectangleTriplets)); + getRuntimeContext().addAccumulator(id + "-8", new LongMaximum(maximumTriplets)); + } + } + + /** + * Wraps edge metrics. + */ + public static class Result { + private long vertexCount; + private long edgeCount; + private long triangleTripletCount; + private long rectangleTripletCount; + private long tripletCount; + private long maximumDegree; + private long maximumTriangleTriplets; + private long maximumRectangleTriplets; + private long maximumTriplets; + + public Result(long vertexCount, long edgeCount, long triangleTripletCount, long rectangleTripletCount, long tripletCount, + long maximumDegree, long maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) { + this.vertexCount = vertexCount; + this.edgeCount = edgeCount; + this.triangleTripletCount = triangleTripletCount; + this.rectangleTripletCount = rectangleTripletCount; + this.tripletCount = tripletCount; + this.maximumDegree = maximumDegree; + this.maximumTriangleTriplets = maximumTriangleTriplets; + this.maximumRectangleTriplets = maximumRectangleTriplets; + this.maximumTriplets = maximumTriplets; + } + + /** + * Get the number of vertices. + * + * @return number of vertices + */ + public long getNumberOfVertices() { + return vertexCount; + } + + /** + * Get the number of edges. + * + * @return number of edges + */ + public long getNumberOfEdges() { + return edgeCount; + } + + /** + * Get the number of triangle triplets. + * + * @return number of triangle triplets + */ + public long getNumberOfTriangleTriplets() { + return triangleTripletCount; + } + + /** + * Get the number of rectangle triplets. + * + * @return number of rectangle triplets + */ + public long getNumberOfRectangleTriplets() { + return rectangleTripletCount; + } + + /** + * Get the number of triplets. + * + * @return number of triplets + */ + public long getNumberOfTriplets() { + return tripletCount; + } + + /** + * Get the maximum degree. + * + * @return maximum degree + */ + public long getMaximumDegree() { + return maximumDegree; + } + + /** + * Get the maximum triangle triplets. + * + * @return maximum triangle triplets + */ + public long getMaximumTriangleTriplets() { + return maximumTriangleTriplets; + } + + /** + * Get the maximum rectangle triplets. + * + * @return maximum rectangle triplets + */ + public long getMaximumRectangleTriplets() { + return maximumRectangleTriplets; + } + + /** + * Get the maximum triplets. + * + * @return maximum triplets + */ + public long getMaximumTriplets() { + return maximumTriplets; + } + + @Override + public String toString() { + NumberFormat nf = NumberFormat.getInstance(); + + return "vertex count: " + nf.format(vertexCount) + + "; edge count: " + nf.format(edgeCount) + + "; triangle triplet count: " + nf.format(triangleTripletCount) + + "; rectangle triplet count: " + nf.format(rectangleTripletCount) + + "; triplet count: " + nf.format(tripletCount) + + "; maximum degree: " + nf.format(maximumDegree) + + "; maximum triangle triplets: " + nf.format(maximumTriangleTriplets) + + "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets) + + "; maximum triplets: " + nf.format(maximumTriplets); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(vertexCount) + .append(edgeCount) + .append(triangleTripletCount) + .append(rectangleTripletCount) + .append(tripletCount) + .append(maximumDegree) + .append(maximumTriangleTriplets) + .append(maximumRectangleTriplets) + .append(maximumTriplets) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Result rhs = (Result)obj; + + return new EqualsBuilder() + .append(vertexCount, rhs.vertexCount) + .append(edgeCount, rhs.edgeCount) + .append(triangleTripletCount, rhs.triangleTripletCount) + .append(rectangleTripletCount, rhs.rectangleTripletCount) + .append(tripletCount, rhs.tripletCount) + .append(maximumDegree, rhs.maximumDegree) + .append(maximumTriangleTriplets, rhs.maximumTriangleTriplets) + .append(maximumRectangleTriplets, rhs.maximumRectangleTriplets) + .append(maximumTriplets, rhs.maximumTriplets) + .isEquals(); + } + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java index 3c26e432920fa..d04fa7bfe1af5 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.accumulators.LongMaximum; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.configuration.Configuration; @@ -35,12 +36,17 @@ import org.apache.flink.util.AbstractID; import java.io.IOException; +import java.text.NumberFormat; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; /** - * Compute the number of vertices, number of edges, and number of triplets in - * an undirected graph. + * Compute the following vertex metrics in an undirected graph: + * - number of vertices + * - number of edges + * - number of triplets + * - maximum degree + * - maximum number of triplets * * @param graph ID type * @param vertex value type @@ -125,8 +131,10 @@ public Result getResult() { long vertexCount = res.getAccumulatorResult(id + "-0"); long edgeCount = res.getAccumulatorResult(id + "-1"); long tripletCount = res.getAccumulatorResult(id + "-2"); + long maximumDegree = res.getAccumulatorResult(id + "-3"); + long maximumTriplets = res.getAccumulatorResult(id + "-4"); - return new Result(vertexCount, edgeCount / 2, tripletCount); + return new Result(vertexCount, edgeCount / 2, tripletCount, maximumDegree, maximumTriplets); } /** @@ -141,13 +149,15 @@ private static class VertexMetricsHelper private long vertexCount; private long edgeCount; private long tripletCount; + private long maximumDegree; + private long maximumTriplets; /** * This helper class collects vertex metrics by scanning over and * discarding elements from the given DataSet. * * The unique id is required because Flink's accumulator namespace is - * among all operators. + * shared among all operators. * * @param id unique string used for accumulator names */ @@ -164,10 +174,13 @@ public void open(int taskNumber, int numTasks) throws IOException {} @Override public void writeRecord(Vertex record) throws IOException { long degree = record.f1.getValue(); + long triplets = degree * (degree - 1) / 2; vertexCount++; edgeCount += degree; - tripletCount += degree * (degree - 1) / 2; + tripletCount += triplets; + maximumDegree = Math.max(maximumDegree, degree); + maximumTriplets = Math.max(maximumTriplets, triplets); } @Override @@ -175,6 +188,8 @@ public void close() throws IOException { getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount)); getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount)); getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount)); + getRuntimeContext().addAccumulator(id + "-3", new LongMaximum(maximumDegree)); + getRuntimeContext().addAccumulator(id + "-4", new LongMaximum(maximumTriplets)); } } @@ -185,11 +200,15 @@ public static class Result { private long vertexCount; private long edgeCount; private long tripletCount; + private long maximumDegree; + private long maximumTriplets; - public Result(long vertexCount, long edgeCount, long tripletCount) { + public Result(long vertexCount, long edgeCount, long tripletCount, long maximumDegree, long maximumTriplets) { this.vertexCount = vertexCount; this.edgeCount = edgeCount; this.tripletCount = tripletCount; + this.maximumDegree = maximumDegree; + this.maximumTriplets = maximumTriplets; } /** @@ -219,11 +238,33 @@ public long getNumberOfTriplets() { return tripletCount; } + /** + * Get the maximum degree. + * + * @return maximum degree + */ + public long getMaximumDegree() { + return maximumDegree; + } + + /** + * Get the maximum triplets. + * + * @return maximum triplets + */ + public long getMaximumTriplets() { + return maximumTriplets; + } + @Override public String toString() { - return "vertex count: " + vertexCount - + ", edge count:" + edgeCount - + ", triplet count: " + tripletCount; + NumberFormat nf = NumberFormat.getInstance(); + + return "vertex count: " + nf.format(vertexCount) + + "; edge count: " + nf.format(edgeCount) + + "; triplet count: " + nf.format(tripletCount) + + "; maximum degree: " + nf.format(maximumDegree) + + "; maximum triplets: " + nf.format(maximumTriplets); } @Override @@ -232,6 +273,8 @@ public int hashCode() { .append(vertexCount) .append(edgeCount) .append(tripletCount) + .append(maximumDegree) + .append(maximumTriplets) .hashCode(); } @@ -247,6 +290,8 @@ public boolean equals(Object obj) { .append(vertexCount, rhs.vertexCount) .append(edgeCount, rhs.edgeCount) .append(tripletCount, rhs.tripletCount) + .append(maximumDegree, rhs.maximumDegree) + .append(maximumTriplets, rhs.maximumTriplets) .isEquals(); } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java new file mode 100644 index 0000000000000..af5a154b4ff1d --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.directed; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EdgeMetricsTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 2, 3, 1, 3, 6); + + Result edgeMetrics = new EdgeMetrics() + .run(directedSimpleGraph) + .execute(); + + assertEquals(expectedResult, edgeMetrics); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedEdges = completeGraphVertexCount * expectedDegree; + long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets; + + Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets, + expectedDegree, expectedDegree, expectedDegree, + expectedMaximumTriplets, expectedMaximumTriplets, expectedMaximumTriplets); + + Result edgeMetrics = new EdgeMetrics() + .run(completeGraph) + .execute(); + + assertEquals(expectedResult, edgeMetrics); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + Result expectedResult; + + expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + + Result withoutZeroDegreeVertices = new EdgeMetrics() + .run(emptyGraph) + .execute(); + + assertEquals(withoutZeroDegreeVertices, expectedResult); + } + + @Test + public void testWithRMatGraph() + throws Exception { + Result expectedResult = new Result(902, 12009, 107817, 315537, 1003442, 463, 334, 342, 820, 3822, 106953); + + Result withoutZeroDegreeVertices = new EdgeMetrics() + .run(directedRMatGraph) + .execute(); + + assertEquals(expectedResult, withoutZeroDegreeVertices); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java index 8145abdfee0d4..e4362c0f6c8bc 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java @@ -34,10 +34,10 @@ public class VertexMetricsTest @Test public void testWithSimpleGraph() throws Exception { - Result expectedResult = new Result(6, 7, 13); + Result expectedResult = new Result(6, 7, 13, 4, 2, 3, 6); Result vertexMetrics = new VertexMetrics() - .run(undirectedSimpleGraph) + .run(directedSimpleGraph) .execute(); assertEquals(expectedResult, vertexMetrics); @@ -47,10 +47,11 @@ public void testWithSimpleGraph() public void testWithCompleteGraph() throws Exception { long expectedDegree = completeGraphVertexCount - 1; - long expectedEdges = completeGraphVertexCount * expectedDegree / 2; - long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + long expectedEdges = completeGraphVertexCount * expectedDegree; + long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets; - Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets); + Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets, expectedDegree, expectedDegree, expectedDegree, expectedMaximumTriplets); Result vertexMetrics = new VertexMetrics() .run(completeGraph) @@ -64,7 +65,7 @@ public void testWithEmptyGraph() throws Exception { Result expectedResult; - expectedResult = new Result(0, 0, 0); + expectedResult = new Result(0, 0, 0, 0, 0, 0, 0); Result withoutZeroDegreeVertices = new VertexMetrics() .setIncludeZeroDegreeVertices(false) @@ -73,7 +74,7 @@ public void testWithEmptyGraph() assertEquals(withoutZeroDegreeVertices, expectedResult); - expectedResult = new Result(3, 0, 0); + expectedResult = new Result(3, 0, 0, 0, 0, 0, 0); Result withZeroDegreeVertices = new VertexMetrics() .setIncludeZeroDegreeVertices(true) @@ -86,10 +87,10 @@ public void testWithEmptyGraph() @Test public void testWithRMatGraph() throws Exception { - Result expectedResult = new Result(902, 10442, 1003442); + Result expectedResult = new Result(902, 12009, 1003442, 463, 334, 342, 106953); Result withoutZeroDegreeVertices = new VertexMetrics() - .run(undirectedRMatGraph) + .run(directedRMatGraph) .execute(); assertEquals(expectedResult, withoutZeroDegreeVertices); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java new file mode 100644 index 0000000000000..b300d667bf28c --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.metric.undirected; + +import org.apache.commons.math3.util.CombinatoricsUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EdgeMetricsTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 1, 3, 6); + + Result edgeMetrics = new EdgeMetrics() + .run(undirectedSimpleGraph) + .execute(); + + assertEquals(expectedResult, edgeMetrics); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + long expectedDegree = completeGraphVertexCount - 1; + long expectedEdges = completeGraphVertexCount * expectedDegree / 2; + long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets; + + Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets, + expectedDegree, expectedMaximumTriplets, expectedMaximumTriplets, expectedMaximumTriplets); + + Result edgeMetrics = new EdgeMetrics() + .run(completeGraph) + .execute(); + + assertEquals(expectedResult, edgeMetrics); + } + + @Test + public void testWithEmptyGraph() + throws Exception { + Result expectedResult; + + expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0); + + Result withoutZeroDegreeVertices = new EdgeMetrics() + .run(emptyGraph) + .execute(); + + assertEquals(withoutZeroDegreeVertices, expectedResult); + } + + @Test + public void testWithRMatGraph() + throws Exception { + Result expectedResult = new Result(902, 10442, 107817, 315537, 1003442, 463, 820, 3822, 106953); + + Result withoutZeroDegreeVertices = new EdgeMetrics() + .run(undirectedRMatGraph) + .execute(); + + assertEquals(expectedResult, withoutZeroDegreeVertices); + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java index a36ca94f667e3..8f7e1da784e2a 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java @@ -34,7 +34,7 @@ public class VertexMetricsTest @Test public void testWithSimpleGraph() throws Exception { - Result expectedResult = new Result(6, 7, 13); + Result expectedResult = new Result(6, 7, 13, 4, 6); Result vertexMetrics = new VertexMetrics() .run(undirectedSimpleGraph) @@ -48,9 +48,11 @@ public void testWithCompleteGraph() throws Exception { long expectedDegree = completeGraphVertexCount - 1; long expectedEdges = completeGraphVertexCount * expectedDegree / 2; - long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2); + long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets; - Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets); + Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets, + expectedDegree, expectedMaximumTriplets); Result vertexMetrics = new VertexMetrics() .run(completeGraph) @@ -64,7 +66,7 @@ public void testWithEmptyGraph() throws Exception { Result expectedResult; - expectedResult = new Result(0, 0, 0); + expectedResult = new Result(0, 0, 0, 0, 0); Result withoutZeroDegreeVertices = new VertexMetrics() .setIncludeZeroDegreeVertices(false) @@ -73,7 +75,7 @@ public void testWithEmptyGraph() assertEquals(withoutZeroDegreeVertices, expectedResult); - expectedResult = new Result(3, 0, 0); + expectedResult = new Result(3, 0, 0, 0, 0); Result withZeroDegreeVertices = new VertexMetrics() .setIncludeZeroDegreeVertices(true) @@ -86,7 +88,7 @@ public void testWithEmptyGraph() @Test public void testWithRMatGraph() throws Exception { - Result expectedResult = new Result(902, 10442, 1003442); + Result expectedResult = new Result(902, 10442, 1003442, 463, 106953); Result withoutZeroDegreeVertices = new VertexMetrics() .run(undirectedRMatGraph)