From e6314cf5031d20c39fcc729c0fcd1f0cec2e4e6f Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 6 May 2016 11:13:26 -0400 Subject: [PATCH 1/5] [FLINK-3879] [gelly] Native implementation of HITS algorithm This closes #1967 --- docs/apis/batch/libs/gelly.md | 15 +- .../org/apache/flink/graph/examples/HITS.java | 188 ++++++ .../flink/graph/library/HITSAlgorithm.java | 4 +- .../graph/library/link_analysis/HITS.java | 560 ++++++++++++++++++ 4 files changed, 757 insertions(+), 10 deletions(-) create mode 100644 flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index 1b925e0b854b3..0a4af5d3c4bb2 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2038,19 +2038,18 @@ Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of t #### Overview [Hyperlink-Induced Topic Search](http://www.cs.cornell.edu/home/kleinber/auth.pdf) (HITS, or "Hubs and Authorities") -computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many +computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many good authorities and good authorities are those pointed to by many good hubs. #### Details -HITS ranking relies on an iterative method converging to a stationary solution. Each vertex in the directed graph is assigned same non-negative -hub and authority scores. Then the algorithm iteratively updates the scores until termination. Current implementation divides the iteration -into two phases, authority scores can be computed until hub scores updating and normalising finished, hub scores can be computed until -authority scores updating and normalising finished. +Every vertex is assigned the same initial hub and authority scores. The algorithm then iteratively updates the scores +until termination. During each iteration new hub scores are computed from the authority scores, then new authority +scores are computed from the new hub scores. The scores are then normalized and optionally tested for convergence. #### Usage -The algorithm takes a directed graph as input and outputs a `DataSet` of vertices, where the vertex value is a `Tuple2` -containing the hub and authority score after maximum iterations. - +The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3` containing the vertex ID, hub score, +and authority score. + ### Summarization #### Overview diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java new file mode 100644 index 0000000000000..07c876a83e4c0 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java @@ -0,0 +1,188 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.text.WordUtils; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.simple.directed.Simplify; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.link_analysis.HITS.Result; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of HITS (Hubs and Authorities). + * + * This example reads a simple, undirected graph from a CSV file or generates + * an undirected RMat graph with the given scale and edge factor then calculates + * hub and authority scores for each vertex. + * + * @see org.apache.flink.graph.library.link_analysis.HITS + */ +public class HITS { + + public static final int DEFAULT_ITERATIONS = 10; + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + private static void printUsage() { + System.out.println(WordUtils.wrap("", 80)); + System.out.println(); + System.out.println(WordUtils.wrap("", 80)); + System.out.println(); + System.out.println("usage: HITS --input --output graph = Graph + .fromCsvReader(parameters.get("input_filename"), env) + .ignoreCommentsEdges("#") + .lineDelimiterEdges(lineDelimiter) + .fieldDelimiterEdges(fieldDelimiter) + .keyType(LongValue.class); + + hits = graph + .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); + } break; + + case "rmat": { + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate() + .run(new Simplify()); + + if (parameters.get("algorithm").equals("HITS")) { + if (scale > 32) { + hits = graph + .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); + } else { + hits = graph + .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); + } + } else if (parameters.get("algorithm").equals("HITSAlgorithm")) { + if (scale > 32) { + hits = graph + .run(new org.apache.flink.graph.library.HITSAlgorithm(iterations)); + } else { + hits = graph + .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.HITSAlgorithm(iterations)); + } + } else { + return; + } + } break; + + default: + printUsage(); + return; + } + + switch (parameters.get("output", "")) { + case "print": + for (Object e: hits.collect()) { + if (parameters.get("algorithm").equals("HITS")) { + Result result = (Result)e; + System.out.println(result.toVerboseString()); + } else { + System.out.println(e); + } + } + break; + + case "hash": + System.out.println(DataSetUtils.checksumHashCode(hits)); + break; + + case "csv": + String filename = parameters.get("output_filename"); + + String lineDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER)); + + String fieldDelimiter = StringEscapeUtils.unescapeJava( + parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER)); + + hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter); + + env.execute(); + break; + default: + printUsage(); + return; + } + + JobExecutionResult result = env.getLastJobExecutionResult(); + + NumberFormat nf = NumberFormat.getInstance(); + System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms"); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java index 39e9487e3a9ac..8669060188582 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java @@ -49,7 +49,7 @@ public class HITSAlgorithm implements GraphAlgorithm>>> { private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2; - private final static double MINIMUMTHRESHOLD = 1e-9; + private final static double MINIMUMTHRESHOLD = Double.MIN_VALUE; private int maxIterations; private double convergeThreshold; @@ -179,7 +179,7 @@ public void updateVertex(Vertex> vertex, Mes double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices(); // count the diff value of sum of authority scores - diffSumAggregator.aggregate((previousAuthAverage - newAuthorityValue.getValue())); + diffSumAggregator.aggregate(Math.abs(previousAuthAverage - newAuthorityValue.getValue())); } setNewVertexValue(new Tuple2<>(newHubValue, newAuthorityValue)); } else if (getSuperstepNumber() == maxIteration) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java new file mode 100644 index 0000000000000..39e4654371df3 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -0,0 +1,560 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.library.link_analysis; + +import org.apache.flink.api.common.aggregators.ConvergenceCriterion; +import org.apache.flink.api.common.aggregators.DoubleSumAggregator; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichJoinFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; + +/** + * http://www.cs.cornell.edu/home/kleinber/auth.pdf + * + * Hyperlink-Induced Topic Search computes two interdependent scores for every + * vertex in a directed graph. A good "hub" links to good "authorities" and + * good "authorities" are linked from good "hubs". + * + * This algorithm can be configured to terminate either by a limit on the number + * of iterations, a convergence threshold, or both. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class HITS +implements GraphAlgorithm>> { + + private static final String CHANGE_IN_SCORES = "change in scores"; + + private static final String HUBBINESS_SUM_SQUARED = "hubbiness sum squared"; + + private static final String AUTHORITY_SUM_SQUARED = "authority sum squared"; + + // Required configuration + private int maxIterations = Integer.MAX_VALUE; + + private double convergenceThreshold; + + // Optional configuration + private int parallelism = PARALLELISM_DEFAULT; + + /** + * Hyperlink-Induced Topic Search with a fixed number of iterations. + * + * @param iterations fixed number of iterations + */ + public HITS(int iterations) { + this(iterations, Double.MAX_VALUE); + } + + /** + * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm + * terminates When the total change in hub and authority scores over all + * vertices falls below the given threshold value. + * + * @param convergenceThreshold convergence threshold for sum of scores + */ + public HITS(double convergenceThreshold) { + this(Integer.MAX_VALUE, convergenceThreshold); + } + + /** + * Hyperlink-Induced Topic Search with a convergence threshold and a maximum + * iteration count. The algorithm terminates after either the given number + * of iterations or when the total change in hub and authority scores over all + * vertices falls below the given threshold value. + * + * @param maxIterations maximum number of iterations + * @param convergenceThreshold convergence threshold for sum of scores + */ + public HITS(int maxIterations, double convergenceThreshold) { + Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero"); + Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero"); + + this.maxIterations = maxIterations; + this.convergenceThreshold = convergenceThreshold; + } + + /** + * Override the operator parallelism. + * + * @param parallelism operator parallelism + * @return this + */ + public HITS setParallelism(int parallelism) { + this.parallelism = parallelism; + + return this; + } + + @Override + public DataSet> run(Graph input) + throws Exception { + DataSet> edges = input + .getEdges() + .flatMap(new ExtractEdgeIDs()) + .setParallelism(parallelism) + .name("Extract edge IDs"); + + // ID, hub, authority + DataSet> initialScores = edges + .map(new InitializeScores()) + .setParallelism(parallelism) + .name("Initial scores") + .groupBy(0) + .reduce(new SumScores()) + .setParallelism(parallelism) + .name("Sum"); + + IterativeDataSet> iterative = initialScores + .iterate(maxIterations); + + // ID, hubbiness + DataSet> hubbiness = iterative + .coGroup(edges) + .where(0) + .equalTo(1) + .with(new Hubbiness()) + .setParallelism(parallelism) + .name("Hub") + .groupBy(0) + .reduce(new SumScore()) + .setParallelism(parallelism) + .name("Sum"); + + // sum-of-hubbiness-squared + DataSet hubbinessSumSquared = hubbiness + .map(new Square()) + .setParallelism(parallelism) + .name("Square") + .reduce(new Sum()) + .setParallelism(parallelism) + .name("Sum"); + + // ID, new authority + DataSet> authority = hubbiness + .coGroup(edges) + .where(0) + .equalTo(0) + .with(new Authority()) + .setParallelism(parallelism) + .name("Authority") + .groupBy(0) + .reduce(new SumScore()) + .setParallelism(parallelism) + .name("Sum"); + + // sum-of-authority-squared + DataSet authoritySumSquared = authority + .map(new Square()) + .setParallelism(parallelism) + .name("Square") + .reduce(new Sum()) + .setParallelism(parallelism) + .name("Sum"); + + // ID, normalized hubbiness, normalized authority + DataSet> scores = hubbiness + .fullOuterJoin(authority, JoinHint.REPARTITION_SORT_MERGE) + .where(0) + .equalTo(0) + .with(new JoinAndNormalizeHubAndAuthority()) + .withBroadcastSet(hubbinessSumSquared, HUBBINESS_SUM_SQUARED) + .withBroadcastSet(authoritySumSquared, AUTHORITY_SUM_SQUARED) + .setParallelism(parallelism) + .name("Join scores"); + + DataSet> passThrough; + + if (convergenceThreshold < Double.MAX_VALUE) { + passThrough = iterative + .fullOuterJoin(scores, JoinHint.REPARTITION_SORT_MERGE) + .where(0) + .equalTo(0) + .with(new ChangeInScores()) + .setParallelism(parallelism) + .name("Change in scores"); + + iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold)); + } else { + passThrough = scores; + } + + return iterative + .closeWith(passThrough) + .map(new TranslateResult()) + .setParallelism(parallelism) + .name("Map result"); + } + + /** + * Map edges and remove the edge value. + * + * @param ID type + * @param edge value type + * + * @see Graph.ExtractEdgeIDsMapper + */ + @ForwardedFields("0; 1") + private static class ExtractEdgeIDs + implements FlatMapFunction, Tuple2> { + private Tuple2 output = new Tuple2<>(); + + @Override + public void flatMap(Edge value, Collector> out) + throws Exception { + output.f0 = value.f0; + output.f1 = value.f1; + out.collect(output); + } + } + + /** + * Initialize vertices' authority scores by assigning each vertex with an + * initial hub score of 1.0. The hub scores are initialized to zero since + * these will be computed based on the initial authority scores. + * + * The initial scores are non-normalized. + * + * @param ID type + */ + @ForwardedFields("1->0") + private static class InitializeScores + implements MapFunction, Tuple3> { + private Tuple3 output = new Tuple3<>(null, new DoubleValue(0.0), new DoubleValue(1.0)); + + @Override + public Tuple3 map(Tuple2 value) throws Exception { + output.f0 = value.f1; + return output; + } + } + + /** + * Sum vertices' hub and authority scores. + * + * @param ID type + */ + @ForwardedFieldsFirst("0") + @ForwardedFieldsSecond("0") + private static class SumScores + implements ReduceFunction> { + @Override + public Tuple3 reduce(Tuple3 left, Tuple3 right) + throws Exception { + left.f1.setValue(left.f1.getValue() + right.f1.getValue()); + left.f2.setValue(left.f2.getValue() + right.f2.getValue()); + return left; + } + } + + /** + * The hub score is the sum of authority scores of vertices on out-edges. + * + * @param ID type + */ + @ForwardedFieldsFirst("2->1") + @ForwardedFieldsSecond("0") + private static class Hubbiness + implements CoGroupFunction, Tuple2, Tuple2> { + private Tuple2 output = new Tuple2<>(); + + @Override + public void coGroup(Iterable> vertex, Iterable> edges, Collector> out) + throws Exception { + output.f1 = vertex.iterator().next().f2; + + for (Tuple2 edge : edges) { + output.f0 = edge.f0; + out.collect(output); + } + } + } + + /** + * Sum vertices' scores. + * + * @param ID type + */ + @ForwardedFieldsFirst("0") + @ForwardedFieldsSecond("0") + private static class SumScore + implements ReduceFunction> { + @Override + public Tuple2 reduce(Tuple2 left, Tuple2 right) + throws Exception { + left.f1.setValue(left.f1.getValue() + right.f1.getValue()); + return left; + } + } + + /** + * The authority score is the sum of hub scores of vertices on in-edges. + * + * @param ID type + */ + @ForwardedFieldsFirst("1") + @ForwardedFieldsSecond("1->0") + private static class Authority + implements CoGroupFunction, Tuple2, Tuple2> { + private Tuple2 output = new Tuple2<>(); + + @Override + public void coGroup(Iterable> vertex, Iterable> edges, Collector> out) + throws Exception { + output.f1 = vertex.iterator().next().f1; + + for (Tuple2 edge : edges) { + output.f0 = edge.f1; + out.collect(output); + } + } + } + + /** + * Compute the square of each score. + * + * @param ID type + */ + private static class Square + implements MapFunction, DoubleValue> { + private DoubleValue output = new DoubleValue(); + + @Override + public DoubleValue map(Tuple2 value) + throws Exception { + double val = value.f1.getValue(); + output.setValue(val * val); + + return output; + } + } + + /** + * Sum over values. This specialized function is used in place of generic aggregation. + */ + private static class Sum + implements ReduceFunction { + @Override + public DoubleValue reduce(DoubleValue first, DoubleValue second) + throws Exception { + first.setValue(first.getValue() + second.getValue()); + return first; + } + } + + /** + * Join and normalize the hub and authority scores. + * + * @param ID type + */ + @ForwardedFieldsFirst("0") + @ForwardedFieldsSecond("0") + private static class JoinAndNormalizeHubAndAuthority + extends RichJoinFunction, Tuple2, Tuple3> { + private Tuple3 output = new Tuple3<>(null, new DoubleValue(), new DoubleValue()); + + private double hubbinessRootSumSquared; + + private double authorityRootSumSquared; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + Collection var; + var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED); + hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue()); + + var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED); + authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue()); + } + + @Override + public Tuple3 join(Tuple2 hubbiness, Tuple2 authority) + throws Exception { + output.f0 = (authority == null) ? hubbiness.f0 : authority.f0; + output.f1.setValue(hubbiness == null ? 0.0 : hubbiness.f1.getValue() / hubbinessRootSumSquared); + output.f2.setValue(authority == null ? 0.0 : authority.f1.getValue() / authorityRootSumSquared); + return output; + } + } + + /** + * Computes the total sum of the change in hub and authority scores over + * all vertices between iterations. A negative score is emitted after the + * first iteration as an optimization to not normalize the initial scores. + * + * @param ID type + */ + @ForwardedFieldsFirst("0") + @ForwardedFieldsSecond("*") + private static class ChangeInScores + extends RichJoinFunction, Tuple3, Tuple3> { + private boolean isInitialSuperstep; + + private double changeInScores; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1); + changeInScores = (isInitialSuperstep) ? -1.0 : 0.0; + } + + @Override + public void close() throws Exception { + super.close(); + + DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES); + agg.aggregate(changeInScores); + } + + @Override + public Tuple3 join(Tuple3 first, Tuple3 second) + throws Exception { + if (! isInitialSuperstep) { + changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue()); + changeInScores += Math.abs(second.f2.getValue() - first.f2.getValue()); + } + + return second; + } + } + + /** + * Monitors the total change in hub and authority scores over all vertices. + * The iteration terminates when the change in scores compared against the + * prior iteration falls below the given convergence threshold. + * + * An optimization of this implementation of HITS is to leave the initial + * scores non-normalized; therefore, the change in scores after the first + * superstep cannot be measured and a negative value is emitted to signal + * that the iteration should continue. + */ + private static class ScoreConvergence + implements ConvergenceCriterion { + private double convergenceThreshold; + + public ScoreConvergence(double convergenceThreshold) { + this.convergenceThreshold = convergenceThreshold; + } + + @Override + public boolean isConverged(int iteration, DoubleValue value) { + double val = value.getValue(); + return (0 <= val && val <= convergenceThreshold); + } + } + + /** + * Map the Tuple result to the return type. + * + * @param ID type + */ + @ForwardedFields("0") + private static class TranslateResult + implements MapFunction, Result> { + private Result output = new Result<>(); + + @Override + public Result map(Tuple3 value) throws Exception { + output.f0 = value.f0; + output.f1.f0 = value.f1; + output.f1.f1 = value.f2; + return output; + } + } + + /** + * Wraps the vertex type to encapsulate results from the HITS algorithm. + * + * @param ID type + */ + public static class Result + extends Vertex> { + public static final int HASH_SEED = 0xc7e39a63; + + private Murmur3_32 hasher = new Murmur3_32(HASH_SEED); + + public Result() { + f1 = new Tuple2<>(); + } + + /** + * Get the hub score. Good hubs link to good authorities. + * + * @return the hub score + */ + public DoubleValue getHubScore() { + return f1.f0; + } + + /** + * Get the authority score. Good authorities link to good hubs. + * + * @return the authority score + */ + public DoubleValue getAuthorityScore() { + return f1.f1; + } + + public String toVerboseString() { + return "Vertex ID: " + f0 + + ", hub score: " + getHubScore() + + ", authority score: " + getAuthorityScore(); + } + + @Override + public int hashCode() { + return hasher.reset() + .hash(f0.hashCode()) + .hash(f1.f0.getValue()) + .hash(f1.f1.getValue()) + .hash(); + } + } +} From 21e4d2da3aea6fba5c4ce67c50aec9518860ce0c Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Fri, 24 Jun 2016 09:46:07 -0400 Subject: [PATCH 2/5] Simplify graphs with smaller types in examples --- .../main/java/org/apache/flink/graph/examples/HITS.java | 7 +++++-- .../java/org/apache/flink/graph/examples/JaccardIndex.java | 5 +++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java index 07c876a83e4c0..0ed99ddbd891b 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java @@ -115,25 +115,28 @@ public static void main(String[] args) throws Exception { long edgeCount = vertexCount * edgeFactor; Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate() - .run(new Simplify()); + .generate(); if (parameters.get("algorithm").equals("HITS")) { if (scale > 32) { hits = graph + .run(new Simplify()) .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); } else { hits = graph .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new Simplify()) .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); } } else if (parameters.get("algorithm").equals("HITSAlgorithm")) { if (scale > 32) { hits = graph + .run(new Simplify()) .run(new org.apache.flink.graph.library.HITSAlgorithm(iterations)); } else { hits = graph .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new Simplify()) .run(new org.apache.flink.graph.library.HITSAlgorithm(iterations)); } } else { diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 46a296a8f4676..454afacf53ea8 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -119,15 +119,16 @@ public static void main(String[] args) throws Exception { boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) - .generate() - .run(new Simplify(clipAndFlip)); + .generate(); if (scale > 32) { ji = graph + .run(new Simplify(clipAndFlip)) .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); } else { ji = graph .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new Simplify(clipAndFlip)) .run(new org.apache.flink.graph.library.similarity.JaccardIndex()); } } break; From f5aca527304f2cd01c0712af7eec9ab1fa2c375b Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Mon, 27 Jun 2016 09:17:10 -0400 Subject: [PATCH 3/5] Move HITSAlgorithm to gelly examples --- .../org/apache/flink/graph/examples/HITS.java | 74 ++++++----------- .../flink/graph/examples}/HITSAlgorithm.java | 2 +- .../graph/library/HITSAlgorithmITCase.java | 1 + .../graph/library/link_analysis/HITS.java | 36 ++++---- .../LocalClusteringCoefficientTest.java | 6 +- .../LocalClusteringCoefficientTest.java | 6 +- .../graph/library/link_analysis/HITSTest.java | 83 +++++++++++++++++++ 7 files changed, 134 insertions(+), 74 deletions(-) rename flink-libraries/{flink-gelly/src/main/java/org/apache/flink/graph/library => flink-gelly-examples/src/main/java/org/apache/flink/graph/examples}/HITSAlgorithm.java (99%) create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java index 0ed99ddbd891b..c67fa9c57b538 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java @@ -1,21 +1,19 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.flink.graph.examples; @@ -96,10 +94,10 @@ public static void main(String[] args) throws Exception { Graph graph = Graph .fromCsvReader(parameters.get("input_filename"), env) - .ignoreCommentsEdges("#") - .lineDelimiterEdges(lineDelimiter) - .fieldDelimiterEdges(fieldDelimiter) - .keyType(LongValue.class); + .ignoreCommentsEdges("#") + .lineDelimiterEdges(lineDelimiter) + .fieldDelimiterEdges(fieldDelimiter) + .keyType(LongValue.class); hits = graph .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); @@ -117,30 +115,15 @@ public static void main(String[] args) throws Exception { Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) .generate(); - if (parameters.get("algorithm").equals("HITS")) { - if (scale > 32) { - hits = graph - .run(new Simplify()) - .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); - } else { - hits = graph - .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new Simplify()) - .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); - } - } else if (parameters.get("algorithm").equals("HITSAlgorithm")) { - if (scale > 32) { - hits = graph - .run(new Simplify()) - .run(new org.apache.flink.graph.library.HITSAlgorithm(iterations)); - } else { - hits = graph - .run(new TranslateGraphIds(new LongValueToIntValue())) - .run(new Simplify()) - .run(new org.apache.flink.graph.library.HITSAlgorithm(iterations)); - } + if (scale > 32) { + hits = graph + .run(new Simplify()) + .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); } else { - return; + hits = graph + .run(new TranslateGraphIds(new LongValueToIntValue())) + .run(new Simplify()) + .run(new org.apache.flink.graph.library.link_analysis.HITS(iterations)); } } break; @@ -152,12 +135,7 @@ public static void main(String[] args) throws Exception { switch (parameters.get("output", "")) { case "print": for (Object e: hits.collect()) { - if (parameters.get("algorithm").equals("HITS")) { - Result result = (Result)e; - System.out.println(result.toVerboseString()); - } else { - System.out.println(e); - } + System.out.println(((Result)e).toVerboseString()); } break; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java similarity index 99% rename from flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java rename to flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java index 8669060188582..4a39a03505238 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.graph.library; +package org.apache.flink.graph.examples; import org.apache.flink.api.common.aggregators.DoubleSumAggregator; import org.apache.flink.api.common.functions.MapFunction; diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java index 188772522d312..b2526ae4b87a2 100644 --- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.HITSAlgorithm; import org.apache.flink.graph.examples.data.HITSData; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.DoubleValue; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 39e4654371df3..00035e4ef00f7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -1,21 +1,19 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.flink.graph.library.link_analysis; @@ -73,7 +71,7 @@ public class HITS private static final String AUTHORITY_SUM_SQUARED = "authority sum squared"; // Required configuration - private int maxIterations = Integer.MAX_VALUE; + private int maxIterations; private double convergenceThreshold; @@ -92,7 +90,7 @@ public HITS(int iterations) { /** * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm * terminates When the total change in hub and authority scores over all - * vertices falls below the given threshold value. + * vertices falls to or below the given threshold value. * * @param convergenceThreshold convergence threshold for sum of scores */ @@ -104,7 +102,7 @@ public HITS(double convergenceThreshold) { * Hyperlink-Induced Topic Search with a convergence threshold and a maximum * iteration count. The algorithm terminates after either the given number * of iterations or when the total change in hub and authority scores over all - * vertices falls below the given threshold value. + * vertices falls to or below the given threshold value. * * @param maxIterations maximum number of iterations * @param convergenceThreshold convergence threshold for sum of scores @@ -425,7 +423,7 @@ public Tuple3 join(Tuple2 hubbiness /** * Computes the total sum of the change in hub and authority scores over * all vertices between iterations. A negative score is emitted after the - * first iteration as an optimization to not normalize the initial scores. + * first iteration to prevent premature convergence. * * @param ID type */ diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java index 1790ecc4de76b..d8d93adcb50a8 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java @@ -40,9 +40,6 @@ public class LocalClusteringCoefficientTest @Test public void testSimpleGraph() throws Exception { - DataSet> cc = directedSimpleGraph - .run(new LocalClusteringCoefficient()); - String expectedResult = "(0,(2,1))\n" + "(1,(3,2))\n" + @@ -51,6 +48,9 @@ public void testSimpleGraph() "(4,(1,0))\n" + "(5,(1,0))"; + DataSet> cc = directedSimpleGraph + .run(new LocalClusteringCoefficient()); + TestBaseUtils.compareResultAsText(cc.collect(), expectedResult); } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java index a940ab0de730a..f5416fb3fe8f4 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java @@ -40,9 +40,6 @@ public class LocalClusteringCoefficientTest @Test public void testSimpleGraph() throws Exception { - DataSet> cc = undirectedSimpleGraph - .run(new LocalClusteringCoefficient()); - String expectedResult = "(0,(2,1))\n" + "(1,(3,2))\n" + @@ -51,6 +48,9 @@ public void testSimpleGraph() "(4,(1,0))\n" + "(5,(1,0))"; + DataSet> cc = undirectedSimpleGraph + .run(new LocalClusteringCoefficient()); + TestBaseUtils.compareResultAsText(cc.collect(), expectedResult); } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java new file mode 100644 index 0000000000000..4ba3c174749b0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.link_analysis; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.library.link_analysis.HITS.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HITSTest +extends AsmTestBase { + + @Test + public void testWithSimpleGraph() + throws Exception { + DataSet> hits = new HITS(10) + .run(directedSimpleGraph); + + String expectedResult = + "(0,(0.5446287864731747,0.0))\n" + + "(1,(0.0,0.8363240238999012))\n" + + "(2,(0.6072453524686667,0.26848532437604833))\n" + + "(3,(0.5446287864731747,0.39546603929699625))\n" + + "(4,(0.0,0.26848532437604833))\n" + + "(5,(0.194966796646811,0.0))"; + + TestBaseUtils.compareResultAsText(hits.collect(), expectedResult); + } + + @Test + public void testWithCompleteGraph() + throws Exception { + double expectedScore = 1.0 / Math.sqrt(completeGraphVertexCount); + + DataSet> hits = new HITS(0.000001) + .run(completeGraph); + + List> results = hits.collect(); + + assertEquals(completeGraphVertexCount, results.size()); + + for (Result result : results) { + assertEquals(expectedScore, result.getHubScore().getValue(), 0.000001); + assertEquals(expectedScore, result.getAuthorityScore().getValue(), 0.000001); + } + } + + @Test + public void testWithRMatGraph() + throws Exception { + ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph + .run(new HITS(0.000001))); + + assertEquals(902, checksum.getCount()); + assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum()); + } +} From cb57970ae547ba7749e1809320b70fa155c5e781 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 28 Jun 2016 08:50:36 -0400 Subject: [PATCH 4/5] Ensure all tests check double deltas --- .../graph/library/link_analysis/HITSTest.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java index 4ba3c174749b0..1551d842f0328 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java @@ -20,15 +20,16 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.asm.AsmTestBase; import org.apache.flink.graph.library.link_analysis.HITS.Result; -import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; import org.apache.flink.types.NullValue; import org.junit.Test; +import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; @@ -42,15 +43,19 @@ public void testWithSimpleGraph() DataSet> hits = new HITS(10) .run(directedSimpleGraph); - String expectedResult = - "(0,(0.5446287864731747,0.0))\n" + - "(1,(0.0,0.8363240238999012))\n" + - "(2,(0.6072453524686667,0.26848532437604833))\n" + - "(3,(0.5446287864731747,0.39546603929699625))\n" + - "(4,(0.0,0.26848532437604833))\n" + - "(5,(0.194966796646811,0.0))"; + List> expectedResults = new ArrayList<>(); + expectedResults.add(Tuple2.of(0.5446287864731747, 0.0)); + expectedResults.add(Tuple2.of(0.0, 0.8363240238999012)); + expectedResults.add(Tuple2.of(0.6072453524686667,0.26848532437604833)); + expectedResults.add(Tuple2.of(0.5446287864731747,0.39546603929699625)); + expectedResults.add(Tuple2.of(0.0, 0.26848532437604833)); + expectedResults.add(Tuple2.of(0.194966796646811, 0.0)); - TestBaseUtils.compareResultAsText(hits.collect(), expectedResult); + for (Result result : hits.collect()) { + int id = result.f0.getValue(); + assertEquals(expectedResults.get(id).f0, result.getHubScore().getValue(), 0.000001); + assertEquals(expectedResults.get(id).f1, result.getAuthorityScore().getValue(), 0.000001); + } } @Test From 27fe8f2198bed2b02171e9087f21e3f54f280f00 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Tue, 28 Jun 2016 15:17:14 -0400 Subject: [PATCH 5/5] Convergence looks to be broken in HITSAlgorithm but this was not the fix --- .../java/org/apache/flink/graph/examples/HITSAlgorithm.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java index 4a39a03505238..129d2a61c6ad1 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITSAlgorithm.java @@ -179,7 +179,7 @@ public void updateVertex(Vertex> vertex, Mes double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices(); // count the diff value of sum of authority scores - diffSumAggregator.aggregate(Math.abs(previousAuthAverage - newAuthorityValue.getValue())); + diffSumAggregator.aggregate(previousAuthAverage - newAuthorityValue.getValue()); } setNewVertexValue(new Tuple2<>(newHubValue, newAuthorityValue)); } else if (getSuperstepNumber() == maxIteration) {