diff --git a/computer-algorithm/pom.xml b/computer-algorithm/pom.xml index 69c83b987..499bb11f0 100644 --- a/computer-algorithm/pom.xml +++ b/computer-algorithm/pom.xml @@ -12,6 +12,15 @@ computer-algorithm - + + com.baidu.hugegraph + computer-api + ${project.version} + + + com.baidu.hugegraph + computer-core + ${project.version} + diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRank.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRank.java new file mode 100644 index 000000000..d17eccfb5 --- /dev/null +++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRank.java @@ -0,0 +1,154 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.algorithm.rank.pagerank; + +import java.util.Iterator; + +import com.baidu.hugegraph.computer.core.aggregator.Aggregator; +import com.baidu.hugegraph.computer.core.combiner.Combiner; +import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.computer.core.worker.Computation; +import com.baidu.hugegraph.computer.core.worker.ComputationContext; +import com.baidu.hugegraph.computer.core.worker.WorkerContext; + +public class PageRank implements Computation { + + public static final String CONF_ALPHA_KEY = "pagerank.alpha"; + + public static final double CONF_ALPHA_DEFAULT = 0.15D; + + private double alpha; + private double rankFromDangling; + private double initialRankInSuperstep; + private double cumulativeValue; + + private Aggregator l1DiffAggr; + private Aggregator cumulativeRankAggr; + private Aggregator danglingVertexNumAggr; + private Aggregator danglingCumulativeAggr; + + // Initial value in superstep 0. + private DoubleValue initialValue; + private DoubleValue contribValue; + + @Override + public String name() { + return "pageRank"; + } + + @Override + public String category() { + return "rank"; + } + + @Override + public void compute0(ComputationContext context, Vertex vertex) { + vertex.value(this.initialValue); + this.cumulativeRankAggr.aggregateValue(this.initialValue.value()); + int edgeCount = vertex.numEdges(); + if (edgeCount == 0) { + this.danglingVertexNumAggr.aggregateValue(1L); + this.danglingCumulativeAggr.aggregateValue( + this.initialValue.value()); + } else { + this.contribValue.value(this.initialValue.value() / edgeCount); + context.sendMessageToAllEdges(vertex, this.contribValue); + } + } + + @Override + public void compute(ComputationContext context, Vertex vertex, + Iterator messages) { + DoubleValue message = Combiner.combineAll(context.combiner(), messages); + double rankFromNeighbors = 0.0D; + if (message != null) { + rankFromNeighbors = message.value(); + } + double rank = (this.rankFromDangling + rankFromNeighbors) * + (1.0D - this.alpha) + this.initialRankInSuperstep; + rank /= this.cumulativeValue; + DoubleValue oldRank = vertex.value(); + vertex.value(new DoubleValue(rank)); + this.l1DiffAggr.aggregateValue(Math.abs(oldRank.value() - rank)); + this.cumulativeRankAggr.aggregateValue(rank); + int edgeCount = vertex.numEdges(); + if (edgeCount == 0) { + this.danglingVertexNumAggr.aggregateValue(1L); + this.danglingCumulativeAggr.aggregateValue(rank); + } else { + DoubleValue contribValue = new DoubleValue(rank / edgeCount); + context.sendMessageToAllEdges(vertex, contribValue); + } + } + + @Override + public void init(Config config) { + this.alpha = config.getDouble(CONF_ALPHA_KEY, CONF_ALPHA_DEFAULT); + this.contribValue = new DoubleValue(); + } + + @Override + public void close(Config config) { + // pass + } + + @Override + public void beforeSuperstep(WorkerContext context) { + // Get aggregator values for computation + DoubleValue danglingContribution = context.aggregatedValue( + PageRank4Master.AGGR_COMULATIVE_DANGLING_PROBABILITY); + + this.rankFromDangling = danglingContribution.value() / + context.totalVertexCount(); + this.initialRankInSuperstep = this.alpha / context.totalVertexCount(); + DoubleValue cumulativeProbability = context.aggregatedValue( + PageRank4Master.AGGR_COMULATIVE_PROBABILITY); + this.cumulativeValue = cumulativeProbability.value(); + this.initialValue = new DoubleValue(1.0D / context.totalVertexCount()); + + // Create aggregators + this.l1DiffAggr = context.createAggregator( + PageRank4Master.AGGR_L1_NORM_DIFFERENCE_KEY); + this.cumulativeRankAggr = context.createAggregator( + PageRank4Master.AGGR_COMULATIVE_PROBABILITY); + this.danglingVertexNumAggr = context.createAggregator( + PageRank4Master.AGGR_DANGLING_VERTICES_NUM); + this.danglingCumulativeAggr = context.createAggregator( + PageRank4Master.AGGR_COMULATIVE_DANGLING_PROBABILITY); + } + + @Override + public void afterSuperstep(WorkerContext context) { + context.aggregateValue( + PageRank4Master.AGGR_COMULATIVE_PROBABILITY, + this.cumulativeRankAggr.aggregatedValue()); + context.aggregateValue( + PageRank4Master.AGGR_L1_NORM_DIFFERENCE_KEY, + this.l1DiffAggr.aggregatedValue()); + context.aggregateValue( + PageRank4Master.AGGR_DANGLING_VERTICES_NUM, + this.danglingVertexNumAggr.aggregatedValue()); + context.aggregateValue( + PageRank4Master.AGGR_COMULATIVE_DANGLING_PROBABILITY, + this.danglingCumulativeAggr.aggregatedValue()); + } +} diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRank4Master.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRank4Master.java new file mode 100644 index 000000000..540a74ad4 --- /dev/null +++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRank4Master.java @@ -0,0 +1,105 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.algorithm.rank.pagerank; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner; +import com.baidu.hugegraph.computer.core.combiner.LongValueSumCombiner; +import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; +import com.baidu.hugegraph.computer.core.graph.value.LongValue; +import com.baidu.hugegraph.computer.core.graph.value.ValueType; +import com.baidu.hugegraph.computer.core.master.MasterComputation; +import com.baidu.hugegraph.computer.core.master.MasterComputationContext; +import com.baidu.hugegraph.computer.core.master.MasterContext; +import com.baidu.hugegraph.util.Log; + +public class PageRank4Master implements MasterComputation { + + private static final Logger LOG = Log.logger(PageRank4Master.class); + + public static final String CONF_L1_NORM_DIFFERENCE_THRESHOLD_KEY = + "pagerank.l1DiffThreshold"; + public static final double CONF_L1_DIFF_THRESHOLD_DEFAULT = 0.00001D; + + public static final String AGGR_L1_NORM_DIFFERENCE_KEY = + "pagerank.aggr_l1_norm_difference"; + public static final String AGGR_DANGLING_VERTICES_NUM = + "pagerank.dangling_vertices_num"; + public static final String AGGR_COMULATIVE_DANGLING_PROBABILITY = + "pagerank.comulative_dangling_probability"; + public static final String AGGR_COMULATIVE_PROBABILITY = + "pagerank.comulative_probability"; + + private double l1DiffThreshold; + + @Override + public void init(MasterContext context) { + this.l1DiffThreshold = context.config().getDouble( + CONF_L1_NORM_DIFFERENCE_THRESHOLD_KEY, + CONF_L1_DIFF_THRESHOLD_DEFAULT); + context.registerAggregator(AGGR_DANGLING_VERTICES_NUM, + ValueType.LONG, + LongValueSumCombiner.class); + context.registerAggregator(AGGR_COMULATIVE_DANGLING_PROBABILITY, + ValueType.DOUBLE, + DoubleValueSumCombiner.class); + context.registerAggregator(AGGR_COMULATIVE_PROBABILITY, + ValueType.DOUBLE, + DoubleValueSumCombiner.class); + context.registerAggregator(AGGR_L1_NORM_DIFFERENCE_KEY, + ValueType.DOUBLE, + DoubleValueSumCombiner.class); + } + + @Override + public void close(MasterContext context) { + // pass + } + + @Override + public boolean compute(MasterComputationContext context) { + + LongValue danglingVerticesNum = context.aggregatedValue( + AGGR_DANGLING_VERTICES_NUM); + DoubleValue danglingProbability = context.aggregatedValue( + AGGR_COMULATIVE_DANGLING_PROBABILITY); + DoubleValue cumulativeProbability = context.aggregatedValue( + AGGR_COMULATIVE_PROBABILITY); + DoubleValue l1NormDifference = context.aggregatedValue( + AGGR_L1_NORM_DIFFERENCE_KEY); + + StringBuilder sb = new StringBuilder(); + sb.append("[Superstep ").append(context.superstep()).append("]") + .append(", dangling vertices num = ").append(danglingVerticesNum) + .append(", cumulative dangling probability = ") + .append(danglingProbability.value()) + .append(", cumulative probability = ").append(cumulativeProbability) + .append(", l1 norm difference = ").append(l1NormDifference.value()); + + LOG.info("PageRank running status: {}", sb); + double l1Diff = l1NormDifference.value(); + if (context.superstep() > 1 && l1Diff <= this.l1DiffThreshold) { + return false; + } else { + return true; + } + } +} diff --git a/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRankParams.java b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRankParams.java new file mode 100644 index 000000000..e5ac392c8 --- /dev/null +++ b/computer-algorithm/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRankParams.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.algorithm.rank.pagerank; + +import java.util.Map; + +import com.baidu.hugegraph.computer.algorithm.AlgorithmParams; +import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; +import com.baidu.hugegraph.computer.core.output.LimitedLogOutput; + +public class PageRankParams implements AlgorithmParams { + + @Override + public void setAlgorithmParameters(Map params) { + this.setIfAbsent(params, ComputerOptions.MASTER_COMPUTATION_CLASS, + PageRank4Master.class.getName()); + this.setIfAbsent(params, ComputerOptions.WORKER_COMPUTATION_CLASS, + PageRank.class.getName()); + this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName()); + this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName()); + this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS, + DoubleValueSumCombiner.class.getName()); + this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS, + LimitedLogOutput.class.getName()); + } +} diff --git a/computer-api/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmParams.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmParams.java new file mode 100644 index 000000000..3302f9396 --- /dev/null +++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/algorithm/AlgorithmParams.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.algorithm; + +import java.util.Map; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.config.ConfigOption; +import com.baidu.hugegraph.util.Log; + +public interface AlgorithmParams { + + Logger LOG = Log.logger(AlgorithmParams.class); + + /** + * set algorithm's specific configuration + * @param params + */ + void setAlgorithmParameters(Map params); + + default void setIfAbsent(Map params, String key, + String value) { + if (!params.keySet().contains(key)) { + LOG.debug("Put parameters key={}, value={}", key, value); + params.put(key, value); + } + } + + default void setIfAbsent(Map params, + ConfigOption keyOption, + String value) { + this.setIfAbsent(params, keyOption.name(), value); + } +} diff --git a/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/Combiner.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/Combiner.java index 7e6e64c0f..b9ab102bd 100644 --- a/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/Combiner.java +++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/Combiner.java @@ -39,7 +39,7 @@ default String name() { T combine(T v1, T v2); - public static T combineAll(Combiner combiner, Iterator values) { + static T combineAll(Combiner combiner, Iterator values) { if (!values.hasNext()) { return null; } diff --git a/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/ValueMinCombiner.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/ValueMinCombiner.java index 521cc1a4f..539dd0cf3 100644 --- a/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/ValueMinCombiner.java +++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/combiner/ValueMinCombiner.java @@ -22,14 +22,14 @@ import com.baidu.hugegraph.computer.core.graph.value.Value; import com.baidu.hugegraph.util.E; -public class ValueMinCombiner> implements Combiner { +public class ValueMinCombiner> implements Combiner { @Override @SuppressWarnings("unchecked") public T combine(T v1, T v2) { E.checkArgumentNotNull(v1, "The combine parameter v1 can't be null"); E.checkArgumentNotNull(v2, "The combine parameter v2 can't be null"); - if (((Value) v1).compareTo(v2) <= 0) { + if (v1.compareTo(v2) <= 0) { return v1; } else { return v2; diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInput.java index cdab2fee7..1e31f00f8 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInput.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInput.java @@ -27,6 +27,7 @@ import com.baidu.hugegraph.computer.core.common.ComputerContext; import com.baidu.hugegraph.computer.core.common.exception.ComputerException; import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.config.Config; import com.baidu.hugegraph.computer.core.graph.value.Value; import com.baidu.hugegraph.computer.core.io.BytesInput; import com.baidu.hugegraph.computer.core.io.IOFactory; @@ -36,8 +37,9 @@ public class MessageInput> { + private final Config config; private final PeekableIterator messages; - private final T value; + private T value; public MessageInput(ComputerContext context, PeekableIterator messages) { @@ -46,8 +48,9 @@ public MessageInput(ComputerContext context, } else { this.messages = messages; } + this.config = context.config(); - this.value = context.config().createObject( + this.value = this.config.createObject( ComputerOptions.ALGORITHM_MESSAGE_CLASS); } @@ -94,6 +97,8 @@ public boolean hasNext() { try { BytesInput in = IOFactory.createBytesInput( entry.value().bytes()); + MessageInput.this.value = config.createObject( + ComputerOptions.ALGORITHM_MESSAGE_CLASS); MessageInput.this.value.read(in); } catch (IOException e) { throw new ComputerException("Can't read value", e); diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java index b271d4869..fddd98f85 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java @@ -60,6 +60,15 @@ public static synchronized ComputerOptions instance() { return INSTANCE; } + public static final ConfigOption> ALGORITHM_PARAMS_CLASS = + new ConfigOption<>( + "algorithm.params_class", + "The class used to transfer algorithms's parameters " + + "before algorithm been run.", + disallowEmpty(), + Null.class + ); + public static final ConfigOption> ALGORITHM_RESULT_CLASS = new ConfigOption<>( "algorithm.result_class", diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/LimitedLogOutput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/LimitedLogOutput.java new file mode 100644 index 000000000..8b0f376f3 --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/LimitedLogOutput.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.output; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.util.Log; + +public class LimitedLogOutput implements ComputerOutput { + + private static final Logger LOG = Log.logger(LimitedLogOutput.class); + private static final String CONF_LIMIT_OUTPUT_PER_PARTITION_KEY = + "output.limit_logt_output"; + private static final int CONF_LIMIT_OUTPUT_PER_PARTITION_DEFAULT = 10; + + private int partition; + private int limit; + private int logged; + + @Override + public void init(Config config, int partition) { + this.partition = partition; + this.limit = config.getInt(CONF_LIMIT_OUTPUT_PER_PARTITION_KEY, + CONF_LIMIT_OUTPUT_PER_PARTITION_DEFAULT); + this.logged = 0; + LOG.info("Start write back partition {}", this.partition); + } + + @Override + public void write(Vertex vertex) { + if (this.logged < this.limit) { + LOG.info("id='{}', result='{}'", vertex.id(), vertex.value()); + this.logged++; + } + } + + @Override + public void close() { + LOG.info("End write back partition {}", this.partition); + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/util/ComputerContextUtil.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/util/ComputerContextUtil.java index 2fc5a3ccc..46992b6f9 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/util/ComputerContextUtil.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/util/ComputerContextUtil.java @@ -22,10 +22,12 @@ import java.util.HashMap; import java.util.Map; +import com.baidu.hugegraph.computer.algorithm.AlgorithmParams; import com.baidu.hugegraph.computer.core.allocator.Allocator; import com.baidu.hugegraph.computer.core.allocator.DefaultAllocator; import com.baidu.hugegraph.computer.core.common.ComputerContext; import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; import com.baidu.hugegraph.computer.core.config.Config; import com.baidu.hugegraph.computer.core.config.DefaultConfig; import com.baidu.hugegraph.computer.core.graph.BuiltinGraphFactory; @@ -37,11 +39,26 @@ public static void initContext(String... params) { initContext(convertToMap(params)); } - public static void initContext(Map params) { + public static Config initContext(Map params) { + // Set algorithm's parameters + String algorithmParamsName = params.get( + ComputerOptions.ALGORITHM_PARAMS_CLASS.name()); + AlgorithmParams algorithmParams; + try { + algorithmParams = (AlgorithmParams) Class.forName( + algorithmParamsName).newInstance(); + } catch (Exception e) { + throw new ComputerException("Can't create algorithmParams, " + + "algorithmParamsName = {}", + algorithmParamsName); + } + algorithmParams.setAlgorithmParameters(params); + Config config = new DefaultConfig(params); GraphFactory graphFactory = new BuiltinGraphFactory(config); Allocator allocator = new DefaultAllocator(config, graphFactory); ComputerContext.initContext(config, graphFactory, allocator); + return config; } public static Map convertToMap(String... options) { diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/AlgorithmTestSuite.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/AlgorithmTestSuite.java new file mode 100644 index 000000000..6af275c3e --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/AlgorithmTestSuite.java @@ -0,0 +1,40 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.algorithm.rank.pagerank; + +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +import com.baidu.hugegraph.config.OptionSpace; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + PageRankTest.class, +}) +public class AlgorithmTestSuite { + @BeforeClass + public static void setup() throws ClassNotFoundException { + // Don't forget to register options + OptionSpace.register("computer", + "com.baidu.hugegraph.computer.core.config." + + "ComputerOptions"); + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRankTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRankTest.java new file mode 100644 index 000000000..340a162a8 --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/algorithm/rank/pagerank/PageRankTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.algorithm.rank.pagerank; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.Test; +import org.slf4j.Logger; + +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.master.MasterService; +import com.baidu.hugegraph.computer.core.util.ComputerContextUtil; +import com.baidu.hugegraph.computer.core.worker.MockWorkerService; +import com.baidu.hugegraph.computer.core.worker.WorkerService; +import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; +import com.baidu.hugegraph.config.RpcOptions; +import com.baidu.hugegraph.testutil.Assert; +import com.baidu.hugegraph.util.Log; + +public class PageRankTest extends UnitTestBase { + + private static final Logger LOG = Log.logger(PageRankTest.class); + + @Test + public void testServiceWith1Worker() throws InterruptedException { + ExecutorService pool = Executors.newFixedThreadPool(2); + CountDownLatch countDownLatch = new CountDownLatch(2); + Throwable[] exceptions = new Throwable[2]; + + pool.submit(() -> { + Map params = new HashMap<>(); + params.put(RpcOptions.RPC_REMOTE_URL.name(), "127.0.0.1:8090"); + params.put(ComputerOptions.JOB_ID.name(), "local_002"); + params.put(ComputerOptions.JOB_WORKERS_COUNT.name(), "1"); + params.put(ComputerOptions.TRANSPORT_SERVER_PORT.name(), "8086"); + params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(), "100000"); + params.put(ComputerOptions.BSP_LOG_INTERVAL.name(), "30000"); + params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(), "10"); + params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(), + PageRankParams.class.getName()); + + Config config = ComputerContextUtil.initContext(params); + + WorkerService workerService = new MockWorkerService(); + try { + Thread.sleep(2000L); + workerService.init(config); + workerService.execute(); + } catch (Throwable e) { + LOG.error("Failed to start worker", e); + exceptions[0] = e; + } finally { + workerService.close(); + countDownLatch.countDown(); + } + }); + + pool.submit(() -> { + Map params = new HashMap<>(); + params.put(RpcOptions.RPC_SERVER_HOST.name(), "localhost"); + params.put(RpcOptions.RPC_SERVER_PORT.name(), "8090"); + params.put(ComputerOptions.JOB_ID.name(), "local_002"); + params.put(ComputerOptions.JOB_WORKERS_COUNT.name(), "1"); + params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(), "100000"); + params.put(ComputerOptions.BSP_LOG_INTERVAL.name(), "30000"); + params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(), "10"); + params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(), + PageRankParams.class.getName()); + + Config config = ComputerContextUtil.initContext(params); + + MasterService masterService = new MasterService(); + try { + masterService.init(config); + masterService.execute(); + } catch (Throwable e) { + LOG.error("Failed to start master", e); + exceptions[1] = e; + } finally { + /* + * It must close the service first. The pool will be shutdown + * if count down is executed first, and the server thread in + * master service will not be closed. + */ + masterService.close(); + countDownLatch.countDown(); + } + }); + + countDownLatch.await(); + pool.shutdownNow(); + + Assert.assertFalse(Arrays.asList(exceptions).toString(), + existError(exceptions)); + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfigTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfigTest.java index d2f2906c0..f6900ea91 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfigTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/config/DefaultConfigTest.java @@ -140,7 +140,10 @@ public void testString() throws IOException { @Test public void testCreateObject() { - Config config = context().config(); + Config config = UnitTestBase.updateWithRequiredOptions( + ComputerOptions.MASTER_COMPUTATION_CLASS, + DefaultMasterComputation.class.getName() + ); MasterComputation masterComputation = config.createObject( ComputerOptions.MASTER_COMPUTATION_CLASS); Assert.assertEquals(DefaultMasterComputation.class, @@ -165,7 +168,10 @@ public void testCreateObjectFail() { @Test public void testNullClass() { - Config config = context().config(); + Config config = UnitTestBase.updateWithRequiredOptions( + ComputerOptions.WORKER_COMBINER_CLASS, + Null.class.getName() + ); Object combiner = config.createObject( ComputerOptions.WORKER_COMBINER_CLASS, false); Assert.assertNull(combiner); diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputationParams.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputationParams.java new file mode 100644 index 000000000..5c560b41f --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputationParams.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.worker; + +import java.util.Map; + +import com.baidu.hugegraph.computer.algorithm.AlgorithmParams; +import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; +import com.baidu.hugegraph.computer.core.output.LimitedLogOutput; + +public class MockComputationParams implements AlgorithmParams { + + @Override + public void setAlgorithmParameters(Map params) { + this.setIfAbsent(params, ComputerOptions.MASTER_COMPUTATION_CLASS, + MockMasterComputation.class.getName()); + this.setIfAbsent(params, ComputerOptions.WORKER_COMPUTATION_CLASS, + MockComputation.class.getName()); + this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName()); + this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName()); + this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS, + DoubleValueSumCombiner.class.getName()); + this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS, + LimitedLogOutput.class.getName()); + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java index a216ca0ca..a18fcfdf5 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java @@ -32,6 +32,7 @@ import com.baidu.hugegraph.computer.core.config.Config; import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; import com.baidu.hugegraph.computer.core.master.MasterService; +import com.baidu.hugegraph.computer.core.output.LimitedLogOutput; import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; import com.baidu.hugegraph.config.RpcOptions; import com.baidu.hugegraph.testutil.Assert; @@ -61,7 +62,9 @@ public void testServiceWith1Worker() throws InterruptedException { ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, - DoubleValue.class.getName() + DoubleValue.class.getName(), + ComputerOptions.OUTPUT_CLASS, + LimitedLogOutput.class.getName() ); WorkerService workerService = new MockWorkerService(); try { @@ -115,7 +118,7 @@ public void testServiceWith1Worker() throws InterruptedException { pool.shutdownNow(); Assert.assertFalse(Arrays.asList(exceptions).toString(), - this.existError(exceptions)); + existError(exceptions)); } @Test @@ -220,19 +223,7 @@ public void testServiceWith2Workers() throws InterruptedException { pool.shutdownNow(); Assert.assertFalse(Arrays.asList(exceptions).toString(), - this.existError(exceptions)); - } - - private boolean existError(Throwable[] exceptions) { - boolean error = false; - for (Throwable e : exceptions) { - if (e != null) { - error = true; - LOG.warn("There exist error:", e); - break; - } - } - return error; + existError(exceptions)); } @Test diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java index 68875813b..c80bf1fa6 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestBase.java @@ -24,9 +24,12 @@ import java.util.Map; import java.util.Random; +import org.slf4j.Logger; + import com.baidu.hugegraph.computer.core.common.ComputerContext; import com.baidu.hugegraph.computer.core.common.Constants; import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; import com.baidu.hugegraph.computer.core.config.Config; import com.baidu.hugegraph.computer.core.graph.GraphFactory; import com.baidu.hugegraph.computer.core.graph.id.Id; @@ -45,12 +48,16 @@ import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryInput; import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryInputImpl; import com.baidu.hugegraph.computer.core.util.ComputerContextUtil; +import com.baidu.hugegraph.computer.core.worker.MockComputationParams; import com.baidu.hugegraph.config.TypedOption; import com.baidu.hugegraph.testutil.Assert; import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.Log; public class UnitTestBase { + private static final Logger LOG = Log.logger(UnitTestBase.class); + private static final String CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789" + "abcdefghijklmnopqrstuvxyz"; @@ -104,6 +111,11 @@ public static void updateOptions(Object... optionKeyValues) { "The option value must be String class"); map.put(((TypedOption) key).name(), (String) value); } + if (!map.keySet().contains( + ComputerOptions.ALGORITHM_PARAMS_CLASS.name())) { + map.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(), + MockComputationParams.class.getName()); + } ComputerContextUtil.initContext(map); } @@ -183,4 +195,16 @@ protected static StreamGraphOutput newStreamGraphOutput( OutputFormat.BIN, output); } + + public static boolean existError(Throwable[] exceptions) { + boolean error = false; + for (Throwable e : exceptions) { + if (e != null) { + error = true; + LOG.warn("There exist error:", e); + break; + } + } + return error; + } } diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java index 5376a55ae..e152cee18 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java @@ -24,6 +24,7 @@ import org.junit.runners.Suite; import org.slf4j.Logger; +import com.baidu.hugegraph.computer.algorithm.rank.pagerank.AlgorithmTestSuite; import com.baidu.hugegraph.computer.core.allocator.AllocatorTestSuite; import com.baidu.hugegraph.computer.core.bsp.BspTestSuite; import com.baidu.hugegraph.computer.core.combiner.CombinerTestSuite; @@ -69,8 +70,9 @@ SenderTestSuite.class, ReceiverTestSuite.class, ComputeTestSuite.class, + AlgorithmTestSuite.class, DriverTestSuite.class, - K8sTestSuite.class + K8sTestSuite.class, }) public class UnitTestSuite {