Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
*/
package org.apache.beam.sdk.io.hdfs;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.List;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
Expand All @@ -39,11 +45,37 @@ public interface HadoopFileSystemOptions extends PipelineOptions {
void setHdfsConfiguration(List<Configuration> value);

/** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
class ConfigurationLocator implements DefaultValueFactory<Configuration> {
class ConfigurationLocator implements DefaultValueFactory<List<Configuration>> {
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationLocator.class);
@Override
public Configuration create(PipelineOptions options) {
// TODO: Find default configuration to use
return null;
public List<Configuration> create(PipelineOptions options) {
// Find default configuration when HADOOP_CONF_DIR or YARN_CONF_DIR is set.
Configuration conf = new Configuration(false);
List<String> hadoopEnvList = Lists.newArrayList("HADOOP_CONF_DIR", "YARN_CONF_DIR");
for (String env : hadoopEnvList) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we find a configuration in HADOOP_CONF_DIR and YARN_CONF_DIR, we should be returning them both separately and not having the YARN_CONF_DIR overwriting the properties found in HADOOP_CONF_DIR.

Also, ensure that we only load one configuration if both HADOOP_CONF_DIR and YARN_CONF_DIR point to the same location.

String hadoopConfPath = System.getenv(env);
Copy link
Member

@lukecwik lukecwik May 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit tests.

Note that System.getenv isn't mockable so the best bet is to make a method on ConfigurationLocator like:

@VisibleForTesting
Map<String, String> getEnvironment() {
  return System.getenv(key);
}

and spy it in your unit tests.

See GcpOptions and GcpOptionsTest for an example of how this kind of interaction can be tested

if (!Strings.isNullOrEmpty(hadoopConfPath) && new File(hadoopConfPath).exists()) {

// We just need to load both core-site.xml and hdfs-site.xml to determine the
// default fs path and the hdfs configuration
if (new File(hadoopConfPath + "/core-site.xml").exists()) {
Copy link
Member

@lukecwik lukecwik May 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the two argument constructor for File/Path here and below so we aren't assuming how path resolution works ("/" is common but not for every file system)

new File(hadoopConfPath, "core-site.xml");
new Path(hadoopConfPath, "core-site.xml");

conf.addResource(new Path(hadoopConfPath + "/core-site.xml"));

if (LOG.isDebugEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use parameterized messages

if (LOG.isDebugEnabled()) {
  LOG.debug("Log " + x);
}

becomes

LOG.debug("Log {}", x);

See:
https://www.slf4j.org/faq.html#logging_performance

LOG.debug("Adding " + hadoopConfPath + "/core-site.xml to hadoop configuration");
}
}

if (new File(hadoopConfPath + "/hdfs-site.xml").exists()) {
conf.addResource(new Path(hadoopConfPath + "/hdfs-site.xml"));

if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + hadoopConfPath + "/hdfs-site.xml to hadoop configuration");
}
}
}
}
return Lists.<Configuration>newArrayList(conf);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be returning a configuration if we didn't load one from one of the paths.

}
}
}