Skip to content
Permalink
Browse files
add hdfs output (#114)
* add hsfd output
* statistics execute cost
* fix problems with using the same Config object for SenderIntegrateTest
  • Loading branch information
coderzc committed Oct 18, 2021
1 parent dec7e29 commit 847abed47562e466d13c1969dd7e2b1a4b2f7d3e
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 27 deletions.
@@ -24,13 +24,26 @@ jobs:
java-version: '8'
distribution: 'adopt'

- name: Setup Python3
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Setup Hdfs
uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.1'

- name: Setup Minikube-Kubernetes
uses: manusa/actions-setup-minikube@v2.4.2
with:
minikube version: v1.21.0
kubernetes version: v1.20.1
- name: Interact with the cluster
run: kubectl get nodes

- name: Check Component
run: |
sleep 5
curl ${{ env.HDFS_NAMENODE_ADDR }}
kubectl get nodes
- name: Cache Maven packages
uses: actions/cache@v2
@@ -17,17 +17,18 @@
* under the License.
*/

package com.baidu.hugegraph.computer.core.output.hg;
package com.baidu.hugegraph.computer.algorithm.centrality.pagerank;

import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.output.hg.HugeOutput;
import com.baidu.hugegraph.structure.constant.WriteType;

public class PageRankOutput extends HugeOutput {

@Override
public String name() {
return "pagerank";
return "page_rank";
}

@Override
@@ -25,7 +25,6 @@
import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.output.hg.PageRankOutput;

public class PageRankParams implements AlgorithmParams {

@@ -23,8 +23,6 @@

import javax.ws.rs.NotSupportedException;

import org.apache.commons.lang3.builder.ToStringBuilder;

import com.baidu.hugegraph.computer.core.graph.value.IdList;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
import com.baidu.hugegraph.computer.core.graph.value.Value;
@@ -91,10 +89,7 @@ public int compareTo(TriangleCountValue other) {

@Override
public String toString() {
return new ToStringBuilder(this)
.append("idList", this.idList)
.append("count", this.count)
.toString();
return String.valueOf(this.count);
}

@Override
@@ -39,6 +39,13 @@ public interface ComputerOutput {
*/
void write(Vertex vertex);

/**
* Merge output files of multiple partitions if applicable.
*/
default void mergePartitions(Config config) {
// pass
}

/**
* Close the connection to target output system. Commit if target output
* required.
@@ -42,5 +42,19 @@
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
</dependencies>
</project>
@@ -266,6 +266,54 @@ public static synchronized ComputerOptions instance() {
10
);

public static final ConfigOption<String> OUTPUT_HDFS_URL =
new ConfigOption<>(
"output.hdfs_url",
"The hdfs url of output.",
disallowEmpty(),
"hdfs://127.0.0.1:9000"
);

public static final ConfigOption<String> OUTPUT_HDFS_USER =
new ConfigOption<>(
"output.hdfs_user",
"The hdfs user of output.",
disallowEmpty(),
"hadoop"
);

public static final ConfigOption<Short> OUTPUT_HDFS_REPLICATION =
new ConfigOption<>(
"output.hdfs_replication",
"The replication number of hdfs.",
positiveInt(),
(short) 3
);

public static final ConfigOption<String> OUTPUT_HDFS_DIR =
new ConfigOption<>(
"output.hdfs_path_prefix",
"The directory of hdfs output result.",
disallowEmpty(),
"/hugegraph-computer/results"
);

public static final ConfigOption<String> OUTPUT_HDFS_DELIMITER =
new ConfigOption<>(
"output.hdfs_delimiter",
"The delimiter of hdfs output.",
disallowEmpty(),
String.valueOf((char) 27)
);

public static final ConfigOption<Boolean> OUTPUT_HDFS_MERGE =
new ConfigOption<>(
"output.hdfs_merge_partitions",
"Whether merge output files of multiple partitions.",
allowValues(true, false),
true
);

public static final ConfigOption<Integer> VERTEX_AVERAGE_DEGREE =
new ConfigOption<>(
"computer.vertex_average_degree",
@@ -23,6 +23,7 @@
import java.net.InetSocketAddress;
import java.util.List;

import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.aggregator.Aggregator;
@@ -42,11 +43,13 @@
import com.baidu.hugegraph.computer.core.input.MasterInputManager;
import com.baidu.hugegraph.computer.core.manager.Managers;
import com.baidu.hugegraph.computer.core.network.TransportUtil;
import com.baidu.hugegraph.computer.core.output.ComputerOutput;
import com.baidu.hugegraph.computer.core.rpc.MasterRpcManager;
import com.baidu.hugegraph.computer.core.util.ShutdownHook;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.baidu.hugegraph.util.TimeUtil;

/**
* Master service is job's controller. It controls the superstep iteration of
@@ -178,6 +181,7 @@ private void cleanAndCloseBsp() {
* After the superstep iteration, output the result.
*/
public void execute() {
StopWatch watcher = new StopWatch();
this.checkInited();

LOG.info("{} MasterService execute", this);
@@ -201,16 +205,23 @@ public void execute() {
* Constants.INPUT_SUPERSTEP.
*/
SuperstepStat superstepStat;
watcher.start();
if (superstep == Constants.INPUT_SUPERSTEP) {
superstepStat = this.inputstep();
superstep++;
} else {
// TODO: Get superstepStat from bsp service.
superstepStat = null;
}
watcher.stop();
LOG.info("{} MasterService input step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
E.checkState(superstep <= this.maxSuperStep,
"The superstep {} can't be > maxSuperStep {}",
superstep, this.maxSuperStep);

watcher.reset();
watcher.start();
// Step 3: Iteration computation of all supersteps.
for (; superstepStat.active(); superstep++) {
LOG.info("{} MasterService superstep {} started",
@@ -254,9 +265,17 @@ public void execute() {
LOG.info("{} MasterService superstep {} finished",
this, superstep);
}
watcher.stop();
LOG.info("{} MasterService compute step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));

watcher.reset();
watcher.start();
// Step 4: Output superstep for outputting results.
this.outputstep();
watcher.stop();
LOG.info("{} MasterService output step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
}

@Override
@@ -351,6 +370,10 @@ private SuperstepStat inputstep() {
private void outputstep() {
LOG.info("{} MasterService outputstep started", this);
this.bsp4Master.waitWorkersOutputDone();
// Merge output files of multiple partitions
ComputerOutput output = this.config.createObject(
ComputerOptions.OUTPUT_CLASS);
output.mergePartitions(this.config);
LOG.info("{} MasterService outputstep finished", this);
}

@@ -0,0 +1,139 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.output.hdfs;


import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.output.ComputerOutput;
import com.baidu.hugegraph.computer.core.util.StringEncoding;
import com.baidu.hugegraph.util.Log;

public class HdfsOutput implements ComputerOutput {

private static final Logger LOG = Log.logger(HdfsOutput.class);

private FileSystem fs;
private FSDataOutputStream fileOutputStream;
private String delimiter;
private static final String REPLICATION_KEY = "dfs.replication";
private static final String FILE_PREFIX = "partition_";
private static final String FILE_SUFFIX = ".csv";

@Override
public void init(Config config, int partition) {
try {
this.delimiter = config.get(ComputerOptions.OUTPUT_HDFS_DELIMITER);
this.openHDFS(config, partition);
} catch (IOException | URISyntaxException | InterruptedException e) {
throw new ComputerException("Failed to init hdfs output on " +
"partition [%s]", e, partition);
}
}

private void openHDFS(Config config, int partition) throws
IOException,
URISyntaxException,
InterruptedException {
Configuration configuration = new Configuration();
Short replication = config.get(ComputerOptions.OUTPUT_HDFS_REPLICATION);
configuration.set(REPLICATION_KEY, String.valueOf(replication));
String url = config.get(ComputerOptions.OUTPUT_HDFS_URL);
String user = config.get(ComputerOptions.OUTPUT_HDFS_USER);
this.fs = FileSystem.get(new URI(url), configuration, user);

String dir = config.get(ComputerOptions.OUTPUT_HDFS_DIR);
String jobId = config.get(ComputerOptions.JOB_ID);
Path hdfsPath = buildPath(dir, jobId, partition);
this.fileOutputStream = this.fs.create(hdfsPath, true);
}

@Override
public void write(Vertex vertex) {
try {
this.writeString(vertex.id().toString());
this.writeString(this.delimiter);
this.writeString(this.constructValueString(vertex));
this.writeString(System.lineSeparator());
} catch (IOException e) {
throw new ComputerException("Failed to write vertex: {}",
vertex.toString(), e);
}
}

protected void writeBytes(byte[] bytes) throws IOException {
this.fileOutputStream.write(bytes);
}

protected void writeString(String string) throws IOException {
this.writeBytes(StringEncoding.encode(string));
}

protected String constructValueString(Vertex vertex) {
return vertex.value().toString();
}

public static Path buildPath(String dir, String jobId, int partition) {
Path dirPath = new Path(dir, jobId);
return new Path(dirPath, FILE_PREFIX + partition + FILE_SUFFIX);
}

@Override
public void mergePartitions(Config config) {
Boolean merge = config.get(ComputerOptions.OUTPUT_HDFS_MERGE);
if (merge) {
LOG.info("Merge hdfs output partitions started");
HdfsOutputMerger hdfsOutputMerger = new HdfsOutputMerger();
try {
hdfsOutputMerger.init(config);
hdfsOutputMerger.merge();
} finally {
hdfsOutputMerger.close();
}
LOG.info("Merge hdfs output partitions finished");
}
}

@Override
public void close() {
try {
if (this.fileOutputStream != null) {
this.fileOutputStream.close();
}
if (this.fs != null) {
this.fs.close();
}
} catch (IOException e) {
throw new ComputerException("Failed to close hdfs", e);
}
}
}

0 comments on commit 847abed

Please sign in to comment.