Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> WRITE_RECORD_POSITIONS = ConfigProperty
.key("hoodie.write.record.positions")
.defaultValue(false)
.defaultValue(true)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Whether to write record positions to the block header for data blocks containing updates and delete blocks. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, Strin
assert snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0;

incrementalQuery(spark, tablePath, tableName);
pointInTimeQuery(spark, tablePath, tableName);
// pointInTimeQuery(spark, tablePath, tableName);

Dataset<Row> snapshotBeforeDelete = snapshotAfterUpdate;
Dataset<Row> deleteDf = delete(spark, tablePath, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object DataSourceReadOptions {

val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.use.new.parquet.file.format")
.defaultValue("false")
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Read using the new Hudi parquet file format. The new Hudi parquet file format is " +
Expand Down Expand Up @@ -555,7 +555,7 @@ object DataSourceWriteOptions {

val ENABLE_MERGE_INTO_PARTIAL_UPDATES: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.spark.sql.merge.into.partial.updates")
.defaultValue(false)
.defaultValue(true)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Whether to write partial updates to the data blocks containing updates "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

class BatchAdjustableParquetFileFormat extends ParquetFileFormat {


var supportBatchToggle = false

def buildReaderWithPartitionValues(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
canSupportBatch: Boolean): PartitionedFile => Iterator[InternalRow] = {
supportBatchToggle = canSupportBatch
super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
}

@Override
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if (supportBatchToggle) {
super.supportBatch(sparkSession, schema)
} else {
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,15 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
//Used so that the planner only projects once and does not stack overflow
var isProjected = false

protected val readerFormat: BatchAdjustableParquetFileFormat = new BatchAdjustableParquetFileFormat

/**
* Support batch needs to remain consistent, even if one side of a bootstrap merge can support
* while the other side can't
*/
private var supportBatchCalled = false
private var supportBatchResult = false
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if (!supportBatchCalled) {
supportBatchCalled = true
supportBatchResult = !isIncremental && !isMOR && super.supportBatch(sparkSession, schema)
}
supportBatchResult = !isIncremental && !isMOR && super.supportBatch(sparkSession, schema)
supportBatchResult
}

Expand Down Expand Up @@ -193,18 +191,17 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],

//file reader when you just read a hudi parquet file and don't do any merging
val baseFileReader = if (isIncremental) {
super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchemaWithMandatory,
filters ++ requiredFilters, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchemaWithMandatory,
filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult)
} else {
super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema,
filters ++ requiredFilters, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema,
filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult)
}

//file reader for reading a hudi base file that needs to be merged with log files
val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters, tableState.value.recordKeyField)
val preMergeBaseFileReader = if (isMOR) {
super.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty),
requiredSchemaWithMandatory, requiredFilters ++ recordKeyRelatedFilters, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty),
requiredSchemaWithMandatory, requiredFilters, options, new Configuration(hadoopConf))
} else {
_: PartitionedFile => Iterator.empty
}
Expand All @@ -222,12 +219,12 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
val skeletonReader = if (needMetaCols && isBootstrap) {
if (needDataCols || isMOR) {
// no filter and no append
super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
requiredMeta, Seq.empty, options, new Configuration(hadoopConf), supportBatchResult)
} else {
// filter and append
super.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, partitionSchema,
requiredMeta, filters, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, HoodieSparkUtils.getMetaSchema, partitionSchema,
requiredMeta, filters, options, new Configuration(hadoopConf), supportBatchResult)
}
} else {
_: PartitionedFile => Iterator.empty
Expand All @@ -238,16 +235,16 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => isMetaField(sf.name)))
if (isMOR) {
// no filter and no append
super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
Seq.empty, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
Seq.empty, options, new Configuration(hadoopConf), supportBatchResult)
} else if (needMetaCols) {
// no filter but append
super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
Seq.empty, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
Seq.empty, options, new Configuration(hadoopConf), supportBatchResult)
} else {
// filter and append
super.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
filters ++ requiredFilters, options, new Configuration(hadoopConf))
readerFormat.buildReaderWithPartitionValues(sparkSession, dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
filters ++ requiredFilters, options, new Configuration(hadoopConf), supportBatchResult)
}
} else {
_: PartitionedFile => Iterator.empty
Expand Down Expand Up @@ -372,8 +369,4 @@ class NewHoodieParquetFileFormat(tableState: Broadcast[HoodieTableState],
protected def getLogFilesFromSlice(fileSlice: FileSlice): List[HoodieLogFile] = {
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
}

protected def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn: String): Seq[Filter] = {
filters.filter(f => f.references.exists(c => c.equalsIgnoreCase(recordKeyColumn)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.ScalaAssertionSupport
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, IsNotNull, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -130,11 +129,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal

if (partitioned) {
val executionPlan = df.queryExecution.executedPlan
val expectedPhysicalPlanPartitionFiltersClause = tableType match {
case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
}

val expectedPhysicalPlanPartitionFiltersClause = s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
}

Expand Down