Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2840] Fixed DeltaStreaemer to properly respect configuration passed t/h properties file #4090

Merged
merged 15 commits into from
Nov 25, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {

TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getProps(true);
: UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);

properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
Expand Down Expand Up @@ -334,11 +335,24 @@ public class HoodieClusteringConfig extends HoodieConfig {
/** @deprecated Use {@link #ASYNC_CLUSTERING_ENABLE} and its methods instead */
@Deprecated
public static final String DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = ASYNC_CLUSTERING_ENABLE.defaultValue();


// NOTE: This ctor is required for appropriate deserialization
public HoodieClusteringConfig() {
super();
}

public boolean isAsyncClusteringEnabled() {
return getBooleanOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
}

public boolean isInlineClusteringEnabled() {
return getBooleanOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING);
}

public static HoodieClusteringConfig from(TypedProperties props) {
return HoodieClusteringConfig.newBuilder().fromProperties(props).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidental

}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -421,6 +435,7 @@ public Builder withAsyncClusteringMaxCommits(int numCommits) {
}

public Builder fromProperties(Properties props) {
// TODO this should cherry-pick only clustering properties
this.clusteringConfig.getProps().putAll(props);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ public static HoodieMetricsPrometheusConfig.Builder newBuilder() {
return new HoodieMetricsPrometheusConfig.Builder();
}

@Override
public Properties getProps() {
return super.getProps();
}

public static class Builder {

private HoodieMetricsPrometheusConfig hoodieMetricsPrometheusConfig = new HoodieMetricsPrometheusConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.metrics;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;

import com.codahale.metrics.MetricRegistry;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void metricsReporterFactoryShouldReturnReporter() {
public void metricsReporterFactoryShouldReturnUserDefinedReporter() {
when(config.getMetricReporterClassName()).thenReturn(DummyMetricsReporter.class.getName());

Properties props = new Properties();
TypedProperties props = new TypedProperties();
props.setProperty("testKey", "testValue");

when(config.getProps()).thenReturn(props);
Expand All @@ -70,7 +71,7 @@ public void metricsReporterFactoryShouldReturnUserDefinedReporter() {
@Test
public void metricsReporterFactoryShouldThrowExceptionWhenMetricsReporterClassIsIllegal() {
when(config.getMetricReporterClassName()).thenReturn(IllegalTestMetricsReporter.class.getName());
when(config.getProps()).thenReturn(new Properties());
when(config.getProps()).thenReturn(new TypedProperties());
assertThrows(HoodieException.class, () -> MetricsReporterFactory.createReporter(config, registry));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -58,7 +61,8 @@ public class DFSPropertiesConfiguration {
// props read from hudi-defaults.conf
private static TypedProperties GLOBAL_PROPS = loadGlobalProps();

private final FileSystem fs;
@Nullable
private final Configuration hadoopConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general we can keep SerializableConfiguration that way even if it serializes from driver to executor, it may be ok. But I think we don't do this, otherwise we ll face errros with fs to begin with


private Path currentFilePath;

Expand All @@ -68,16 +72,16 @@ public class DFSPropertiesConfiguration {
// Keep track of files visited, to detect loops
private final Set<String> visitedFilePaths;

public DFSPropertiesConfiguration(FileSystem fs, Path filePath) {
this.fs = fs;
public DFSPropertiesConfiguration(@Nonnull Configuration hadoopConf, @Nonnull Path filePath) {
this.hadoopConfig = hadoopConf;
this.currentFilePath = filePath;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
addPropsFromFile(filePath);
}

public DFSPropertiesConfiguration() {
this.fs = null;
this.hadoopConfig = null;
this.currentFilePath = null;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
Expand Down Expand Up @@ -119,13 +123,13 @@ public void addPropsFromFile(Path filePath) {
if (visitedFilePaths.contains(filePath.toString())) {
throw new IllegalStateException("Loop detected; file " + filePath + " already referenced");
}
FileSystem fileSystem;
try {
fileSystem = fs != null ? fs : filePath.getFileSystem(new Configuration());
} catch (IOException e) {
throw new IllegalArgumentException("Cannot get the file system from file path", e);
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(filePath)))) {

FileSystem fs = FSUtils.getFs(
filePath.toString(),
Option.ofNullable(hadoopConfig).orElseGet(Configuration::new)
);
Comment on lines +127 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan This is changed to get the FileSystem instance each time for loading a props file (fs is passed in from the constructor before). Since it's only for configuration, it shouldn't incur too much overhead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most changes in this PR are related to this so just to make sure there is no hidden implication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is called just once to load props from external file right? I mean, once per write operation. looks ok to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSystem instance is also cached at the hadoop client level. so this itself may be ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to fix local file-system access (for properties), since it got broken after #3416 (we're passing Hadoop FS in ctor, and we're using to try to access local files)


try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath)))) {
visitedFilePaths.add(filePath.toString());
currentFilePath = filePath;
addPropsFromStream(reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public static HoodieConfig create(FSDataInputStream inputStream) throws IOExcept
return config;
}

protected Properties props;
protected TypedProperties props;

public HoodieConfig() {
this.props = new Properties();
this.props = new TypedProperties();
}

public HoodieConfig(Properties props) {
this.props = props;
this.props = new TypedProperties(props);
}

public <T> void setValue(ConfigProperty<T> cfg, String val) {
Expand Down Expand Up @@ -147,7 +147,7 @@ public <T> Boolean getBoolean(ConfigProperty<T> configProperty) {
public <T> boolean getBooleanOrDefault(ConfigProperty<T> configProperty) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(v -> Boolean.parseBoolean(v.toString()))
.orElse(Boolean.parseBoolean(configProperty.defaultValue().toString()));
.orElseGet(() -> Boolean.parseBoolean(configProperty.defaultValue().toString()));
}

public <T> Long getLong(ConfigProperty<T> configProperty) {
Expand All @@ -174,13 +174,13 @@ public <T> String getStringOrDefault(ConfigProperty<T> configProperty, String de
return rawValue.map(Object::toString).orElse(defaultVal);
}

public Properties getProps() {
public TypedProperties getProps() {
return getProps(false);
}

public Properties getProps(boolean includeGlobalProps) {
public TypedProperties getProps(boolean includeGlobalProps) {
if (includeGlobalProps) {
Properties mergedProps = DFSPropertiesConfiguration.getGlobalProps();
TypedProperties mergedProps = DFSPropertiesConfiguration.getGlobalProps();
mergedProps.putAll(props);
return mergedProps;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private static void writePropertiesFile(Path path, String[] lines) throws IOExce

@Test
public void testParsing() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props"));
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/t1.props"));
TypedProperties props = cfg.getProps();
assertEquals(5, props.size());
assertThrows(IllegalArgumentException.class, () -> {
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testParsing() {

@Test
public void testIncludes() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t3.props"));
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/t3.props"));
TypedProperties props = cfg.getProps();

assertEquals(123, props.getInteger("int.prop"));
Expand All @@ -144,6 +144,31 @@ public void testIncludes() {
}, "Should error out on a self-included file.");
}

@Test
public void testLocalFileSystemLoading() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/t1.props"));

cfg.addPropsFromFile(
new Path(
String.format(
"file:%s",
getClass().getClassLoader()
.getResource("props/test.properties")
.getPath()
)
)
);

TypedProperties props = cfg.getProps();

assertEquals(123, props.getInteger("int.prop"));
assertEquals(113.4, props.getDouble("double.prop"), 0.001);
assertTrue(props.getBoolean("boolean.prop"));
assertEquals("str", props.getString("string.prop"));
assertEquals(1354354354, props.getLong("long.prop"));
assertEquals(123, props.getInteger("some.random.prop"));
}

@Test
public void testNoGlobalConfFileConfigured() {
ENVIRONMENT_VARIABLES.clear(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME);
Expand Down
18 changes: 18 additions & 0 deletions hudi-common/src/test/resources/props/test.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

some.random.prop=123
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) {
return new TypedProperties();
}
return readConfig(
FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
getHadoopConf(),
new Path(cfg.propsFilePath), cfg.configs).getProps();
}

Expand All @@ -127,8 +127,8 @@ public static Schema getSourceSchema(org.apache.flink.configuration.Configuratio
/**
* Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option).
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath);
public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Configuration hadoopConfig, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throw
cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps();
this.props = UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps();
log.info("Creating workload generator with configs : {}", props.toString());
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private boolean isUpsert() {
public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true);
: UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
LOG.info("Starting data import with configs : " + props.toString());
int ret = -1;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

package org.apache.hudi.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -61,9 +58,8 @@ public HoodieCleaner(Config cfg, JavaSparkContext jssc) {
/*
* Filesystem used.
*/
FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true);
: UtilHelpers.readConfig(jssc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
LOG.info("Creating Cleaner with configs : " + props.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
}

private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());

return UtilHelpers
.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
}

private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());

return UtilHelpers
.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.utilities;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
Expand Down Expand Up @@ -157,11 +158,8 @@ public static InitialCheckPointProvider createInitialCheckpointProvider(
}
}

/**
*
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath);
public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
Expand Down
Loading