Skip to content
Permalink
Browse files
feature: support input from hdfs and support hdfs kerberos (#172)
* support input from local/hdfs file (based on loader)
* support hdfs kerberos for input/output
* support filter vertex for hdfs output
  • Loading branch information
coderzc committed Jan 17, 2022
1 parent 0ae7fea commit d890a24c137fb02ecc1252a1da34c617858d39a6
Showing 32 changed files with 1,380 additions and 60 deletions.
@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.2</version>
<version>0.1.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.2</version>
<version>0.1.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.2</version>
<version>0.1.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -56,5 +56,9 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-loader</artifactId>
</dependency>
</dependencies>
</project>
@@ -90,8 +90,8 @@ public static synchronized ComputerOptions instance() {
new ConfigOption<>(
"input.source_type",
"The source type to load input data",
allowValues("hugegraph"),
"hugegraph"
allowValues("hugegraph-server", "hugegraph-loader"),
"hugegraph-server"
);

public static final ConfigOption<Integer> INPUT_SPLIT_FETCH_TIMEOUT =
@@ -175,6 +175,24 @@ public static synchronized ComputerOptions instance() {
200
);

public static final ConfigOption<String> INPUT_LOADER_STRUCT_PATH =
new ConfigOption<>(
"input.loader_struct_path",
"The struct path of loader input, only takes effect when " +
"the input.source_type=loader is enabled",
null,
""
);

public static final ConfigOption<String> INPUT_LOADER_SCHEMA_PATH =
new ConfigOption<>(
"input.loader_schema_path",
"The schema path of loader input, only takes effect when " +
"the input.source_type=loader is enabled",
null,
""
);

public static final ConfigOption<Integer> SORT_THREAD_NUMS =
new ConfigOption<>(
"sort.thread_nums",
@@ -300,6 +318,22 @@ public static synchronized ComputerOptions instance() {
"hadoop"
);

public static final ConfigOption<String> OUTPUT_HDFS_CORE_SITE_PATH =
new ConfigOption<>(
"output.hdfs_core_site_path",
"The hdfs core site path.",
null,
""
);

public static final ConfigOption<String> OUTPUT_HDFS_SITE_PATH =
new ConfigOption<>(
"output.hdfs_site_path",
"The hdfs site path.",
null,
""
);

public static final ConfigOption<Short> OUTPUT_HDFS_REPLICATION =
new ConfigOption<>(
"output.hdfs_replication",
@@ -321,7 +355,7 @@ public static synchronized ComputerOptions instance() {
"output.hdfs_delimiter",
"The delimiter of hdfs output.",
disallowEmpty(),
String.valueOf((char) 27)
","
);

public static final ConfigOption<Boolean> OUTPUT_HDFS_MERGE =
@@ -332,6 +366,38 @@ public static synchronized ComputerOptions instance() {
true
);

public static final ConfigOption<Boolean> OUTPUT_HDFS_KERBEROS_ENABLE =
new ConfigOption<>(
"output.hdfs_kerberos_enable",
"Is Kerberos authentication enabled for Hdfs.",
allowValues(true, false),
false
);

public static final ConfigOption<String> OUTPUT_HDFS_KRB5_CONF =
new ConfigOption<>(
"output.hdfs_krb5_conf",
"Kerberos configuration file.",
disallowEmpty(),
"/etc/krb5.conf"
);

public static final ConfigOption<String> OUTPUT_HDFS_KERBEROS_PRINCIPAL =
new ConfigOption<>(
"output.hdfs_kerberos_principal",
"The Hdfs's principal for kerberos authentication.",
null,
""
);

public static final ConfigOption<String> OUTPUT_HDFS_KERBEROS_KEYTAB =
new ConfigOption<>(
"output.hdfs_kerberos_keytab",
"The Hdfs's key tab file for kerberos authentication.",
null,
""
);

public static final ConfigOption<Integer>
ALLOCATOR_MAX_VERTICES_PER_THREAD = new ConfigOption<>(
"allocator.max_vertices_per_thread",
@@ -0,0 +1,67 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import java.util.ArrayList;
import java.util.List;

import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.id.IdType;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.schema.EdgeLabel;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.SplicingIdGenerator;
import com.google.common.collect.ImmutableList;

public class IdUtil {

public static final String NUMBER_ID_PREFIX = "L";
public static final String STRING_ID_PREFIX = "S";

private static String writeString(Object rawId) {
Id id = HugeConverter.convertId(rawId);
String idString = id.toString();
return (id.idType() == IdType.LONG ?
NUMBER_ID_PREFIX : STRING_ID_PREFIX).concat(idString);
}

private static List<Object> sortValues(Edge edge, EdgeLabel edgeLabel) {
List<String> sortKeys = edgeLabel.sortKeys();
if (sortKeys.isEmpty()) {
return ImmutableList.of();
}
List<Object> propValues = new ArrayList<>(sortKeys.size());
for (String sk : sortKeys) {
Object property = edge.property(sk);
E.checkState(property != null,
"The value of sort key '%s' can't be null", sk);
propValues.add(property);
}
return propValues;
}

public static String assignEdgeId(Edge edge, EdgeLabel edgeLabel) {
return SplicingIdGenerator.concat(
writeString(edge.sourceId()),
String.valueOf(edgeLabel.id()),
SplicingIdGenerator.concatValues(sortValues(edge, edgeLabel)),
writeString(edge.targetId()));
}
}
@@ -24,15 +24,19 @@
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.input.hg.HugeGraphFetcher;
import com.baidu.hugegraph.computer.core.input.hg.HugeInputSplitFetcher;
import com.baidu.hugegraph.computer.core.input.loader.LoaderFileInputSplitFetcher;
import com.baidu.hugegraph.computer.core.input.loader.LoaderGraphFetcher;
import com.baidu.hugegraph.computer.core.rpc.InputSplitRpcService;

public class InputSourceFactory {

public static InputSplitFetcher createInputSplitFetcher(Config config) {
String type = config.get(ComputerOptions.INPUT_SOURCE_TYPE);
switch (type) {
case "hugegraph":
case "hugegraph-server":
return new HugeInputSplitFetcher(config);
case "hugegraph-loader":
return new LoaderFileInputSplitFetcher(config);
default:
throw new ComputerException("Unexpected source type %s", type);
}
@@ -42,8 +46,10 @@ public static GraphFetcher createGraphFetcher(Config config,
InputSplitRpcService srv) {
String type = config.get(ComputerOptions.INPUT_SOURCE_TYPE);
switch (type) {
case "hugegraph":
case "hugegraph-server":
return new HugeGraphFetcher(config, srv);
case "hugegraph-loader":
return new LoaderGraphFetcher(config, srv);
default:
throw new ComputerException("Unexpected source type %s", type);
}
@@ -0,0 +1,73 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input.loader;

import java.util.ArrayList;
import java.util.List;

import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.input.EdgeFetcher;
import com.baidu.hugegraph.computer.core.input.IdUtil;
import com.baidu.hugegraph.loader.builder.EdgeBuilder;
import com.baidu.hugegraph.loader.builder.ElementBuilder;
import com.baidu.hugegraph.loader.executor.LoadContext;
import com.baidu.hugegraph.loader.mapping.EdgeMapping;
import com.baidu.hugegraph.loader.mapping.InputStruct;
import com.baidu.hugegraph.loader.reader.line.Line;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.schema.EdgeLabel;

public class FileEdgeFetcher extends FileElementFetcher<Edge>
implements EdgeFetcher {

public FileEdgeFetcher(Config config) {
super(config);
}

@Override
protected List<ElementBuilder<Edge>> elementBuilders(LoadContext context,
InputStruct struct) {
List<ElementBuilder<Edge>> builders = new ArrayList<>();
for (EdgeMapping mapping : struct.edges()) {
if (mapping.skip()) {
continue;
}
builders.add(new EdgeBuilder(context, struct, mapping));
}
return builders;
}

@Override
protected List<Edge> buildElement(Line line, ElementBuilder<Edge> builder) {
List<Edge> edges = super.buildElement(line, builder);
for (Edge edge : edges) {
// generate edgeId
EdgeLabel edgeLabel = (EdgeLabel) builder.schemaLabel();
String edgeId = IdUtil.assignEdgeId(edge, edgeLabel);
edge.id(edgeId);
}
return edges;
}

@Override
public void close() {
super.close();
}
}

0 comments on commit d890a24

Please sign in to comment.