Skip to content

Commit

Permalink
[HUDI-4936] Fix as.of.instant not recognized as hoodie config (apac…
Browse files Browse the repository at this point in the history
…he#5616)


Co-authored-by: leon <leon@leondeMacBook-Pro.local>
Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
3 people authored and voonhous committed Oct 7, 2022
1 parent a1c1d59 commit 09c9e15
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 10 deletions.
Expand Up @@ -90,6 +90,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;

Expand Down Expand Up @@ -375,7 +376,7 @@ private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,

HashMap<String, String> params = new HashMap<>();
params.put("hoodie.datasource.query.type", "snapshot");
params.put("as.of.instant", instantTime);
params.put(TIMESTAMP_AS_OF.key(), instantTime);

Path[] paths;
if (hasLogFiles) {
Expand Down
Expand Up @@ -36,6 +36,11 @@ public class HoodieCommonConfig extends HoodieConfig {
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");

public static final ConfigProperty<String> TIMESTAMP_AS_OF = ConfigProperty
.key("as.of.instant")
.noDefaultValue()
.withDocumentation("The query instant for time travel. Without specified this option, we query the latest snapshot.");

public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
Expand Down
Expand Up @@ -48,6 +48,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;

/**
* Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then
* always accept
Expand Down Expand Up @@ -183,13 +185,13 @@ public boolean accept(Path path) {
metaClientCache.put(baseDir.toString(), metaClient);
}

if (getConf().get("as.of.instant") != null) {
if (getConf().get(TIMESTAMP_AS_OF.key()) != null) {
// Build FileSystemViewManager with specified time, it's necessary to set this config when you may
// access old version files. For example, in spark side, using "hoodie.datasource.read.paths"
// which contains old version files, if not specify this value, these files will be filtered.
fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()),
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant")));
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key())));
} else {
fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()));
Expand Down
Expand Up @@ -116,11 +116,7 @@ object DataSourceReadOptions {
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")

val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty
.key("as.of.instant")
.noDefaultValue()
.withDocumentation("The query instant for time travel. Without specified this option," +
" we query the latest snapshot.")
val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF

val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.enable.data.skipping")
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
Expand Down Expand Up @@ -250,9 +250,17 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
(baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority
(spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
.filterKeys(_.startsWith("hoodie."))
.filterKeys(isHoodieConfigKey)
}

/**
* Check if Sql options are Hoodie Config keys.
*
* TODO: standardize the key prefix so that we don't need this helper (HUDI-4935)
*/
def isHoodieConfigKey(key: String): Boolean =
key.startsWith("hoodie.") || key == DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key

/**
* Checks whether Spark is using Hive as Session's Catalog
*/
Expand Down

0 comments on commit 09c9e15

Please sign in to comment.