From 88aa0324e362c4098e5cd52a6aae67d9d813136c Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 20:15:09 +0000 Subject: [PATCH 01/13] validation --- .../resources/error/error-conditions.json | 6 + .../autocdc/Scd1BatchProcessor.scala | 109 +++++++- .../autocdc/Scd1BatchProcessorSuite.scala | 237 +++++++++++++++++- 3 files changed, 338 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index fb0bb87172a8e..9274c17500a75 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -209,6 +209,12 @@ ], "sqlState" : "42703" }, + "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : { + "message" : [ + "Using column name comparison, the column `` in the schema conflicts with the reserved AutoCDC column name ``. Rename or remove the column." + ], + "sqlState" : "42710" + }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { "message" : [ "Cannot write null value for field defined as non-null Avro data type .", diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index f87a4a1da53d4..d3fcc2b8e7e0f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -17,16 +17,26 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.sql.{functions => F} +import org.apache.spark.SparkException +import org.apache.spark.sql.{functions => F, AnalysisException} +import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.util.QuotingUtils import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.ArrayImplicits._ /** * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] * configuration. + * + * @param changeArgs The CDC flow configuration. + * @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived + * from the flow's resolved DataFrame at flow setup time. */ -case class Scd1BatchProcessor(changeArgs: ChangeArgs) { +case class Scd1BatchProcessor( + changeArgs: ChangeArgs, + resolvedSequencingType: DataType) { + /** * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key * as ordered by [[ChangeArgs.sequencing]]. @@ -59,9 +69,102 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { ) .select(F.col(s"$winningRowCol.*")) } + + /** + * Project the CDC metadata column onto the microbatch. + */ + def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + // Proactively validate the reserved CDC metadata column does not exist in the microbatch. + validateCdcMetadataColumnNotPresent(microbatchDf) + + val rowDeleteSequence: Column = changeArgs.deleteCondition match { + case Some(deleteCondition) => + F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null)) + case None => + F.lit(null) + } + + val rowUpsertSequence: Column = + // A row that is not a delete must be an upsert, these are mutually exclusive and a complete + // set of CDC event types. + F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null)) + + microbatchDf.withColumn( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.constructCdcMetadataCol( + deleteSequence = rowDeleteSequence, + upsertSequence = rowUpsertSequence, + sequencingType = resolvedSequencingType + ) + ) + } + + private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { + val ignoreColumnNameCase = + !microbatchDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + + microbatchDf.schema.fieldNames + .find { fieldName => + if (ignoreColumnNameCase) { + fieldName.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) + } else { + fieldName.equals(Scd1BatchProcessor.cdcMetadataColName) + } + } + .foreach { conflictingColumnName => + throw new AnalysisException( + errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of(!ignoreColumnNameCase), + "columnName" -> conflictingColumnName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } object Scd1BatchProcessor { // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. - private[autocdc] val winningRowColName = "__spark_autocdc_winning_row" + private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row" + private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata" + + private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence" + private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence" + + /** + * Schema of the CDC metadata struct column for SCD1. + */ + private def cdcMetadataColSchema(sequencingType: DataType): StructType = + StructType( + Seq( + // The sequencing of the event if it represents a delete, null otherwise. + StructField(cdcDeleteSequenceFieldName, sequencingType, nullable = true), + // The sequencing of the event if it represents an upsert, null otherwise. + StructField(cdcUpsertSequenceFieldName, sequencingType, nullable = true) + ) + ) + + /** + * Construct the CDC metadata struct column for SCD1, following the exact schema and field + * ordering defined by [[cdcMetadataColSchema]]. + */ + private[autocdc] def constructCdcMetadataCol( + deleteSequence: Column, + upsertSequence: Column, + sequencingType: DataType): Column = { + val cdcMetadataFieldsInOrder = cdcMetadataColSchema(sequencingType).fields.map { field => + val value = field.name match { + case `cdcDeleteSequenceFieldName` => deleteSequence + case `cdcUpsertSequenceFieldName` => upsertSequence + case other => + throw SparkException.internalError( + s"Unable to construct SCD1 CDC metadata column due to unknown `${other}` field." + ) + } + value.cast(field.dataType).as(field.name) + } + F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*) + } } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 208c0aa1e4c59..3975f11f9b68f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row} import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -53,7 +54,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -77,7 +79,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -102,7 +105,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // On equal sequence number events for the same key we provide no guarantee on which event will @@ -133,7 +137,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -160,7 +165,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( df = processor.deduplicateMicrobatch(batch), @@ -187,7 +193,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -217,7 +224,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // All non-key columns must come from the row with the largest sequence value, never @@ -277,7 +285,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -338,7 +347,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("`user.id`")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -402,7 +412,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // Field names and dataTypes must match the input exactly, in the original order. @@ -424,11 +435,215 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) val result = processor.deduplicateMicrobatch(batch) assert(result.collect().isEmpty) assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) } + + test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or an upsert " + + "per deleteCondition") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, 10L, false), + Row(2, 20L, true), + Row(3, 30L, false), + Row(4, 40L, true) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = Some(F.col("is_delete") === true) + ), + resolvedSequencingType = LongType + ) + + // Mutual-exclusivity invariant: each row's _cdc_metadata struct has exactly one of + // (deleteSequence, upsertSequence) non-null, and the non-null side carries the row's + // sequence value. + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, false, Row(null, 10L)), + Row(2, 20L, true, Row(20L, null)), + Row(3, 30L, false, Row(null, 30L)), + Row(4, 40L, true, Row(40L, null)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " + + "when deleteCondition is None") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a"), + Row(2, 20L, "b") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = None + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, "a", Row(null, 10L)), + Row(2, 20L, "b", Row(null, 20L)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata appends CDC metadata as the last column") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + // Original columns are preserved in their original order, with CDC metadata appended at + // the very end. + assert(result.schema.fieldNames.toSeq == + schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName) + } + + test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence fields to " + + "resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is IntegerType, but the flow's resolved sequencing type + // will be LongType. This should be upcasted in the projected CDC metadata column. + .add("seq", IntegerType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + val cdcMetadataDataType = + resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq( + Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType, + Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType)) + + // The cast must also succeed at runtime: upsertSequence is materialized as a Long value, not + // an Int. + checkAnswer( + df = resultDf, + expectedAnswer = Row(1, 10, "a", Row(null, 10L)) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata fails fast when the microbatch's sequencing column " + + "is incompatible with resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is a struct, whereas the flow's resolved sequencing type + // will be LongType. These are incompatible and should throw. + .add( + "seq", + new StructType() + .add("major", LongType) + .add("minor", LongType)) + + val batch = microbatchOf(schema)( + Row(1, Row(1L, 0L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val ex = intercept[AnalysisException] { + // .schema forces analysis of the underlying logical plan, surfacing the invalid cast. + processor.extendMicrobatchRowsWithCdcMetadata(batch).schema + } + assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION") + } + + test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already contains the " + + "reserved CDC metadata column") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add(Scd1BatchProcessor.cdcMetadataColName, StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "user-supplied") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkError( + exception = intercept[AnalysisException] { + processor.extendMicrobatchRowsWithCdcMetadata(batch) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "columnName" -> Scd1BatchProcessor.cdcMetadataColName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } From f8d269092477657ef2a156b6e20e4fad1b50eca9 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 21:31:20 +0000 Subject: [PATCH 02/13] buff scaladoc --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index d3fcc2b8e7e0f..bdc8bf1f9683b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -72,6 +72,9 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. + * + * The returned dataframe has all of the columns in the input microbatch + the CDC metadata + * column. */ def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { // Proactively validate the reserved CDC metadata column does not exist in the microbatch. From 27aa75bbb9308e13bf508f00603b882648f8060b Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 01:29:47 +0000 Subject: [PATCH 03/13] use spark resolver --- .../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index bdc8bf1f9683b..953f963efed4b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -103,22 +103,16 @@ case class Scd1BatchProcessor( } private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val ignoreColumnNameCase = - !microbatchDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + val sqlConf = microbatchDf.sparkSession.sessionState.conf + val resolver = sqlConf.resolver microbatchDf.schema.fieldNames - .find { fieldName => - if (ignoreColumnNameCase) { - fieldName.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) - } else { - fieldName.equals(Scd1BatchProcessor.cdcMetadataColName) - } - } + .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) .foreach { conflictingColumnName => throw new AnalysisException( errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(!ignoreColumnNameCase), + "caseSensitivity" -> CaseSensitivityLabels.of(sqlConf.caseSensitiveAnalysis), "columnName" -> conflictingColumnName, "schemaName" -> "microbatch", "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName From c1552595119600bc2dfd0008b5b40b27f59cff3e Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:10:09 +0000 Subject: [PATCH 04/13] lingint --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 953f963efed4b..1e35678a7200e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -72,7 +72,7 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. - * + * * The returned dataframe has all of the columns in the input microbatch + the CDC metadata * column. */ From ca4e971c5a5b1d8f1659d9de8a87aa85820646ff Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 18:22:41 +0000 Subject: [PATCH 05/13] rebase conflict --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 1e35678a7200e..00450bee6984b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -103,8 +103,8 @@ case class Scd1BatchProcessor( } private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val sqlConf = microbatchDf.sparkSession.sessionState.conf - val resolver = sqlConf.resolver + val microbatchSqlConf = microbatchDf.sparkSession.sessionState.conf + val resolver = microbatchSqlConf.resolver microbatchDf.schema.fieldNames .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) @@ -112,7 +112,7 @@ case class Scd1BatchProcessor( throw new AnalysisException( errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(sqlConf.caseSensitiveAnalysis), + "caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis), "columnName" -> conflictingColumnName, "schemaName" -> "microbatch", "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName From 801a8b71f7711598023ec5664608742dc9e34172 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 23:01:00 +0000 Subject: [PATCH 06/13] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 8 +++ .../autocdc/Scd1BatchProcessorSuite.scala | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 00450bee6984b..5554efdafd68c 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -73,6 +73,14 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. * + * This must run before any column selection is applied to the microbatch. The + * [[ChangeArgs.deleteCondition]] and [[ChangeArgs.sequencing]] expressions are evaluated against + * the current microbatch schema, and column selection may drop inputs required by those + * expressions. + * + * Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] evaluates to true. A + * false or null delete condition classifies the row as an upsert. + * * The returned dataframe has all of the columns in the input microbatch + the CDC metadata * column. */ diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 3975f11f9b68f..3042e61838de9 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -482,6 +482,32 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("extendMicrobatchRowsWithCdcMetadata treats null deleteCondition results as upserts") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, 10L, null) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Row(1, 10L, null, Row(null, 10L)) + ) + } + test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " + "when deleteCondition is None") { val schema = new StructType() @@ -646,4 +672,42 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } } + + test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata column " + + "case-insensitively") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val conflictingColumnName = Scd1BatchProcessor.cdcMetadataColName.toUpperCase + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add(conflictingColumnName, StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "user-supplied") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkError( + exception = intercept[AnalysisException] { + processor.extendMicrobatchRowsWithCdcMetadata(batch) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> conflictingColumnName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } From c36f91015ea8d314cbf5d9b06a1eb355e555b137 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 20 May 2026 21:08:50 +0000 Subject: [PATCH 07/13] rebase conflicts --- .../sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 3042e61838de9..9368205807cd4 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -255,7 +255,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -322,7 +323,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.greatest(F.col("seq"), F.col("alt_seq")), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -377,7 +379,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName(reservedColName)), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkError( From 25387c3c77c6f6fe1a54de75673e2685969aeefa Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 21 May 2026 18:50:31 +0000 Subject: [PATCH 08/13] PR feedback --- .../pipelines/autocdc/Scd1BatchProcessor.scala | 16 ++++++++++------ .../autocdc/Scd1BatchProcessorSuite.scala | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 5554efdafd68c..d50c30919ba8d 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -81,12 +81,16 @@ case class Scd1BatchProcessor( * Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] evaluates to true. A * false or null delete condition classifies the row as an upsert. * + * @param validatedMicrobatch A microbatch that has already been validated such that the + * sequencing column should not contain null values, and its data type + * should support ordering. + * * The returned dataframe has all of the columns in the input microbatch + the CDC metadata * column. */ - def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): DataFrame = { // Proactively validate the reserved CDC metadata column does not exist in the microbatch. - validateCdcMetadataColumnNotPresent(microbatchDf) + validateCdcMetadataColumnNotPresent(validatedMicrobatch) val rowDeleteSequence: Column = changeArgs.deleteCondition match { case Some(deleteCondition) => @@ -100,7 +104,7 @@ case class Scd1BatchProcessor( // set of CDC event types. F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null)) - microbatchDf.withColumn( + validatedMicrobatch.withColumn( Scd1BatchProcessor.cdcMetadataColName, Scd1BatchProcessor.constructCdcMetadataCol( deleteSequence = rowDeleteSequence, @@ -110,11 +114,11 @@ case class Scd1BatchProcessor( ) } - private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val microbatchSqlConf = microbatchDf.sparkSession.sessionState.conf + private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { + val microbatchSqlConf = microbatch.sparkSession.sessionState.conf val resolver = microbatchSqlConf.resolver - microbatchDf.schema.fieldNames + microbatch.schema.fieldNames .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) .foreach { conflictingColumnName => throw new AnalysisException( diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 9368205807cd4..babd425f23403 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -471,7 +471,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { resolvedSequencingType = LongType ) - // Mutual-exclusivity invariant: each row's _cdc_metadata struct has exactly one of + // Mutual-exclusivity invariant: each row's CDC metadata struct has exactly one of // (deleteSequence, upsertSequence) non-null, and the non-null side carries the row's // sequence value. checkAnswer( From be4b09c020d3e7311fea36b7ef62c73c87f8d988 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 21 May 2026 21:48:24 +0000 Subject: [PATCH 09/13] scalalint LOCALE --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index babd425f23403..1cb3483164368 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.pipelines.autocdc +import java.util.Locale + import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.internal.SQLConf @@ -679,7 +681,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata column " + "case-insensitively") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - val conflictingColumnName = Scd1BatchProcessor.cdcMetadataColName.toUpperCase + val conflictingColumnName = Scd1BatchProcessor.cdcMetadataColName.toUpperCase(Locale.ROOT) val schema = new StructType() .add("id", IntegerType) .add("seq", LongType) From f7411d8d16821bc863d55cb91d1b75f1519c2eaf Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 21:30:02 +0000 Subject: [PATCH 10/13] project target columns onto microbatch --- .../autocdc/Scd1BatchProcessor.scala | 50 +++++ .../autocdc/Scd1BatchProcessorSuite.scala | 182 ++++++++++++++++++ 2 files changed, 232 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index d50c30919ba8d..6f6ca67dbf1ef 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -114,6 +114,56 @@ case class Scd1BatchProcessor( ) } + /** + * Project the user-defined column selection onto the microbatch. By this point the input + * microbatch should already have projected its CDC metadata, because it's possible that the + * user-defined column selection drops columns that are otherwise necessary to compute the + * CDC metadata. + * + * Returned dataframe's schema is: all of the user-selected columns in the input dataframe as per + * [[ChangeArgs.columnSelection]] + the CDC metadata column. + */ + def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = { + val ignoreColumnNameCase = + !microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + + // The schema of the microbatch less the system-projected CDC metadata column, i.e. the + // original microbatch schema. + val userColumnsInMicrobatchSchema = + StructType( + microbatchWithCdcMetadataDf.schema.fields.filterNot { field => + if (ignoreColumnNameCase) { + field.name.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) + } else { + field.name.equals(Scd1BatchProcessor.cdcMetadataColName) + } + } + ) + + val userSelectedColumnsInMicrobatchSchema = + ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = userColumnsInMicrobatchSchema, + columnSelection = changeArgs.columnSelection, + ignoreCase = ignoreColumnNameCase + ) + + // In addition to the explicit user-selected columns, re-project the operational CDC metadata + // column as the last column. + val finalColumnsInMicrobatchToSelect = + userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => { + // Spark drops backticks in the schema, quote all identifiers for safety before executing + // select. Identifiers could have special characters such as '.'. + F.col(QuotingUtils.quoteIdentifier(colName)) + }) :+ F.col( + Scd1BatchProcessor.cdcMetadataColName + ) + + microbatchWithCdcMetadataDf.select( + finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _* + ) + } + private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { val microbatchSqlConf = microbatch.sparkSession.sessionState.conf val resolver = microbatchSqlConf.resolver diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 1cb3483164368..2de9b2fdc4001 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -27,6 +27,20 @@ import org.apache.spark.sql.types._ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { + /** + * Test Schema for a microbatch that already has the SCD1 CDC metadata column projected. + */ + private val microbatchWithCdcMetadataSchema: StructType = new StructType() + .add("id", IntegerType) + .add("name", StringType) + .add("age", IntegerType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + ) + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) @@ -715,4 +729,172 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } } + + test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC metadata column " + + "when columnSelection is None") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)), + Row(2, "bob", 25, Row(20L, null)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = None + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // None selection is no-op on the user columns, and the CDC metadata column is unconditionally + // re-projected last, so the output shape exactly matches the input. + assert(result.schema.fieldNames.toSeq == microbatchWithCdcMetadataSchema.fieldNames.toSeq) + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "alice", 30, Row(null, 10L)), + Row(2, "bob", 25, Row(20L, null)) + ) + ) + } + + test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column even when " + + "IncludeColumns does not contain it") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch respects exclude column") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("age")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert( + result.schema.fieldNames.toSeq == + Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName) + ) + checkAnswer( + df = result, + expectedAnswer = Row(1, "alice", Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema order") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + // User specifies (age, id) -- intentionally different from the schema order (id, age). + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // Output column order follows the original microbatch schema (id before age), not the order + // in which the user listed columns in IncludeColumns. The CDC metadata column is appended + // last as always. + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + + test("projectTargetColumnsOntoMicrobatch handles backticked column names containing a " + + "literal dot") { + val schema = new StructType() + .add("id", IntegerType) + // Even if a column is created with backticks via DDL, those backticks are consumed by Spark + // before resolving the schema; they won't show up in the schema field. + .add("user.id", StringType) + .add( + Scd1BatchProcessor.cdcMetadataColName, + new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)) + + val batch = microbatchOf(schema)( + Row(1, "u-100", Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq( + UnqualifiedColumnName("id"), + UnqualifiedColumnName("`user.id`") + ) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == + Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, "u-100", Row(null, 10L)) + ) + } } From d7500816183e7c7aa7bbbc196ab8b72b3ae73220 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:31:55 +0000 Subject: [PATCH 11/13] reuse applyToSchema --- .../autocdc/Scd1BatchProcessor.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 6f6ca67dbf1ef..0a28e1d39cf00 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -127,18 +127,23 @@ case class Scd1BatchProcessor( val ignoreColumnNameCase = !microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis - // The schema of the microbatch less the system-projected CDC metadata column, i.e. the - // original microbatch schema. - val userColumnsInMicrobatchSchema = - StructType( - microbatchWithCdcMetadataDf.schema.fields.filterNot { field => - if (ignoreColumnNameCase) { - field.name.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) - } else { - field.name.equals(Scd1BatchProcessor.cdcMetadataColName) - } - } - ) + // Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e. + // the The user schema is the microbatch's schema after dropping the system columns - i.e the + // CDC metadata column. + + // We project out the system columns before applying user selection and project back in + // afterwards, so that users cannot control whether these [necessary] columns show up in the + // target table. + val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = microbatchWithCdcMetadataDf.schema, + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName)) + ) + ), + ignoreCase = ignoreColumnNameCase + ) val userSelectedColumnsInMicrobatchSchema = ColumnSelection.applyToSchema( From a42da1d9a883f957df25fa40642dc25496dd725b Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 18:18:50 +0000 Subject: [PATCH 12/13] rebase conflict --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 0a28e1d39cf00..122d6447df9e9 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -124,8 +124,8 @@ case class Scd1BatchProcessor( * [[ChangeArgs.columnSelection]] + the CDC metadata column. */ def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: DataFrame): DataFrame = { - val ignoreColumnNameCase = - !microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + val caseSensitiveColumnComparison = + microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis // Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e. // the The user schema is the microbatch's schema after dropping the system columns - i.e the @@ -142,7 +142,7 @@ case class Scd1BatchProcessor( Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName)) ) ), - ignoreCase = ignoreColumnNameCase + caseSensitive = caseSensitiveColumnComparison ) val userSelectedColumnsInMicrobatchSchema = @@ -150,7 +150,7 @@ case class Scd1BatchProcessor( schemaName = "microbatch", schema = userColumnsInMicrobatchSchema, columnSelection = changeArgs.columnSelection, - ignoreCase = ignoreColumnNameCase + caseSensitive = caseSensitiveColumnComparison ) // In addition to the explicit user-selected columns, re-project the operational CDC metadata From b71ec8e613032eb81a7b76f4be1c4ad8d60dbff1 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 21 May 2026 19:05:39 +0000 Subject: [PATCH 13/13] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 9 ++--- .../autocdc/Scd1BatchProcessorSuite.scala | 35 +++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 122d6447df9e9..03aaf284f070f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -127,12 +127,9 @@ case class Scd1BatchProcessor( val caseSensitiveColumnComparison = microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis - // Calculate the schema of the microbatch less the system-projected CDC metadata column, i.e. - // the The user schema is the microbatch's schema after dropping the system columns - i.e the - // CDC metadata column. - - // We project out the system columns before applying user selection and project back in - // afterwards, so that users cannot control whether these [necessary] columns show up in the + // The user schema is the microbatch schema after dropping the system CDC metadata column. + // We project out the system column before applying user selection and project it back in + // afterwards, so that users cannot control whether this [necessary] column shows up in the // target table. val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema( schemaName = "microbatch", diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 2de9b2fdc4001..a49c89e357555 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -897,4 +897,39 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { expectedAnswer = Row(1, "u-100", Row(null, 10L)) ) } + + test("projectTargetColumnsOntoMicrobatch resolves columnSelection case-insensitively " + + "when SQLConf.CASE_SENSITIVE=false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val batch = microbatchOf(microbatchWithCdcMetadataSchema)( + Row(1, "alice", 30, Row(null, 10L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + // User columns intentionally use a different case than the schema (id, age). + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE")) + ) + ) + ), + resolvedSequencingType = LongType + ) + + val result = processor.projectTargetColumnsOntoMicrobatch(batch) + + // Output column names follow the microbatch schema's casing, not the casing in the user's + // columnSelection. The CDC metadata column is appended last as always. + assert(result.schema.fieldNames.toSeq == + Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, Row(null, 10L)) + ) + } + } }