Skip to content

Commit

Permalink
add page-rank algorithm (#70)
Browse files Browse the repository at this point in the history
* add page-rank algorithm
* Set algorithm's parameters in ComputerContextUtil#initContext
* register ComputerOptions to overwrite the config in driver.config.ComputerOptions
  • Loading branch information
houzhizhen committed Aug 5, 2021
1 parent 966beb3 commit f4babda
Show file tree
Hide file tree
Showing 18 changed files with 712 additions and 25 deletions.
11 changes: 10 additions & 1 deletion computer-algorithm/pom.xml
Expand Up @@ -12,6 +12,15 @@
<artifactId>computer-algorithm</artifactId>

<dependencies>

<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>computer-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>computer-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,154 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm.rank.pagerank;

import java.util.Iterator;

import com.baidu.hugegraph.computer.core.aggregator.Aggregator;
import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.worker.Computation;
import com.baidu.hugegraph.computer.core.worker.ComputationContext;
import com.baidu.hugegraph.computer.core.worker.WorkerContext;

public class PageRank implements Computation<DoubleValue> {

public static final String CONF_ALPHA_KEY = "pagerank.alpha";

public static final double CONF_ALPHA_DEFAULT = 0.15D;

private double alpha;
private double rankFromDangling;
private double initialRankInSuperstep;
private double cumulativeValue;

private Aggregator l1DiffAggr;
private Aggregator cumulativeRankAggr;
private Aggregator danglingVertexNumAggr;
private Aggregator danglingCumulativeAggr;

// Initial value in superstep 0.
private DoubleValue initialValue;
private DoubleValue contribValue;

@Override
public String name() {
return "pageRank";
}

@Override
public String category() {
return "rank";
}

@Override
public void compute0(ComputationContext context, Vertex vertex) {
vertex.value(this.initialValue);
this.cumulativeRankAggr.aggregateValue(this.initialValue.value());
int edgeCount = vertex.numEdges();
if (edgeCount == 0) {
this.danglingVertexNumAggr.aggregateValue(1L);
this.danglingCumulativeAggr.aggregateValue(
this.initialValue.value());
} else {
this.contribValue.value(this.initialValue.value() / edgeCount);
context.sendMessageToAllEdges(vertex, this.contribValue);
}
}

@Override
public void compute(ComputationContext context, Vertex vertex,
Iterator<DoubleValue> messages) {
DoubleValue message = Combiner.combineAll(context.combiner(), messages);
double rankFromNeighbors = 0.0D;
if (message != null) {
rankFromNeighbors = message.value();
}
double rank = (this.rankFromDangling + rankFromNeighbors) *
(1.0D - this.alpha) + this.initialRankInSuperstep;
rank /= this.cumulativeValue;
DoubleValue oldRank = vertex.value();
vertex.value(new DoubleValue(rank));
this.l1DiffAggr.aggregateValue(Math.abs(oldRank.value() - rank));
this.cumulativeRankAggr.aggregateValue(rank);
int edgeCount = vertex.numEdges();
if (edgeCount == 0) {
this.danglingVertexNumAggr.aggregateValue(1L);
this.danglingCumulativeAggr.aggregateValue(rank);
} else {
DoubleValue contribValue = new DoubleValue(rank / edgeCount);
context.sendMessageToAllEdges(vertex, contribValue);
}
}

@Override
public void init(Config config) {
this.alpha = config.getDouble(CONF_ALPHA_KEY, CONF_ALPHA_DEFAULT);
this.contribValue = new DoubleValue();
}

@Override
public void close(Config config) {
// pass
}

@Override
public void beforeSuperstep(WorkerContext context) {
// Get aggregator values for computation
DoubleValue danglingContribution = context.aggregatedValue(
PageRank4Master.AGGR_COMULATIVE_DANGLING_PROBABILITY);

this.rankFromDangling = danglingContribution.value() /
context.totalVertexCount();
this.initialRankInSuperstep = this.alpha / context.totalVertexCount();
DoubleValue cumulativeProbability = context.aggregatedValue(
PageRank4Master.AGGR_COMULATIVE_PROBABILITY);
this.cumulativeValue = cumulativeProbability.value();
this.initialValue = new DoubleValue(1.0D / context.totalVertexCount());

// Create aggregators
this.l1DiffAggr = context.createAggregator(
PageRank4Master.AGGR_L1_NORM_DIFFERENCE_KEY);
this.cumulativeRankAggr = context.createAggregator(
PageRank4Master.AGGR_COMULATIVE_PROBABILITY);
this.danglingVertexNumAggr = context.createAggregator(
PageRank4Master.AGGR_DANGLING_VERTICES_NUM);
this.danglingCumulativeAggr = context.createAggregator(
PageRank4Master.AGGR_COMULATIVE_DANGLING_PROBABILITY);
}

@Override
public void afterSuperstep(WorkerContext context) {
context.aggregateValue(
PageRank4Master.AGGR_COMULATIVE_PROBABILITY,
this.cumulativeRankAggr.aggregatedValue());
context.aggregateValue(
PageRank4Master.AGGR_L1_NORM_DIFFERENCE_KEY,
this.l1DiffAggr.aggregatedValue());
context.aggregateValue(
PageRank4Master.AGGR_DANGLING_VERTICES_NUM,
this.danglingVertexNumAggr.aggregatedValue());
context.aggregateValue(
PageRank4Master.AGGR_COMULATIVE_DANGLING_PROBABILITY,
this.danglingCumulativeAggr.aggregatedValue());
}
}
@@ -0,0 +1,105 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm.rank.pagerank;

import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner;
import com.baidu.hugegraph.computer.core.combiner.LongValueSumCombiner;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.computer.core.master.MasterComputation;
import com.baidu.hugegraph.computer.core.master.MasterComputationContext;
import com.baidu.hugegraph.computer.core.master.MasterContext;
import com.baidu.hugegraph.util.Log;

public class PageRank4Master implements MasterComputation {

private static final Logger LOG = Log.logger(PageRank4Master.class);

public static final String CONF_L1_NORM_DIFFERENCE_THRESHOLD_KEY =
"pagerank.l1DiffThreshold";
public static final double CONF_L1_DIFF_THRESHOLD_DEFAULT = 0.00001D;

public static final String AGGR_L1_NORM_DIFFERENCE_KEY =
"pagerank.aggr_l1_norm_difference";
public static final String AGGR_DANGLING_VERTICES_NUM =
"pagerank.dangling_vertices_num";
public static final String AGGR_COMULATIVE_DANGLING_PROBABILITY =
"pagerank.comulative_dangling_probability";
public static final String AGGR_COMULATIVE_PROBABILITY =
"pagerank.comulative_probability";

private double l1DiffThreshold;

@Override
public void init(MasterContext context) {
this.l1DiffThreshold = context.config().getDouble(
CONF_L1_NORM_DIFFERENCE_THRESHOLD_KEY,
CONF_L1_DIFF_THRESHOLD_DEFAULT);
context.registerAggregator(AGGR_DANGLING_VERTICES_NUM,
ValueType.LONG,
LongValueSumCombiner.class);
context.registerAggregator(AGGR_COMULATIVE_DANGLING_PROBABILITY,
ValueType.DOUBLE,
DoubleValueSumCombiner.class);
context.registerAggregator(AGGR_COMULATIVE_PROBABILITY,
ValueType.DOUBLE,
DoubleValueSumCombiner.class);
context.registerAggregator(AGGR_L1_NORM_DIFFERENCE_KEY,
ValueType.DOUBLE,
DoubleValueSumCombiner.class);
}

@Override
public void close(MasterContext context) {
// pass
}

@Override
public boolean compute(MasterComputationContext context) {

LongValue danglingVerticesNum = context.aggregatedValue(
AGGR_DANGLING_VERTICES_NUM);
DoubleValue danglingProbability = context.aggregatedValue(
AGGR_COMULATIVE_DANGLING_PROBABILITY);
DoubleValue cumulativeProbability = context.aggregatedValue(
AGGR_COMULATIVE_PROBABILITY);
DoubleValue l1NormDifference = context.aggregatedValue(
AGGR_L1_NORM_DIFFERENCE_KEY);

StringBuilder sb = new StringBuilder();
sb.append("[Superstep ").append(context.superstep()).append("]")
.append(", dangling vertices num = ").append(danglingVerticesNum)
.append(", cumulative dangling probability = ")
.append(danglingProbability.value())
.append(", cumulative probability = ").append(cumulativeProbability)
.append(", l1 norm difference = ").append(l1NormDifference.value());

LOG.info("PageRank running status: {}", sb);
double l1Diff = l1NormDifference.value();
if (context.superstep() > 1 && l1Diff <= this.l1DiffThreshold) {
return false;
} else {
return true;
}
}
}
@@ -0,0 +1,47 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm.rank.pagerank;

import java.util.Map;

import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.output.LimitedLogOutput;

public class PageRankParams implements AlgorithmParams {

@Override
public void setAlgorithmParameters(Map<String, String> params) {
this.setIfAbsent(params, ComputerOptions.MASTER_COMPUTATION_CLASS,
PageRank4Master.class.getName());
this.setIfAbsent(params, ComputerOptions.WORKER_COMPUTATION_CLASS,
PageRank.class.getName());
this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS,
DoubleValue.class.getName());
this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS,
DoubleValue.class.getName());
this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS,
DoubleValueSumCombiner.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
LimitedLogOutput.class.getName());
}
}
@@ -0,0 +1,52 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm;

import java.util.Map;

import org.slf4j.Logger;

import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.util.Log;

public interface AlgorithmParams {

Logger LOG = Log.logger(AlgorithmParams.class);

/**
* set algorithm's specific configuration
* @param params
*/
void setAlgorithmParameters(Map<String, String> params);

default void setIfAbsent(Map<String, String> params, String key,
String value) {
if (!params.keySet().contains(key)) {
LOG.debug("Put parameters key={}, value={}", key, value);
params.put(key, value);
}
}

default void setIfAbsent(Map<String, String> params,
ConfigOption<?> keyOption,
String value) {
this.setIfAbsent(params, keyOption.name(), value);
}
}
Expand Up @@ -39,7 +39,7 @@ default String name() {
T combine(T v1, T v2);


public static <T> T combineAll(Combiner<T> combiner, Iterator<T> values) {
static <T> T combineAll(Combiner<T> combiner, Iterator<T> values) {
if (!values.hasNext()) {
return null;
}
Expand Down

0 comments on commit f4babda

Please sign in to comment.