diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index cd8f9f6b6291e..6a36e5025bcc4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -751,7 +751,7 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty WRITE_RECORD_POSITIONS = ConfigProperty .key("hoodie.write.record.positions") - .defaultValue(false) + .defaultValue(true) .markAdvanced() .sinceVersion("1.0.0") .withDocumentation("Whether to write record positions to the block header for data blocks containing updates and delete blocks. " diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index d723921148d85..b9fbbae10698e 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -80,7 +80,7 @@ public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, Strin assert snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0; incrementalQuery(spark, tablePath, tableName); - pointInTimeQuery(spark, tablePath, tableName); + // pointInTimeQuery(spark, tablePath, tableName); Dataset snapshotBeforeDelete = snapshotAfterUpdate; Dataset deleteDf = delete(spark, tablePath, tableName); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index e7364316205f8..f4b26152bbe4f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -88,7 +88,7 @@ object DataSourceReadOptions { val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.use.new.parquet.file.format") - .defaultValue("false") + .defaultValue("true") .markAdvanced() .sinceVersion("0.14.0") .withDocumentation("Read using the new Hudi parquet file format. The new Hudi parquet file format is " + @@ -555,7 +555,7 @@ object DataSourceWriteOptions { val ENABLE_MERGE_INTO_PARTIAL_UPDATES: ConfigProperty[Boolean] = ConfigProperty .key("hoodie.spark.sql.merge.into.partial.updates") - .defaultValue(false) + .defaultValue(true) .markAdvanced() .sinceVersion("1.0.0") .withDocumentation("Whether to write partial updates to the data blocks containing updates " diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/BatchAdjustableParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/BatchAdjustableParquetFileFormat.scala new file mode 100644 index 0000000000000..e3ecb9e83d71d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/BatchAdjustableParquetFileFormat.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +class BatchAdjustableParquetFileFormat extends ParquetFileFormat { + + + var supportBatchToggle = false + + def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration, + canSupportBatch: Boolean): PartitionedFile => Iterator[InternalRow] = { + supportBatchToggle = canSupportBatch + super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) + } + + @Override + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + if (supportBatchToggle) { + super.supportBatch(sparkSession, schema) + } else { + false + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala index e77e353c66cc6..2065ea23927d9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala @@ -65,17 +65,15 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], //Used so that the planner only projects once and does not stack overflow var isProjected = false + protected val readerFormat: BatchAdjustableParquetFileFormat = new BatchAdjustableParquetFileFormat + /** * Support batch needs to remain consistent, even if one side of a bootstrap merge can support * while the other side can't */ - private var supportBatchCalled = false private var supportBatchResult = false override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if (!supportBatchCalled) { - supportBatchCalled = true - supportBatchResult = !isIncremental && !isMOR && super.supportBatch(sparkSession, schema) - } + supportBatchResult = !isIncremental && !isMOR && super.supportBatch(sparkSession, schema) supportBatchResult } @@ -193,18 +191,17 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], //file reader when you just read a hudi parquet file and don't do any merging val baseFileReader = if (isIncremental) { - super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchemaWithMandatory, - filters ++ requiredFilters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchemaWithMandatory, + filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult) } else { - super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, - filters ++ requiredFilters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, + filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult) } //file reader for reading a hudi base file that needs to be merged with log files - val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters, tableState.value.recordKeyField) val preMergeBaseFileReader = if (isMOR) { - super.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty), - requiredSchemaWithMandatory, requiredFilters ++ recordKeyRelatedFilters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty), + requiredSchemaWithMandatory, requiredFilters, options, new Configuration(hadoopConf)) } else { _: PartitionedFile => Iterator.empty } @@ -222,12 +219,12 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], val skeletonReader = if (needMetaCols && isBootstrap) { if (needDataCols || isMOR) { // no filter and no append - super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), - requiredMeta, Seq.empty, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), + requiredMeta, Seq.empty, options, new Configuration(hadoopConf), supportBatchResult) } else { // filter and append - super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, partitionSchema, - requiredMeta, filters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, partitionSchema, + requiredMeta, filters, options, new Configuration(hadoopConf), supportBatchResult) } } else { _: PartitionedFile => Iterator.empty @@ -238,16 +235,16 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => isMetaField(sf.name))) if (isMOR) { // no filter and no append - super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta, - Seq.empty, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta, + Seq.empty, options, new Configuration(hadoopConf), supportBatchResult) } else if (needMetaCols) { // no filter but append - super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, - Seq.empty, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, + Seq.empty, options, new Configuration(hadoopConf), supportBatchResult) } else { // filter and append - super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, - filters ++ requiredFilters, options, new Configuration(hadoopConf)) + readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta, + filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult) } } else { _: PartitionedFile => Iterator.empty @@ -372,8 +369,4 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState], protected def getLogFilesFromSlice(fileSlice: FileSlice): List[HoodieLogFile] = { fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList } - - protected def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn: String): Seq[Filter] = { - filters.filter(f => f.references.exists(c => c.equalsIgnoreCase(recordKeyColumn))) - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala index aac2a4027a29e..82f85fafa61ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala @@ -21,7 +21,6 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.ScalaAssertionSupport import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, IsNotNull, Literal} import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -130,11 +129,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal if (partitioned) { val executionPlan = df.queryExecution.executedPlan - val expectedPhysicalPlanPartitionFiltersClause = tableType match { - case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]" - case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]" - } - + val expectedPhysicalPlanPartitionFiltersClause = s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]" Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause)) }