Skip to content

Commit

Permalink
Move HITSAlgorithm to gelly examples
Browse files Browse the repository at this point in the history
  • Loading branch information
greghogan committed Jun 27, 2016
1 parent 950cab3 commit b546ebe
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.examples;
Expand Down Expand Up @@ -96,10 +94,10 @@ public static void main(String[] args) throws Exception {

Graph<LongValue, NullValue, NullValue> graph = Graph
.fromCsvReader(parameters.get("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter)
.keyType(LongValue.class);

hits = graph
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
Expand All @@ -117,30 +115,15 @@ public static void main(String[] args) throws Exception {
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();

if (parameters.get("algorithm").equals("HITS")) {
if (scale > 32) {
hits = graph
.run(new Simplify<LongValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
} else {
hits = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new Simplify<IntValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
}
} else if (parameters.get("algorithm").equals("HITSAlgorithm")) {
if (scale > 32) {
hits = graph
.run(new Simplify<LongValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.HITSAlgorithm<LongValue, NullValue, NullValue>(iterations));
} else {
hits = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new Simplify<IntValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.HITSAlgorithm<IntValue, NullValue, NullValue>(iterations));
}
if (scale > 32) {
hits = graph
.run(new Simplify<LongValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
} else {
return;
hits = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new Simplify<IntValue, NullValue, NullValue>())
.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
}
} break;

Expand All @@ -152,12 +135,7 @@ public static void main(String[] args) throws Exception {
switch (parameters.get("output", "")) {
case "print":
for (Object e: hits.collect()) {
if (parameters.get("algorithm").equals("HITS")) {
Result result = (Result)e;
System.out.println(result.toVerboseString());
} else {
System.out.println(e);
}
System.out.println(((Result)e).toVerboseString());
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.graph.library;
package org.apache.flink.graph.examples;

import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.HITSAlgorithm;
import org.apache.flink.graph.examples.data.HITSData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.DoubleValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.library.link_analysis;
Expand Down Expand Up @@ -73,7 +71,7 @@ public class HITS<K, VV, EV>
private static final String AUTHORITY_SUM_SQUARED = "authority sum squared";

// Required configuration
private int maxIterations = Integer.MAX_VALUE;
private int maxIterations;

private double convergenceThreshold;

Expand All @@ -92,7 +90,7 @@ public HITS(int iterations) {
/**
* Hyperlink-Induced Topic Search with a convergence threshold. The algorithm
* terminates When the total change in hub and authority scores over all
* vertices falls below the given threshold value.
* vertices falls to or below the given threshold value.
*
* @param convergenceThreshold convergence threshold for sum of scores
*/
Expand All @@ -104,7 +102,7 @@ public HITS(double convergenceThreshold) {
* Hyperlink-Induced Topic Search with a convergence threshold and a maximum
* iteration count. The algorithm terminates after either the given number
* of iterations or when the total change in hub and authority scores over all
* vertices falls below the given threshold value.
* vertices falls to or below the given threshold value.
*
* @param maxIterations maximum number of iterations
* @param convergenceThreshold convergence threshold for sum of scores
Expand Down Expand Up @@ -425,7 +423,7 @@ public Tuple3<T, DoubleValue, DoubleValue> join(Tuple2<T, DoubleValue> hubbiness
/**
* Computes the total sum of the change in hub and authority scores over
* all vertices between iterations. A negative score is emitted after the
* first iteration as an optimization to not normalize the initial scores.
* first iteration to prevent premature convergence.
*
* @param <T> ID type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ public class LocalClusteringCoefficientTest
@Test
public void testSimpleGraph()
throws Exception {
DataSet<Result<IntValue>> cc = directedSimpleGraph
.run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>());

String expectedResult =
"(0,(2,1))\n" +
"(1,(3,2))\n" +
Expand All @@ -51,6 +48,9 @@ public void testSimpleGraph()
"(4,(1,0))\n" +
"(5,(1,0))";

DataSet<Result<IntValue>> cc = directedSimpleGraph
.run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>());

TestBaseUtils.compareResultAsText(cc.collect(), expectedResult);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ public class LocalClusteringCoefficientTest
@Test
public void testSimpleGraph()
throws Exception {
DataSet<Result<IntValue>> cc = undirectedSimpleGraph
.run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>());

String expectedResult =
"(0,(2,1))\n" +
"(1,(3,2))\n" +
Expand All @@ -51,6 +48,9 @@ public void testSimpleGraph()
"(4,(1,0))\n" +
"(5,(1,0))";

DataSet<Result<IntValue>> cc = undirectedSimpleGraph
.run(new LocalClusteringCoefficient<IntValue, NullValue, NullValue>());

TestBaseUtils.compareResultAsText(cc.collect(), expectedResult);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.library.link_analysis;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;

public class HITSTest
extends AsmTestBase {

@Test
public void testWithSimpleGraph()
throws Exception {
DataSet<Result<IntValue>> hits = new HITS<IntValue, NullValue, NullValue>(10)
.run(directedSimpleGraph);

String expectedResult =
"(0,(0.5446287864731747,0.0))\n" +
"(1,(0.0,0.8363240238999012))\n" +
"(2,(0.6072453524686667,0.26848532437604833))\n" +
"(3,(0.5446287864731747,0.39546603929699625))\n" +
"(4,(0.0,0.26848532437604833))\n" +
"(5,(0.194966796646811,0.0))";

TestBaseUtils.compareResultAsText(hits.collect(), expectedResult);
}

@Test
public void testWithCompleteGraph()
throws Exception {
double expectedScore = 1.0 / Math.sqrt(completeGraphVertexCount);

DataSet<Result<LongValue>> hits = new HITS<LongValue, NullValue, NullValue>(0.000001)
.run(completeGraph);

List<Result<LongValue>> results = hits.collect();

assertEquals(completeGraphVertexCount, results.size());

for (Result<LongValue> result : results) {
assertEquals(expectedScore, result.getHubScore().getValue(), 0.000001);
assertEquals(expectedScore, result.getAuthorityScore().getValue(), 0.000001);
}
}

@Test
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new HITS<LongValue, NullValue, NullValue>(0.000001)));

assertEquals(902, checksum.getCount());
assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum());
}
}

0 comments on commit b546ebe

Please sign in to comment.