Skip to content

Commit cb352c2

Browse files
authored
[Spark] Support partition-like data filters (#3831)
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Given an arbitrary data skipping expression, we can skip a file if: 1. For all of the referenced attributes in the expression, the collected min and max values are equal AND there are 0 nulls on that column. AND 2. The data skipping expression (when evaluated on the collect min==max values on all referenced attributes) evaluates to false. This PR adds support for some of these expressions with the following limitations: 1. The table must be >= 100 files (this is to ensure that the added data skipping expressions to avoid regressing the performance for small tables that won't have many files with the same min-max value). 2. The table must be a clustered table and all referenced attributes must be clustering columns (we use this heuristic to avoid adding extra complexity to data skipping for expressions that won't be able to filter out many files). 3. The expression must not reference a Timestamp-type column. Because stats on timestamp columns are truncated to millisecond precision, we can't safely assume that the min and max value for a timestamp column are the same (even if the collected stats are the same). Because timestamp is generally quite high cardinality, it should anyways be relatively rare that the min and max value for a file are equal for the timestamp column. One more minor nuance: There's one more case where the collected stats differs from the behavior of partitioned tables - a truncated string. However, if a string value is truncated to the first 32 characters, then the collected max value for the string will not be equal to the collected min value (as one or more tiebreaker character(s) will be appended to the collected max value). As a result, it should be sufficient to validate equality, since for any truncated string, the min and max value will not be equal. ## How was this patch tested? See new test. ## Does this PR introduce _any_ user-facing changes? No.
1 parent 235ce96 commit cb352c2

File tree

7 files changed

+657
-1
lines changed

7 files changed

+657
-1
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/metering/ScanReport.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ case class ScanReport(
2626
deltaDataSkippingType: String,
2727
partitionFilters: Seq[String],
2828
dataFilters: Seq[String],
29+
partitionLikeDataFilters: Seq[String],
30+
rewrittenPartitionLikeDataFilters: Seq[String],
2931
unusedFilters: Seq[String],
3032
size: Map[String, DataSize],
3133
@JsonDeserialize(contentAs = classOf[java.lang.Long])
@@ -51,6 +53,8 @@ object ScanReport {
5153
path: String,
5254
scanType: String,
5355
partitionFilters: Seq[String],
56+
partitionLikeDataFilters: Seq[String],
57+
rewrittenPartitionLikeDataFilters: Seq[String],
5458
dataFilters: Seq[String],
5559
unusedFilters: Seq[String],
5660
size: Map[String, DataSize],
@@ -67,6 +71,8 @@ object ScanReport {
6771
deltaDataSkippingType = "",
6872
partitionFilters = partitionFilters,
6973
dataFilters = dataFilters,
74+
partitionLikeDataFilters = partitionLikeDataFilters,
75+
rewrittenPartitionLikeDataFilters = rewrittenPartitionLikeDataFilters,
7076
unusedFilters = unusedFilters,
7177
size = size,
7278
metrics = metrics,
@@ -86,6 +92,8 @@ object ScanReport {
8692
scanType: String,
8793
partitionFilters: Seq[String],
8894
dataFilters: Seq[String],
95+
partitionLikeDataFilters: Seq[String],
96+
rewrittenPartitionLikeDataFilters: Seq[String],
8997
unusedFilters: Seq[String],
9098
size: Map[String, DataSize],
9199
metrics: Map[String, Long],
@@ -97,6 +105,8 @@ object ScanReport {
97105
scanType = scanType,
98106
partitionFilters = partitionFilters,
99107
dataFilters = dataFilters,
108+
partitionLikeDataFilters = partitionLikeDataFilters,
109+
rewrittenPartitionLikeDataFilters = rewrittenPartitionLikeDataFilters,
100110
unusedFilters = unusedFilters,
101111
size = size,
102112
metrics = metrics,

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,6 +1442,12 @@ def normalizeColumnNamesInDataType(
14421442
log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)}", e)
14431443
}
14441444
}
1445+
1446+
// Helper method to validate that two logical column names are equal using the Delta column
1447+
// resolver (case insensitive comparison).
1448+
def areLogicalNamesEqual(col1: Seq[String], col2: Seq[String]): Boolean = {
1449+
col1.length == col2.length && col1.zip(col2).forall(DELTA_COL_RESOLVER.tupled)
1450+
}
14451451
}
14461452

14471453
/**

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,6 +1231,25 @@ trait DeltaSQLConfBase {
12311231
.booleanConf
12321232
.createWithDefault(true)
12331233

1234+
val DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_ENABLED =
1235+
buildConf("skipping.partitionLikeFilters.enabled")
1236+
.doc(
1237+
"""
1238+
|If true, during data skipping, apply arbitrary data filters to "partition-like"
1239+
|files (files with the same min-max values and no nulls on all referenced attributes).
1240+
|""".stripMargin)
1241+
.internal()
1242+
.booleanConf
1243+
.createWithDefault(false)
1244+
1245+
val DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_THRESHOLD =
1246+
buildConf("skipping.partitionLikeDataSkippingFilesThreshold")
1247+
.internal()
1248+
.doc("Partition-like data skipping on files with the same min-max values will only be" +
1249+
"attempted when a Delta table has a number of files larger than this threshold.")
1250+
.intConf
1251+
.createWithDefault(100)
1252+
12341253
/**
12351254
* The below confs have a special prefix `spark.databricks.io` because this is the conf value
12361255
* already used by Databricks' data skipping implementation. There's no benefit to making OSS

spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala

Lines changed: 234 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,25 @@ import java.io.Closeable
2121

2222
import scala.collection.mutable.ArrayBuffer
2323

24+
import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo}
2425
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaLog, DeltaTableUtils}
2526
import org.apache.spark.sql.delta.actions.{AddFile, Metadata}
2627
import org.apache.spark.sql.delta.implicits._
2728
import org.apache.spark.sql.delta.metering.DeltaLogging
29+
import org.apache.spark.sql.delta.schema.SchemaUtils
2830
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2931
import org.apache.spark.sql.delta.stats.DeltaDataSkippingType.DeltaDataSkippingType
3032
import org.apache.spark.sql.delta.stats.DeltaStatistics._
3133
import org.apache.spark.sql.delta.util.StateCache
34+
import org.apache.spark.sql.util.ScalaExtensions._
3235
import org.apache.hadoop.fs.Path
3336

3437
import org.apache.spark.sql.{DataFrame, _}
38+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
3539
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
3640
import org.apache.spark.sql.catalyst.expressions._
3741
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
42+
import org.apache.spark.sql.catalyst.expressions.objects.InvokeLike
3843
import org.apache.spark.sql.catalyst.util.TypeUtils
3944
import org.apache.spark.sql.execution.InSubqueryExec
4045
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
@@ -583,6 +588,192 @@ trait DataSkippingReaderBase
583588
case _ => None
584589
}
585590

591+
// Lightweight wrapper to represent a fully resolved reference to an attribute for
592+
// partition-like data filters. Contains the min/max/null count stats column expressions and
593+
// the referenced stats column for the attribute.
594+
private case class ResolvedPartitionLikeReference(
595+
referencedStatsCols: Seq[StatsColumn],
596+
minExpr: Expression,
597+
maxExpr: Expression,
598+
nullCountExpr: Expression)
599+
600+
/**
601+
* Rewrites the references in an expression to point to the collected stats over that column
602+
* (if possible).
603+
*
604+
* This is generally equivalent to [[DeltaLog.rewritePartitionFilters]], with a few differences:
605+
* 1. This method checks the eligibility of the column datatype before rewriting it to point to
606+
* the stats column (which isn't needed for partition columns).
607+
* 2. There's no need to handle scalar subqueries (other than InSubqueryExec) here - subqueries
608+
* other than InSubqueryExec aren't eligible for data filtering.
609+
* 3. AND expressions may be partially rewritten as partition-like data filters if one branch
610+
* is eligible but the other is not.
611+
*
612+
* For example:
613+
* CAST(a AS DATE) = '2024-09-11' -> CAST(parsed_stats[minValues][a] AS DATE) = '2024-09-11'
614+
*
615+
* @param expr The expression to rewrite.
616+
* @return If the expression is safe to rewrite, return the rewritten expression and a
617+
* set of referenced attributes (with both the logical path to the column and the
618+
* column type).
619+
*/
620+
private def rewriteDataFiltersAsPartitionLikeInternal(
621+
expr: Expression,
622+
clusteringColumnPaths: Set[Seq[String]])
623+
: Option[(Expression, Set[ResolvedPartitionLikeReference])] = expr match {
624+
// The expression is an eligible reference to an attribute.
625+
// Do NOT allow partition-like filtering on timestamp columns because timestamps are truncated
626+
// to millisecond precision, meaning that we can't guarantee that the collected minVal and
627+
// maxVal are the same.
628+
// Applying these partition-like filters will generally only be beneficial if a large
629+
// percentage of files have the same min-max value. As a rough heuristic, only allow rewriting
630+
// expressions that reference only the clustering columns (since these columns are more likely
631+
// to have the same min-max values).
632+
case SkippingEligibleColumn(c, SkippingEligibleDataType(dt))
633+
if dt != TimestampType && dt != TimestampNTZType &&
634+
clusteringColumnPaths.exists(SchemaUtils.areLogicalNamesEqual(_, c.reverse)) =>
635+
// Only rewrite the expression if all stats are collected for this column.
636+
val minStatsCol = StatsColumn(MIN, c, dt)
637+
val maxStatsCol = StatsColumn(MAX, c, dt)
638+
val nullCountStatsCol = StatsColumn(NULL_COUNT, c, dt)
639+
for {
640+
minCol <- getStatsColumnOpt(minStatsCol);
641+
maxCol <- getStatsColumnOpt(maxStatsCol);
642+
nullCol <- getStatsColumnOpt(nullCountStatsCol)
643+
} yield {
644+
val resolvedAttribute = ResolvedPartitionLikeReference(
645+
Seq(minStatsCol, maxStatsCol, nullCountStatsCol),
646+
minCol.expr,
647+
maxCol.expr,
648+
nullCol.expr)
649+
(minCol.expr, Set(resolvedAttribute))
650+
}
651+
// For other attribute references, we can't safely rewrite the expression.
652+
case SkippingEligibleColumn(_, _) => None
653+
// Don't attempt data skipping on a nondeterministic expression, since the value returned
654+
// might be different when executed twice on the same input.
655+
// For example, rand() > 0.5 would return ~25% of records if used in data skipping, while the
656+
// user would expect ~50% of records to be returned.
657+
case other if !other.deterministic => None
658+
// Inline subquery results to support InSet. The subquery should generally have already been
659+
// evaluated.
660+
case in: InSubqueryExec =>
661+
// Values may not be defined if the subquery has been skipped - we can't apply this filter.
662+
in.values().flatMap { possiblyNullValues =>
663+
// Rewrite the children of InSubqueryExec, then replace the subquery with an InSet
664+
// containing the materialized values.
665+
rewriteDataFiltersAsPartitionLikeInternal(in.child, clusteringColumnPaths).flatMap {
666+
case (rewrittenChildren, referencedStats) =>
667+
Some(InSet(rewrittenChildren, possiblyNullValues.toSet), referencedStats)
668+
}
669+
}
670+
// Don't allow rewriting UDFs - even if deterministic, UDFs might have some unexpected
671+
// side effects when executed twice.
672+
case _: UserDefinedExpression => None
673+
// Don't attempt to rewrite expressions might be extremely expensive to invoke twice.
674+
case _: RegExpReplace | _: RegExpExtractBase | _: Like | _: MultiLikeBase => None
675+
case _: InvokeLike => None
676+
case _: JsonToStructs => None
677+
// Pushdown NOT through OR - we prefer AND to OR because AND can tolerate one branch not being
678+
// rewriteable.
679+
case Not(Or(e1, e2)) =>
680+
rewriteDataFiltersAsPartitionLikeInternal(And(Not(e1), Not(e2)), clusteringColumnPaths)
681+
// For AND expressions, we can tolerate one side not being eligible for partition-like
682+
// data skipping - simply remove the ineligible side.
683+
case And(left, right) =>
684+
val leftResult = rewriteDataFiltersAsPartitionLikeInternal(left, clusteringColumnPaths)
685+
val rightResult = rewriteDataFiltersAsPartitionLikeInternal(right, clusteringColumnPaths)
686+
(leftResult, rightResult) match {
687+
case (Some((newLeft, statsLeft)), Some((newRight, statsRight))) =>
688+
Some((And(newLeft, newRight), statsLeft ++ statsRight))
689+
case _ => leftResult.orElse(rightResult)
690+
}
691+
// For all other expressions, recursively rewrite the children.
692+
case other =>
693+
val childResults = other.children.map(
694+
rewriteDataFiltersAsPartitionLikeInternal(_, clusteringColumnPaths))
695+
Option.whenNot (childResults.exists(_.isEmpty)) {
696+
val (children, stats) = childResults.map(_.get).unzip
697+
(other.withNewChildren(children), stats.flatten.toSet)
698+
}
699+
}
700+
701+
/**
702+
* Returns an expression that returns true if a file must be read because of a mismatched
703+
* min-max value or partial nulls on a given column. For these files, it's not safe to apply
704+
* arbitrary partition-like filters.
705+
*/
706+
private def fileMustBeScanned(
707+
resolvedPartitionLikeReference: ResolvedPartitionLikeReference,
708+
numRecordsColOpt: Option[Column]): Expression = {
709+
// Construct an expression to determine if all records in the file are null.
710+
val nullCountExpr = resolvedPartitionLikeReference.nullCountExpr
711+
val allNulls = numRecordsColOpt match {
712+
case Some(physicalNumRecords) => EqualTo(nullCountExpr, physicalNumRecords.expr)
713+
case _ => Literal(false)
714+
}
715+
716+
// Note that there are 2 other differences in behavior between unpartitioned and partitioned
717+
// tables:
718+
// 1. If the column is a timestamp, the min-max stats are truncated to millisecond precision.
719+
// We shouldn't apply partition-like filters in this case, but
720+
// rewriteDataFiltersAsPartitionLikeInternal validates the column is not a Timestamp,
721+
// so we don't have to check here.
722+
// 2. The min-max stats on a string column might be truncated for an unpartitioned table.
723+
// Note that just validating that the min and max are equal is enough to prevent this case
724+
// - if the string is truncated, the collected max value is guaranteed to be longer than
725+
// the min value due to the tiebreaker character(s) appended at the end of the max.
726+
Not(
727+
Or(
728+
allNulls,
729+
And(
730+
EqualTo(
731+
resolvedPartitionLikeReference.minExpr, resolvedPartitionLikeReference.maxExpr),
732+
EqualTo(resolvedPartitionLikeReference.nullCountExpr, Literal(0L))
733+
)
734+
)
735+
)
736+
}
737+
738+
/**
739+
* Rewrites the given expression as a partition-like expression if possible:
740+
* 1. Rewrite the attribute references in the expression to reference the collected min stats
741+
* on the attribute reference's column.
742+
* 2. Construct an expression that returns true if any of the referenced columns are not
743+
* partition-like on a given file.
744+
* The rewritten expression is a union of the above expressions: a file is read if it's either
745+
* not partition-like on any of the columns or if the rewritten expression evaluates to true.
746+
*
747+
* @param clusteringColumns The columns that are used for clustering.
748+
* @param expr The data filtering expression to rewrite.
749+
* @return If the expression is safe to rewrite, return the rewritten
750+
* expression. Otherwise, return None.
751+
*/
752+
def rewriteDataFiltersAsPartitionLike(
753+
clusteringColumns: Seq[String], expr: Expression): Option[DataSkippingPredicate] = {
754+
val clusteringColumnPaths =
755+
clusteringColumns.map(UnresolvedAttribute.quotedString(_).nameParts).toSet
756+
rewriteDataFiltersAsPartitionLikeInternal(expr, clusteringColumnPaths).map {
757+
case (newExpr, referencedStats) =>
758+
// Create an expression that returns true if a file must be read because it has mismatched
759+
// min-max values or partial nulls on any of the referenced columns.
760+
val numRecordsStatsCol = StatsColumn(NUM_RECORDS, pathToColumn = Nil, LongType)
761+
val numRecordsColOpt = getStatsColumnOpt(numRecordsStatsCol)
762+
val statsCols = ArrayBuffer(numRecordsStatsCol)
763+
val finalExpr = referencedStats.foldLeft(newExpr) {
764+
case (oldExpr, resolvedReference) =>
765+
val updatedExpr = Or(
766+
oldExpr, fileMustBeScanned(resolvedReference, numRecordsColOpt))
767+
statsCols ++= resolvedReference.referencedStatsCols
768+
updatedExpr
769+
}
770+
// Create the final data skipping expression - read a file either if it's has nulls on any
771+
// referenced column, has mismatched stats on any referenced column, or the filter
772+
// expression evaluates to `true`.
773+
DataSkippingPredicate(Column(finalExpr), statsCols.toSet)
774+
}
775+
}
776+
586777
private def areAllLeavesLiteral(e: Expression): Boolean = e match {
587778
case _: Literal => true
588779
case _ if e.children.nonEmpty => e.children.forall(areAllLeavesLiteral)
@@ -927,6 +1118,8 @@ trait DataSkippingReaderBase
9271118
scannedSnapshot = snapshotToScan,
9281119
partitionFilters = ExpressionSet(Nil),
9291120
dataFilters = ExpressionSet(Nil),
1121+
partitionLikeDataFilters = ExpressionSet(Nil),
1122+
rewrittenPartitionLikeDataFilters = Set.empty,
9301123
unusedFilters = ExpressionSet(Nil),
9311124
scanDurationMs = System.currentTimeMillis() - startTime,
9321125
dataSkippingType = getCorrectDataSkippingType(DeltaDataSkippingType.noSkippingV1)
@@ -959,6 +1152,8 @@ trait DataSkippingReaderBase
9591152
scannedSnapshot = snapshotToScan,
9601153
partitionFilters = ExpressionSet(partitionFilters),
9611154
dataFilters = ExpressionSet(Nil),
1155+
partitionLikeDataFilters = ExpressionSet(Nil),
1156+
rewrittenPartitionLikeDataFilters = Set.empty,
9621157
unusedFilters = ExpressionSet(subqueryFilters),
9631158
scanDurationMs = System.currentTimeMillis() - startTime,
9641159
dataSkippingType =
@@ -973,13 +1168,47 @@ trait DataSkippingReaderBase
9731168
DeltaDataSkippingType.dataSkippingAndPartitionFilteringV1
9741169
}
9751170

976-
val (skippingFilters, unusedFilters) = if (useStats) {
1171+
var (skippingFilters, unusedFilters) = if (useStats) {
9771172
val constructDataFilters = new DataFiltersBuilder(spark, dataSkippingType)
9781173
dataFilters.map(f => (f, constructDataFilters(f))).partition(f => f._2.isDefined)
9791174
} else {
9801175
(Nil, dataFilters.map(f => (f, None)))
9811176
}
9821177

1178+
// If enabled, rewrite unused data filters to use partition-like data skipping for clustered
1179+
// tables. Only rewrite filters if the table is expected to benefit from partition-like
1180+
// data skipping:
1181+
// 1. The table should be have a large portion of files with the same min-max values on the
1182+
// referenced columns - as a rough heuristic, require the table to be a clustered table, as
1183+
// many files often have the same min-max on the clustering columns.
1184+
// 2. The table should be large enough to benefit from partition-like data skipping - as a
1185+
// rough heuristic, require the table to no longer be considered a "small delta table."
1186+
// 3. At least 1 data filter was not already used for data skipping.
1187+
val shouldRewriteDataFiltersAsPartitionLike =
1188+
spark.conf.get(DeltaSQLConf.DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_ENABLED) &&
1189+
ClusteredTableUtils.isSupported(snapshotToScan.protocol) &&
1190+
snapshotToScan.numOfFilesIfKnown.exists(_ >=
1191+
spark.conf.get(DeltaSQLConf.DELTA_DATASKIPPING_PARTITION_LIKE_FILTERS_THRESHOLD)) &&
1192+
unusedFilters.nonEmpty
1193+
val partitionLikeFilters = if (shouldRewriteDataFiltersAsPartitionLike) {
1194+
val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshotToScan)
1195+
val (rewrittenUsedFilters, rewrittenUnusedFilters) = {
1196+
val constructDataFilters = new DataFiltersBuilder(spark, dataSkippingType)
1197+
unusedFilters
1198+
.map { case (expr, _) =>
1199+
val rewrittenExprOpt = constructDataFilters.rewriteDataFiltersAsPartitionLike(
1200+
clusteringColumns, expr)
1201+
(expr, rewrittenExprOpt)
1202+
}
1203+
.partition(_._2.isDefined)
1204+
}
1205+
skippingFilters = skippingFilters ++ rewrittenUsedFilters
1206+
unusedFilters = rewrittenUnusedFilters
1207+
rewrittenUsedFilters.map { case (orig, rewrittenOpt) => (orig, rewrittenOpt.get) }
1208+
} else {
1209+
Nil
1210+
}
1211+
9831212
val finalSkippingFilters = skippingFilters
9841213
.map(_._2.get)
9851214
.reduceOption((skip1, skip2) => DataSkippingPredicate(
@@ -1000,6 +1229,8 @@ trait DataSkippingReaderBase
10001229
scannedSnapshot = snapshotToScan,
10011230
partitionFilters = ExpressionSet(partitionFilters),
10021231
dataFilters = ExpressionSet(skippingFilters.map(_._1)),
1232+
partitionLikeDataFilters = ExpressionSet(partitionLikeFilters.map(_._1)),
1233+
rewrittenPartitionLikeDataFilters = partitionLikeFilters.map(_._2.expr.expr).toSet,
10031234
unusedFilters = ExpressionSet(unusedFilters.map(_._1) ++ subqueryFilters),
10041235
scanDurationMs = System.currentTimeMillis() - startTime,
10051236
dataSkippingType = getCorrectDataSkippingType(dataSkippingType)
@@ -1044,6 +1275,8 @@ trait DataSkippingReaderBase
10441275
scannedSnapshot = snapshotToScan,
10451276
partitionFilters = ExpressionSet(partitionFilters),
10461277
dataFilters = ExpressionSet(Nil),
1278+
partitionLikeDataFilters = ExpressionSet(Nil),
1279+
rewrittenPartitionLikeDataFilters = Set.empty,
10471280
unusedFilters = ExpressionSet(Nil),
10481281
scanDurationMs = System.currentTimeMillis() - startTime,
10491282
dataSkippingType = DeltaDataSkippingType.filteredLimit

0 commit comments

Comments
 (0)