diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 0c52a7cc49b7..eab98f2f1907 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -90,6 +90,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; @@ -375,7 +376,7 @@ private Dataset readRecordsForGroupAsRow(JavaSparkContext jsc, HashMap params = new HashMap<>(); params.put("hoodie.datasource.query.type", "snapshot"); - params.put("as.of.instant", instantTime); + params.put(TIMESTAMP_AS_OF.key(), instantTime); Path[] paths; if (hasLogFiles) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 917cfe621f11..00ff7e568330 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -36,6 +36,11 @@ public class HoodieCommonConfig extends HoodieConfig { .defaultValue(false) .withDocumentation("Enables support for Schema Evolution feature"); + public static final ConfigProperty TIMESTAMP_AS_OF = ConfigProperty + .key("as.of.instant") + .noDefaultValue() + .withDocumentation("The query instant for time travel. Without specified this option, we query the latest snapshot."); + public static final ConfigProperty RECONCILE_SCHEMA = ConfigProperty .key("hoodie.datasource.write.reconcile.schema") .defaultValue(false) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index 44844a8d475e..de1fd0055dc2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -48,6 +48,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; + /** * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then * always accept @@ -183,13 +185,13 @@ public boolean accept(Path path) { metaClientCache.put(baseDir.toString(), metaClient); } - if (getConf().get("as.of.instant") != null) { + if (getConf().get(TIMESTAMP_AS_OF.key()) != null) { // Build FileSystemViewManager with specified time, it's necessary to set this config when you may // access old version files. For example, in spark side, using "hoodie.datasource.read.paths" // which contains old version files, if not specify this value, these files will be filtered. fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()), - metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant"))); + metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key()))); } else { fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf())); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 1dd339b975cf..92c8f2a23609 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -126,11 +126,7 @@ object DataSourceReadOptions { .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + "instead of the full table. This option allows using glob pattern to directly filter on path.") - val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty - .key("as.of.instant") - .noDefaultValue() - .withDocumentation("The query instant for time travel. Without specified this option," + - " we query the latest snapshot.") + val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.enable.data.skipping") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 63186c075941..025a224373ae 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.PartitionPathEncodeUtils -import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} +import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver @@ -250,9 +250,17 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) - .filterKeys(_.startsWith("hoodie.")) + .filterKeys(isHoodieConfigKey) } + /** + * Check if Sql options are Hoodie Config keys. + * + * TODO: standardize the key prefix so that we don't need this helper (HUDI-4935) + */ + def isHoodieConfigKey(key: String): Boolean = + key.startsWith("hoodie.") || key == DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key + /** * Checks whether Spark is using Hive as Session's Catalog */