From 473f0308a7072321d166d228e5839e62a9ff3bcb Mon Sep 17 00:00:00 2001 From: "yangping.wu" Date: Fri, 28 Apr 2017 17:23:46 +0800 Subject: [PATCH 1/2] [BEAM-1491]Identify HADOOP_CONF_DIR(or YARN_CONF_DIR) environment variables --- .../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 7 +- .../beam/sdk/io/hdfs/HDFSFileSource.java | 7 +- .../apache/beam/sdk/io/hdfs/HadoopUtils.java | 70 +++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java index aa9e41ebcae3..902f656030b0 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java @@ -211,7 +211,12 @@ public Builder setConfiguration(@Nullable Configuration configuration) if (configuration == null) { configuration = new Configuration(false); } - return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); + + SerializableConfiguration serializableConfiguration = + new SerializableConfiguration(configuration); + HadoopUtils.readConfigurationFromHadoopYarnConfigDirs(serializableConfiguration); + + return this.setSerializableConfiguration(serializableConfiguration); } public abstract Builder setUsername(String username); public abstract Builder setValidate(boolean validate); diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 5cc20978628a..9942fcdebe73 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -244,7 +244,12 @@ public Builder setConfiguration(Configuration configuration) { if (configuration == null) { configuration = new Configuration(false); } - return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); + + SerializableConfiguration serializableConfiguration = + new SerializableConfiguration(configuration); + HadoopUtils.readConfigurationFromHadoopYarnConfigDirs(serializableConfiguration); + + return this.setSerializableConfiguration(serializableConfiguration); } public abstract Builder setSerializableSplit(SerializableSplit serializableSplit); public abstract Builder setUsername(@Nullable String username); diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java new file mode 100644 index 000000000000..6d002fffd60a --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import java.io.File; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to work with Apache Hadoop. + */ +public class HadoopUtils { + private static final Logger LOG = LoggerFactory.getLogger(SerializableConfiguration.class); + + /** + * Loading hadoop configuration from HADOOP_CONF_DIR and YARN_CONF_DIR environment variable. + */ + public static void readConfigurationFromHadoopYarnConfigDirs( + @Nullable SerializableConfiguration conf) { + + if (conf != null) { + List hadoopEnvList = Lists.newArrayList("HADOOP_CONF_DIR", "YARN_CONF_DIR"); + for (String env : hadoopEnvList) { + String hadoopConfPath = System.getenv(env); + if (!Strings.isNullOrEmpty(hadoopConfPath) + && new File(hadoopConfPath).exists()) { + + // We just need to load both core-site.xml and hdfs-site.xml to determine the + // default fs path and the hdfs configuration + if (new File(hadoopConfPath + "/core-site.xml").exists()) { + conf.get().addResource(new Path(hadoopConfPath + "/core-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + hadoopConfPath + "/core-site.xml to hadoop configuration"); + } + } + + if (new File(hadoopConfPath + "/hdfs-site.xml").exists()) { + conf.get().addResource(new Path(hadoopConfPath + "/hdfs-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + hadoopConfPath + "/hdfs-site.xml to hadoop configuration"); + } + } + } + } + } + } +} From b29c8c60dcd87f965cea9f85494f4bf79c7282b9 Mon Sep 17 00:00:00 2001 From: "yangping.wu" Date: Fri, 28 Apr 2017 17:25:27 +0800 Subject: [PATCH 2/2] [BEAM-1491]Identify HADOOP_CONF_DIR(or YARN_CONF_DIR) environment variables --- .../src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java index 6d002fffd60a..f17d5c902f3a 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopUtils.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.beam.sdk.io.hdfs; import com.google.common.base.Strings;