From 8a3253d31b364ea9a6f6c51f5bfdbb6ccd36c311 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Mon, 9 May 2016 14:45:15 -0400 Subject: [PATCH 1/6] [FLINK-3780] [gelly] Jaccard Similarity The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). --- docs/apis/batch/libs/gelly.md | 20 + .../java/org/apache/flink/util/MathUtils.java | 30 ++ .../graph/examples/JaccardSimilarity.java | 131 ++++++ .../examples/JaccardSimilarityMeasure.java | 214 ---------- .../data/JaccardSimilarityMeasureData.java | 58 --- .../JaccardSimilarityMeasureITCase.java | 73 ---- .../TranslateEdgeDegreeToIntValue.java | 51 +++ .../flink/graph/asm/translate/Translate.java | 12 +- .../graph/library/asm/JaccardSimilarity.java | 374 ++++++++++++++++++ .../library/asm/JaccardSimilarityTest.java | 101 +++++ 10 files changed, 715 insertions(+), 349 deletions(-) create mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java delete mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java delete mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java delete mode 100644 flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/asm/JaccardSimilarity.java create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/asm/JaccardSimilarityTest.java diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index dd0b4c185cc9e..5ddea07ce16fe 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1831,6 +1831,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * [GSA Triangle Count](#gsa-triangle-count) * [Triangle Enumerator](#triangle-enumerator) * [Summarization](#summarization) +* [Jaccard Similarity](#jaccard-similarity) * [Local Clustering Coefficient](#local-clustering-coefficient) Gelly's library methods can be used by simply calling the `run()` method on the input graph: @@ -2051,6 +2052,25 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Similarity + +#### Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + +#### Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of +neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. + +#### Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, +the number of common neighbors, and the number of distinct neighbors. The vertex ID must be `Comparable` and `Copyable`. + ### Local Clustering Coefficient #### Overview diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java index 2bdddc8cccd9f..d1fe277e471b4 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java @@ -166,6 +166,36 @@ else if (code != Integer.MIN_VALUE) { } } + /** + * This function hashes a series of integer values. + * + * @param seed initialization + * @param codes one or more integer values + * @return The hash code for the integer series + */ + public static int murmurHash(int seed, int... codes) { + int hash = seed; + + for (int code : codes) { + code *= 0xcc9e2d51; + code = code << 15; + code *= 0x1b873593; + + hash ^= code; + hash = hash << 13; + hash = hash * 5 + 0xe6546b64; + } + + hash ^= 4 * codes.length; + hash ^= hash >>> 16; + hash *= 0x85ebca6b; + hash ^= hash >>> 13; + hash *= 0xc2b2ae35; + hash ^= hash >>> 16; + + return hash; + } + // ============================================================================================ /** diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java new file mode 100644 index 0000000000000..b297edbf3b3a0 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Jaccard Similarity. + * + * This example generates an undirected RMat graph with the given scale and + * edge factor then calculates all non-zero Jaccard Similarity scores + * between vertices. + * + * @see org.apache.flink.graph.library.asm.JaccardSimilarity + */ +public class JaccardSimilarity { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1 << scale; + long edgeCount = vertexCount * edgeFactor; + + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + DataSet js; + + if (scale > 32) { + js = graph + .run(new org.apache.flink.graph.library.asm.JaccardSimilarity()); + } else { + js = graph + .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.asm.JaccardSimilarity()); + } + + switch (parameters.get("output", "")) { + case "print": + js.print(); + break; + + case "hash": + System.out.println(DataSetUtils.checksumHashCode(js)); + break; + + case "csv": + String filename = parameters.get("filename"); + + String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); + String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + + js.writeAsCsv(filename, row_delimiter, field_delimiter); + + env.execute(); + break; + default: + System.out.println("The Jaccard Index measures the similarity between vertex neighborhoods."); + System.out.println("Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common)."); + System.out.println(""); + System.out.println("This algorithm returns 4-tuples containing two vertex IDs, the number of"); + System.out.println("common neighbors, and the number of distinct neighbors. The Jaccard Index"); + System.out.println("is the number of common neighbors divided by the number of distinct neighbors."); + System.out.println(""); + System.out.println("usage:"); + System.out.println(" JaccardSimilarity [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); + System.out.println(" JaccardSimilarity [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); + System.out.println(" JaccardSimilarity [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + + " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); + + return; + } + + JobExecutionResult result = env.getLastJobExecutionResult(); + + NumberFormat nf = NumberFormat.getInstance(); + System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + } +} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java deleted file mode 100644 index fbd735baa060d..0000000000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarityMeasure.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.ReduceNeighborsFunction; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.Triplet; -import org.apache.flink.graph.VertexJoinFunction; -import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData; - -import java.util.HashSet; - -/** - * This example shows how to use - *
    - *
  • neighborhood methods - *
  • join with vertices - *
  • triplets - *
- * - * Given a directed, unweighted graph, return a weighted graph where the edge values are equal - * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size - * of the union of neighbor sets - for the src and target vertices. - * - *

- * Input files are plain text files and must be formatted as follows: - *
- * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. - * Edges themselves are separated by newlines. - * For example: 1 2\n1 3\n defines two edges 1-2 and 1-3. - *

- * - * Usage JaccardSimilarityMeasure <edge path> <result path>
- * If no parameters are provided, the program is run with default data from - * {@link JaccardSimilarityMeasureData} - */ -@SuppressWarnings("serial") -public class JaccardSimilarityMeasure implements ProgramDescription { - - public static void main(String [] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet> edges = getEdgesDataSet(env); - - Graph, Double> graph = Graph.fromDataSet(edges, - new MapFunction>() { - - @Override - public HashSet map(Long id) throws Exception { - HashSet neighbors = new HashSet(); - neighbors.add(id); - - return new HashSet(neighbors); - } - }, env); - - // create the set of neighbors - DataSet>> computedNeighbors = - graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); - - // join with the vertices to update the node values - Graph, Double> graphWithVertexValues = - graph.joinWithVertices(computedNeighbors, new VertexJoinFunction, - HashSet>() { - - public HashSet vertexJoin(HashSet vertexValue, HashSet inputValue) { - return inputValue; - } - }); - - // compare neighbors, compute Jaccard - DataSet> edgesWithJaccardValues = - graphWithVertexValues.getTriplets().map(new ComputeJaccard()); - - // emit result - if (fileOutput) { - edgesWithJaccardValues.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Executing Jaccard Similarity Measure"); - } else { - edgesWithJaccardValues.print(); - } - - } - - @Override - public String getDescription() { - return "Vertex Jaccard Similarity Measure"; - } - - /** - * Each vertex will have a HashSet containing its neighbor ids as value. - */ - private static final class GatherNeighbors implements ReduceNeighborsFunction> { - - @Override - public HashSet reduceNeighbors(HashSet first, HashSet second) { - first.addAll(second); - return new HashSet(first); - } - } - - /** - * The edge weight will be the Jaccard coefficient, which is computed as follows: - * - * Consider the edge x-y - * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively. - * sizeX+sizeY = union + intersection of neighborhoods - * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods - * The intersection can then be deduced. - * - * The Jaccard similarity coefficient is then, the intersection/union. - */ - private static final class ComputeJaccard implements - MapFunction, Double>, Edge> { - - @Override - public Edge map(Triplet, Double> triplet) throws Exception { - - Vertex> srcVertex = triplet.getSrcVertex(); - Vertex> trgVertex = triplet.getTrgVertex(); - - Long x = srcVertex.getId(); - Long y = trgVertex.getId(); - HashSet neighborSetY = trgVertex.getValue(); - - double unionPlusIntersection = srcVertex.getValue().size() + neighborSetY.size(); - // within a HashSet, all elements are distinct - HashSet unionSet = new HashSet(); - unionSet.addAll(srcVertex.getValue()); - unionSet.addAll(neighborSetY); - double union = unionSet.size(); - double intersection = unionPlusIntersection - union; - - return new Edge(x, y, intersection/union); - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - - private static boolean parseParameters(String [] args) { - if(args.length > 0) { - if(args.length != 2) { - System.err.println("Usage JaccardSimilarityMeasure "); - return false; - } - - fileOutput = true; - edgeInputPath = args[0]; - outputPath = args[1]; - } else { - System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("Usage JaccardSimilarityMeasure "); - } - - return true; - } - - private static DataSet> getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction, Edge>() { - @Override - public Edge map(Tuple2 tuple2) throws Exception { - return new Edge(tuple2.f0, tuple2.f1, new Double(0)); - } - }); - } else { - return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env); - } - } -} diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java deleted file mode 100644 index 054f041a59039..0000000000000 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.examples.data; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; - -import java.util.ArrayList; -import java.util.List; - -/** - * Provides the default data sets used for the Jaccard Similarity Measure example program. - * If no parameters are given to the program, the default data sets are used. - */ -public class JaccardSimilarityMeasureData { - - public static final String EDGES = "1 2\n" + "1 3\n" + "1 4\n" + "1 5\n" + "2 3\n" + "2 4\n" + - "2 5\n" + "3 4\n" + "3 5\n" + "4 5"; - - public static DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 2L, new Double(0))); - edges.add(new Edge(1L, 3L, new Double(0))); - edges.add(new Edge(1L, 4L, new Double(0))); - edges.add(new Edge(1L, 5L, new Double(0))); - edges.add(new Edge(2L, 3L, new Double(0))); - edges.add(new Edge(2L, 4L, new Double(0))); - edges.add(new Edge(2L, 5L, new Double(0))); - edges.add(new Edge(3L, 4L, new Double(0))); - edges.add(new Edge(3L, 5L, new Double(0))); - edges.add(new Edge(4L, 5L, new Double(0))); - - return env.fromCollection(edges); - } - - public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" + - "2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6"; - - private JaccardSimilarityMeasureData() {} -} diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java deleted file mode 100644 index 92cca86f1b441..0000000000000 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.examples; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.graph.examples.JaccardSimilarityMeasure; -import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.test.util.TestBaseUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase { - - private String edgesPath; - - private String resultPath; - - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public JaccardSimilarityMeasureITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - - File edgesFile = tempFolder.newFile(); - Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8); - - edgesPath = edgesFile.toURI().toString(); - } - - @Test - public void testJaccardSimilarityMeasureExample() throws Exception { - JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath}); - expected = JaccardSimilarityMeasureData.JACCARD_EDGES; - } - - @After - public void after() throws Exception { - TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); - } -} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java new file mode 100644 index 0000000000000..0abb6fc9ddbdc --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.asm.translate.TranslateFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate the edge degree returned by the degree annotation functions from + * {@link LongValue} to {@link IntValue}. + * + * @param edge value type + */ +public class TranslateEdgeDegreeToIntValue +implements TranslateFunction, Tuple2> { + + @Override + public Tuple2 translate(Tuple2 value, Tuple2 reuse) throws Exception { + long val = value.f1.getValue(); + + if (val > Integer.MAX_VALUE) { + throw new RuntimeException("LongValue input overflows IntValue output"); + } + + if (reuse == null) { + reuse = new Tuple2<>(null, new IntValue()); + } + + reuse.f0 = value.f0; + reuse.f1.setValue((int) val); + return reuse; + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java index d87c949a5029e..70c20f8e1ea6e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java @@ -75,7 +75,8 @@ public static DataSet> translateVertexIds(DataSet "The parallelism must be greater than zero."); Class> vertexClass = (Class>)(Class) Vertex.class; - TypeInformation newType = TypeExtractor.createTypeInfo(TranslateFunction.class, translator.getClass(), 1, null, null); + TypeInformation oldType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(0); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); @@ -151,7 +152,8 @@ public static DataSet> translateEdgeIds(DataSet> edgeClass = (Class>)(Class) Edge.class; - TypeInformation newType = TypeExtractor.createTypeInfo(TranslateFunction.class, translator.getClass(), 1, null, null); + TypeInformation oldType = ((TupleTypeInfo>) edges.getType()).getTypeAt(0); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); TypeInformation edgeValueType = ((TupleTypeInfo>) edges.getType()).getTypeAt(2); TupleTypeInfo> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType); @@ -229,7 +231,8 @@ public static DataSet> translateVertexValues(DataSe Class> vertexClass = (Class>)(Class) Vertex.class; TypeInformation idType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(0); - TypeInformation newType = TypeExtractor.createTypeInfo(TranslateFunction.class, translator.getClass(), 1, null, null); + TypeInformation oldType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, idType, newType); @@ -305,7 +308,8 @@ public static DataSet> translateEdgeValues(DataSet> edgeClass = (Class>)(Class) Edge.class; TypeInformation idType = ((TupleTypeInfo>) edges.getType()).getTypeAt(0); - TypeInformation newType = TypeExtractor.createTypeInfo(TranslateFunction.class, translator.getClass(), 1, null, null); + TypeInformation oldType = ((TupleTypeInfo>) edges.getType()).getTypeAt(2); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); TupleTypeInfo> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/asm/JaccardSimilarity.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/asm/JaccardSimilarity.java new file mode 100644 index 0000000000000..146e169f7d7e0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/asm/JaccardSimilarity.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.asm; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.asm.JaccardSimilarity.Result; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MathUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + *
+ * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + *
+ * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardSimilarity, VV, EV> +implements GraphAlgorithm>> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** + * Override the default group size for the quadratic expansion of neighbor + * pairs. Small groups generate more data whereas large groups distribute + * computation less evenly among tasks. + * + * @param groupSize the group size for the quadratic expansion of neighbor pairs + * @return this + */ + public JaccardSimilarity setGroupSize(int groupSize) { + this.groupSize = groupSize; + + return this; + } + + /** + * Override the parallelism of operators processing small amounts of data. + * + * @param littleParallelism operator parallelism + * @return this + */ + public JaccardSimilarity setLittleParallelism(int littleParallelism) { + this.littleParallelism = littleParallelism; + + return this; + } + + /* + * Implementation notes: + * + * The requirement that "K extends CopyableValue" can be removed when + * Flink has a self-join which performs the skew distribution handled by + * GenerateGroupSpans / GenerateGroups / GenerateGroupPairs. + */ + + @Override + public DataSet> run(Graph input) + throws Exception { + // s, t, d(t) + DataSet>> neighborDegree = input + .run(new EdgeTargetDegree() + .setParallelism(littleParallelism)); + + // group span, s, t, d(t) + DataSet> groupSpans = neighborDegree + .groupBy(0) + .sortGroup(1, Order.ASCENDING) + .reduceGroup(new GenerateGroupSpans(groupSize)) + .setParallelism(littleParallelism) + .name("Generate group spans"); + + // group, s, t, d(t) + DataSet> groups = groupSpans + .rebalance() + .setParallelism(littleParallelism) + .name("Rebalance") + .flatMap(new GenerateGroups()) + .setParallelism(littleParallelism) + .name("Generate groups"); + + // t, u, d(t)+d(u) + DataSet> twoPaths = groups + .groupBy(0, 1) + .sortGroup(2, Order.ASCENDING) + .reduceGroup(new GenerateGroupPairs(groupSize)) + .name("Generate group pairs"); + + // t, u, intersection, union + return twoPaths + .groupBy(0, 1) + .reduceGroup(new ComputeScores()) + .name("Compute scores"); + } + + /** + * This is the first of three operations implementing a self-join to generate + * the full neighbor pairing for each vertex. The number of neighbor pairs + * is (n choose 2) which is quadratic in the vertex degree. + *
+ * The third operation, {@link GenerateGroupPairs}, processes groups of size + * {@link #groupSize} and emits {@code O(groupSize * deg(vertex))} pairs. + *
+ * This input to the third operation is still quadratic in the vertex degree. + * Two prior operations, {@link GenerateGroupSpans} and {@link GenerateGroups}, + * each emit datasets linear in the vertex degree, with a forced rebalance + * in between. {@link GenerateGroupSpans} first annotates each edge with the + * number of groups and {@link GenerateGroups} emits each edge into each group. + * + * @param ID type + */ + @FunctionAnnotation.ForwardedFields("0->1; 1->2") + private static class GenerateGroupSpans + implements GroupReduceFunction>, Tuple4> { + private final int groupSize; + + private IntValue groupSpansValue = new IntValue(); + + private Tuple4 output = new Tuple4<>(groupSpansValue, null, null, new IntValue()); + + public GenerateGroupSpans(int groupSize) { + this.groupSize = groupSize; + } + + @Override + public void reduce(Iterable>> values, Collector> out) + throws Exception { + int groupCount = 0; + int groupSpans = 1; + + groupSpansValue.setValue(groupSpans); + + for (Edge> edge : values) { + long degree = edge.f2.f1.getValue(); + if (degree > Integer.MAX_VALUE) { + throw new RuntimeException("Degree overflows IntValue"); + } + + // group span, u, v, d(u) + output.f1 = edge.f0; + output.f2 = edge.f1; + output.f3.setValue((int)degree); + + out.collect(output); + + if (++groupCount == groupSize) { + groupCount = 0; + groupSpansValue.setValue(++groupSpans); + } + } + } + } + + /** + * Emits the input tuple into each group within its group span. + * + * @param ID type + * + * @see GenerateGroupSpans + */ + @FunctionAnnotation.ForwardedFields("1; 2; 3") + private static class GenerateGroups + implements FlatMapFunction, Tuple4> { + @Override + public void flatMap(Tuple4 value, Collector> out) + throws Exception { + int spans = value.f0.getValue(); + + for (int idx = 0 ; idx < spans ; idx++ ) { + value.f0.setValue(idx); + out.collect(value); + } + } + } + + /** + * Emits the two-path for all neighbor pairs in this group. + *
+ * The first {@link #groupSize} vertices are emitted pairwise. Following + * vertices are only paired with vertices from this initial group. + * + * @param ID type + * + * @see GenerateGroupSpans + */ + private static class GenerateGroupPairs> + implements GroupReduceFunction, Tuple3> { + private final int groupSize; + + private boolean initialized = false; + + private List> visited; + + public GenerateGroupPairs(int groupSize) { + this.groupSize = groupSize; + this.visited = new ArrayList<>(groupSize); + } + + @Override + public void reduce(Iterable> values, Collector> out) + throws Exception { + int visitedCount = 0; + + for (Tuple4 edge : values) { + for (int i = 0 ; i < visitedCount ; i++) { + Tuple3 prior = visited.get(i); + + prior.f1 = edge.f2; + + int oldValue = prior.f2.getValue(); + + long degreeSum = oldValue + edge.f3.getValue(); + if (degreeSum > Integer.MAX_VALUE) { + throw new RuntimeException("Degree sum overflows IntValue"); + } + prior.f2.setValue((int)degreeSum); + + // v, w, d(v) + d(w) + out.collect(prior); + + prior.f2.setValue(oldValue); + } + + if (visitedCount < groupSize) { + if (! initialized) { + initialized = true; + + for (int i = 0 ; i < groupSize ; i++) { + Tuple3 tuple = new Tuple3<>(); + + tuple.f0 = edge.f2.copy(); + tuple.f2 = edge.f3.copy(); + + visited.add(tuple); + } + } else { + Tuple3 copy = visited.get(visitedCount); + + edge.f2.copyTo(copy.f0); + edge.f3.copyTo(copy.f2); + } + + visitedCount += 1; + } + } + } + } + + /** + * Compute the counts of common and distinct neighbors. A two-path connecting + * the vertices is emitted for each common neighbor. The number of distinct + * neighbors is equal to the sum of degrees of the vertices minus the count + * of common numbers, which are double-counted in the degree sum. + * + * @param ID type + */ + @FunctionAnnotation.ForwardedFields("0; 1") + private static class ComputeScores + implements GroupReduceFunction, Result> { + private Result output = new Result<>(); + + @Override + public void reduce(Iterable> values, Collector> out) + throws Exception { + int count = 0; + Tuple3 edge = null; + + for (Tuple3 next : values) { + edge = next; + count += 1; + } + + output.f0 = edge.f0; + output.f1 = edge.f1; + output.f2.f0.setValue(count); + output.f2.f1.setValue(edge.f2.getValue() - count); + out.collect(output); + } + } + + /** + * Wraps the vertex type to encapsulate results from the Jaccard Similarity algorithm. + * + * @param ID type + */ + public static class Result + extends Edge> { + public static final int HASH_SEED = 0x731f73e7; + + public Result() { + f2 = new Tuple2<>(new IntValue(), new IntValue()); + } + + /** + * Get the common neighbor count. + * + * @return common neighbor count + */ + public IntValue getCommonNeighborCount() { + return f2.f0; + } + + /** + * Get the distinct neighbor count. + * + * @return distinct neighbor count + */ + public IntValue getDistinctNeighborCount() { + return f2.f1; + } + + /** + * Get the Jaccard Similarity score, equal to the number of common + * neighbors shared by the source and target vertices divided by the + * number of distinct neighbors. + * + * @return Jaccard Similarity score + */ + public double getJaccardSimilarityScore() { + return getCommonNeighborCount().getValue() / (double) getDistinctNeighborCount().getValue(); + } + + @Override + public int hashCode() { + return MathUtils.murmurHash(HASH_SEED, f0.hashCode(), f1.hashCode(), f2.f0.hashCode(), f2.f1.hashCode()); + } + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/asm/JaccardSimilarityTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/asm/JaccardSimilarityTest.java new file mode 100644 index 0000000000000..9e867ef7a2818 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/asm/JaccardSimilarityTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.asm; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.asm.JaccardSimilarity.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardSimilarityTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSet> js = undirectedSimpleGraph + .run(new JaccardSimilarity()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(js.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet> js = completeGraph + .run(new JaccardSimilarity() + .setGroupSize(4)); + + for (Result result : js.collect()) { + // the intersection includes every vertex + assertEquals(completeGraphVertexCount, result.getDistinctNeighborCount().getValue()); + + // the union only excludes the two vertices from the similarity score + assertEquals(completeGraphVertexCount - 2, result.getCommonNeighborCount().getValue()); + } + } + + @Test + public void testRMatGraph() + throws Exception { + long vertexCount = 1 << 8; + long edgeCount = 8 * vertexCount; + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, false) + .generate(); + + DataSet> js = graph + .run(new JaccardSimilarity() + .setGroupSize(4)); + + ChecksumHashCode checksum = DataSetUtils.checksumHashCode(js); + + assertEquals(13954, checksum.getCount()); + assertEquals(0x0000179f83a2a873L, checksum.getChecksum()); + } +} From 11b62db62952717b40a762ebbb6ac775a2c5cbc3 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 17 May 2016 14:31:30 -0400 Subject: [PATCH 2/6] Renamed "Jaccard Similarity" -> "Jaccard Index" Added min/max scores and max degree --- docs/apis/batch/libs/gelly.md | 11 +- .../java/org/apache/flink/util/MathUtils.java | 30 ---- .../graph/examples/JaccardSimilarity.java | 7 +- .../annotate/DegreeAnnotationFunctions.java | 21 +++ .../annotate/undirected/EdgeDegreePair.java | 35 +++-- .../annotate/undirected/EdgeSourceDegree.java | 34 +++-- .../annotate/undirected/EdgeTargetDegree.java | 34 +++-- .../annotate/undirected/VertexDegree.java | 22 +++ .../JaccardIndex.java} | 128 +++++++++++++++--- .../JaccardIndexTest.java} | 24 ++-- 10 files changed, 238 insertions(+), 108 deletions(-) rename flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/{asm/JaccardSimilarity.java => similarity/JaccardIndex.java} (75%) rename flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/{asm/JaccardSimilarityTest.java => similarity/JaccardIndexTest.java} (80%) diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 5ddea07ce16fe..7cc302485c27d 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -1831,7 +1831,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * [GSA Triangle Count](#gsa-triangle-count) * [Triangle Enumerator](#triangle-enumerator) * [Summarization](#summarization) -* [Jaccard Similarity](#jaccard-similarity) +* [Jaccard Index](#jaccard-index) * [Local Clustering Coefficient](#local-clustering-coefficient) Gelly's library methods can be used by simply calling the `run()` method on the input graph: @@ -2052,7 +2052,7 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. -### Jaccard Similarity +### Jaccard Index #### Overview The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to @@ -2069,7 +2069,8 @@ neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the co #### Usage The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, -the number of common neighbors, and the number of distinct neighbors. The vertex ID must be `Comparable` and `Copyable`. +the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and +`Copyable`. ### Local Clustering Coefficient @@ -2171,6 +2172,7 @@ DataSet> degree = graph

Optional configuration:

  • setIncludeZeroDegreeVertices: by default only the edge set is processed for the computation of degree; when this flag is set an additional join is performed against the vertex set in order to output vertices with a degree of zero

  • +
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnTargetId: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.

@@ -2188,6 +2190,7 @@ DataSet>> sourceDegree = graph {% endhighlight %}

Optional configuration:

    +
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnTargetId: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.

@@ -2205,6 +2208,7 @@ DataSet>> targetDegree = graph {% endhighlight %}

Optional configuration:

    +
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnSourceId: the degree can be counted from either the edge source or target IDs. By default the target IDs are counted. Reducing on source IDs may optimize the algorithm if the input edge list is sorted by source ID.

@@ -2222,6 +2226,7 @@ DataSet>> pairDegree = graph {% endhighlight %}

Optional configuration:

    +
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnTargetId: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.

diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java index d1fe277e471b4..2bdddc8cccd9f 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java @@ -166,36 +166,6 @@ else if (code != Integer.MIN_VALUE) { } } - /** - * This function hashes a series of integer values. - * - * @param seed initialization - * @param codes one or more integer values - * @return The hash code for the integer series - */ - public static int murmurHash(int seed, int... codes) { - int hash = seed; - - for (int code : codes) { - code *= 0xcc9e2d51; - code = code << 15; - code *= 0x1b873593; - - hash ^= code; - hash = hash << 13; - hash = hash * 5 + 0xe6546b64; - } - - hash ^= 4 * codes.length; - hash ^= hash >>> 16; - hash *= 0x85ebca6b; - hash ^= hash >>> 13; - hash *= 0xc2b2ae35; - hash ^= hash >>> 16; - - return hash; - } - // ============================================================================================ /** diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java index b297edbf3b3a0..17bb073a01313 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java @@ -31,6 +31,7 @@ import org.apache.flink.graph.generator.RMatGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; @@ -44,7 +45,7 @@ * edge factor then calculates all non-zero Jaccard Similarity scores * between vertices. * - * @see org.apache.flink.graph.library.asm.JaccardSimilarity + * @see JaccardIndex */ public class JaccardSimilarity { @@ -80,11 +81,11 @@ public static void main(String[] args) throws Exception { if (scale > 32) { js = graph - .run(new org.apache.flink.graph.library.asm.JaccardSimilarity()); + .run(new JaccardIndex()); } else { js = graph .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new org.apache.flink.graph.library.asm.JaccardSimilarity()); + .run(new JaccardIndex()); } switch (parameters.get("output", "")) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java index 098e9fe05facd..a33c0f6b2dd53 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java @@ -18,6 +18,7 @@ package org.apache.flink.graph.asm.degree.annotate; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -89,6 +90,26 @@ public Vertex reduce(Vertex left, Vertex ID type + */ + public static class DegreeFilter + implements FilterFunction> { + private long maximumDegree; + + public DegreeFilter(long maximumDegree) { + this.maximumDegree = maximumDegree; + } + + @Override + public boolean filter(Vertex value) + throws Exception { + return value.f1.getValue() <= maximumDegree; + } + } + /** * Performs a left outer join to apply a zero count for vertices with * out- or in-degree of zero. diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index d85b4fab722d7..84d2d68cb84b0 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -43,6 +43,8 @@ public class EdgeDegreePair // Optional configuration protected boolean reduceOnTargetId = false; + private long maximumDegree = Long.MAX_VALUE; + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -60,6 +62,18 @@ public EdgeDegreePair setReduceOnTargetId(boolean reduceOnTargetId) { return this; } + /** + * Filter out vertices with degree than the given maximum. + * + * @param maximumDegree maximum degree + * @return this + */ + public EdgeDegreePair setMaximumDegree(long maximumDegree) { + this.maximumDegree = maximumDegree; + + return this; + } + /** * Override the operator parallelism. * @@ -79,22 +93,15 @@ public DataSet>> run(Graph i DataSet>> edgeSourceDegrees = input .run(new EdgeSourceDegree() .setReduceOnTargetId(reduceOnTargetId) + .setMaximumDegree(maximumDegree) .setParallelism(parallelism)); - DataSet> vertexDegrees; - - if (reduceOnTargetId) { - // t, d(t) - vertexDegrees = input - .run(new VertexDegree() - .setReduceOnTargetId(true) - .setParallelism(parallelism)); - } else { - // s, d(s) - vertexDegrees = input - .run(new VertexDegree() - .setParallelism(parallelism)); - } + // t, d(t) + DataSet> vertexDegrees = input + .run(new VertexDegree() + .setReduceOnTargetId(reduceOnTargetId) + .setMaximumDegree(maximumDegree) + .setParallelism(parallelism)); // s, t, (deg(s), deg(t)) return edgeSourceDegrees diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index d200dd38c55a2..1932dc35ef30c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -42,6 +42,8 @@ public class EdgeSourceDegree // Optional configuration private boolean reduceOnTargetId = false; + private long maximumDegree = Long.MAX_VALUE; + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -59,6 +61,18 @@ public EdgeSourceDegree setReduceOnTargetId(boolean reduceOnTargetId) return this; } + /** + * Filter out vertices with degree than the given maximum. + * + * @param maximumDegree maximum degree + * @return this + */ + public EdgeSourceDegree setMaximumDegree(long maximumDegree) { + this.maximumDegree = maximumDegree; + + return this; + } + /** * Override the operator parallelism. * @@ -74,20 +88,12 @@ public EdgeSourceDegree setParallelism(int parallelism) { @Override public DataSet>> run(Graph input) throws Exception { - DataSet> vertexDegrees; - - if (reduceOnTargetId) { - // t, d(t) - vertexDegrees = input - .run(new VertexDegree() - .setReduceOnTargetId(true) - .setParallelism(parallelism)); - } else { - // s, d(s) - vertexDegrees = input - .run(new VertexDegree() - .setParallelism(parallelism)); - } + // s, d(s) + DataSet> vertexDegrees = input + .run(new VertexDegree() + .setReduceOnTargetId(reduceOnTargetId) + .setMaximumDegree(maximumDegree) + .setParallelism(parallelism)); // s, t, d(s) return input.getEdges() diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index 8716f74d97158..5a6a52e71ce58 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -42,6 +42,8 @@ public class EdgeTargetDegree // Optional configuration private boolean reduceOnSourceId = false; + private long maximumDegree = Long.MAX_VALUE; + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -59,6 +61,18 @@ public EdgeTargetDegree setReduceOnSourceId(boolean reduceOnSourceId) return this; } + /** + * Filter out vertices with degree than the given maximum. + * + * @param maximumDegree maximum degree + * @return this + */ + public EdgeTargetDegree setMaximumDegree(long maximumDegree) { + this.maximumDegree = maximumDegree; + + return this; + } + /** * Override the operator parallelism. * @@ -74,20 +88,12 @@ public EdgeTargetDegree setParallelism(int parallelism) { @Override public DataSet>> run(Graph input) throws Exception { - DataSet> vertexDegrees; - - if (reduceOnSourceId) { - // s, d(s) - vertexDegrees = input - .run(new VertexDegree() - .setParallelism(parallelism)); - } else { - // t, d(t) - vertexDegrees = input - .run(new VertexDegree() - .setReduceOnTargetId(true) - .setParallelism(parallelism)); - } + // t, d(t) + DataSet> vertexDegrees = input + .run(new VertexDegree() + .setReduceOnTargetId(!reduceOnSourceId) + .setMaximumDegree(maximumDegree) + .setParallelism(parallelism)); // s, t, d(t) return input.getEdges() diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index 518afaa973182..c1fb55690c4b9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -26,6 +26,7 @@ import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; +import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeFilter; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId; @@ -46,6 +47,8 @@ public class VertexDegree private boolean reduceOnTargetId = false; + private long maximumDegree = Long.MAX_VALUE; + private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -78,6 +81,18 @@ public VertexDegree setReduceOnTargetId(boolean reduceOnTargetId) { return this; } + /** + * Filter out vertices with degree than the given maximum. + * + * @param maximumDegree maximum degree + * @return this + */ + public VertexDegree setMaximumDegree(long maximumDegree) { + this.maximumDegree = maximumDegree; + + return this; + } + /** * Override the operator parallelism. * @@ -110,6 +125,13 @@ public DataSet> run(Graph input) .setParallelism(parallelism) .name("Degree count"); + if (maximumDegree < Long.MAX_VALUE) { + degree = degree + .filter(new DegreeFilter(maximumDegree)) + .setParallelism(parallelism) + .name("Degree filter"); + } + if (includeZeroDegreeVertices) { degree = input .getVertices() diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/asm/JaccardSimilarity.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java similarity index 75% rename from flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/asm/JaccardSimilarity.java rename to flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 146e169f7d7e0..e801ea646b653 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/asm/JaccardSimilarity.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.graph.library.asm; +package org.apache.flink.graph.library.similarity; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -31,12 +31,12 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; -import org.apache.flink.graph.library.asm.JaccardSimilarity.Result; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; -import org.apache.flink.util.MathUtils; import java.util.ArrayList; import java.util.List; @@ -56,7 +56,7 @@ * @param vertex value type * @param edge value type */ -public class JaccardSimilarity, VV, EV> +public class JaccardIndex, VV, EV> implements GraphAlgorithm>> { public static final int DEFAULT_GROUP_SIZE = 64; @@ -64,6 +64,18 @@ public class JaccardSimilarity, VV, EV> // Optional configuration private int groupSize = DEFAULT_GROUP_SIZE; + private long maximumDegree = Long.MAX_VALUE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = -1; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = -1; + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -74,19 +86,63 @@ public class JaccardSimilarity, VV, EV> * @param groupSize the group size for the quadratic expansion of neighbor pairs * @return this */ - public JaccardSimilarity setGroupSize(int groupSize) { + public JaccardIndex setGroupSize(int groupSize) { this.groupSize = groupSize; return this; } + /** + * Filter out vertices with degree than the given maximum. + * + * @param maximumDegree maximum degree + * @return this + */ + public JaccardIndex setMaximumDegree(long maximumDegree) { + this.maximumDegree = maximumDegree; + + return this; + } + + /** + * Filter out Jaccard Index scores less than the given minimum fraction. + * + * @param numerator numerator of the minimum score + * @param denominator denominator of the minimum score + * @return this + * @see #setMaximumScore(int, int) + */ + public JaccardIndex setMinimumScore(int numerator, int denominator) { + this.unboundedScores = false; + this.minimumScoreNumerator = numerator; + this.minimumScoreDenominator = denominator; + + return this; + } + + /** + * Filter out Jaccard Index scores greater than the given maximum fraction. + * + * @param numerator numerator of the maximum score + * @param denominator denominator of the maximum score + * @return this + * @see #setMinimumScore(int, int) + */ + public JaccardIndex setMaximumScore(int numerator, int denominator) { + this.unboundedScores = false; + this.maximumScoreNumerator = numerator; + this.maximumScoreDenominator = denominator; + + return this; + } + /** * Override the parallelism of operators processing small amounts of data. * * @param littleParallelism operator parallelism * @return this */ - public JaccardSimilarity setLittleParallelism(int littleParallelism) { + public JaccardIndex setLittleParallelism(int littleParallelism) { this.littleParallelism = littleParallelism; return this; @@ -106,6 +162,7 @@ public DataSet> run(Graph input) // s, t, d(t) DataSet>> neighborDegree = input .run(new EdgeTargetDegree() + .setMaximumDegree(maximumDegree) .setParallelism(littleParallelism)); // group span, s, t, d(t) @@ -135,7 +192,9 @@ public DataSet> run(Graph input) // t, u, intersection, union return twoPaths .groupBy(0, 1) - .reduceGroup(new ComputeScores()) + .reduceGroup(new ComputeScores(unboundedScores, + minimumScoreNumerator, minimumScoreDenominator, + maximumScoreNumerator, maximumScoreDenominator)) .name("Compute scores"); } @@ -301,10 +360,30 @@ public void reduce(Iterable> values, Collector< * @param ID type */ @FunctionAnnotation.ForwardedFields("0; 1") - private static class ComputeScores + private class ComputeScores implements GroupReduceFunction, Result> { + private boolean unboundedScores; + + private long minimumScoreNumerator; + + private long minimumScoreDenominator; + + private long maximumScoreNumerator; + + private long maximumScoreDenominator; + private Result output = new Result<>(); + public ComputeScores(boolean unboundedScores, + int minimumScoreNumerator, int minimumScoreDenominator, + int maximumScoreNumerator, int maximumScoreDenominator) { + this.unboundedScores = unboundedScores; + this.minimumScoreNumerator = minimumScoreNumerator; + this.minimumScoreDenominator = minimumScoreDenominator; + this.maximumScoreNumerator = maximumScoreNumerator; + this.maximumScoreDenominator = maximumScoreDenominator; + } + @Override public void reduce(Iterable> values, Collector> out) throws Exception { @@ -316,16 +395,22 @@ public void reduce(Iterable> values, Collector> count += 1; } - output.f0 = edge.f0; - output.f1 = edge.f1; - output.f2.f0.setValue(count); - output.f2.f1.setValue(edge.f2.getValue() - count); - out.collect(output); + int distinctNeighbors = edge.f2.getValue() - count; + + if (unboundedScores || + (count * minimumScoreDenominator >= distinctNeighbors * minimumScoreNumerator + && count * maximumScoreDenominator <= distinctNeighbors * maximumScoreNumerator)) { + output.f0 = edge.f0; + output.f1 = edge.f1; + output.f2.f0.setValue(count); + output.f2.f1.setValue(distinctNeighbors); + out.collect(output); + } } } /** - * Wraps the vertex type to encapsulate results from the Jaccard Similarity algorithm. + * Wraps the vertex type to encapsulate results from the jaccard index algorithm. * * @param ID type */ @@ -333,6 +418,8 @@ public static class Result extends Edge> { public static final int HASH_SEED = 0x731f73e7; + private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); + public Result() { f2 = new Tuple2<>(new IntValue(), new IntValue()); } @@ -356,19 +443,24 @@ public IntValue getDistinctNeighborCount() { } /** - * Get the Jaccard Similarity score, equal to the number of common + * Get the Jaccard Index score, equal to the number of common * neighbors shared by the source and target vertices divided by the * number of distinct neighbors. * - * @return Jaccard Similarity score + * @return Jaccard Index score */ - public double getJaccardSimilarityScore() { + public double getJaccardIndexScore() { return getCommonNeighborCount().getValue() / (double) getDistinctNeighborCount().getValue(); } @Override public int hashCode() { - return MathUtils.murmurHash(HASH_SEED, f0.hashCode(), f1.hashCode(), f2.f0.hashCode(), f2.f1.hashCode()); + return hasher.reset() + .hash(f0.hashCode()) + .hash(f1.hashCode()) + .hash(f2.f0.getValue()) + .hash(f2.f1.getValue()) + .hash(); } } } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/asm/JaccardSimilarityTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java similarity index 80% rename from flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/asm/JaccardSimilarityTest.java rename to flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java index 9e867ef7a2818..760e70e940ee4 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/asm/JaccardSimilarityTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.graph.library.asm; +package org.apache.flink.graph.library.similarity; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.flink.api.java.DataSet; @@ -27,7 +27,7 @@ import org.apache.flink.graph.generator.RMatGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.graph.library.asm.JaccardSimilarity.Result; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -36,14 +36,14 @@ import static org.junit.Assert.assertEquals; -public class JaccardSimilarityTest +public class JaccardIndexTest extends AsmTestBase { @Test public void testSimpleGraph() throws Exception { - DataSet> js = undirectedSimpleGraph - .run(new JaccardSimilarity()); + DataSet> ji = undirectedSimpleGraph + .run(new JaccardIndex()); String expectedResult = "(0,1,(1,4))\n" + @@ -58,17 +58,17 @@ public void testSimpleGraph() "(2,5,(1,3))\n" + "(4,5,(1,1))\n"; - TestBaseUtils.compareResultAsText(js.collect(), expectedResult); + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); } @Test public void testCompleteGraph() throws Exception { - DataSet> js = completeGraph - .run(new JaccardSimilarity() + DataSet> ji = completeGraph + .run(new JaccardIndex() .setGroupSize(4)); - for (Result result : js.collect()) { + for (Result result : ji.collect()) { // the intersection includes every vertex assertEquals(completeGraphVertexCount, result.getDistinctNeighborCount().getValue()); @@ -89,11 +89,11 @@ public void testRMatGraph() .setSimpleGraph(true, false) .generate(); - DataSet> js = graph - .run(new JaccardSimilarity() + DataSet> ji = graph + .run(new JaccardIndex() .setGroupSize(4)); - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(js); + ChecksumHashCode checksum = DataSetUtils.checksumHashCode(ji); assertEquals(13954, checksum.getCount()); assertEquals(0x0000179f83a2a873L, checksum.getChecksum()); From 00a2036fbf99c5726beab1197379d915e6fef94b Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Wed, 18 May 2016 10:27:23 -0400 Subject: [PATCH 3/6] Removed maximum degree ... this should and needs to be a separate algorithm which returns a new graph --- docs/apis/batch/libs/gelly.md | 4 --- ...ccardSimilarity.java => JaccardIndex.java} | 19 +++++----- .../annotate/DegreeAnnotationFunctions.java | 21 ----------- .../annotate/undirected/EdgeDegreePair.java | 16 --------- .../annotate/undirected/EdgeSourceDegree.java | 15 -------- .../annotate/undirected/EdgeTargetDegree.java | 15 -------- .../annotate/undirected/VertexDegree.java | 22 ------------ .../library/similarity/JaccardIndex.java | 34 ++++++++---------- .../library/similarity/JaccardIndexTest.java | 35 +++++++++++++++++++ 9 files changed, 59 insertions(+), 122 deletions(-) rename flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/{JaccardSimilarity.java => JaccardIndex.java} (86%) diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 7cc302485c27d..242a95e589f04 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2172,7 +2172,6 @@ DataSet> degree = graph

Optional configuration:

  • setIncludeZeroDegreeVertices: by default only the edge set is processed for the computation of degree; when this flag is set an additional join is performed against the vertex set in order to output vertices with a degree of zero

  • -
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnTargetId: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.

@@ -2190,7 +2189,6 @@ DataSet>> sourceDegree = graph {% endhighlight %}

Optional configuration:

    -
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnTargetId: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.

@@ -2208,7 +2206,6 @@ DataSet>> targetDegree = graph {% endhighlight %}

Optional configuration:

    -
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnSourceId: the degree can be counted from either the edge source or target IDs. By default the target IDs are counted. Reducing on source IDs may optimize the algorithm if the input edge list is sorted by source ID.

@@ -2226,7 +2223,6 @@ DataSet>> pairDegree = graph {% endhighlight %}

Optional configuration:

    -
  • setMaximumDegree: filter out vertices with degree than the given maximum

  • setParallelism: override the operator parallelism

  • setReduceOnTargetId: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.

diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java similarity index 86% rename from flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java rename to flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 17bb073a01313..8917f6b0580f0 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardSimilarity.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -31,7 +31,6 @@ import org.apache.flink.graph.generator.RMatGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; import org.apache.flink.graph.generator.random.RandomGenerableFactory; -import org.apache.flink.graph.library.similarity.JaccardIndex; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; @@ -39,15 +38,15 @@ import java.text.NumberFormat; /** - * Driver for the library implementation of Jaccard Similarity. + * Driver for the library implementation of Jaccard Index. * * This example generates an undirected RMat graph with the given scale and - * edge factor then calculates all non-zero Jaccard Similarity scores + * edge factor then calculates all non-zero Jaccard Index scores * between vertices. * - * @see JaccardIndex + * @see org.apache.flink.graph.library.similarity.JaccardIndex */ -public class JaccardSimilarity { +public class JaccardIndex { public static final int DEFAULT_SCALE = 10; @@ -81,11 +80,11 @@ public static void main(String[] args) throws Exception { if (scale > 32) { js = graph - .run(new JaccardIndex()); + .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); } else { js = graph .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new JaccardIndex()); + .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); } switch (parameters.get("output", "")) { @@ -116,9 +115,9 @@ public static void main(String[] args) throws Exception { System.out.println("is the number of common neighbors divided by the number of distinct neighbors."); System.out.println(""); System.out.println("usage:"); - System.out.println(" JaccardSimilarity [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); - System.out.println(" JaccardSimilarity [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); - System.out.println(" JaccardSimilarity [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + + System.out.println(" JaccardIndex [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); + System.out.println(" JaccardIndex [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); + System.out.println(" JaccardIndex [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); return; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java index a33c0f6b2dd53..098e9fe05facd 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java @@ -18,7 +18,6 @@ package org.apache.flink.graph.asm.degree.annotate; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -90,26 +89,6 @@ public Vertex reduce(Vertex left, Vertex ID type - */ - public static class DegreeFilter - implements FilterFunction> { - private long maximumDegree; - - public DegreeFilter(long maximumDegree) { - this.maximumDegree = maximumDegree; - } - - @Override - public boolean filter(Vertex value) - throws Exception { - return value.f1.getValue() <= maximumDegree; - } - } - /** * Performs a left outer join to apply a zero count for vertices with * out- or in-degree of zero. diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index 84d2d68cb84b0..698a46adbf851 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -43,8 +43,6 @@ public class EdgeDegreePair // Optional configuration protected boolean reduceOnTargetId = false; - private long maximumDegree = Long.MAX_VALUE; - private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -62,18 +60,6 @@ public EdgeDegreePair setReduceOnTargetId(boolean reduceOnTargetId) { return this; } - /** - * Filter out vertices with degree than the given maximum. - * - * @param maximumDegree maximum degree - * @return this - */ - public EdgeDegreePair setMaximumDegree(long maximumDegree) { - this.maximumDegree = maximumDegree; - - return this; - } - /** * Override the operator parallelism. * @@ -93,14 +79,12 @@ public DataSet>> run(Graph i DataSet>> edgeSourceDegrees = input .run(new EdgeSourceDegree() .setReduceOnTargetId(reduceOnTargetId) - .setMaximumDegree(maximumDegree) .setParallelism(parallelism)); // t, d(t) DataSet> vertexDegrees = input .run(new VertexDegree() .setReduceOnTargetId(reduceOnTargetId) - .setMaximumDegree(maximumDegree) .setParallelism(parallelism)); // s, t, (deg(s), deg(t)) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index 1932dc35ef30c..6f64bc1443c34 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -42,8 +42,6 @@ public class EdgeSourceDegree // Optional configuration private boolean reduceOnTargetId = false; - private long maximumDegree = Long.MAX_VALUE; - private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -61,18 +59,6 @@ public EdgeSourceDegree setReduceOnTargetId(boolean reduceOnTargetId) return this; } - /** - * Filter out vertices with degree than the given maximum. - * - * @param maximumDegree maximum degree - * @return this - */ - public EdgeSourceDegree setMaximumDegree(long maximumDegree) { - this.maximumDegree = maximumDegree; - - return this; - } - /** * Override the operator parallelism. * @@ -92,7 +78,6 @@ public DataSet>> run(Graph input) DataSet> vertexDegrees = input .run(new VertexDegree() .setReduceOnTargetId(reduceOnTargetId) - .setMaximumDegree(maximumDegree) .setParallelism(parallelism)); // s, t, d(s) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index 5a6a52e71ce58..e9505bcff4dcc 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -42,8 +42,6 @@ public class EdgeTargetDegree // Optional configuration private boolean reduceOnSourceId = false; - private long maximumDegree = Long.MAX_VALUE; - private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -61,18 +59,6 @@ public EdgeTargetDegree setReduceOnSourceId(boolean reduceOnSourceId) return this; } - /** - * Filter out vertices with degree than the given maximum. - * - * @param maximumDegree maximum degree - * @return this - */ - public EdgeTargetDegree setMaximumDegree(long maximumDegree) { - this.maximumDegree = maximumDegree; - - return this; - } - /** * Override the operator parallelism. * @@ -92,7 +78,6 @@ public DataSet>> run(Graph input) DataSet> vertexDegrees = input .run(new VertexDegree() .setReduceOnTargetId(!reduceOnSourceId) - .setMaximumDegree(maximumDegree) .setParallelism(parallelism)); // s, t, d(t) diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index c1fb55690c4b9..518afaa973182 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -26,7 +26,6 @@ import org.apache.flink.graph.GraphAlgorithm; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; -import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeFilter; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId; @@ -47,8 +46,6 @@ public class VertexDegree private boolean reduceOnTargetId = false; - private long maximumDegree = Long.MAX_VALUE; - private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN; /** @@ -81,18 +78,6 @@ public VertexDegree setReduceOnTargetId(boolean reduceOnTargetId) { return this; } - /** - * Filter out vertices with degree than the given maximum. - * - * @param maximumDegree maximum degree - * @return this - */ - public VertexDegree setMaximumDegree(long maximumDegree) { - this.maximumDegree = maximumDegree; - - return this; - } - /** * Override the operator parallelism. * @@ -125,13 +110,6 @@ public DataSet> run(Graph input) .setParallelism(parallelism) .name("Degree count"); - if (maximumDegree < Long.MAX_VALUE) { - degree = degree - .filter(new DegreeFilter(maximumDegree)) - .setParallelism(parallelism) - .name("Degree filter"); - } - if (includeZeroDegreeVertices) { degree = input .getVertices() diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index e801ea646b653..975e7c6578cbc 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -37,6 +37,7 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; import java.util.ArrayList; import java.util.List; @@ -64,17 +65,15 @@ public class JaccardIndex, VV, EV> // Optional configuration private int groupSize = DEFAULT_GROUP_SIZE; - private long maximumDegree = Long.MAX_VALUE; - private boolean unboundedScores = true; - private int minimumScoreNumerator = -1; + private int minimumScoreNumerator = 0; private int minimumScoreDenominator = 1; private int maximumScoreNumerator = 1; - private int maximumScoreDenominator = -1; + private int maximumScoreDenominator = 0; private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; @@ -87,19 +86,9 @@ public class JaccardIndex, VV, EV> * @return this */ public JaccardIndex setGroupSize(int groupSize) { - this.groupSize = groupSize; - - return this; - } + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); - /** - * Filter out vertices with degree than the given maximum. - * - * @param maximumDegree maximum degree - * @return this - */ - public JaccardIndex setMaximumDegree(long maximumDegree) { - this.maximumDegree = maximumDegree; + this.groupSize = groupSize; return this; } @@ -113,6 +102,10 @@ public JaccardIndex setMaximumDegree(long maximumDegree) { * @see #setMaximumScore(int, int) */ public JaccardIndex setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >= 0, "Minimum score numerator must be non-negative"); + Preconditions.checkArgument(denominator > 0, "Minimum score denominator must be greater than zero"); + Preconditions.checkArgument(numerator <= denominator, "Minimum score fraction must be less than or equal to one"); + this.unboundedScores = false; this.minimumScoreNumerator = numerator; this.minimumScoreDenominator = denominator; @@ -121,7 +114,7 @@ public JaccardIndex setMinimumScore(int numerator, int denominator) { } /** - * Filter out Jaccard Index scores greater than the given maximum fraction. + * Filter out Jaccard Index scores greater than or equal to the given maximum fraction. * * @param numerator numerator of the maximum score * @param denominator denominator of the maximum score @@ -129,6 +122,10 @@ public JaccardIndex setMinimumScore(int numerator, int denominator) { * @see #setMinimumScore(int, int) */ public JaccardIndex setMaximumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >= 0, "Maximum score numerator must be non-negative"); + Preconditions.checkArgument(denominator > 0, "Maximum score denominator must be greater than zero"); + Preconditions.checkArgument(numerator <= denominator, "Maximum score fraction must be less than or equal to one"); + this.unboundedScores = false; this.maximumScoreNumerator = numerator; this.maximumScoreDenominator = denominator; @@ -162,7 +159,6 @@ public DataSet> run(Graph input) // s, t, d(t) DataSet>> neighborDegree = input .run(new EdgeTargetDegree() - .setMaximumDegree(maximumDegree) .setParallelism(littleParallelism)); // group span, s, t, d(t) @@ -399,7 +395,7 @@ public void reduce(Iterable> values, Collector> if (unboundedScores || (count * minimumScoreDenominator >= distinctNeighbors * minimumScoreNumerator - && count * maximumScoreDenominator <= distinctNeighbors * maximumScoreNumerator)) { + && count * maximumScoreDenominator < distinctNeighbors * maximumScoreNumerator)) { output.f0 = edge.f0; output.f1 = edge.f1; output.f2.f0.setValue(count); diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java index 760e70e940ee4..4f894883b929f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java @@ -61,6 +61,41 @@ public void testSimpleGraph() TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); } + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet> ji = undirectedSimpleGraph + .run(new JaccardIndex() + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet> ji = undirectedSimpleGraph + .run(new JaccardIndex() + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + @Test public void testCompleteGraph() throws Exception { From 804b3abcdb6b855b5592a76eace0cceb286b3b61 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Wed, 18 May 2016 14:19:51 -0400 Subject: [PATCH 4/6] Fix precision of shift in example --- .../java/org/apache/flink/graph/examples/JaccardIndex.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 8917f6b0580f0..44de8bbfe0c11 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -41,7 +41,7 @@ * Driver for the library implementation of Jaccard Index. * * This example generates an undirected RMat graph with the given scale and - * edge factor then calculates all non-zero Jaccard Index scores + * edge factor then calculates all non-zero Jaccard Index similarity scores * between vertices. * * @see org.apache.flink.graph.library.similarity.JaccardIndex @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception { RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); - long vertexCount = 1 << scale; + long vertexCount = 1L << scale; long edgeCount = vertexCount * edgeFactor; boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); @@ -112,7 +112,8 @@ public static void main(String[] args) throws Exception { System.out.println(""); System.out.println("This algorithm returns 4-tuples containing two vertex IDs, the number of"); System.out.println("common neighbors, and the number of distinct neighbors. The Jaccard Index"); - System.out.println("is the number of common neighbors divided by the number of distinct neighbors."); + System.out.println("similarity score is the number of common neighbors divided by the number"); + System.out.println("of distinct neighbors."); System.out.println(""); System.out.println("usage:"); System.out.println(" JaccardIndex [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); From 094c3e29fd3d0a55169a355256df32d5c78367e1 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 19 May 2016 17:09:29 -0400 Subject: [PATCH 5/6] PR updates: CSV input now supported in JaccardIndex example improved documentation --- docs/apis/batch/libs/gelly.md | 61 ++++++--- .../flink/graph/examples/JaccardIndex.java | 124 ++++++++++++------ .../examples/LocalClusteringCoefficient.java | 16 +-- .../flink/graph/examples/TriangleListing.java | 2 +- .../library/similarity/JaccardIndex.java | 37 ++++-- .../library/similarity/JaccardIndexTest.java | 2 +- 6 files changed, 157 insertions(+), 85 deletions(-) diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 242a95e589f04..344dfd7f03dfc 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2055,22 +2055,22 @@ vertex and edge in the output graph stores the common group value and the number ### Jaccard Index #### Overview -The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to -1.0 (all neighbors are common). +The Jaccard Index measures the similarity between vertex neighborhoods and is computed as the number of shared numbers +divided by the number of distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are +shared). #### Details -Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges -connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed -by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted -in the sum of degrees. +Counting shared neighbors for pairs of vertices is equivalent to counting connecting paths of length two. The number of +distinct neighbors is computed by storing the sum of degrees of the vertex pair and subtracting the count of shared +neighbors, which are double-counted in the sum of degrees. -The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of -neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. +The algorithm first annotates each edge with the target vertex's degree. Grouping on the source vertex, each pair of +neighbors is emitted with the degree sum. Grouping on two-paths, the shared neighbors are counted. #### Usage The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, -the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and -`Copyable`. +the number of shared neighbors, and the number of distinct neighbors. The result class provides a method to compute the +Jaccard Index score. The graph ID type must be `Comparable` and `Copyable`. ### Local Clustering Coefficient @@ -2095,7 +2095,7 @@ Graph Algorithms ----------- The logic blocks with which the `Graph` API and top-level algorithms are assembled are accessible in Gelly as graph -algorithms in the `org.apache.flink.graph.asm` package. These algorithms provide optimization and tuning through +algorithms in the `org.apache.flink.graph` package. These algorithms provide optimization and tuning through configuration parameters and may provide implicit runtime reuse when processing the same input with a similar configuration. @@ -2109,7 +2109,7 @@ configuration. - degree.annotate.directed.
VertexInDegree + asm.degree.annotate.directed.
VertexInDegree

Annotate vertices of a directed graph with the in-degree.

{% highlight java %} @@ -2126,7 +2126,7 @@ DataSet> inDegree = graph - degree.annotate.directed.
VertexOutDegree + asm.degree.annotate.directed.
VertexOutDegree

Annotate vertices of a directed graph with the out-degree.

{% highlight java %} @@ -2143,7 +2143,7 @@ DataSet> outDegree = graph - degree.annotate.directed.
VertexDegreePair + asm.degree.annotate.directed.
VertexDegreePair

Annotate vertices of a directed graph with both the out-degree and in-degree.

{% highlight java %} @@ -2160,7 +2160,7 @@ DataSet>> pairDegree = graph - degree.annotate.undirected.
VertexDegree + asm.degree.annotate.undirected.
VertexDegree

Annotate vertices of an undirected graph with the degree.

{% highlight java %} @@ -2179,7 +2179,7 @@ DataSet> degree = graph - degree.annotate.undirected.
EdgeSourceDegree + asm.degree.annotate.undirected.
EdgeSourceDegree

Annotate edges of an undirected graph with degree of the source ID.

{% highlight java %} @@ -2196,7 +2196,7 @@ DataSet>> sourceDegree = graph - degree.annotate.undirected.
EdgeTargetDegree + asm.degree.annotate.undirected.
EdgeTargetDegree

Annotate edges of an undirected graph with degree of the target ID.

{% highlight java %} @@ -2213,7 +2213,7 @@ DataSet>> targetDegree = graph - degree.annotate.undirected.
EdgeDegreePair + asm.degree.annotate.undirected.
EdgeDegreePair

Annotate edges of an undirected graph with the degree of both the source and target degree ID.

{% highlight java %} @@ -2230,7 +2230,7 @@ DataSet>> pairDegree = graph - translate.
TranslateGraphIds + asm.translate.
TranslateGraphIds

Translate vertex and edge IDs using the given TranslateFunction.

{% highlight java %} @@ -2240,7 +2240,7 @@ graph.run(new TranslateGraphIds(new LongValueToStringValue())); - translate.
TranslateVertexValues + asm.translate.
TranslateVertexValues

Translate vertex values using the given TranslateFunction.

{% highlight java %} @@ -2250,7 +2250,7 @@ graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount))); - translate.
TranslateEdgeValues + asm.translate.
TranslateEdgeValues

Translate edge values using the given TranslateFunction.

{% highlight java %} @@ -2258,6 +2258,25 @@ graph.run(new TranslateEdgeValues(new Nullify())); {% endhighlight %} + + + library.similarity.
JaccardIndex + +

Measures the similarity between vertex neighborhoods. The Jaccard Index score is computed as the number of shared numbers divided by the number of distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are shared).

+{% highlight java %} +DataSet ji = graph + .run(new JaccardIndex() + .setMinimumScore(2, 5) + .setMaximumScore(7, 10)); +{% endhighlight %} +

Optional configuration:

+
    +
  • setLittleParallelism: override the parallelism of operators processing small amounts of data

  • +
  • setMaximumScore: filter out Jaccard Index scores greater than or equal to the given maximum fraction

  • +
  • setMinimumScore: filter out Jaccard Index scores less than the given minimum fraction

  • +
+ + diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 44de8bbfe0c11..573f3b3555c4f 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -18,6 +18,8 @@ package org.apache.flink.graph.examples; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.text.WordUtils; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.DataSet; @@ -31,6 +33,7 @@ import org.apache.flink.graph.generator.RMatGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; @@ -40,9 +43,9 @@ /** * Driver for the library implementation of Jaccard Index. * - * This example generates an undirected RMat graph with the given scale and - * edge factor then calculates all non-zero Jaccard Index similarity scores - * between vertices. + * This example reads a simple, undirected graph from a CSV file or generates + * generates an undirected RMat graph with the given scale and edge factor + * then calculates all non-zero Jaccard Index similarity scores between vertices. * * @see org.apache.flink.graph.library.similarity.JaccardIndex */ @@ -54,6 +57,26 @@ public class JaccardIndex { public static final boolean DEFAULT_CLIP_AND_FLIP = true; + private static void printUsage() { + System.out.println(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" + + " neighborhoods and is computed as the number of shared numbers divided by the number of" + + " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" + + " shared).", 80)); + System.out.println(); + System.out.println(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + + " number of shared neighbors, and the number of distinct neighbors.", 80)); + System.out.println(); + System.out.println("usage: JaccardIndex --input --output graph = Graph + .fromCsvReader(parameters.get("input_filename"), env) + .ignoreCommentsEdges("#") + .lineDelimiterEdges(lineDelimiter) + .fieldDelimiterEdges(fieldDelimiter) + .keyType(LongValue.class); + + ji = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); - RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + } break; - long vertexCount = 1L << scale; - long edgeCount = vertexCount * edgeFactor; + case "rmat": { + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); - boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); - Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .setSimpleGraph(true, clipAndFlip) - .generate(); + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; - DataSet js; + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); - if (scale > 32) { - js = graph - .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); - } else { - js = graph - .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + if (scale > 32) { + ji = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); + } else { + ji = graph + .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); + } + } break; + + default: + printUsage(); + return; } switch (parameters.get("output", "")) { case "print": - js.print(); + for (Object e: ji.collect()) { + Result result = (Result)e; + System.out.println(result.toVerboseString()); + } break; case "hash": - System.out.println(DataSetUtils.checksumHashCode(js)); + System.out.println(DataSetUtils.checksumHashCode(ji)); break; case "csv": - String filename = parameters.get("filename"); + String filename = parameters.get("output_filename"); + + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); - String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER); - String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER); + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); - js.writeAsCsv(filename, row_delimiter, field_delimiter); + ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter); env.execute(); break; default: - System.out.println("The Jaccard Index measures the similarity between vertex neighborhoods."); - System.out.println("Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common)."); - System.out.println(""); - System.out.println("This algorithm returns 4-tuples containing two vertex IDs, the number of"); - System.out.println("common neighbors, and the number of distinct neighbors. The Jaccard Index"); - System.out.println("similarity score is the number of common neighbors divided by the number"); - System.out.println("of distinct neighbors."); - System.out.println(""); - System.out.println("usage:"); - System.out.println(" JaccardIndex [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); - System.out.println(" JaccardIndex [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); - System.out.println(" JaccardIndex [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" + - " --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]"); - + printUsage(); return; } diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java index 2465da8128306..16d79c21480e1 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java @@ -18,6 +18,7 @@ package org.apache.flink.graph.examples; +import org.apache.commons.lang3.text.WordUtils; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.DataSet; @@ -110,14 +111,13 @@ public static void main(String[] args) throws Exception { env.execute(); break; default: - System.out.println("The local clustering coefficient measures the connectedness of each vertex's"); - System.out.println("neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0"); - System.out.println("(neighborhood is a clique)"); - System.out.println(""); - System.out.println("This algorithm returns tuples containing the vertex ID, the degree of"); - System.out.println("the vertex, the number of edges between vertex neighbors, and the local"); - System.out.println("clustering coefficient."); - System.out.println(""); + System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" + + " vertex's neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0 (neighborhood" + + " is a clique).", 80)); + System.out.println(); + System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" + + " the vertex, the number of edges between vertex neighbors, and the local clustering coefficient.", 80)); + System.out.println(); System.out.println("usage:"); System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); System.out.println(" LocalClusteringCoefficient [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java index f5f232d888020..1aff090f6c2d0 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java @@ -107,7 +107,7 @@ public static void main(String[] args) throws Exception { break; default: System.out.println("Lists all distinct triangles in the generated RMat graph."); - System.out.println(""); + System.out.println(); System.out.println("usage:"); System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print"); System.out.println(" TriangleListing [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash"); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 975e7c6578cbc..c487bd0048ed9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -43,11 +43,13 @@ import java.util.List; /** - * The Jaccard Index measures the similarity between vertex neighborhoods. - * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * The Jaccard Index measures the similarity between vertex neighborhoods and + * is computed as the number of shared numbers divided by the number of + * distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all + * neighbors are shared). *
* This implementation produces similarity scores for each pair of vertices - * in the graph with at least one common neighbor; equivalently, this is the + * in the graph with at least one shared neighbor; equivalently, this is the * set of all non-zero Jaccard Similarity coefficients. *
* The input graph must be a simple, undirected graph containing no duplicate @@ -82,6 +84,8 @@ public class JaccardIndex, VV, EV> * pairs. Small groups generate more data whereas large groups distribute * computation less evenly among tasks. * + * The default value should be near-optimal for all use cases. + * * @param groupSize the group size for the quadratic expansion of neighbor pairs * @return this */ @@ -348,10 +352,10 @@ public void reduce(Iterable> values, Collector< } /** - * Compute the counts of common and distinct neighbors. A two-path connecting - * the vertices is emitted for each common neighbor. The number of distinct + * Compute the counts of shared and distinct neighbors. A two-path connecting + * the vertices is emitted for each shared neighbor. The number of distinct * neighbors is equal to the sum of degrees of the vertices minus the count - * of common numbers, which are double-counted in the degree sum. + * of shared numbers, which are double-counted in the degree sum. * * @param ID type */ @@ -421,11 +425,11 @@ public Result() { } /** - * Get the common neighbor count. + * Get the shared neighbor count. * - * @return common neighbor count + * @return shared neighbor count */ - public IntValue getCommonNeighborCount() { + public IntValue getSharedNeighborCount() { return f2.f0; } @@ -439,14 +443,21 @@ public IntValue getDistinctNeighborCount() { } /** - * Get the Jaccard Index score, equal to the number of common - * neighbors shared by the source and target vertices divided by the - * number of distinct neighbors. + * Get the Jaccard Index score, equal to the number of shared neighbors + * of the source and target vertices divided by the number of distinct + * neighbors. * * @return Jaccard Index score */ public double getJaccardIndexScore() { - return getCommonNeighborCount().getValue() / (double) getDistinctNeighborCount().getValue(); + return getSharedNeighborCount().getValue() / (double) getDistinctNeighborCount().getValue(); + } + + public String toVerboseString() { + return "Vertex IDs: (" + f0 + ", " + f1 + + "), number of shared neighbors: " + getSharedNeighborCount() + + ", number of distinct neighbors: " + getDistinctNeighborCount() + + ", jaccard index score: " + getJaccardIndexScore(); } @Override diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java index 4f894883b929f..2241dc9624532 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java @@ -108,7 +108,7 @@ public void testCompleteGraph() assertEquals(completeGraphVertexCount, result.getDistinctNeighborCount().getValue()); // the union only excludes the two vertices from the similarity score - assertEquals(completeGraphVertexCount - 2, result.getCommonNeighborCount().getValue()); + assertEquals(completeGraphVertexCount - 2, result.getSharedNeighborCount().getValue()); } } From 5e0800f99ef7e314dc56cd5a4526422b8d5f071f Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 20 May 2016 09:06:24 -0400 Subject: [PATCH 6/6] Remove unused translator --- docs/apis/batch/libs/gelly.md | 55 +++++++++---------- .../flink/graph/examples/JaccardIndex.java | 6 +- .../TranslateEdgeDegreeToIntValue.java | 51 ----------------- .../library/similarity/JaccardIndex.java | 2 +- 4 files changed, 30 insertions(+), 84 deletions(-) delete mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 344dfd7f03dfc..8a0b308b2c56e 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2055,7 +2055,7 @@ vertex and edge in the output graph stores the common group value and the number ### Jaccard Index #### Overview -The Jaccard Index measures the similarity between vertex neighborhoods and is computed as the number of shared numbers +The Jaccard Index measures the similarity between vertex neighborhoods and is computed as the number of shared neighbors divided by the number of distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are shared). @@ -2072,6 +2072,10 @@ The algorithm takes a simple, undirected graph as input and outputs a `DataSet` the number of shared neighbors, and the number of distinct neighbors. The result class provides a method to compute the Jaccard Index score. The graph ID type must be `Comparable` and `Copyable`. +* `setLittleParallelism`: override the parallelism of operators processing small amounts of data +* `setMaximumScore`: filter out Jaccard Index scores greater than or equal to the given maximum fraction +* `setMinimumScore`: filter out Jaccard Index scores less than the given minimum fraction + ### Local Clustering Coefficient #### Overview @@ -2095,7 +2099,7 @@ Graph Algorithms ----------- The logic blocks with which the `Graph` API and top-level algorithms are assembled are accessible in Gelly as graph -algorithms in the `org.apache.flink.graph` package. These algorithms provide optimization and tuning through +algorithms in the `org.apache.flink.graph.asm` package. These algorithms provide optimization and tuning through configuration parameters and may provide implicit runtime reuse when processing the same input with a similar configuration. @@ -2109,7 +2113,7 @@ configuration. - asm.degree.annotate.directed.
VertexInDegree + degree.annotate.directed.
VertexInDegree

Annotate vertices of a directed graph with the in-degree.

{% highlight java %} @@ -2126,7 +2130,7 @@ DataSet> inDegree = graph - asm.degree.annotate.directed.
VertexOutDegree + degree.annotate.directed.
VertexOutDegree

Annotate vertices of a directed graph with the out-degree.

{% highlight java %} @@ -2143,7 +2147,7 @@ DataSet> outDegree = graph - asm.degree.annotate.directed.
VertexDegreePair + degree.annotate.directed.
VertexDegreePair

Annotate vertices of a directed graph with both the out-degree and in-degree.

{% highlight java %} @@ -2160,7 +2164,7 @@ DataSet>> pairDegree = graph - asm.degree.annotate.undirected.
VertexDegree + degree.annotate.undirected.
VertexDegree

Annotate vertices of an undirected graph with the degree.

{% highlight java %} @@ -2179,7 +2183,7 @@ DataSet> degree = graph - asm.degree.annotate.undirected.
EdgeSourceDegree + degree.annotate.undirected.
EdgeSourceDegree

Annotate edges of an undirected graph with degree of the source ID.

{% highlight java %} @@ -2196,7 +2200,7 @@ DataSet>> sourceDegree = graph - asm.degree.annotate.undirected.
EdgeTargetDegree + degree.annotate.undirected.
EdgeTargetDegree

Annotate edges of an undirected graph with degree of the target ID.

{% highlight java %} @@ -2213,7 +2217,7 @@ DataSet>> targetDegree = graph - asm.degree.annotate.undirected.
EdgeDegreePair + degree.annotate.undirected.
EdgeDegreePair

Annotate edges of an undirected graph with the degree of both the source and target degree ID.

{% highlight java %} @@ -2230,50 +2234,43 @@ DataSet>> pairDegree = graph - asm.translate.
TranslateGraphIds + translate.
TranslateGraphIds

Translate vertex and edge IDs using the given TranslateFunction.

{% highlight java %} graph.run(new TranslateGraphIds(new LongValueToStringValue())); {% endhighlight %} +

Required configuration:

+
    +
  • translator: implements type or value conversion

  • +
- asm.translate.
TranslateVertexValues + translate.
TranslateVertexValues

Translate vertex values using the given TranslateFunction.

{% highlight java %} graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount))); {% endhighlight %} +

Required configuration:

+
    +
  • translator: implements type or value conversion

  • +
- asm.translate.
TranslateEdgeValues + translate.
TranslateEdgeValues

Translate edge values using the given TranslateFunction.

{% highlight java %} graph.run(new TranslateEdgeValues(new Nullify())); {% endhighlight %} - - - - - library.similarity.
JaccardIndex - -

Measures the similarity between vertex neighborhoods. The Jaccard Index score is computed as the number of shared numbers divided by the number of distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are shared).

-{% highlight java %} -DataSet ji = graph - .run(new JaccardIndex() - .setMinimumScore(2, 5) - .setMaximumScore(7, 10)); -{% endhighlight %} -

Optional configuration:

+

Required configuration:

    -
  • setLittleParallelism: override the parallelism of operators processing small amounts of data

  • -
  • setMaximumScore: filter out Jaccard Index scores greater than or equal to the given maximum fraction

  • -
  • setMinimumScore: filter out Jaccard Index scores less than the given minimum fraction

  • +
  • translator: implements type or value conversion

diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 573f3b3555c4f..f8707d61bc446 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -44,8 +44,8 @@ * Driver for the library implementation of Jaccard Index. * * This example reads a simple, undirected graph from a CSV file or generates - * generates an undirected RMat graph with the given scale and edge factor - * then calculates all non-zero Jaccard Index similarity scores between vertices. + * an undirected RMat graph with the given scale and edge factor then calculates + * all non-zero Jaccard Index similarity scores between vertices. * * @see org.apache.flink.graph.library.similarity.JaccardIndex */ @@ -59,7 +59,7 @@ public class JaccardIndex { private static void printUsage() { System.out.println(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" + - " neighborhoods and is computed as the number of shared numbers divided by the number of" + + " neighborhoods and is computed as the number of shared neighbors divided by the number of" + " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" + " shared).", 80)); System.out.println(); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java deleted file mode 100644 index 0abb6fc9ddbdc..0000000000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.asm.degree.annotate; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.asm.translate.TranslateFunction; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; - -/** - * Translate the edge degree returned by the degree annotation functions from - * {@link LongValue} to {@link IntValue}. - * - * @param edge value type - */ -public class TranslateEdgeDegreeToIntValue -implements TranslateFunction, Tuple2> { - - @Override - public Tuple2 translate(Tuple2 value, Tuple2 reuse) throws Exception { - long val = value.f1.getValue(); - - if (val > Integer.MAX_VALUE) { - throw new RuntimeException("LongValue input overflows IntValue output"); - } - - if (reuse == null) { - reuse = new Tuple2<>(null, new IntValue()); - } - - reuse.f0 = value.f0; - reuse.f1.setValue((int) val); - return reuse; - } -} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index c487bd0048ed9..a9ea60e3cc745 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -44,7 +44,7 @@ /** * The Jaccard Index measures the similarity between vertex neighborhoods and - * is computed as the number of shared numbers divided by the number of + * is computed as the number of shared neighbors divided by the number of * distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all * neighbors are shared). *