Skip to content

Commit 8f2b532

Browse files
kamcheungting-dballisonport-db
authored andcommitted
Introduce Delta Statistics Columns dataSkippingStatsColumns
Allow user to specify delta dataskipping statistic columns list. Closes #1763 GitOrigin-RevId: b20c801057431ca0ba2a3494de49c24c5812434d
1 parent b99d700 commit 8f2b532

12 files changed

+1295
-108
lines changed

core/src/main/resources/error/delta-error-classes.json

+18
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,18 @@
322322
],
323323
"sqlState" : "0AKDC"
324324
},
325+
"DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_PARTITIONED_COLUMN" : {
326+
"message" : [
327+
"Data skipping is not supported for partition column '<column>'."
328+
],
329+
"sqlState" : "0AKDC"
330+
},
331+
"DELTA_COLUMN_DATA_SKIPPING_NOT_SUPPORTED_TYPE" : {
332+
"message" : [
333+
"Data skipping is not supported for column '<column>' of type <type>."
334+
],
335+
"sqlState" : "0AKDC"
336+
},
325337
"DELTA_COLUMN_MAPPING_MAX_COLUMN_ID_NOT_SET" : {
326338
"message" : [
327339
"The max column id property (<prop>) is not set on a column mapping enabled table."
@@ -579,6 +591,12 @@
579591
],
580592
"sqlState" : "42701"
581593
},
594+
"DELTA_DUPLICATE_DATA_SKIPPING_COLUMNS" : {
595+
"message" : [
596+
"Duplicated data skipping columns found: <columns>."
597+
],
598+
"sqlState" : "42701"
599+
},
582600
"DELTA_DUPLICATE_DOMAIN_METADATA_INTERNAL_ERROR" : {
583601
"message" : [
584602
"Internal error: two DomainMetadata actions within the same transaction have the same domain <domainName>"

core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala

+30-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.{HashMap, Locale}
2121
import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils}
2222
import org.apache.spark.sql.delta.metering.DeltaLogging
2323
import org.apache.spark.sql.delta.sources.DeltaSQLConf
24-
import org.apache.spark.sql.delta.stats.DataSkippingReader
24+
import org.apache.spark.sql.delta.stats.{DataSkippingReader, StatisticsCollection}
2525

2626
import org.apache.spark.sql.SparkSession
2727
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, IntervalUtils}
@@ -485,6 +485,35 @@ trait DeltaConfigsBase extends DeltaLogging {
485485
a => a >= -1,
486486
"needs to be larger than or equal to -1.")
487487

488+
/**
489+
* The names of specific columns to collect stats on for data skipping. If present, it takes
490+
* precedences over dataSkippingNumIndexedCols config, and the system will only collect stats for
491+
* columns that exactly match those specified. If a nested column is specified, the system will
492+
* collect stats for all leaf fields of that column. If a non-existent column is specified, it
493+
* will be ignored. Updating this conf does not trigger stats re-collection, but redefines the
494+
* stats schema of table, i.e., it will change the behavior of future stats collection (e.g., in
495+
* append and OPTIMIZE) as well as data skipping (e.g., the column stats not mentioned by this
496+
* config will be ignored even if they exist).
497+
*/
498+
val DATA_SKIPPING_STATS_COLUMNS = buildConfig[Option[String]](
499+
"dataSkippingStatsColumns",
500+
null,
501+
v => Option(v),
502+
vOpt => vOpt.forall(v => StatisticsCollection.parseDeltaStatsColumnNames(v).isDefined),
503+
"""
504+
|The dataSkippingStatsColumns parameter is a comma-separated list of case-insensitive column
505+
|identifiers. Each column identifier can consist of letters, digits, and underscores.
506+
|Multiple column identifiers can be listed, separated by commas.
507+
|
508+
|If a column identifier includes special characters such as !@#$%^&*()_+-={}|[]:";'<>,.?/,
509+
|the column name should be enclosed in backticks (`) to escape the special characters.
510+
|
511+
|A column identifier can refer to one of the following: the name of a non-struct column, the
512+
|leaf field's name of a struct column, or the name of a struct column. When a struct column's
513+
|name is specified in dataSkippingStatsColumns, statistics for all its leaf fields will be
514+
|collected.
515+
|""".stripMargin)
516+
488517
val SYMLINK_FORMAT_MANIFEST_ENABLED = buildConfig[Boolean](
489518
s"${hooks.GenerateSymlinkManifest.CONFIG_NAME_ROOT}.enabled",
490519
"false",

core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

+2
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
397397
proposedNewMetadata: Metadata,
398398
ignoreDefaultProperties: Boolean = false): Unit = {
399399
var newMetadataTmp = proposedNewMetadata
400+
// Validate all indexed columns are inside table's schema.
401+
StatisticsCollection.validateDeltaStatsColumns(newMetadataTmp)
400402
if (readVersion == -1 || isCreatingNewTable) {
401403
// We need to ignore the default properties when trying to create an exact copy of a table
402404
// (as in CLONE and SHALLOW CLONE).

core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

+10-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
2525
import org.apache.spark.sql.delta.schema.SchemaUtils
2626
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2727
import org.apache.spark.sql.delta.stats.DataSkippingReader
28+
import org.apache.spark.sql.delta.stats.DeltaStatsColumnSpec
2829
import org.apache.spark.sql.delta.stats.StatisticsCollection
2930
import org.apache.spark.sql.delta.util.StateCache
3031
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -87,6 +88,8 @@ class Snapshot(
8788
/** Snapshot to scan by the DeltaScanGenerator for metadata query optimizations */
8889
override val snapshotToScan: Snapshot = this
8990

91+
override def columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
92+
9093

9194
@volatile private[delta] var stateReconstructionTriggered = false
9295

@@ -152,7 +155,8 @@ class Snapshot(
152155
}
153156

154157
/** Number of columns to collect stats on for data skipping */
155-
lazy val numIndexedCols: Int = DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(metadata)
158+
override lazy val statsColumnSpec: DeltaStatsColumnSpec =
159+
StatisticsCollection.configuredDeltaStatsColumnSpec(metadata)
156160

157161
/** Performs validations during initialization */
158162
protected def init(): Unit = {
@@ -329,7 +333,11 @@ class Snapshot(
329333
allFiles = checksumOpt.flatMap(_.allFiles))
330334

331335
/** Returns the data schema of the table, used for reading stats */
332-
def tableDataSchema: StructType = metadata.dataSchema
336+
def tableSchema: StructType = metadata.dataSchema
337+
338+
def outputTableStatsSchema: StructType = metadata.dataSchema
339+
340+
def outputAttributeSchema: StructType = metadata.dataSchema
333341

334342
/** Returns the schema of the columns written out to file (overridden in write path) */
335343
def dataSchema: StructType = metadata.dataSchema

core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.commands._
3131
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
3232
import org.apache.spark.sql.delta.metering.DeltaLogging
3333
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf}
34+
import org.apache.spark.sql.delta.stats.StatisticsCollection
3435
import org.apache.hadoop.fs.Path
3536

3637
import org.apache.spark.internal.Logging
@@ -104,7 +105,9 @@ class DeltaCatalog extends DelegatingCatalogExtension
104105
var newPartitionColumns = partitionColumns
105106
var newBucketSpec = maybeBucketSpec
106107
val conf = spark.sessionState.conf
107-
108+
allTableProperties.asScala
109+
.get(DeltaConfigs.DATA_SKIPPING_STATS_COLUMNS.key)
110+
.foreach(StatisticsCollection.validateDeltaStatsColumns(schema, partitionColumns, _))
108111
val isByPath = isPathIdentifier(ident)
109112
if (isByPath && !conf.getConf(DeltaSQLConf.DELTA_LEGACY_ALLOW_AMBIGUOUS_PATHS)
110113
&& allTableProperties.containsKey("location")

core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman
7171
val df = spark.createDataFrame(new java.util.ArrayList[Row](), dataSchema)
7272
val checkColStat = spark.sessionState.conf.getConf(
7373
DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK)
74-
val statCollectionSchema = txn.snapshot.statCollectionSchema
74+
val statCollectionSchema = txn.snapshot.statCollectionLogicalSchema
7575
val colsWithoutStats = ArrayBuffer[String]()
7676

7777
unresolvedZOrderByCols.foreach { colAttribute =>

core/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala

+20-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraint
2828
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
2929
import org.apache.spark.sql.delta.schema.SchemaUtils.transformColumnsStructs
3030
import org.apache.spark.sql.delta.sources.DeltaSQLConf
31+
import org.apache.spark.sql.delta.stats.StatisticsCollection
3132

3233
import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
3334
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
@@ -119,6 +120,7 @@ case class AlterTableSetPropertiesDeltaCommand(
119120
case _ =>
120121
true
121122
}
123+
122124
val newMetadata = metadata.copy(
123125
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
124126
configuration = metadata.configuration ++ filteredConfs)
@@ -310,9 +312,13 @@ case class AlterTableDropColumnsDeltaCommand(
310312
if (droppingPartitionCols.nonEmpty) {
311313
throw DeltaErrors.dropPartitionColumnNotSupported(droppingPartitionCols)
312314
}
313-
314-
val newMetadata = metadata.copy(schemaString = newSchema.json)
315-
315+
// Updates the delta statistics column list by removing the dropped columns from it.
316+
val newConfiguration = metadata.configuration ++
317+
StatisticsCollection.dropDeltaStatsColumns(metadata, columnsToDrop)
318+
val newMetadata = metadata.copy(
319+
schemaString = newSchema.json,
320+
configuration = newConfiguration
321+
)
316322
columnsToDrop.foreach { columnParts =>
317323
checkDependentExpressions(sparkSession, columnParts, newMetadata, txn.protocol, "drop")
318324
}
@@ -397,8 +403,17 @@ case class AlterTableChangeColumnDeltaCommand(
397403
}
398404
} else metadata.partitionColumns
399405

406+
val oldColumnPath = columnPath :+ columnName
407+
val newColumnPath = columnPath :+ newColumn.name
408+
// Rename the column in the delta statistics columns configuration, if present.
409+
val newConfiguration = metadata.configuration ++
410+
StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath)
411+
400412
val newMetadata = metadata.copy(
401-
schemaString = newSchema.json, partitionColumns = newPartitionColumns)
413+
schemaString = newSchema.json,
414+
partitionColumns = newPartitionColumns,
415+
configuration = newConfiguration
416+
)
402417

403418
if (newColumn.name != columnName) {
404419
// need to validate the changes if the column is renamed
@@ -411,9 +426,7 @@ case class AlterTableChangeColumnDeltaCommand(
411426

412427
if (newColumn.name != columnName) {
413428
// record column rename separately
414-
txn.commit(Nil, DeltaOperations.RenameColumn(
415-
columnPath :+ columnName,
416-
columnPath :+ newColumn.name))
429+
txn.commit(Nil, DeltaOperations.RenameColumn(oldColumnPath, newColumnPath))
417430
} else {
418431
txn.commit(Nil, DeltaOperations.ChangeColumn(
419432
columnPath, columnName, newColumn, colPosition.map(_.toString)))

core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala

+27-25
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInv
2727
import org.apache.spark.sql.delta.metering.DeltaLogging
2828
import org.apache.spark.sql.delta.schema._
2929
import org.apache.spark.sql.delta.sources.DeltaSQLConf
30+
import org.apache.spark.sql.delta.sources.DeltaSQLConf.DELTA_COLLECT_STATS_USING_TABLE_SCHEMA
3031
import org.apache.spark.sql.delta.stats.{DeltaJobStatisticsTracker, StatisticsCollection}
3132
import org.apache.commons.lang3.exception.ExceptionUtils
3233
import org.apache.hadoop.fs.Path
@@ -241,24 +242,25 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
241242
}
242243

243244
/**
244-
* Return a tuple of (statsDataSchema, statsCollectionSchema).
245-
* statsDataSchema is the data source schema from DataFrame used for stats collection. It
246-
* contains the columns in the DataFrame output, excluding the partition columns.
247-
* statsCollectionSchema is the schema to collect stats for. It contains the columns in the
245+
* Return a tuple of (outputStatsCollectionSchema, statsCollectionSchema).
246+
* outputStatsCollectionSchema is the data source schema from DataFrame used for stats collection.
247+
* It contains the columns in the DataFrame output, excluding the partition columns.
248+
* tableStatsCollectionSchema is the schema to collect stats for. It contains the columns in the
248249
* table schema, excluding the partition columns.
249250
* Note: We only collect NULL_COUNT stats (as the number of rows) for the columns in
250-
* statsCollectionSchema but missing in statsDataSchema
251+
* statsCollectionSchema but missing in outputStatsCollectionSchema
251252
*/
252253
protected def getStatsSchema(
253254
dataFrameOutput: Seq[Attribute],
254255
partitionSchema: StructType): (Seq[Attribute], Seq[Attribute]) = {
255256
val partitionColNames = partitionSchema.map(_.name).toSet
256257

257-
// statsDataSchema comes from DataFrame output
258+
// The outputStatsCollectionSchema comes from DataFrame output
258259
// schema should be normalized, therefore we can do an equality check
259-
val statsDataSchema = dataFrameOutput.filterNot(c => partitionColNames.contains(c.name))
260+
val outputStatsCollectionSchema = dataFrameOutput
261+
.filterNot(c => partitionColNames.contains(c.name))
260262

261-
// statsCollectionSchema comes from table schema
263+
// The tableStatsCollectionSchema comes from table schema
262264
val statsTableSchema = metadata.schema.toAttributes
263265
val mappedStatsTableSchema = if (metadata.columnMappingMode == NoMapping) {
264266
statsTableSchema
@@ -267,10 +269,10 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
267269
}
268270

269271
// It's important to first do the column mapping and then drop the partition columns
270-
val filteredStatsTableSchema = mappedStatsTableSchema
272+
val tableStatsCollectionSchema = mappedStatsTableSchema
271273
.filterNot(c => partitionColNames.contains(c.name))
272274

273-
(statsDataSchema, filteredStatsTableSchema)
275+
(outputStatsCollectionSchema, tableStatsCollectionSchema)
274276
}
275277

276278
protected def getStatsColExpr(
@@ -291,33 +293,33 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
291293
Option[StatisticsCollection]) = {
292294
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_STATS)) {
293295

294-
val (statsDataSchema, statsCollectionSchema) = getStatsSchema(output, partitionSchema)
295-
296-
val indexedCols = DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(metadata)
296+
val (outputStatsCollectionSchema, tableStatsCollectionSchema) =
297+
getStatsSchema(output, partitionSchema)
297298

298299
val statsCollection = new StatisticsCollection {
299-
override def tableDataSchema = {
300-
// If collecting stats using the table schema, then pass in statsCollectionSchema.
301-
// Otherwise pass in statsDataSchema to collect stats using the DataFrame schema.
302-
if (spark.sessionState.conf.getConf(DeltaSQLConf
303-
.DELTA_COLLECT_STATS_USING_TABLE_SCHEMA)) {
304-
statsCollectionSchema.toStructType
300+
override val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
301+
override def tableSchema: StructType = metadata.schema
302+
override def outputTableStatsSchema: StructType = {
303+
// If collecting stats uses the table schema, then we pass in tableStatsCollectionSchema;
304+
// otherwise, pass in outputStatsCollectionSchema to collect stats using the DataFrame
305+
// schema.
306+
if (spark.sessionState.conf.getConf(DELTA_COLLECT_STATS_USING_TABLE_SCHEMA)) {
307+
tableStatsCollectionSchema.toStructType
305308
} else {
306-
statsDataSchema.toStructType
309+
outputStatsCollectionSchema.toStructType
307310
}
308311
}
309-
override def dataSchema = statsDataSchema.toStructType
312+
override def outputAttributeSchema: StructType = outputStatsCollectionSchema.toStructType
310313
override val spark: SparkSession = data.sparkSession
311-
override val numIndexedCols = indexedCols
314+
override val statsColumnSpec = StatisticsCollection.configuredDeltaStatsColumnSpec(metadata)
312315
override val protocol: Protocol = newProtocol.getOrElse(snapshot.protocol)
313316
}
314-
315-
val statsColExpr = getStatsColExpr(statsDataSchema, statsCollection)
317+
val statsColExpr = getStatsColExpr(outputStatsCollectionSchema, statsCollection)
316318

317319
(Some(new DeltaJobStatisticsTracker(
318320
deltaLog.newDeltaHadoopConf(),
319321
outputPath,
320-
statsDataSchema,
322+
outputStatsCollectionSchema,
321323
statsColExpr)), Some(statsCollection))
322324
} else {
323325
(None, None)

0 commit comments

Comments
 (0)