Skip to content

Commit

Permalink
[Spark] DV Reads Performance Improvement in Delta by removing Broadca…
Browse files Browse the repository at this point in the history
…sting DV Information (#2888)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Back then, we relied on an [expensive Broadcast of DV
files](#1542) to pass the DV files
to the associated Parquet Files. With the introduction of [adding custom
metadata to files](apache/spark#40677)
introduced in Spark 3.5, we can now pass the DV through the custom
metadata field, this is expected to improve the performance of DV reads
in Delta.
## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Adjusted the existing UTs that cover our changes.
## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
longvu-db committed Apr 23, 2024
1 parent cb53c9a commit be7183b
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.apache.spark.sql.delta

import java.net.URI

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

Expand All @@ -32,17 +30,17 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow, ColumnVector}
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -59,13 +57,11 @@ case class DeltaParquetFileFormat(
nullableRowTrackingFields: Boolean = false,
isSplittable: Boolean = true,
disablePushDowns: Boolean = false,
tablePath: Option[String] = None,
broadcastDvMap: Option[Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]]] = None,
broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None)
tablePath: Option[String] = None)
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasDeletionVectorMap) {
require(tablePath.isDefined && !isSplittable && disablePushDowns,
if (hasTablePath) {
require(!isSplittable && disablePushDowns,
"Wrong arguments for Delta table scan with deletion vectors")
}

Expand Down Expand Up @@ -124,7 +120,7 @@ case class DeltaParquetFileFormat(
override def isSplitable(
sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = isSplittable

def hasDeletionVectorMap: Boolean = broadcastDvMap.isDefined && broadcastHadoopConf.isDefined
def hasTablePath: Boolean = tablePath.isDefined

/**
* We sometimes need to replace FileFormat within LogicalPlans, so we have to override
Expand Down Expand Up @@ -182,11 +178,13 @@ case class DeltaParquetFileFormat(
require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown")
}

if (hasDeletionVectorMap && isRowDeletedColumn.isEmpty) {
if (hasTablePath && isRowDeletedColumn.isEmpty) {
throw new IllegalArgumentException(
s"Expected a column $IS_ROW_DELETED_COLUMN_NAME in the schema")
}

val serializableHadoopConf = new SerializableConfiguration(hadoopConf)

val useOffHeapBuffers = sparkSession.sessionState.conf.offHeapColumnVectorEnabled
(partitionedFile: PartitionedFile) => {
val rowIteratorFromParquet = parquetDataReader(partitionedFile)
Expand All @@ -196,8 +194,9 @@ case class DeltaParquetFileFormat(
partitionedFile,
rowIteratorFromParquet,
isRowDeletedColumn,
useOffHeapBuffers = useOffHeapBuffers,
rowIndexColumn = rowIndexColumn)
rowIndexColumn,
useOffHeapBuffers,
serializableHadoopConf)
iterToReturn.asInstanceOf[Iterator[InternalRow]]
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -276,16 +275,11 @@ case class DeltaParquetFileFormat(
.updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion)
}

def copyWithDVInfo(
tablePath: String,
broadcastDvMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]],
broadcastHadoopConf: Broadcast[SerializableConfiguration]): DeltaParquetFileFormat = {
def disableSplittingAndPushdown(tablePath: String): DeltaParquetFileFormat = {
this.copy(
isSplittable = false,
disablePushDowns = true,
tablePath = Some(tablePath),
broadcastDvMap = Some(broadcastDvMap),
broadcastHadoopConf = Some(broadcastHadoopConf))
tablePath = Some(tablePath))
}

/**
Expand All @@ -300,28 +294,32 @@ case class DeltaParquetFileFormat(
iterator: Iterator[Object],
isRowDeletedColumn: Option[ColumnMetadata],
rowIndexColumn: Option[ColumnMetadata],
useOffHeapBuffers: Boolean): Iterator[Object] = {
val pathUri = partitionedFile.pathUri

useOffHeapBuffers: Boolean,
serializableHadoopConf: SerializableConfiguration): Iterator[Object] = {
val rowIndexFilter = isRowDeletedColumn.map { col =>
// Fetch the DV descriptor from the broadcast map and create a row index filter
broadcastDvMap.get.value
.get(pathUri)
.map { case DeletionVectorDescriptorWithFilterType(dvDescriptor, filterType) =>
filterType match {
case i if i == RowIndexFilterType.IF_CONTAINED =>
DropMarkedRowsFilter.createInstance(
dvDescriptor,
broadcastHadoopConf.get.value.value,
tablePath.map(new Path(_)))
case i if i == RowIndexFilterType.IF_NOT_CONTAINED =>
KeepMarkedRowsFilter.createInstance(
dvDescriptor,
broadcastHadoopConf.get.value.value,
tablePath.map(new Path(_)))
}
val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues
.get(FILE_ROW_INDEX_FILTER_ID_ENCODED)
val filterTypeOpt = partitionedFile.otherConstantMetadataColumnValues
.get(FILE_ROW_INDEX_FILTER_TYPE)
if (dvDescriptorOpt.isDefined && filterTypeOpt.isDefined) {
val rowIndexFilter = filterTypeOpt.get match {
case RowIndexFilterType.IF_CONTAINED => DropMarkedRowsFilter
case RowIndexFilterType.IF_NOT_CONTAINED => KeepMarkedRowsFilter
case unexpectedFilterType => throw new IllegalStateException(
s"Unexpected row index filter type: ${unexpectedFilterType}")
}
.getOrElse(KeepAllRowsFilter)
rowIndexFilter.createInstance(
DeletionVectorDescriptor.fromJson(dvDescriptorOpt.get.asInstanceOf[String]),
serializableHadoopConf.value,
tablePath.map(new Path(_)))
} else if (dvDescriptorOpt.isDefined || filterTypeOpt.isDefined) {
throw new IllegalStateException(
s"Both ${FILE_ROW_INDEX_FILTER_ID_ENCODED} and ${FILE_ROW_INDEX_FILTER_TYPE} " +
"should either both have values or no values at all.")
} else {
KeepAllRowsFilter
}
}

val metadataColumns = Seq(isRowDeletedColumn, rowIndexColumn).filter(_.nonEmpty).map(_.get)
Expand Down Expand Up @@ -417,6 +415,14 @@ object DeltaParquetFileFormat {
val ROW_INDEX_COLUMN_NAME = "__delta_internal_row_index"
val ROW_INDEX_STRUCT_FIELD = StructField(ROW_INDEX_COLUMN_NAME, LongType)

/** The key to the encoded row index filter identifier value of the
* [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */
val FILE_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded"

/** The key to the row index filter type value of the
* [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */
val FILE_ROW_INDEX_FILTER_TYPE = "row_index_filter_type"

/** Utility method to create a new writable vector */
private def newVector(
useOffHeapBuffers: Boolean, size: Int, dataType: StructField): WritableColumnVector = {
Expand Down Expand Up @@ -484,11 +490,6 @@ object DeltaParquetFileFormat {
/** Helper class to encapsulate column info */
case class ColumnMetadata(index: Int, structField: StructField)

/** Helper class that encapsulate an [[RowIndexFilterType]]. */
case class DeletionVectorDescriptorWithFilterType(
descriptor: DeletionVectorDescriptor,
filterType: RowIndexFilterType)

/**
* Translates the filter to use physical column names instead of logical column names.
* This is needed when the column mapping mode is set to `NameMapping` or `IdMapping`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,16 @@

package org.apache.spark.sql.delta

import java.net.URI

import org.apache.spark.sql.delta.{RowIndexFilter, RowIndexFilterType}
import org.apache.spark.sql.delta.RowIndexFilter
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

/**
* Plan transformer to inject a filter that removes the rows marked as deleted according to
Expand All @@ -54,8 +47,6 @@ import org.apache.spark.util.SerializableConfiguration
* to generate the row index. This is a cost we need to pay until we upgrade to latest
* Apache Spark which contains Parquet reader changes that automatically generate the
* row_index irrespective of the file splitting and filter pushdowns.
* - The scan created also contains a broadcast variable of Parquet File -> DV File map.
* The Parquet reader created uses this map to find the DV file corresponding to the data file.
* - Filter created filters out rows with __skip_row equals to 0
* - And at the end we have a Project to keep the plan node output same as before the rule is
* applied.
Expand Down Expand Up @@ -93,20 +84,20 @@ object ScanWithDeletionVectors {

// See if the relation is already modified to include DV reads as part of
// a previous invocation of this rule on this table
if (fileFormat.hasDeletionVectorMap) return None
if (fileFormat.hasTablePath) return None

// See if any files actually have a DV
val spark = SparkSession.getActiveSession.get
val filePathToDVBroadcastMap = createBroadcastDVMap(spark, index)
if (filePathToDVBroadcastMap.value.isEmpty) return None
val filesWithDVs = index
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
if (filesWithDVs.isEmpty) return None

// Get the list of columns in the output of the `LogicalRelation` we are
// trying to modify. At the end of the plan, we need to return a
// `LogicalRelation` that has the same output as this `LogicalRelation`
val planOutput = scan.output

val newScan = createScanWithSkipRowColumn(
spark, scan, fileFormat, index, filePathToDVBroadcastMap, hadoopRelation)
val newScan = createScanWithSkipRowColumn(scan, fileFormat, index, hadoopRelation)

// On top of the scan add a filter that filters out the rows which have
// skip row column value non-zero
Expand All @@ -121,11 +112,9 @@ object ScanWithDeletionVectors {
* an extra column which indicates whether the row needs to be skipped or not.
*/
private def createScanWithSkipRowColumn(
spark: SparkSession,
inputScan: LogicalRelation,
fileFormat: DeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
filePathToDVBroadcastMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]],
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
// Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra
// column to indicate whether to skip the row or not
Expand All @@ -141,11 +130,7 @@ object ScanWithDeletionVectors {
// operator after the data is read from the underlying file reader.
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)

val hadoopConfBroadcast = spark.sparkContext.broadcast(
new SerializableConfiguration(tahoeFileIndex.deltaLog.newDeltaHadoopConf()))

val newFileFormat = fileFormat.copyWithDVInfo(
tahoeFileIndex.path.toString, filePathToDVBroadcastMap, hadoopConfBroadcast)
val newFileFormat = fileFormat.disableSplittingAndPushdown(tahoeFileIndex.path.toString)
val newRelation = hadoopFsRelation.copy(
fileFormat = newFileFormat,
dataSchema = newDataSchema)(hadoopFsRelation.sparkSession)
Expand All @@ -166,33 +151,4 @@ object ScanWithDeletionVectors {
val filterExp = keepRow(new Column(skipRowColumnRef)).expr
Filter(filterExp, newScan)
}

private def createBroadcastDVMap(
spark: SparkSession,
tahoeFileIndex: TahoeFileIndex)
: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]] = {
val filterTypes = tahoeFileIndex.rowIndexFilters.getOrElse(Map.empty)

// Given there is no way to find the final filters, just select all files in the
// file index and create the DV map.
val filesWithDVs = tahoeFileIndex
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
// Attach filter types to FileActions, so that later [[DeltaParquetFileFormat]] could pick it up
// to decide which kind of rows should be filtered out. This info is necessary for reading CDC
// rows that have been deleted (marked in DV), in which case marked rows must be kept rather
// than filtered out. In such a case, the `filterTypes` map will be populated by [[CDCReader]]
// to indicate IF_NOT_CONTAINED filter should be used. In other cases, `filterTypes` will be
// empty, so we generate IF_CONTAINED as the default DV behavior.
val filePathToDVMap = filesWithDVs.map { addFile =>
val key = absolutePath(tahoeFileIndex.path.toString, addFile.path).toUri
val filterType =
filterTypes.getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED)
val value =
DeletionVectorDescriptorWithFilterType(addFile.deletionVector, filterType)
key -> value
}.toMap

spark.sparkContext.broadcast(filePathToDVMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import java.util.Objects

import scala.collection.mutable
import org.apache.spark.sql.delta.RowIndexFilterType
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, NoMapping, Snapshot, SnapshotDescriptor}
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaParquetFileFormat, Snapshot, SnapshotDescriptor}
import org.apache.spark.sql.delta.DefaultRowCommitVersion
import org.apache.spark.sql.delta.RowId
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol}
import org.apache.spark.sql.delta.implicits._
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -126,6 +127,17 @@ abstract class TahoeFileIndex(
addFile.defaultRowCommitVersion.foreach(defaultRowCommitVersion =>
metadata.put(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, defaultRowCommitVersion))

if (addFile.deletionVector != null) {
metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED,
JsonUtils.toJson(addFile.deletionVector))

// Set the filter type to IF_CONTAINED by default to let [[DeltaParquetFileFormat]] filter
// out rows unless a filter type was explicitly provided in rowIndexFilters. This can happen
// e.g. when reading CDC data to keep deleted rows instead of filtering them out.
val filterType = rowIndexFilters.getOrElse(Map.empty)
.getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED)
metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE, filterType)
}
FileStatusWithMetadata(fs, metadata.toMap)
}

Expand Down

0 comments on commit be7183b

Please sign in to comment.