Skip to content
Permalink
Browse files
[HUDI-2362] Add external config file support (#3416)
Co-authored-by: Wenning Ding <wenningd@amazon.com>
  • Loading branch information
zhedoubushishi and Wenning Ding committed Nov 18, 2021
1 parent 8772cec commit 24def0b30dabd3c447fbfbde636868f1913e9074
Show file tree
Hide file tree
Showing 25 changed files with 424 additions and 100 deletions.
@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.

# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false
@@ -360,7 +360,7 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {

TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getProps(true);

properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

@@ -18,14 +18,20 @@

package org.apache.hudi.common.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;

@@ -43,72 +49,110 @@ public class DFSPropertiesConfiguration {

private static final Logger LOG = LogManager.getLogger(DFSPropertiesConfiguration.class);

public static final String DEFAULT_PROPERTIES_FILE = "hudi-defaults.conf";

public static final String CONF_FILE_DIR_ENV_NAME = "HUDI_CONF_DIR";

public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf";

// props read from hudi-defaults.conf
private static TypedProperties GLOBAL_PROPS = loadGlobalProps();

private final FileSystem fs;

private final Path rootFile;
private Path currentFilePath;

private final TypedProperties props;
// props read from user defined configuration file or input stream
private final HoodieConfig hoodieConfig;

// Keep track of files visited, to detect loops
private final Set<String> visitedFiles;
private final Set<String> visitedFilePaths;

public DFSPropertiesConfiguration(FileSystem fs, Path rootFile, TypedProperties defaults) {
public DFSPropertiesConfiguration(FileSystem fs, Path filePath) {
this.fs = fs;
this.rootFile = rootFile;
this.props = defaults;
this.visitedFiles = new HashSet<>();
visitFile(rootFile);
}

public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
this(fs, rootFile, new TypedProperties());
this.currentFilePath = filePath;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
addPropsFromFile(filePath);
}

public DFSPropertiesConfiguration() {
this.fs = null;
this.rootFile = null;
this.props = new TypedProperties();
this.visitedFiles = new HashSet<>();
this.currentFilePath = null;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
}

private String[] splitProperty(String line) {
int ind = line.indexOf('=');
String k = line.substring(0, ind).trim();
String v = line.substring(ind + 1).trim();
return new String[] {k, v};
/**
* Load global props from hudi-defaults.conf which is under CONF_FILE_DIR_ENV_NAME.
* @return Typed Properties
*/
public static TypedProperties loadGlobalProps() {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration();
Option<Path> defaultConfPath = getConfPathFromEnv();
if (defaultConfPath.isPresent()) {
conf.addPropsFromFile(defaultConfPath.get());
} else {
try {
conf.addPropsFromFile(new Path(DEFAULT_CONF_FILE_DIR));
} catch (Exception ignored) {
LOG.debug("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR);
}
}
return conf.getProps();
}

public static void refreshGlobalProps() {
GLOBAL_PROPS = loadGlobalProps();
}

public static void clearGlobalProps() {
GLOBAL_PROPS = new TypedProperties();
}

private void visitFile(Path file) {
/**
* Add properties from external configuration files.
*
* @param filePath File path for configuration file
*/
public void addPropsFromFile(Path filePath) {
if (visitedFilePaths.contains(filePath.toString())) {
throw new IllegalStateException("Loop detected; file " + filePath + " already referenced");
}
FileSystem fileSystem;
try {
if (visitedFiles.contains(file.getName())) {
throw new IllegalStateException("Loop detected; file " + file + " already referenced");
}
visitedFiles.add(file.getName());
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file)));
addProperties(reader);
fileSystem = fs != null ? fs : filePath.getFileSystem(new Configuration());
} catch (IOException e) {
throw new IllegalArgumentException("Cannot get the file system from file path", e);
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(filePath)))) {
visitedFilePaths.add(filePath.toString());
currentFilePath = filePath;
addPropsFromStream(reader);
} catch (IOException ioe) {
LOG.error("Error reading in properies from dfs", ioe);
LOG.error("Error reading in properties from dfs");
throw new IllegalArgumentException("Cannot read properties from dfs", ioe);
}
}

/**
* Add properties from input stream.
*
* Add properties from buffered reader.
*
* @param reader Buffered Reader
* @throws IOException
*/
public void addProperties(BufferedReader reader) throws IOException {
public void addPropsFromStream(BufferedReader reader) throws IOException {
try {
reader.lines().forEach(line -> {
if (line.startsWith("#") || line.equals("") || !line.contains("=")) {
if (!isValidLine(line)) {
return;
}
String[] split = splitProperty(line);
if (line.startsWith("include=") || line.startsWith("include =")) {
visitFile(new Path(rootFile.getParent(), split[1]));
Path includeFilePath = new Path(currentFilePath.getParent(), split[1]);
addPropsFromFile(includeFilePath);
} else {
props.setProperty(split[0], split[1]);
hoodieConfig.setValue(split[0], split[1]);
}
});

@@ -117,7 +161,46 @@ public void addProperties(BufferedReader reader) throws IOException {
}
}

public TypedProperties getConfig() {
return props;
public static TypedProperties getGlobalProps() {
final TypedProperties globalProps = new TypedProperties();
globalProps.putAll(GLOBAL_PROPS);
return globalProps;
}

public TypedProperties getProps() {
return new TypedProperties(hoodieConfig.getProps());
}

public TypedProperties getProps(boolean includeGlobalProps) {
return new TypedProperties(hoodieConfig.getProps(includeGlobalProps));
}

private static Option<Path> getConfPathFromEnv() {
String confDir = System.getenv(CONF_FILE_DIR_ENV_NAME);
if (confDir == null) {
LOG.warn("Cannot find " + CONF_FILE_DIR_ENV_NAME + ", please set it as the dir of " + DEFAULT_PROPERTIES_FILE);
return Option.empty();
}
if (StringUtils.isNullOrEmpty(URI.create(confDir).getScheme())) {
confDir = "file://" + confDir;
}
return Option.of(new Path(confDir + File.separator + DEFAULT_PROPERTIES_FILE));
}

private String[] splitProperty(String line) {
line = line.replaceAll("\\s+"," ");
String delimiter = line.contains("=") ? "=" : " ";
int ind = line.indexOf(delimiter);
String k = line.substring(0, ind).trim();
String v = line.substring(ind + 1).trim();
return new String[] {k, v};
}

private boolean isValidLine(String line) {
ValidationUtils.checkArgument(line != null, "passed line is null");
if (line.startsWith("#") || line.equals("")) {
return false;
}
return line.contains("=") || line.matches(".*\\s.*");
}
}
@@ -58,6 +58,14 @@ public <T> void setValue(ConfigProperty<T> cfg, String val) {
props.setProperty(cfg.key(), val);
}

public <T> void setValue(String key, String val) {
props.setProperty(key, val);
}

public void setAll(Properties properties) {
props.putAll(properties);
}

public <T> void setDefaultValue(ConfigProperty<T> configProperty) {
if (!contains(configProperty)) {
Option<T> inferValue = Option.empty();
@@ -167,7 +175,17 @@ public <T> String getStringOrDefault(ConfigProperty<T> configProperty, String de
}

public Properties getProps() {
return props;
return getProps(false);
}

public Properties getProps(boolean includeGlobalProps) {
if (includeGlobalProps) {
Properties mergedProps = DFSPropertiesConfiguration.getGlobalProps();
mergedProps.putAll(props);
return mergedProps;
} else {
return props;
}
}

public void setDefaultOnCondition(boolean condition, HoodieConfig config) {
@@ -25,10 +25,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Rule;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;

@@ -47,6 +51,10 @@ public class TestDFSPropertiesConfiguration {
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;

@Rule
public static final EnvironmentVariables ENVIRONMENT_VARIABLES
= new EnvironmentVariables();

@BeforeAll
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
@@ -73,12 +81,17 @@ public static void initClass() throws Exception {
}

@AfterAll
public static void cleanupClass() throws Exception {
public static void cleanupClass() {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
}

@AfterEach
public void cleanupGlobalConfig() {
DFSPropertiesConfiguration.clearGlobalProps();
}

private static void writePropertiesFile(Path path, String[] lines) throws IOException {
PrintStream out = new PrintStream(dfs.create(path, true));
for (String line : lines) {
@@ -91,7 +104,7 @@ private static void writePropertiesFile(Path path, String[] lines) throws IOExce
@Test
public void testParsing() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props"));
TypedProperties props = cfg.getConfig();
TypedProperties props = cfg.getProps();
assertEquals(5, props.size());
assertThrows(IllegalArgumentException.class, () -> {
props.getString("invalid.key");
@@ -119,15 +132,38 @@ public void testParsing() {
@Test
public void testIncludes() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t3.props"));
TypedProperties props = cfg.getConfig();
TypedProperties props = cfg.getProps();

assertEquals(123, props.getInteger("int.prop"));
assertEquals(243.4, props.getDouble("double.prop"), 0.001);
assertTrue(props.getBoolean("boolean.prop"));
assertEquals("t3.value", props.getString("string.prop"));
assertEquals(1354354354, props.getLong("long.prop"));
assertThrows(IllegalStateException.class, () -> {
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t4.props"));
cfg.addPropsFromFile(new Path(dfsBasePath + "/t4.props"));
}, "Should error out on a self-included file.");
}

@Test
public void testNoGlobalConfFileConfigured() {
ENVIRONMENT_VARIABLES.clear(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME);
// Should not throw any exception when no external configuration file configured
DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(0, DFSPropertiesConfiguration.getGlobalProps().size());
}

@Test
public void testLoadGlobalConfFile() {
// set HUDI_CONF_DIR
String testPropsFilePath = new File("src/test/resources/external-config").getAbsolutePath();
ENVIRONMENT_VARIABLES.set(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME, testPropsFilePath);

DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size());
assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc"));
assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp"));
assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable"));
}
}

0 comments on commit 24def0b

Please sign in to comment.