Skip to content

Commit

Permalink
[FLINK-3879] [gelly] Native implementation of HITS algorithm
Browse files Browse the repository at this point in the history
This closes #1967
  • Loading branch information
greghogan committed Jun 23, 2016
1 parent 08b075a commit 4fc2d24
Show file tree
Hide file tree
Showing 6 changed files with 780 additions and 11 deletions.
15 changes: 7 additions & 8 deletions docs/apis/batch/libs/gelly.md
Original file line number Diff line number Diff line change
Expand Up @@ -2037,19 +2037,18 @@ Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of t

#### Overview
[Hyperlink-Induced Topic Search](http://www.cs.cornell.edu/home/kleinber/auth.pdf) (HITS, or "Hubs and Authorities")
computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many
computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many
good authorities and good authorities are those pointed to by many good hubs.

#### Details
HITS ranking relies on an iterative method converging to a stationary solution. Each vertex in the directed graph is assigned same non-negative
hub and authority scores. Then the algorithm iteratively updates the scores until termination. Current implementation divides the iteration
into two phases, authority scores can be computed until hub scores updating and normalising finished, hub scores can be computed until
authority scores updating and normalising finished.
Every vertex is assigned the same initial hub and authority scores. The algorithm then iteratively updates the scores
until termination. During each iteration new hub scores are computed from the authority scores, then new authority
scores are computed from the new hub scores. The scores are then normalized and optionally tested for convergence.

#### Usage
The algorithm takes a directed graph as input and outputs a `DataSet` of vertices, where the vertex value is a `Tuple2`
containing the hub and authority score after maximum iterations.
The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3` containing the vertex ID, hub score,
and authority score.

### Summarization

#### Overview
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package org.apache.flink.graph.examples;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.simple.directed.Simplify;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;

import java.text.NumberFormat;

/**
* Driver for the library implementation of HITS (Hubs and Authorities).
*
* This example reads a simple, undirected graph from a CSV file or generates
* an undirected RMat graph with the given scale and edge factor then calculates
* hub and authority scores for each vertex.
*
* @see org.apache.flink.graph.library.link_analysis.HITS
*/
public class HITS {

public static final int DEFAULT_ITERATIONS = 10;

public static final int DEFAULT_SCALE = 10;

public static final int DEFAULT_EDGE_FACTOR = 16;

private static void printUsage() {
System.out.println(WordUtils.wrap("", 80));
System.out.println();
System.out.println(WordUtils.wrap("", 80));
System.out.println();
System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
System.out.println();
System.out.println(" --output print");
System.out.println(" --output hash");
System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
}

public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

ParameterTool parameters = ParameterTool.fromArgs(args);
int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);

DataSet hits;

switch (parameters.get("input", "")) {
case "csv": {
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));

String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));

Graph<LongValue, NullValue, NullValue> graph = Graph
.fromCsvReader(parameters.get("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);

hits = graph
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
} break;

case "rmat": {
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);

RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();

long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;

Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate()
.run(new Simplify<LongValue, NullValue, NullValue>());

if (parameters.get("algorithm").equals("HITS")) {
if (scale > 32) {
hits = graph
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
} else {
hits = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
}
} else if (parameters.get("algorithm").equals("HITSAlgorithm")) {
if (scale > 32) {
hits = graph
.run(new org.apache.flink.graph.library.HITSAlgorithm<LongValue, NullValue, NullValue>(iterations));
} else {
hits = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.library.HITSAlgorithm<IntValue, NullValue, NullValue>(iterations));
}
} else {
return;
}
} break;

default:
printUsage();
return;
}

switch (parameters.get("output", "")) {
case "print":
for (Object e: hits.collect()) {
if (parameters.get("algorithm").equals("HITS")) {
Result result = (Result)e;
System.out.println(result.toVerboseString());
} else {
System.out.println(e);
}
}
break;

case "hash":
System.out.println(DataSetUtils.checksumHashCode(hits));
break;

case "csv":
String filename = parameters.get("output_filename");

String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));

String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));

hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);

env.execute();
break;
default:
printUsage();
return;
}

JobExecutionResult result = env.getLastJobExecutionResult();

NumberFormat nf = NumberFormat.getInstance();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>>> {

private final static int MAXIMUMITERATION = (Integer.MAX_VALUE - 1) / 2;
private final static double MINIMUMTHRESHOLD = 1e-9;
private final static double MINIMUMTHRESHOLD = Double.MIN_VALUE;

private int maxIterations;
private double convergeThreshold;
Expand Down Expand Up @@ -179,7 +179,7 @@ public void updateVertex(Vertex<K, Tuple2<DoubleValue, DoubleValue>> vertex, Mes
double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices();

// count the diff value of sum of authority scores
diffSumAggregator.aggregate((previousAuthAverage - newAuthorityValue.getValue()));
diffSumAggregator.aggregate(Math.abs(previousAuthAverage - newAuthorityValue.getValue()));
}
setNewVertexValue(new Tuple2<>(newHubValue, newAuthorityValue));
} else if (getSuperstepNumber() == maxIteration) {
Expand Down

0 comments on commit 4fc2d24

Please sign in to comment.