diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index ef6416c0ca68..f100658bce8c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -360,7 +360,7 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta String payloadClassName, String enableHiveSync, String propsFilePath, List configs) throws IOException { TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs) - : UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getProps(true); + : UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true); properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 4476e4fc28d4..9a10965427e0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -334,11 +335,24 @@ public class HoodieClusteringConfig extends HoodieConfig { /** @deprecated Use {@link #ASYNC_CLUSTERING_ENABLE} and its methods instead */ @Deprecated public static final String DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = ASYNC_CLUSTERING_ENABLE.defaultValue(); - + + // NOTE: This ctor is required for appropriate deserialization public HoodieClusteringConfig() { super(); } + public boolean isAsyncClusteringEnabled() { + return getBooleanOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE); + } + + public boolean isInlineClusteringEnabled() { + return getBooleanOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING); + } + + public static HoodieClusteringConfig from(TypedProperties props) { + return HoodieClusteringConfig.newBuilder().fromProperties(props).build(); + } + public static Builder newBuilder() { return new Builder(); } @@ -421,6 +435,7 @@ public Builder withAsyncClusteringMaxCommits(int numCommits) { } public Builder fromProperties(Properties props) { + // TODO this should cherry-pick only clustering properties this.clusteringConfig.getProps().putAll(props); return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsPrometheusConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsPrometheusConfig.java index c04e8aa1e980..e27ff1bcb089 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsPrometheusConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsPrometheusConfig.java @@ -166,11 +166,6 @@ public static HoodieMetricsPrometheusConfig.Builder newBuilder() { return new HoodieMetricsPrometheusConfig.Builder(); } - @Override - public Properties getProps() { - return super.getProps(); - } - public static class Builder { private HoodieMetricsPrometheusConfig hoodieMetricsPrometheusConfig = new HoodieMetricsPrometheusConfig(); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java index 317f15230c3c..edd2302a6ecc 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestMetricsReporterFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.metrics; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.MetricRegistry; @@ -57,7 +58,7 @@ public void metricsReporterFactoryShouldReturnReporter() { public void metricsReporterFactoryShouldReturnUserDefinedReporter() { when(config.getMetricReporterClassName()).thenReturn(DummyMetricsReporter.class.getName()); - Properties props = new Properties(); + TypedProperties props = new TypedProperties(); props.setProperty("testKey", "testValue"); when(config.getProps()).thenReturn(props); @@ -70,7 +71,7 @@ public void metricsReporterFactoryShouldReturnUserDefinedReporter() { @Test public void metricsReporterFactoryShouldThrowExceptionWhenMetricsReporterClassIsIllegal() { when(config.getMetricReporterClassName()).thenReturn(IllegalTestMetricsReporter.class.getName()); - when(config.getProps()).thenReturn(new Properties()); + when(config.getProps()).thenReturn(new TypedProperties()); assertThrows(HoodieException.class, () -> MetricsReporterFactory.createReporter(config, registry)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java index a4161cf1e375..01585f9259cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java @@ -21,12 +21,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -58,7 +61,8 @@ public class DFSPropertiesConfiguration { // props read from hudi-defaults.conf private static TypedProperties GLOBAL_PROPS = loadGlobalProps(); - private final FileSystem fs; + @Nullable + private final Configuration hadoopConfig; private Path currentFilePath; @@ -68,8 +72,8 @@ public class DFSPropertiesConfiguration { // Keep track of files visited, to detect loops private final Set visitedFilePaths; - public DFSPropertiesConfiguration(FileSystem fs, Path filePath) { - this.fs = fs; + public DFSPropertiesConfiguration(@Nonnull Configuration hadoopConf, @Nonnull Path filePath) { + this.hadoopConfig = hadoopConf; this.currentFilePath = filePath; this.hoodieConfig = new HoodieConfig(); this.visitedFilePaths = new HashSet<>(); @@ -77,7 +81,7 @@ public DFSPropertiesConfiguration(FileSystem fs, Path filePath) { } public DFSPropertiesConfiguration() { - this.fs = null; + this.hadoopConfig = null; this.currentFilePath = null; this.hoodieConfig = new HoodieConfig(); this.visitedFilePaths = new HashSet<>(); @@ -119,13 +123,13 @@ public void addPropsFromFile(Path filePath) { if (visitedFilePaths.contains(filePath.toString())) { throw new IllegalStateException("Loop detected; file " + filePath + " already referenced"); } - FileSystem fileSystem; - try { - fileSystem = fs != null ? fs : filePath.getFileSystem(new Configuration()); - } catch (IOException e) { - throw new IllegalArgumentException("Cannot get the file system from file path", e); - } - try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(filePath)))) { + + FileSystem fs = FSUtils.getFs( + filePath.toString(), + Option.ofNullable(hadoopConfig).orElseGet(Configuration::new) + ); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath)))) { visitedFilePaths.add(filePath.toString()); currentFilePath = filePath; addPropsFromStream(reader); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index 31e43d773dff..9c142ee3aba7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -44,14 +44,14 @@ public static HoodieConfig create(FSDataInputStream inputStream) throws IOExcept return config; } - protected Properties props; + protected TypedProperties props; public HoodieConfig() { - this.props = new Properties(); + this.props = new TypedProperties(); } public HoodieConfig(Properties props) { - this.props = props; + this.props = new TypedProperties(props); } public void setValue(ConfigProperty cfg, String val) { @@ -147,7 +147,7 @@ public Boolean getBoolean(ConfigProperty configProperty) { public boolean getBooleanOrDefault(ConfigProperty configProperty) { Option rawValue = getRawValue(configProperty); return rawValue.map(v -> Boolean.parseBoolean(v.toString())) - .orElse(Boolean.parseBoolean(configProperty.defaultValue().toString())); + .orElseGet(() -> Boolean.parseBoolean(configProperty.defaultValue().toString())); } public Long getLong(ConfigProperty configProperty) { @@ -174,13 +174,13 @@ public String getStringOrDefault(ConfigProperty configProperty, String de return rawValue.map(Object::toString).orElse(defaultVal); } - public Properties getProps() { + public TypedProperties getProps() { return getProps(false); } - public Properties getProps(boolean includeGlobalProps) { + public TypedProperties getProps(boolean includeGlobalProps) { if (includeGlobalProps) { - Properties mergedProps = DFSPropertiesConfiguration.getGlobalProps(); + TypedProperties mergedProps = DFSPropertiesConfiguration.getGlobalProps(); mergedProps.putAll(props); return mergedProps; } else { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java index b7ea8bff57c5..6a7075f551c6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java @@ -103,7 +103,7 @@ private static void writePropertiesFile(Path path, String[] lines) throws IOExce @Test public void testParsing() { - DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props")); + DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/t1.props")); TypedProperties props = cfg.getProps(); assertEquals(5, props.size()); assertThrows(IllegalArgumentException.class, () -> { @@ -131,7 +131,7 @@ public void testParsing() { @Test public void testIncludes() { - DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t3.props")); + DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/t3.props")); TypedProperties props = cfg.getProps(); assertEquals(123, props.getInteger("int.prop")); @@ -144,6 +144,31 @@ public void testIncludes() { }, "Should error out on a self-included file."); } + @Test + public void testLocalFileSystemLoading() { + DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/t1.props")); + + cfg.addPropsFromFile( + new Path( + String.format( + "file:%s", + getClass().getClassLoader() + .getResource("props/test.properties") + .getPath() + ) + ) + ); + + TypedProperties props = cfg.getProps(); + + assertEquals(123, props.getInteger("int.prop")); + assertEquals(113.4, props.getDouble("double.prop"), 0.001); + assertTrue(props.getBoolean("boolean.prop")); + assertEquals("str", props.getString("string.prop")); + assertEquals(1354354354, props.getLong("long.prop")); + assertEquals(123, props.getInteger("some.random.prop")); + } + @Test public void testNoGlobalConfFileConfigured() { ENVIRONMENT_VARIABLES.clear(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME); diff --git a/hudi-common/src/test/resources/props/test.properties b/hudi-common/src/test/resources/props/test.properties new file mode 100644 index 000000000000..8e848aff79d3 --- /dev/null +++ b/hudi-common/src/test/resources/props/test.properties @@ -0,0 +1,18 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +some.random.prop=123 \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 8954a8b2db53..e7cc0d525f65 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -102,7 +102,7 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) { return new TypedProperties(); } return readConfig( - FSUtils.getFs(cfg.propsFilePath, getHadoopConf()), + getHadoopConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(); } @@ -127,8 +127,8 @@ public static Schema getSourceSchema(org.apache.flink.configuration.Configuratio /** * Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option). */ - public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List overriddenProps) { - DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath); + public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Configuration hadoopConfig, Path cfgPath, List overriddenProps) { + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath); try { if (!overriddenProps.isEmpty()) { LOG.info("Adding overridden properties to file properties."); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 8e0e3eb5d01e..fe81f0c075c9 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -103,7 +103,7 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throw cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString(); this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate(); this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration()); - this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(); + this.props = UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(); log.info("Creating workload generator with configs : {}", props.toString()); this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration()); this.keyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 21393a01b5ed..8651e30c044c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -112,7 +112,7 @@ private boolean isUpsert() { public int dataImport(JavaSparkContext jsc, int retry) { this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) - : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true); + : UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(true); LOG.info("Starting data import with configs : " + props.toString()); int ret = -1; try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index 28d484c56589..39d16e2f11b7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -18,16 +18,13 @@ package org.apache.hudi.utilities; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -61,9 +58,8 @@ public HoodieCleaner(Config cfg, JavaSparkContext jssc) { /* * Filesystem used. */ - FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) - : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true); + : UtilHelpers.readConfig(jssc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true); LOG.info("Creating Cleaner with configs : " + props.toString()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index bfcbe3cfb808..4a590fcebd08 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -73,10 +73,7 @@ public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) { } private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); - - return UtilHelpers - .readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index ce69eff04321..706d1d9df4b9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -62,10 +62,7 @@ public HoodieCompactor(JavaSparkContext jsc, Config cfg) { } private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { - final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); - - return UtilHelpers - .readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) .getProps(true); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 8b0778b07c88..81c5caf82142 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -157,11 +158,8 @@ public static InitialCheckPointProvider createInitialCheckpointProvider( } } - /** - * - */ - public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List overriddenProps) { - DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath); + public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, Path cfgPath, List overriddenProps) { + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath); try { if (!overriddenProps.isEmpty()) { LOG.info("Adding overridden properties to file properties."); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index b5239e929359..7b29107e24b7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -172,6 +172,8 @@ public class DeltaSync implements Serializable { /** * Bag of properties with source, hoodie client, key generator etc. + * + * NOTE: These properties are already consolidated w/ CLI provided config-overrides */ private final TypedProperties props; @@ -680,22 +682,33 @@ private HoodieWriteConfig getHoodieClientConfig(SchemaProvider schemaProvider) { private HoodieWriteConfig getHoodieClientConfig(Schema schema) { final boolean combineBeforeUpsert = true; final boolean autoCommit = false; + + // NOTE: Provided that we're injecting combined properties + // (from {@code props}, including CLI overrides), there's no + // need to explicitly set up some configuration aspects that + // are based on these (for ex Clustering configuration) HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, combineBeforeUpsert) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName) - // Inline compaction is disabled for continuous mode. otherwise enabled for MOR - .withInlineCompaction(cfg.isInlineCompactionEnabled()).build()) - .withClusteringConfig(HoodieClusteringConfig.newBuilder() - .withInlineClustering(cfg.isInlineClusteringEnabled()) - .withAsyncClustering(cfg.isAsyncClusteringEnabled()).build()) - .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField) - .build()) + HoodieWriteConfig.newBuilder() + .withPath(cfg.targetBasePath) + .combineInput(cfg.filterDupes, combineBeforeUpsert) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withPayloadClass(cfg.payloadClassName) + .withInlineCompaction(cfg.isInlineCompactionEnabled()) + .build() + ) + .withPayloadConfig( + HoodiePayloadConfig.newBuilder() + .withPayloadOrderingField(cfg.sourceOrderingField) + .build()) .forTable(cfg.targetTableName) - .withAutoCommit(autoCommit).withProps(props); + .withAutoCommit(autoCommit) + .withProps(props); - if (null != schema) { - builder = builder.withSchema(schema.toString()); + if (schema != null) { + builder.withSchema(schema.toString()); } + HoodieWriteConfig config = builder.build(); // set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed. @@ -703,13 +716,15 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config); } + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props); + // Validate what deltastreamer assumes of write-config to be really safe ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(), String.format("%s should be set to %s", INLINE_COMPACT.key(), cfg.isInlineCompactionEnabled())); - ValidationUtils.checkArgument(config.inlineClusteringEnabled() == cfg.isInlineClusteringEnabled(), - String.format("%s should be set to %s", INLINE_CLUSTERING.key(), cfg.isInlineClusteringEnabled())); - ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == cfg.isAsyncClusteringEnabled(), - String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE.key(), cfg.isAsyncClusteringEnabled())); + ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(), + String.format("%s should be set to %s", INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled())); + ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(), + String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled())); ValidationUtils.checkArgument(!config.shouldAutoCommit(), String.format("%s should be set to %s", AUTO_COMMIT_ENABLE.key(), autoCommit)); ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 408a71484d8d..7a0c73319a82 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -97,6 +97,9 @@ public class HoodieDeltaStreamer implements Serializable { protected final transient Config cfg; + /** + * NOTE: These properties are already consolidated w/ CLI provided config-overrides. + */ private final TypedProperties properties; protected transient Option deltaSyncService; @@ -120,20 +123,8 @@ public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Con } public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, - Option props) throws IOException { - // Resolving the properties first in a consistent way - HoodieConfig hoodieConfig = new HoodieConfig(); - if (props.isPresent()) { - hoodieConfig.setAll(props.get()); - } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) { - hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps()); - } else { - hoodieConfig.setAll(UtilHelpers.readConfig( - FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), - new Path(cfg.propsFilePath), cfg.configs).getProps()); - } - hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA()); - this.properties = (TypedProperties) hoodieConfig.getProps(true); + Option propsOverride) throws IOException { + this.properties = combineProperties(cfg, propsOverride, jssc.hadoopConfiguration()); if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { InitialCheckPointProvider checkPointProvider = @@ -141,6 +132,7 @@ public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Con checkPointProvider.init(conf); cfg.checkpoint = checkPointProvider.getCheckpoint(); } + this.cfg = cfg; this.bootstrapExecutor = Option.ofNullable( cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null); @@ -148,6 +140,25 @@ public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Con cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, Option.ofNullable(this.properties))); } + private static TypedProperties combineProperties(Config cfg, Option propsOverride, Configuration hadoopConf) { + HoodieConfig hoodieConfig = new HoodieConfig(); + // Resolving the properties in a consistent way: + // 1. Properties override always takes precedence + // 2. Otherwise, check if there's no props file specified (merging in CLI overrides) + // 3. Otherwise, parse provided specified props file (merging in CLI overrides) + if (propsOverride.isPresent()) { + hoodieConfig.setAll(propsOverride.get()); + } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) { + hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps()); + } else { + hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps()); + } + + hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA()); + + return hoodieConfig.getProps(true); + } + public void shutdownGracefully() { deltaSyncService.ifPresent(ds -> ds.shutdown(false)); } @@ -362,20 +373,11 @@ public boolean isAsyncCompactionEnabled() { } public boolean isInlineCompactionEnabled() { + // Inline compaction is disabled for continuous mode, otherwise enabled for MOR return !continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); } - public boolean isAsyncClusteringEnabled() { - return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps() - .getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), false))); - } - - public boolean isInlineClusteringEnabled() { - return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps() - .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING.key(), false))); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -624,6 +626,8 @@ protected Pair startService() { LOG.info("Setting Spark Pool name for delta-sync to " + DELTASYNC_POOL_NAME); jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME); } + + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props); try { while (!isShutdownRequested()) { try { @@ -635,7 +639,7 @@ protected Pair startService() { HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get())); asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); } - if (cfg.isAsyncClusteringEnabled()) { + if (clusteringConfig.isAsyncClusteringEnabled()) { Option clusteringInstant = deltaSync.getClusteringInstantOpt(); if (clusteringInstant.isPresent()) { LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get()); @@ -708,7 +712,7 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { } } // start async clustering if required - if (cfg.isAsyncClusteringEnabled()) { + if (HoodieClusteringConfig.from(props).isAsyncClusteringEnabled()) { if (asyncClusteringService.isPresent()) { asyncClusteringService.get().updateWriteClient(writeClient); } else { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 2b788dc6d43d..dc150803e8b3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -77,7 +77,7 @@ public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) throw FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs); - TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getProps(); + TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(), new Path(commonPropsFile), new ArrayList()).getProps(); //get the tables to be ingested and their corresponding config files from this properties instance populateTableExecutionContextList(commonProperties, configFolder, fs, config); } @@ -116,7 +116,7 @@ private void populateTableExecutionContextList(TypedProperties properties, Strin String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX; String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable)); checkIfTableConfigFileExists(configFolder, fs, configFilePath); - TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getProps(); + TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList()).getProps(); properties.forEach((k, v) -> { if (tableProperties.get(k) == null) { tableProperties.setProperty(k.toString(), v.toString()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 227623eeb5a1..eeb8362ec975 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -371,7 +371,7 @@ static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, Fil @Test public void testProps() { TypedProperties props = - new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); + new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism")); assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field")); assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator", @@ -491,7 +491,7 @@ public void testKafkaConnectCheckpointProvider() throws IOException { String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); TypedProperties props = - new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); + new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath); cfg.initialCheckpointProvider = checkpointProviderClass; // create regular kafka connect hdfs dirs