Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3879] [gelly] Native implementation of HITS algorithm #1967

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 7 additions & 8 deletions docs/apis/batch/libs/gelly.md
Original file line number Diff line number Diff line change
Expand Up @@ -2038,19 +2038,18 @@ Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of t

#### Overview
[Hyperlink-Induced Topic Search](http://www.cs.cornell.edu/home/kleinber/auth.pdf) (HITS, or "Hubs and Authorities")
computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many
computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many
good authorities and good authorities are those pointed to by many good hubs.

#### Details
HITS ranking relies on an iterative method converging to a stationary solution. Each vertex in the directed graph is assigned same non-negative
hub and authority scores. Then the algorithm iteratively updates the scores until termination. Current implementation divides the iteration
into two phases, authority scores can be computed until hub scores updating and normalising finished, hub scores can be computed until
authority scores updating and normalising finished.
Every vertex is assigned the same initial hub and authority scores. The algorithm then iteratively updates the scores
until termination. During each iteration new hub scores are computed from the authority scores, then new authority
scores are computed from the new hub scores. The scores are then normalized and optionally tested for convergence.

#### Usage
The algorithm takes a directed graph as input and outputs a `DataSet` of vertices, where the vertex value is a `Tuple2`
containing the hub and authority score after maximum iterations.
The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3` containing the vertex ID, hub score,
and authority score.

### Summarization

#### Overview
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.examples;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.simple.directed.Simplify;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;

import java.text.NumberFormat;

/**
* Driver for the library implementation of HITS (Hubs and Authorities).
*
* This example reads a simple, undirected graph from a CSV file or generates
* an undirected RMat graph with the given scale and edge factor then calculates
* hub and authority scores for each vertex.
*
* @see org.apache.flink.graph.library.link_analysis.HITS
*/
public class HITS {

public static final int DEFAULT_ITERATIONS = 10;

public static final int DEFAULT_SCALE = 10;

public static final int DEFAULT_EDGE_FACTOR = 16;

private static void printUsage() {
System.out.println(WordUtils.wrap("", 80));
System.out.println();
System.out.println(WordUtils.wrap("", 80));
System.out.println();
System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
System.out.println();
System.out.println(" --output print");
System.out.println(" --output hash");
System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
}

public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

ParameterTool parameters = ParameterTool.fromArgs(args);
int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);

DataSet hits;

switch (parameters.get("input", "")) {
case "csv": {
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));

String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));

Graph<LongValue, NullValue, NullValue> graph = Graph
.fromCsvReader(parameters.get("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);

hits = graph
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
} break;

case "rmat": {
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);

RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();

long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;

Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();

if (scale > 32) {
hits = graph
.run(new Simplify<LongValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
} else {
hits = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new Simplify<IntValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
}
} break;

default:
printUsage();
return;
}

switch (parameters.get("output", "")) {
case "print":
for (Object e: hits.collect()) {
System.out.println(((Result)e).toVerboseString());
}
break;

case "hash":
System.out.println(DataSetUtils.checksumHashCode(hits));
break;

case "csv":
String filename = parameters.get("output_filename");

String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));

String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));

hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);

env.execute();
break;
default:
printUsage();
return;
}

JobExecutionResult result = env.getLastJobExecutionResult();

NumberFormat nf = NumberFormat.getInstance();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.graph.library;
package org.apache.flink.graph.examples;

import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -49,7 +49,7 @@
public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> {

private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2;
private final static double MINIMUMTHRESHOLD = 1e-9;
private final static double MINIMUMTHRESHOLD = Double.MIN_VALUE;

private int maxIterations;
private double convergeThreshold;
Expand Down Expand Up @@ -179,7 +179,7 @@ public void updateVertex(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex, Mes
double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices();

// count the diff value of sum of authority scores
diffSumAggregator.aggregate((previousAuthAverage - newAuthorityValue.getValue()));
diffSumAggregator.aggregate(previousAuthAverage - newAuthorityValue.getValue());
}
setNewVertexValue(new Tuple2<>(newHubValue, newAuthorityValue));
} else if (getSuperstepNumber() == maxIteration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,16 @@ public static void main(String[] args) throws Exception {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);

Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate()
.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
.generate();

if (scale > 32) {
ji = graph
.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>());
} else {
ji = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>());
}
} break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.HITSAlgorithm;
import org.apache.flink.graph.examples.data.HITSData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.DoubleValue;
Expand Down