Skip to content
Permalink
Browse files
output demo(pagerank) (#72)
* implement hugegraph pagerank output class
* support multiple threads writeback
* add output configs to driver
  • Loading branch information
zhoney committed Aug 26, 2021
1 parent b6341ca commit 65d287b806cd13d4df24435421c6fd6a0469166c
Showing 15 changed files with 942 additions and 2 deletions.
@@ -25,7 +25,7 @@
import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.output.LimitedLogOutput;
import com.baidu.hugegraph.computer.core.output.hg.PageRankOutput;

public class PageRankParams implements AlgorithmParams {

@@ -42,6 +42,6 @@ public void setAlgorithmParameters(Map<String, String> params) {
this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS,
DoubleValueSumCombiner.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
LimitedLogOutput.class.getName());
PageRankOutput.class.getName());
}
}
@@ -37,5 +37,10 @@
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
</project>
@@ -217,6 +217,55 @@ public static synchronized ComputerOptions instance() {
false
);

public static final ConfigOption<Integer> OUTPUT_BATCH_SIZE =
new ConfigOption<>(
"output.batch_size",
"The batch size of output",
positiveInt(),
500
);

public static final ConfigOption<Integer> OUTPUT_BATCH_THREADS =
new ConfigOption<>(
"output.batch_threads",
"The threads number used to batch output",
positiveInt(),
1
);

public static final ConfigOption<Integer> OUTPUT_SINGLE_THREADS =
new ConfigOption<>(
"output.single_threads",
"The threads number used to single output",
positiveInt(),
1
);

public static final ConfigOption<Integer>
OUTPUT_THREAD_POOL_SHUTDOWN_TIMEOUT =
new ConfigOption<>(
"output.thread_pool_shutdown_timeout",
"The timeout seconds of output threads pool shutdown",
positiveInt(),
60
);

public static final ConfigOption<Integer> OUTPUT_RETRY_TIMES =
new ConfigOption<>(
"output.retry_times",
"The retry times when output failed",
positiveInt(),
3
);

public static final ConfigOption<Integer> OUTPUT_RETRY_INTERVAL =
new ConfigOption<>(
"output.retry_interval",
"The retry interval when output failed",
positiveInt(),
10
);

public static final ConfigOption<Integer> VERTEX_AVERAGE_DEGREE =
new ConfigOption<>(
"computer.vertex_average_degree",
@@ -0,0 +1,92 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.output.hg;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.output.ComputerOutput;
import com.baidu.hugegraph.computer.core.output.hg.task.TaskManager;
import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.util.Log;

public abstract class HugeOutput implements ComputerOutput {

private static final Logger LOG = Log.logger(PageRankOutput.class);

private int partition;
private TaskManager taskManager;
private List<com.baidu.hugegraph.structure.graph.Vertex> vertexBatch;
private int batchSize;

@Override
public void init(Config config, int partition) {
LOG.info("Start write back partition {}", this.partition);

this.partition = partition;

this.taskManager = new TaskManager(config);
this.vertexBatch = new ArrayList<>();
this.batchSize = config.get(ComputerOptions.OUTPUT_BATCH_SIZE);

this.prepareSchema();
}

public HugeClient client() {
return this.taskManager.client();
}

public abstract String name();

public abstract void prepareSchema();

@Override
public void write(Vertex vertex) {
this.vertexBatch.add(this.constructHugeVertex(vertex));
if (this.vertexBatch.size() >= this.batchSize) {
this.commit();
}
}

public abstract com.baidu.hugegraph.structure.graph.Vertex
constructHugeVertex(Vertex vertex);

@Override
public void close() {
if (!this.vertexBatch.isEmpty()) {
this.commit();
}
this.taskManager.waitFinished();
this.taskManager.shutdown();
LOG.info("End write back partition {}", this.partition);
}

private void commit() {
this.taskManager.submitBatch(this.vertexBatch);
LOG.info("Write back {} vertices", this.vertexBatch.size());

this.vertexBatch = new ArrayList<>();
}
}
@@ -0,0 +1,52 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.output.hg;

import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.structure.constant.WriteType;

public class PageRankOutput extends HugeOutput {

@Override
public String name() {
return "pagerank";
}

@Override
public void prepareSchema() {
this.client().schema().propertyKey(this.name())
.asDouble()
.writeType(WriteType.OLAP_RANGE)
.ifNotExist()
.create();
}

@Override
public com.baidu.hugegraph.structure.graph.Vertex constructHugeVertex(
Vertex vertex) {
com.baidu.hugegraph.structure.graph.Vertex hugeVertex =
new com.baidu.hugegraph.structure.graph.Vertex(null);
hugeVertex.id(vertex.id().asObject());
hugeVertex.property(this.name(),
((DoubleValue) vertex.value()).value());
return hugeVertex;
}
}
@@ -0,0 +1,41 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.output.hg.exceptions;

public class WriteBackException extends RuntimeException {

private static final long serialVersionUID = 5504623124963497613L;

public WriteBackException(String message) {
super(message);
}

public WriteBackException(String message, Throwable cause) {
super(message, cause);
}

public WriteBackException(String message, Object... args) {
super(String.format(message, args));
}

public WriteBackException(String message, Throwable cause, Object... args) {
super(String.format(message, args), cause);
}
}
@@ -0,0 +1,49 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.output.hg.metrics;

import java.util.concurrent.atomic.LongAdder;

public final class LoadMetrics {

private final LongAdder insertSuccess;
private final LongAdder insertFailure;

public LoadMetrics() {
this.insertSuccess = new LongAdder();
this.insertFailure = new LongAdder();
}

public long insertSuccess() {
return this.insertSuccess.longValue();
}

public void plusInsertSuccess(long count) {
this.insertSuccess.add(count);
}

public long insertFailure() {
return this.insertFailure.longValue();
}

public void increaseInsertFailure() {
this.insertFailure.increment();
}
}
@@ -0,0 +1,42 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.output.hg.metrics;

public final class LoadReport {

private long vertexInsertSuccess;
private long vertexInsertFailure;

public long vertexInsertSuccess() {
return this.vertexInsertSuccess;
}

public long vertexInsertFailure() {
return this.vertexInsertFailure;
}

public static LoadReport collect(LoadSummary summary) {
LoadReport report = new LoadReport();
LoadMetrics metrics = summary.metrics();
report.vertexInsertSuccess += metrics.insertSuccess();
report.vertexInsertFailure += metrics.insertFailure();
return report;
}
}

0 comments on commit 65d287b

Please sign in to comment.