Skip to content

Commit

Permalink
Support read hadoop conf file (#105)
Browse files Browse the repository at this point in the history
implement #104
  • Loading branch information
Alwayswaiting authored and zhoney committed Nov 26, 2019
1 parent 63aea9c commit d8cf21f
Show file tree
Hide file tree
Showing 79 changed files with 2,043 additions and 140 deletions.
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@
</activation>
<properties>
<source_type>hdfs</source_type>
<store_path>hdfs://localhost:8020/files</store_path>
<test-classes>**/FileLoadTest.java</test-classes>
<!-- same as hdfs://localhost:8020/files -->
<store_path>/files</store_path>
<test-classes>**/HDFSLoadTest.java</test-classes>
</properties>
</profile>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -54,7 +52,7 @@ public HDFSFileReader(HDFSSource source) {
super(source);
Configuration config = this.loadConfiguration();
try {
this.hdfs = FileSystem.get(URI.create(source.path()), config);
this.hdfs = FileSystem.get(config);
} catch (IOException e) {
throw new LoadException("Failed to create HDFS file system", e);
}
Expand Down Expand Up @@ -106,23 +104,7 @@ protected Readers openReaders() throws IOException {

private Configuration loadConfiguration() {
Configuration conf = new Configuration();
String fsDefaultFS = this.source().fsDefaultFS();
// Remote hadoop
if (fsDefaultFS != null) {
// TODO: Support pass more params or specify config files
conf.set("fs.defaultFS", fsDefaultFS);
return conf;
}
// Local hadoop
String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null && !hadoopHome.isEmpty()) {
LOG.info("Get HADOOP_HOME {}", hadoopHome);
String path = Paths.get(hadoopHome, "etc", "hadoop").toString();
conf.addResource(path(path, "/core-site.xml"));
conf.addResource(path(path, "/hdfs-site.xml"));
conf.addResource(path(path, "/mapred-site.xml"));
conf.addResource(path(path, "/yarn-site.xml"));
}
conf.addResource(new Path(this.source().coreSitePath()));
return conf;
}

Expand All @@ -138,10 +120,6 @@ private static void checkExist(FileSystem fs, Path path) {
}
}

private static Path path(String configPath, String configFile) {
return new Path(Paths.get(configPath, configFile).toString());
}

protected static class HDFSFile implements Readable {

private final FileSystem hdfs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,29 @@

import com.baidu.hugegraph.loader.source.SourceType;
import com.baidu.hugegraph.loader.source.file.FileSource;
import com.baidu.hugegraph.util.E;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.StringUtils;

public class HDFSSource extends FileSource {

@JsonProperty("fs_default_fs")
private String fsDefaultFS;
@JsonProperty("core_site_path")
private String coreSitePath;

@Override
public SourceType type() {
return SourceType.HDFS;
}

public String fsDefaultFS() {
return this.fsDefaultFS;
@Override
public void check() throws IllegalArgumentException {
super.check();
E.checkArgument(!StringUtils.isEmpty(this.coreSitePath),
"The core_site_path can't be empty");
}

public String coreSitePath() {
return this.coreSitePath;
}

@Override
Expand Down

0 comments on commit d8cf21f

Please sign in to comment.