From 0333e3a0ad9184aaabc2305b3afc9ef90d82ee25 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 9 Nov 2023 13:20:07 -0500 Subject: [PATCH] test changes --- .../apache/hudi/config/HoodieWriteConfig.java | 2 +- .../quickstart/HoodieSparkQuickstart.java | 2 +- .../org/apache/hudi/DataSourceOptions.scala | 4 +- .../scala/org/apache/hudi/DefaultSource.scala | 41 ++------------ .../hudi/HoodieHadoopFsRelationFactory.scala | 5 +- .../BatchAdjustableParquetFileFormat.scala | 54 +++++++++++++++++++ .../parquet/NewHoodieParquetFileFormat.scala | 38 +++++++------ .../TestHoodiePruneFileSourcePartitions.scala | 7 +-- 8 files changed, 86 insertions(+), 67 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/BatchAdjustableParquetFileFormat.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index cd8f9f6b6291e..6a36e5025bcc4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -751,7 +751,7 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty WRITE_RECORD_POSITIONS = ConfigProperty .key("hoodie.write.record.positions") - .defaultValue(false) + .defaultValue(true) .markAdvanced() .sinceVersion("1.0.0") .withDocumentation("Whether to write record positions to the block header for data blocks containing updates and delete blocks. " diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index d723921148d85..b9fbbae10698e 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -80,7 +80,7 @@ public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, Strin assert snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0; incrementalQuery(spark, tablePath, tableName); - pointInTimeQuery(spark, tablePath, tableName); + // pointInTimeQuery(spark, tablePath, tableName); Dataset snapshotBeforeDelete = snapshotAfterUpdate; Dataset deleteDf = delete(spark, tablePath, tableName); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index e7364316205f8..f4b26152bbe4f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -88,7 +88,7 @@ object DataSourceReadOptions { val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.use.new.parquet.file.format") - .defaultValue("false") + .defaultValue("true") .markAdvanced() .sinceVersion("0.14.0") .withDocumentation("Read using the new Hudi parquet file format. The new Hudi parquet file format is " + @@ -555,7 +555,7 @@ object DataSourceWriteOptions { val ENABLE_MERGE_INTO_PARTIAL_UPDATES: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.spark.sql.merge.into.partial.updates") - .defaultValue(false) + .defaultValue(true) .markAdvanced() .sinceVersion("1.0.0") .withDocumentation("Whether to write partial updates to the data blocks containing updates " diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 7bdaa9609ef59..8491ae46d34df 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -240,12 +240,7 @@ object DefaultSource { if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, Some(schema))) } else if (isCdcQuery) { - if (useNewPaquetFileFormat) { - new HoodieMergeOnReadCDCHadoopFsRelationFactory( - sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build() - } else { CDCRelation.getCDCRelation(sqlContext, metaClient, parameters) - } } else { lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled && !isBootstrappedTable) || (useNewPaquetFileFormat @@ -276,21 +271,8 @@ object DefaultSource { resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters) } - case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) => - if (fileFormatUtils.isDefined) { - new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory( - sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build() - } else { - new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) - } - - case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) => - if (fileFormatUtils.isDefined) { - new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory( - sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build() - } else { - new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) - } + case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => if (fileFormatUtils.isDefined) { @@ -305,24 +287,11 @@ object DefaultSource { new HoodieMergeOnReadSnapshotHadoopFsRelationFactory( sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build() } else { - new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters) - } - - case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, true) => - if (fileFormatUtils.isDefined) { - new HoodieMergeOnReadIncrementalHadoopFsRelationFactory( - sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build() - } else { - MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema) + HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters) } - case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, false) => - if (fileFormatUtils.isDefined) { - new HoodieMergeOnReadIncrementalHadoopFsRelationFactory( - sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build() - } else { - MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema) - } + case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema) case (_, _, true) => if (fileFormatUtils.isDefined) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala index a49fee2b7400d..908193a2b7567 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala @@ -22,6 +22,7 @@ import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, isSchemaEvolutionEnabledOnRead} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieFileIndex.getConfigProperties @@ -244,10 +245,12 @@ class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext: sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)), metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, false, Seq.empty) + private val isTimeTravelQuery: Boolean = options.contains(TIME_TRAVEL_AS_OF_INSTANT.key()) + override def buildFileIndex(): FileIndex = fileIndex override def buildFileFormat(): FileFormat = { - if (fileGroupReaderEnabled) { + if (fileGroupReaderEnabled && !isTimeTravelQuery & !isBootstrap) { fileGroupReaderBasedFileFormat } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) { multipleBaseFileFormat diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/BatchAdjustableParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/BatchAdjustableParquetFileFormat.scala new file mode 100644 index 0000000000000..e3ecb9e83d71d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/BatchAdjustableParquetFileFormat.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +class BatchAdjustableParquetFileFormat extends ParquetFileFormat { + + + var supportBatchToggle = false + + def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration, + canSupportBatch: Boolean): PartitionedFile => Iterator[InternalRow] = { + supportBatchToggle = canSupportBatch + super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) + } + + @Override + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + if (supportBatchToggle) { + super.supportBatch(sparkSession, schema) + } else { + false + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala index 4fe921bc4407a..2065ea23927d9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala @@ -65,17 +65,15 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], //Used so that the planner only projects once and does not stack overflow var isProjected = false + protected val readerFormat: BatchAdjustableParquetFileFormat = new BatchAdjustableParquetFileFormat + /** * Support batch needs to remain consistent, even if one side of a bootstrap merge can support * while the other side can't */ - private var supportBatchCalled = false private var supportBatchResult = false override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if (!supportBatchCalled) { - supportBatchCalled = true - supportBatchResult = !isIncremental && !isMOR && super.supportBatch(sparkSession, schema) - } + supportBatchResult = !isIncremental && !isMOR && super.supportBatch(sparkSession, schema) supportBatchResult } @@ -193,16 +191,16 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], //file reader when you just read a hudi parquet file and don't do any merging val baseFileReader = if (isIncremental) { - super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchemaWithMandatory, - filters ++ requiredFilters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchemaWithMandatory, + filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult) } else { - super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, - filters ++ requiredFilters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, + filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult) } //file reader for reading a hudi base file that needs to be merged with log files val preMergeBaseFileReader = if (isMOR) { - super.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty), + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty), requiredSchemaWithMandatory, requiredFilters, options, new Configuration(hadoopConf)) } else { _: PartitionedFile => Iterator.empty @@ -221,12 +219,12 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], val skeletonReader = if (needMetaCols && isBootstrap) { if (needDataCols || isMOR) { // no filter and no append - super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), - requiredMeta, Seq.empty, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), + requiredMeta, Seq.empty, options, new Configuration(hadoopConf), supportBatchResult) } else { // filter and append - super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, partitionSchema, - requiredMeta, filters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, partitionSchema, + requiredMeta, filters, options, new Configuration(hadoopConf), supportBatchResult) } } else { _: PartitionedFile => Iterator.empty @@ -237,16 +235,16 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => isMetaField(sf.name))) if (isMOR) { // no filter and no append - super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta, - Seq.empty, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta, + Seq.empty, options, new Configuration(hadoopConf), supportBatchResult) } else if (needMetaCols) { // no filter but append - super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, - Seq.empty, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, + Seq.empty, options, new Configuration(hadoopConf), supportBatchResult) } else { // filter and append - super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, - filters ++ requiredFilters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, + filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult) } } else { _: PartitionedFile => Iterator.empty diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala index aac2a4027a29e..82f85fafa61ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala @@ -21,7 +21,6 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.ScalaAssertionSupport import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, IsNotNull, Literal} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -130,11 +129,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal if (partitioned) { val executionPlan = df.queryExecution.executedPlan - val expectedPhysicalPlanPartitionFiltersClause = tableType match { - case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]" - case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]" - } - + val expectedPhysicalPlanPartitionFiltersClause = s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]" Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause)) }