From 8b08cbe4d0b8e917d9cf026f4d5c2180a15253c0 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 12 May 2026 20:55:10 +0000 Subject: [PATCH 01/25] Introduce ChangeArgs --- .../resources/error/error-conditions.json | 13 +++ .../sql/pipelines/autocdc/ChangeArgs.scala | 90 +++++++++++++++++++ .../pipelines/autocdc/ChangeArgsSuite.scala | 90 +++++++++++++++++++ 3 files changed, 193 insertions(+) create mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala create mode 100644 sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 889ecf9f7b08a..322a97a007f5b 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3374,6 +3374,19 @@ ], "sqlState" : "42000" }, + "AUTOCDC_INVALID_COLUMN_SELECTION" : { + "message" : [ + "Invalid column selection." + ], + "subClass" : { + "COLUMNS_NOT_FOUND" : { + "message" : [ + "The following columns are not present in the schema: . Available columns: ." + ] + } + }, + "sqlState" : "42703" + }, "INVALID_CONF_VALUE" : { "message" : [ "The value '' in the config \"\" is invalid." diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala new file mode 100644 index 0000000000000..72559d36a31e1 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.types.StructType + +sealed trait ColumnSelection +object ColumnSelection { + type ColumnList = Seq[String] + case class IncludeColumns(columns: ColumnList) extends ColumnSelection + case class ExcludeColumns(columns: ColumnList) extends ColumnSelection + + /** + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. + * Field names are matched exactly. Field order follows the original schema (filtered in place). + */ + def applyToSchema(schema: StructType, columnSelection: Option[ColumnSelection]): StructType = + columnSelection match { + case None => + // A none column selection is interpreted as a no-op (select all columns existing in the schema). + schema + case Some(IncludeColumns(includeColumns)) => + validateColumnsExistInSchema(columns = includeColumns, schema = schema) + + val includeColumnSet = includeColumns.toSet + StructType(schema.fields.filter(f => includeColumnSet.contains(f.name))) + case Some(ExcludeColumns(excludeColumns)) => + validateColumnsExistInSchema(columns = excludeColumns, schema = schema) + + val excludeColumnSet = excludeColumns.toSet + StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name))) + } + + private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = { + val schemaColumns = schema.fieldNames.toSet + val missingColumns = columns.filterNot(schemaColumns.contains).distinct + if (missingColumns.nonEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + messageParameters = Map( + "missingColumns" -> missingColumns.mkString(", "), + "availableColumns" -> schema.fieldNames.mkString(", ") + )) + } + } +} + +/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ +sealed trait ScdType + +object ScdType { + case object Type1 extends ScdType + case object Type2 extends ScdType +} + +/** + * Configuration for an AutoCDC flow. + * + * @param keys The column(s) that uniquely identify a row in the source data. + * @param sequencing Expression ordering CDC events to correctly resolve out-of-order + * arrivals. Must be a sortable type. + * @param deleteCondition Expression that marks a source row as a DELETE. When None, all + * rows are treated as upserts. + * @param storedAsScdType The SCD strategy these args should be applied to. + * @param columnSelection Which source columns to include in the target table. None means + * all columns. + */ +case class ChangeArgs( + keys: Seq[String], + sequencing: Column, + deleteCondition: Option[Column] = None, + storedAsScdType: ScdType, + columnSelection: Option[ColumnSelection] = None +) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala new file mode 100644 index 0000000000000..5c43d4a4e11ab --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} + +class ChangeArgsSuite extends SparkFunSuite { + + private val sourceSchema = new StructType() + .add("id", IntegerType, nullable = false) + .add("Name", StringType) + .add("age", IntegerType) + + test("ColumnSelection None leaves schema unchanged") { + assert(ColumnSelection.applyToSchema(sourceSchema, None) == sourceSchema) + } + + test("ColumnSelection IncludeColumns filters by exact name in schema order") { + val filteredSchema = ColumnSelection.applyToSchema( + sourceSchema, + Some(ColumnSelection.IncludeColumns(Seq("age", "Name")))) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection ExcludeColumns filters by exact name") { + val filteredSchema = ColumnSelection.applyToSchema( + sourceSchema, + Some(ColumnSelection.ExcludeColumns(Seq("id")))) + + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection IncludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + sourceSchema, + // Column inclusion is case-sensitive; "name" will not match against "Name". + Some(ColumnSelection.IncludeColumns(Seq("name", "missing"))) + ) + }, + condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + sqlState = "42703", + parameters = Map( + "missingColumns" -> "name, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } + + test("ColumnSelection ExcludeColumns fails for columns not present in schema") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + sourceSchema, + // Column exclusion is case-sensitive; "NAME" will not match against "Name". + Some(ColumnSelection.ExcludeColumns(Seq("NAME", "missing"))) + ) + }, + condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + sqlState = "42703", + parameters = Map( + "missingColumns" -> "NAME, missing", + "availableColumns" -> "id, Name, age" + ) + ) + } +} From 202f3a58eaaab5fa08574a46815d96d7ac7cf48e Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 12 May 2026 22:55:00 +0000 Subject: [PATCH 02/25] linting --- .../resources/error/error-conditions.json | 26 +++++++++---------- .../sql/pipelines/autocdc/ChangeArgs.scala | 6 ++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 322a97a007f5b..4fdd1ef399ff6 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -82,6 +82,19 @@ ], "sqlState" : "XX000" }, + "AUTOCDC_INVALID_COLUMN_SELECTION" : { + "message" : [ + "Invalid column selection." + ], + "subClass" : { + "COLUMNS_NOT_FOUND" : { + "message" : [ + "The following columns are not present in the schema: . Available columns: ." + ] + } + }, + "sqlState" : "42703" + }, "APPEND_ONCE_FROM_BATCH_QUERY" : { "message" : [ "Creating a streaming table from a batch query prevents incremental loading of new data from source. Offending table: ''.", @@ -3374,19 +3387,6 @@ ], "sqlState" : "42000" }, - "AUTOCDC_INVALID_COLUMN_SELECTION" : { - "message" : [ - "Invalid column selection." - ], - "subClass" : { - "COLUMNS_NOT_FOUND" : { - "message" : [ - "The following columns are not present in the schema: . Available columns: ." - ] - } - }, - "sqlState" : "42703" - }, "INVALID_CONF_VALUE" : { "message" : [ "The value '' in the config \"\" is invalid." diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 72559d36a31e1..c02fe86a8273e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -33,16 +33,16 @@ object ColumnSelection { def applyToSchema(schema: StructType, columnSelection: Option[ColumnSelection]): StructType = columnSelection match { case None => - // A none column selection is interpreted as a no-op (select all columns existing in the schema). + // A none column selection is interpreted as a no-op. schema case Some(IncludeColumns(includeColumns)) => validateColumnsExistInSchema(columns = includeColumns, schema = schema) - + val includeColumnSet = includeColumns.toSet StructType(schema.fields.filter(f => includeColumnSet.contains(f.name))) case Some(ExcludeColumns(excludeColumns)) => validateColumnsExistInSchema(columns = excludeColumns, schema = schema) - + val excludeColumnSet = excludeColumns.toSet StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name))) } From 4ac75e7c5010eba6eb3a7e6ceb5c0556025874ce Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 13 May 2026 01:37:12 +0000 Subject: [PATCH 03/25] reorder error condition --- .../resources/error/error-conditions.json | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 4fdd1ef399ff6..3d98bae966953 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -82,19 +82,6 @@ ], "sqlState" : "XX000" }, - "AUTOCDC_INVALID_COLUMN_SELECTION" : { - "message" : [ - "Invalid column selection." - ], - "subClass" : { - "COLUMNS_NOT_FOUND" : { - "message" : [ - "The following columns are not present in the schema: . Available columns: ." - ] - } - }, - "sqlState" : "42703" - }, "APPEND_ONCE_FROM_BATCH_QUERY" : { "message" : [ "Creating a streaming table from a batch query prevents incremental loading of new data from source. Offending table: '
'.", @@ -204,6 +191,19 @@ ], "sqlState" : "0A000" }, + "AUTOCDC_INVALID_COLUMN_SELECTION" : { + "message" : [ + "Invalid column selection." + ], + "subClass" : { + "COLUMNS_NOT_FOUND" : { + "message" : [ + "The following columns are not present in the schema: . Available columns: ." + ] + } + }, + "sqlState" : "42703" + }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { "message" : [ "Cannot write null value for field defined as non-null Avro data type .", From 11606c5676c7e7ce598be72786fc01b79f63e464 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 13 May 2026 16:58:44 +0000 Subject: [PATCH 04/25] PR feedback --- .../resources/error/error-conditions.json | 5 ++ .../sql/pipelines/autocdc/ChangeArgs.scala | 50 +++++++++++-- .../pipelines/autocdc/ChangeArgsSuite.scala | 73 ++++++++++++++++++- 3 files changed, 116 insertions(+), 12 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3d98bae966953..617924ce50626 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -200,6 +200,11 @@ "message" : [ "The following columns are not present in the schema: . Available columns: ." ] + }, + "MULTIPART_COLUMN_IDENTIFIER" : { + "message" : [ + "Column selection entries must be a single column identifier; got the multi-part identifier (parts: )." + ] } }, "sqlState" : "42703" diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index c02fe86a8273e..65a41521686e7 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -18,11 +18,43 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.sql.{AnalysisException, Column} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types.StructType +/** A column reference that must be a single, unqualified identifier (no nested + * field path and no table/alias qualifier). The constructor parses + * [[columnName]] with the Spark SQL parser and throws an [[AnalysisException]] + * if it does not resolve to exactly one name part. + */ +case class UnqualifiedColumnName(name: String) { + UnqualifiedColumnName.validate(name) +} + +object UnqualifiedColumnName { + private def validate(columnName: String): Unit = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName) + if (nameParts.length != 1) { + throw multipartColumnIdentifierError(columnName, nameParts) + } + } + + private def multipartColumnIdentifierError( + columnName: String, + nameParts: Seq[String] + ): AnalysisException = + new AnalysisException( + errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + messageParameters = Map( + "columnName" -> columnName, + "nameParts" -> nameParts.mkString(", ") + ) + ) +} + sealed trait ColumnSelection object ColumnSelection { - type ColumnList = Seq[String] + type ColumnList = Seq[UnqualifiedColumnName] + case class IncludeColumns(columns: ColumnList) extends ColumnSelection case class ExcludeColumns(columns: ColumnList) extends ColumnSelection @@ -36,20 +68,20 @@ object ColumnSelection { // A none column selection is interpreted as a no-op. schema case Some(IncludeColumns(includeColumns)) => - validateColumnsExistInSchema(columns = includeColumns, schema = schema) + validateColumnsExistInSchema(includeColumns, schema) - val includeColumnSet = includeColumns.toSet + val includeColumnSet = includeColumns.map(_.name).toSet StructType(schema.fields.filter(f => includeColumnSet.contains(f.name))) case Some(ExcludeColumns(excludeColumns)) => - validateColumnsExistInSchema(columns = excludeColumns, schema = schema) + validateColumnsExistInSchema(excludeColumns, schema) - val excludeColumnSet = excludeColumns.toSet + val excludeColumnSet = excludeColumns.map(_.name).toSet StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name))) } private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = { val schemaColumns = schema.fieldNames.toSet - val missingColumns = columns.filterNot(schemaColumns.contains).distinct + val missingColumns = columns.map(_.name).filterNot(schemaColumns.contains).distinct if (missingColumns.nonEmpty) { throw new AnalysisException( errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", @@ -65,7 +97,9 @@ object ColumnSelection { sealed trait ScdType object ScdType { + /** Representation for the standard SCD1 strategy. */ case object Type1 extends ScdType + /** Representation for the standard SCD2 strategy. */ case object Type2 extends ScdType } @@ -78,13 +112,13 @@ object ScdType { * @param deleteCondition Expression that marks a source row as a DELETE. When None, all * rows are treated as upserts. * @param storedAsScdType The SCD strategy these args should be applied to. - * @param columnSelection Which source columns to include in the target table. None means + * @param columnSelection Which source columns to select in the target table. None means * all columns. */ case class ChangeArgs( keys: Seq[String], sequencing: Column, - deleteCondition: Option[Column] = None, storedAsScdType: ScdType, + deleteCondition: Option[Column] = None, columnSelection: Option[ColumnSelection] = None ) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 5c43d4a4e11ab..25efccdba41f0 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class ChangeArgsSuite extends SparkFunSuite { @@ -35,7 +36,8 @@ class ChangeArgsSuite extends SparkFunSuite { test("ColumnSelection IncludeColumns filters by exact name in schema order") { val filteredSchema = ColumnSelection.applyToSchema( sourceSchema, - Some(ColumnSelection.IncludeColumns(Seq("age", "Name")))) + Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name"))))) assert(filteredSchema == new StructType() .add("Name", StringType) @@ -45,7 +47,7 @@ class ChangeArgsSuite extends SparkFunSuite { test("ColumnSelection ExcludeColumns filters by exact name") { val filteredSchema = ColumnSelection.applyToSchema( sourceSchema, - Some(ColumnSelection.ExcludeColumns(Seq("id")))) + Some(ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))))) assert(filteredSchema == new StructType() .add("Name", StringType) @@ -58,7 +60,8 @@ class ChangeArgsSuite extends SparkFunSuite { ColumnSelection.applyToSchema( sourceSchema, // Column inclusion is case-sensitive; "name" will not match against "Name". - Some(ColumnSelection.IncludeColumns(Seq("name", "missing"))) + Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")))) ) }, condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", @@ -76,7 +79,8 @@ class ChangeArgsSuite extends SparkFunSuite { ColumnSelection.applyToSchema( sourceSchema, // Column exclusion is case-sensitive; "NAME" will not match against "Name". - Some(ColumnSelection.ExcludeColumns(Seq("NAME", "missing"))) + Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")))) ) }, condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", @@ -87,4 +91,65 @@ class ChangeArgsSuite extends SparkFunSuite { ) ) } + + test("UnqualifiedColumnName accepts a simple single-part identifier") { + assert(UnqualifiedColumnName("col").name == "col") + } + + test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") { + // Backticks make the dot part of a single name part, so this passes validation. + assert(UnqualifiedColumnName("`a.b`").name == "`a.b`") + } + + test("UnqualifiedColumnName rejects a dotted (multi-part) identifier") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("a.b") + }, + condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "a.b", + "nameParts" -> "a, b" + ) + ) + } + + test("UnqualifiedColumnName rejects a qualified column reference") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("src.x") + }, + condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "src.x", + "nameParts" -> "src, x" + ) + ) + } + + test("UnqualifiedColumnName rejects an identifier with three or more parts") { + checkError( + exception = intercept[AnalysisException] { + UnqualifiedColumnName("a.b.c") + }, + condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + sqlState = "42703", + parameters = Map( + "columnName" -> "a.b.c", + "nameParts" -> "a, b, c" + ) + ) + } + + test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") { + checkError( + exception = intercept[ParseException] { + UnqualifiedColumnName("") + }, + condition = "PARSE_EMPTY_STATEMENT", + sqlState = Some("42617") + ) + } } From d1a38e6da1c72477c0d315d7ea1a81b2bd620ef3 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 13 May 2026 19:59:25 +0000 Subject: [PATCH 05/25] linting --- .../spark/sql/pipelines/autocdc/ChangeArgs.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 65a41521686e7..f735c30155059 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -21,11 +21,11 @@ import org.apache.spark.sql.{AnalysisException, Column} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types.StructType -/** A column reference that must be a single, unqualified identifier (no nested - * field path and no table/alias qualifier). The constructor parses - * [[columnName]] with the Spark SQL parser and throws an [[AnalysisException]] - * if it does not resolve to exactly one name part. - */ +/** + * A column reference that must be a single, unqualified identifier (no nested field path and + * no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and + * throws an [[AnalysisException]] if it does not resolve to exactly one name part. + */ case class UnqualifiedColumnName(name: String) { UnqualifiedColumnName.validate(name) } From bbe5335983db90d86370c2df6a4b2222fb4e8bd5 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 14 May 2026 17:16:38 +0000 Subject: [PATCH 06/25] PR feedback --- .../resources/error/error-conditions.json | 22 +- .../sql/pipelines/autocdc/ChangeArgs.scala | 131 +++++++---- .../pipelines/autocdc/ChangeArgsSuite.scala | 207 +++++++++++++++--- 3 files changed, 283 insertions(+), 77 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 617924ce50626..ddd6b678fecc7 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -191,22 +191,16 @@ ], "sqlState" : "0A000" }, - "AUTOCDC_INVALID_COLUMN_SELECTION" : { + "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : { "message" : [ - "Invalid column selection." + "The following columns are not present in the schema: . Available columns: . Matching was ." + ], + "sqlState" : "42703" + }, + "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : { + "message" : [ + "Expected a single column identifier; got the multi-part identifier (parts: )." ], - "subClass" : { - "COLUMNS_NOT_FOUND" : { - "message" : [ - "The following columns are not present in the schema: . Available columns: ." - ] - }, - "MULTIPART_COLUMN_IDENTIFIER" : { - "message" : [ - "Column selection entries must be a single column identifier; got the multi-part identifier (parts: )." - ] - } - }, "sqlState" : "42703" }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index f735c30155059..4ca27dd872c12 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -17,25 +17,40 @@ package org.apache.spark.sql.pipelines.autocdc +import java.util.Locale + import org.apache.spark.sql.{AnalysisException, Column} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.util.QuotingUtils import org.apache.spark.sql.types.StructType /** - * A column reference that must be a single, unqualified identifier (no nested field path and - * no table/alias qualifier). The constructor parses [[name]] with the Spark SQL parser and - * throws an [[AnalysisException]] if it does not resolve to exactly one name part. + * A single, unqualified column identifier (no nested path or table/alias qualifier). Backticks + * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for direct schema-fieldName + * comparison and [[quoted]] for APIs that re-parse identifier strings. + * + * Declared `final class` so the smart constructor is the only path to construction (no synthesized + * `copy` can bypass it). */ -case class UnqualifiedColumnName(name: String) { - UnqualifiedColumnName.validate(name) +final class UnqualifiedColumnName private (val name: String) extends Serializable { + + def quoted: String = QuotingUtils.quoteIdentifier(name) + + override def equals(other: Any): Boolean = other match { + case that: UnqualifiedColumnName => name == that.name + case _ => false + } + + override def hashCode(): Int = name.hashCode } object UnqualifiedColumnName { - private def validate(columnName: String): Unit = { - val nameParts = CatalystSqlParser.parseMultipartIdentifier(columnName) + def apply(input: String): UnqualifiedColumnName = { + val nameParts = CatalystSqlParser.parseMultipartIdentifier(input) if (nameParts.length != 1) { - throw multipartColumnIdentifierError(columnName, nameParts) + throw multipartColumnIdentifierError(input, nameParts) } + new UnqualifiedColumnName(nameParts.head) } private def multipartColumnIdentifierError( @@ -43,7 +58,7 @@ object UnqualifiedColumnName { nameParts: Seq[String] ): AnalysisException = new AnalysisException( - errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", messageParameters = Map( "columnName" -> columnName, "nameParts" -> nameParts.mkString(", ") @@ -53,44 +68,84 @@ object UnqualifiedColumnName { sealed trait ColumnSelection object ColumnSelection { - type ColumnList = Seq[UnqualifiedColumnName] - case class IncludeColumns(columns: ColumnList) extends ColumnSelection - case class ExcludeColumns(columns: ColumnList) extends ColumnSelection + case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends ColumnSelection + case class ExcludeColumns(columns: Seq[UnqualifiedColumnName]) + extends ColumnSelection /** - * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. - * Field names are matched exactly. Field order follows the original schema (filtered in place). + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. Field + * order follows the original schema; filtering happens in place. */ - def applyToSchema(schema: StructType, columnSelection: Option[ColumnSelection]): StructType = - columnSelection match { - case None => - // A none column selection is interpreted as a no-op. - schema - case Some(IncludeColumns(includeColumns)) => - validateColumnsExistInSchema(includeColumns, schema) - - val includeColumnSet = includeColumns.map(_.name).toSet - StructType(schema.fields.filter(f => includeColumnSet.contains(f.name))) - case Some(ExcludeColumns(excludeColumns)) => - validateColumnsExistInSchema(excludeColumns, schema) - - val excludeColumnSet = excludeColumns.map(_.name).toSet - StructType(schema.fields.filterNot(f => excludeColumnSet.contains(f.name))) - } + def applyToSchema( + schema: StructType, + columnSelection: Option[ColumnSelection], + ignoreCase: Boolean): StructType = columnSelection match { + case None => + // A none column selection is interpreted as a no-op. + schema + case Some(IncludeColumns(cols)) => + val includeColumnNames = cols.map(_.name) + validateColumnsExistInSchema(includeColumnNames, schema, ignoreCase) + + val caseNormalizedIncludeColumnNames = + includeColumnNames.map(normalizeCase(_, ignoreCase)).toSet + + StructType( + schema.fields.filter(schemaField => + caseNormalizedIncludeColumnNames.contains(normalizeCase(schemaField.name, ignoreCase)) + ) + ) + case Some(ExcludeColumns(cols)) => + val excludeColumnNames = cols.map(_.name) + validateColumnsExistInSchema(excludeColumnNames, schema, ignoreCase) + + val caseNormalizedExcludeColumnNames = + excludeColumnNames.map(normalizeCase(_, ignoreCase)).toSet - private def validateColumnsExistInSchema(columns: ColumnList, schema: StructType): Unit = { - val schemaColumns = schema.fieldNames.toSet - val missingColumns = columns.map(_.name).filterNot(schemaColumns.contains).distinct - if (missingColumns.nonEmpty) { + StructType( + schema.fields.filterNot(schemaField => + caseNormalizedExcludeColumnNames.contains(normalizeCase(schemaField.name, ignoreCase)) + ) + ) + } + + private def validateColumnsExistInSchema( + columnNames: Seq[String], + schema: StructType, + ignoreCase: Boolean): Unit = { + val caseNormalizedSchemaColumns = + schema.fieldNames.map(normalizeCase(_, ignoreCase)).toSet + + // Compare folded forms but report the missing and available columns using their original + // casing so error messages reflect what the user actually wrote and what the schema holds. + val columnsMissingInSchema = columnNames + .filterNot(columnName => + caseNormalizedSchemaColumns.contains(normalizeCase(columnName, ignoreCase)) + ) + .distinct + + if (columnsMissingInSchema.nonEmpty) { throw new AnalysisException( - errorClass = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + errorClass = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", messageParameters = Map( - "missingColumns" -> missingColumns.mkString(", "), - "availableColumns" -> schema.fieldNames.mkString(", ") + "missingColumns" -> columnsMissingInSchema.mkString(", "), + "availableColumns" -> schema.fieldNames.mkString(", "), + "matching" -> (if (ignoreCase) "case-insensitive" else "case-sensitive") )) } } + + /** + * If ignoreCase, normalize all strings to lowercase for stable comparison. + */ + private def normalizeCase(name: String, ignoreCase: Boolean): String = { + if (ignoreCase) { + name.toLowerCase(Locale.ROOT) + } else { + name + } + } } /** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ @@ -116,7 +171,7 @@ object ScdType { * all columns. */ case class ChangeArgs( - keys: Seq[String], + keys: Seq[UnqualifiedColumnName], sequencing: Column, storedAsScdType: ScdType, deleteCondition: Option[Column] = None, diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 25efccdba41f0..50b0a517d8982 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{functions => F, AnalysisException, Row} import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructType} -class ChangeArgsSuite extends SparkFunSuite { +class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { private val sourceSchema = new StructType() .add("id", IntegerType, nullable = false) @@ -30,14 +31,24 @@ class ChangeArgsSuite extends SparkFunSuite { .add("age", IntegerType) test("ColumnSelection None leaves schema unchanged") { - assert(ColumnSelection.applyToSchema(sourceSchema, None) == sourceSchema) + assert( + ColumnSelection.applyToSchema( + schema = sourceSchema, + columnSelection = None, + ignoreCase = false + ) == sourceSchema) } test("ColumnSelection IncludeColumns filters by exact name in schema order") { val filteredSchema = ColumnSelection.applyToSchema( - sourceSchema, - Some(ColumnSelection.IncludeColumns( - Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name"))))) + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name")) + ) + ), + ignoreCase = false + ) assert(filteredSchema == new StructType() .add("Name", StringType) @@ -46,8 +57,12 @@ class ChangeArgsSuite extends SparkFunSuite { test("ColumnSelection ExcludeColumns filters by exact name") { val filteredSchema = ColumnSelection.applyToSchema( - sourceSchema, - Some(ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))))) + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))) + ), + ignoreCase = false + ) assert(filteredSchema == new StructType() .add("Name", StringType) @@ -58,17 +73,22 @@ class ChangeArgsSuite extends SparkFunSuite { checkError( exception = intercept[AnalysisException] { ColumnSelection.applyToSchema( - sourceSchema, - // Column inclusion is case-sensitive; "name" will not match against "Name". - Some(ColumnSelection.IncludeColumns( - Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")))) + schema = sourceSchema, + // Under ignoreCase = false, "name" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")) + ) + ), + ignoreCase = false ) }, - condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", sqlState = "42703", parameters = Map( "missingColumns" -> "name, missing", - "availableColumns" -> "id, Name, age" + "availableColumns" -> "id, Name, age", + "matching" -> "case-sensitive" ) ) } @@ -77,17 +97,98 @@ class ChangeArgsSuite extends SparkFunSuite { checkError( exception = intercept[AnalysisException] { ColumnSelection.applyToSchema( - sourceSchema, - // Column exclusion is case-sensitive; "NAME" will not match against "Name". - Some(ColumnSelection.ExcludeColumns( - Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")))) + schema = sourceSchema, + // Under ignoreCase = false, "NAME" will not match the schema field "Name". + columnSelection = Some( + ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")) + ) + ), + ignoreCase = false ) }, - condition = "AUTOCDC_INVALID_COLUMN_SELECTION.COLUMNS_NOT_FOUND", + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", sqlState = "42703", parameters = Map( "missingColumns" -> "NAME, missing", - "availableColumns" -> "id, Name, age" + "availableColumns" -> "id, Name, age", + "matching" -> "case-sensitive" + ) + ) + } + + test("ColumnSelection IncludeColumns matches case-insensitively under ignoreCase=true") { + // "NAME" and "AGE" do not exactly match the schema fields "Name" and "age", but + // ignoreCase = true folds both sides to lowercase before comparing. + val filteredSchema = ColumnSelection.applyToSchema( + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("AGE"), UnqualifiedColumnName("NAME")) + ) + ), + ignoreCase = true + ) + + // The retained fields keep their original casing from the schema, not the user's input. + assert(filteredSchema == new StructType() + .add("Name", StringType) + .add("age", IntegerType)) + } + + test("ColumnSelection deduplicates user-provided columns that normalize to the same name") { + // Under ignoreCase = true, "name" and "NAME" both fold to "name" and refer to the same + // schema field. The returned schema must include "Name" once, not twice. Output ordering + // and casing follow the schema, not the user's input. + val filteredSchema = ColumnSelection.applyToSchema( + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("NAME")) + ) + ), + ignoreCase = true + ) + + assert(filteredSchema == new StructType().add("Name", StringType)) + } + + test("ColumnSelection ExcludeColumns matches case-insensitively under ignoreCase=true") { + val filteredSchema = ColumnSelection.applyToSchema( + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) + ), + ignoreCase = true + ) + + assert(filteredSchema == new StructType() + .add("id", IntegerType, nullable = false) + .add("age", IntegerType)) + } + + test("ColumnSelection missing-column error under ignoreCase=true preserves user casing") { + checkError( + exception = intercept[AnalysisException] { + ColumnSelection.applyToSchema( + schema = sourceSchema, + // "NAME" matches "Name" under ignoreCase=true, but "Missing" has no schema match. + // The error message reports the user's original casing for the missing column and + // the schema's original casing for the available columns. + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("Missing")) + ) + ), + ignoreCase = true + ) + }, + condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", + sqlState = "42703", + parameters = Map( + "missingColumns" -> "Missing", + "availableColumns" -> "id, Name, age", + "matching" -> "case-insensitive" ) ) } @@ -97,8 +198,64 @@ class ChangeArgsSuite extends SparkFunSuite { } test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") { - // Backticks make the dot part of a single name part, so this passes validation. - assert(UnqualifiedColumnName("`a.b`").name == "`a.b`") + // Backticks make the dot part of a single name part, so this passes validation. The + // stored name is the parsed (unquoted) form so it matches the actual schema field name. + assert(UnqualifiedColumnName("`a.b`").name == "a.b") + } + + test("UnqualifiedColumnName.quoted is safe to pass to functions.col for literal-dot names") { + val schema = new StructType() + .add("a.b", IntegerType) + .add("c", IntegerType) + + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, 2), Row(3, 4))), + schema + ) + + val key = UnqualifiedColumnName("`a.b`") + + // Sanity-check: the unquoted `name` is not safe to pass to `functions.col`. The string is + // re-parsed and the literal dot is interpreted as a nested-field path separator, so the + // analyzer fails to resolve `a`.`b` against the available top-level columns. + checkError( + exception = intercept[AnalysisException] { + df.select(F.col(key.name)).collect() + }, + condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + sqlState = "42703", + parameters = Map( + "objectName" -> "`a`.`b`", + "proposal" -> "`a.b`, `c`" + ), + context = ExpectedContext( + fragment = "col", + callSitePattern = "" + ) + ) + + // The `quoted` form wraps the name in back-ticks so the re-parser treats the whole thing + // as a single identifier, resolving to the top-level "a.b" column. + assert(df.select(F.col(key.quoted)).collect().toSeq == Seq(Row(1), Row(3))) + } + + test("IncludeColumns correctly matches a backtick-quoted literal-dot column") { + val schema = new StructType() + .add("a.b", IntegerType) + .add("c", StringType) + + // The user writes `a.b` to refer to the literal-dot column "a.b" in the schema. After + // construction, the [[UnqualifiedColumnName]] holds "a.b", which matches the field name + // exactly and the column is included in the filtered schema. + val filteredSchema = ColumnSelection.applyToSchema( + schema = schema, + columnSelection = Some( + ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`a.b`"))) + ), + ignoreCase = false + ) + + assert(filteredSchema == new StructType().add("a.b", IntegerType)) } test("UnqualifiedColumnName rejects a dotted (multi-part) identifier") { @@ -106,7 +263,7 @@ class ChangeArgsSuite extends SparkFunSuite { exception = intercept[AnalysisException] { UnqualifiedColumnName("a.b") }, - condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", sqlState = "42703", parameters = Map( "columnName" -> "a.b", @@ -120,7 +277,7 @@ class ChangeArgsSuite extends SparkFunSuite { exception = intercept[AnalysisException] { UnqualifiedColumnName("src.x") }, - condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", sqlState = "42703", parameters = Map( "columnName" -> "src.x", @@ -134,7 +291,7 @@ class ChangeArgsSuite extends SparkFunSuite { exception = intercept[AnalysisException] { UnqualifiedColumnName("a.b.c") }, - condition = "AUTOCDC_INVALID_COLUMN_SELECTION.MULTIPART_COLUMN_IDENTIFIER", + condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", sqlState = "42703", parameters = Map( "columnName" -> "a.b.c", From 95ca0e1eb057b664880936e525ba615e6a1fb0f9 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 17:32:58 +0000 Subject: [PATCH 07/25] buff error message and revert to case class --- .../resources/error/error-conditions.json | 2 +- .../sql/pipelines/autocdc/ChangeArgs.scala | 35 ++++++++++--------- .../pipelines/autocdc/ChangeArgsSuite.scala | 25 +++++++++---- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index ddd6b678fecc7..f009f947248b5 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -193,7 +193,7 @@ }, "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : { "message" : [ - "The following columns are not present in the schema: . Available columns: . Matching was ." + "Using column name comparison, the following columns are not present in the schema: . Available columns: ." ], "sqlState" : "42703" }, diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 4ca27dd872c12..0d912c3102b0e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -28,20 +28,9 @@ import org.apache.spark.sql.types.StructType * A single, unqualified column identifier (no nested path or table/alias qualifier). Backticks * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for direct schema-fieldName * comparison and [[quoted]] for APIs that re-parse identifier strings. - * - * Declared `final class` so the smart constructor is the only path to construction (no synthesized - * `copy` can bypass it). */ -final class UnqualifiedColumnName private (val name: String) extends Serializable { - +case class UnqualifiedColumnName private (name: String) { def quoted: String = QuotingUtils.quoteIdentifier(name) - - override def equals(other: Any): Boolean = other match { - case that: UnqualifiedColumnName => name == that.name - case _ => false - } - - override def hashCode(): Int = name.hashCode } object UnqualifiedColumnName { @@ -78,6 +67,7 @@ object ColumnSelection { * order follows the original schema; filtering happens in place. */ def applyToSchema( + schemaName: String, schema: StructType, columnSelection: Option[ColumnSelection], ignoreCase: Boolean): StructType = columnSelection match { @@ -86,7 +76,7 @@ object ColumnSelection { schema case Some(IncludeColumns(cols)) => val includeColumnNames = cols.map(_.name) - validateColumnsExistInSchema(includeColumnNames, schema, ignoreCase) + validateColumnsExistInSchema(schemaName, schema, includeColumnNames, ignoreCase) val caseNormalizedIncludeColumnNames = includeColumnNames.map(normalizeCase(_, ignoreCase)).toSet @@ -98,7 +88,7 @@ object ColumnSelection { ) case Some(ExcludeColumns(cols)) => val excludeColumnNames = cols.map(_.name) - validateColumnsExistInSchema(excludeColumnNames, schema, ignoreCase) + validateColumnsExistInSchema(schemaName, schema, excludeColumnNames, ignoreCase) val caseNormalizedExcludeColumnNames = excludeColumnNames.map(normalizeCase(_, ignoreCase)).toSet @@ -111,8 +101,9 @@ object ColumnSelection { } private def validateColumnsExistInSchema( - columnNames: Seq[String], + schemaName: String, schema: StructType, + columnNames: Seq[String], ignoreCase: Boolean): Unit = { val caseNormalizedSchemaColumns = schema.fieldNames.map(normalizeCase(_, ignoreCase)).toSet @@ -129,9 +120,10 @@ object ColumnSelection { throw new AnalysisException( errorClass = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of(ignoreCase), + "schemaName" -> schemaName, "missingColumns" -> columnsMissingInSchema.mkString(", "), - "availableColumns" -> schema.fieldNames.mkString(", "), - "matching" -> (if (ignoreCase) "case-insensitive" else "case-sensitive") + "availableColumns" -> schema.fieldNames.mkString(", ") )) } } @@ -148,6 +140,15 @@ object ColumnSelection { } } +/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */ +private[autocdc] object CaseSensitivityLabels { + val CaseSensitive: String = "case-sensitive" + val CaseInsensitive: String = "case-insensitive" + + def of(ignoreCase: Boolean): String = + if (ignoreCase) CaseInsensitive else CaseSensitive +} + /** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ sealed trait ScdType diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 50b0a517d8982..8f0e23b15d446 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -33,6 +33,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { test("ColumnSelection None leaves schema unchanged") { assert( ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, columnSelection = None, ignoreCase = false @@ -41,6 +42,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { test("ColumnSelection IncludeColumns filters by exact name in schema order") { val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, columnSelection = Some( ColumnSelection.IncludeColumns( @@ -57,6 +59,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { test("ColumnSelection ExcludeColumns filters by exact name") { val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, columnSelection = Some( ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))) @@ -73,6 +76,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { checkError( exception = intercept[AnalysisException] { ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, // Under ignoreCase = false, "name" will not match the schema field "Name". columnSelection = Some( @@ -86,9 +90,10 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", sqlState = "42703", parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", "missingColumns" -> "name, missing", - "availableColumns" -> "id, Name, age", - "matching" -> "case-sensitive" + "availableColumns" -> "id, Name, age" ) ) } @@ -97,6 +102,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { checkError( exception = intercept[AnalysisException] { ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, // Under ignoreCase = false, "NAME" will not match the schema field "Name". columnSelection = Some( @@ -110,9 +116,10 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", sqlState = "42703", parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "schemaName" -> "test", "missingColumns" -> "NAME, missing", - "availableColumns" -> "id, Name, age", - "matching" -> "case-sensitive" + "availableColumns" -> "id, Name, age" ) ) } @@ -121,6 +128,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { // "NAME" and "AGE" do not exactly match the schema fields "Name" and "age", but // ignoreCase = true folds both sides to lowercase before comparing. val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, columnSelection = Some( ColumnSelection.IncludeColumns( @@ -141,6 +149,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { // schema field. The returned schema must include "Name" once, not twice. Output ordering // and casing follow the schema, not the user's input. val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, columnSelection = Some( ColumnSelection.IncludeColumns( @@ -155,6 +164,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { test("ColumnSelection ExcludeColumns matches case-insensitively under ignoreCase=true") { val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, columnSelection = Some( ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) @@ -171,6 +181,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { checkError( exception = intercept[AnalysisException] { ColumnSelection.applyToSchema( + schemaName = "test", schema = sourceSchema, // "NAME" matches "Name" under ignoreCase=true, but "Missing" has no schema match. // The error message reports the user's original casing for the missing column and @@ -186,9 +197,10 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", sqlState = "42703", parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "schemaName" -> "test", "missingColumns" -> "Missing", - "availableColumns" -> "id, Name, age", - "matching" -> "case-insensitive" + "availableColumns" -> "id, Name, age" ) ) } @@ -248,6 +260,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { // construction, the [[UnqualifiedColumnName]] holds "a.b", which matches the field name // exactly and the column is included in the filtered schema. val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", schema = schema, columnSelection = Some( ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`a.b`"))) From 481ca9f83522155acc8904055de5e91d83b4f7b2 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 17:42:31 +0000 Subject: [PATCH 08/25] test UnqualifiedColumnName('`col`') --- .../spark/sql/pipelines/autocdc/ChangeArgsSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 8f0e23b15d446..5004d3407336f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -215,6 +215,12 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { assert(UnqualifiedColumnName("`a.b`").name == "a.b") } + test("UnqualifiedColumnName accepts redundant backticks around a single-part name") { + // Backticks around an already-single-part identifier are decorative; the parser strips them + // so the stored name has no surrounding back-ticks. + assert(UnqualifiedColumnName("`col`").name == "col") + } + test("UnqualifiedColumnName.quoted is safe to pass to functions.col for literal-dot names") { val schema = new StructType() .add("a.b", IntegerType) From 0126659c5203b7d512ea80d3ecb3cfa83debc0b2 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 17:45:58 +0000 Subject: [PATCH 09/25] minor test buff --- .../spark/sql/pipelines/autocdc/ChangeArgsSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 5004d3407336f..709b7cffe3f50 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -207,18 +207,24 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { test("UnqualifiedColumnName accepts a simple single-part identifier") { assert(UnqualifiedColumnName("col").name == "col") + // .quoted always wraps in back-ticks, even when the input had none. + assert(UnqualifiedColumnName("col").quoted == "`col`") } test("UnqualifiedColumnName accepts a backtick-quoted name containing a literal dot") { // Backticks make the dot part of a single name part, so this passes validation. The // stored name is the parsed (unquoted) form so it matches the actual schema field name. assert(UnqualifiedColumnName("`a.b`").name == "a.b") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`a.b`").quoted == "`a.b`") } test("UnqualifiedColumnName accepts redundant backticks around a single-part name") { // Backticks around an already-single-part identifier are decorative; the parser strips them // so the stored name has no surrounding back-ticks. assert(UnqualifiedColumnName("`col`").name == "col") + // .quoted re-wraps the parsed name in back-ticks, round-tripping back to the input form. + assert(UnqualifiedColumnName("`col`").quoted == "`col`") } test("UnqualifiedColumnName.quoted is safe to pass to functions.col for literal-dot names") { From ac15be5f3544c2774f0539b8bceaaef70e0f4700 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 02:06:45 +0000 Subject: [PATCH 10/25] address PR feedbak --- .../sql/pipelines/autocdc/ChangeArgs.scala | 75 ++++++------------- .../pipelines/autocdc/ChangeArgsSuite.scala | 24 ++++++ 2 files changed, 46 insertions(+), 53 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 0d912c3102b0e..1c87068fca291 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.pipelines.autocdc -import java.util.Locale - import org.apache.spark.sql.{AnalysisException, Column} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.QuotingUtils @@ -72,71 +70,42 @@ object ColumnSelection { columnSelection: Option[ColumnSelection], ignoreCase: Boolean): StructType = columnSelection match { case None => - // A none column selection is interpreted as a no-op. + // A None column selection is interpreted as a no-op. schema case Some(IncludeColumns(cols)) => - val includeColumnNames = cols.map(_.name) - validateColumnsExistInSchema(schemaName, schema, includeColumnNames, ignoreCase) - - val caseNormalizedIncludeColumnNames = - includeColumnNames.map(normalizeCase(_, ignoreCase)).toSet - - StructType( - schema.fields.filter(schemaField => - caseNormalizedIncludeColumnNames.contains(normalizeCase(schemaField.name, ignoreCase)) - ) - ) + val keepIndices = lookupFieldIndices(schemaName, schema, cols, ignoreCase) + StructType(schema.fields.zipWithIndex.collect { + case (field, idx) if keepIndices.contains(idx) => field + }) case Some(ExcludeColumns(cols)) => - val excludeColumnNames = cols.map(_.name) - validateColumnsExistInSchema(schemaName, schema, excludeColumnNames, ignoreCase) - - val caseNormalizedExcludeColumnNames = - excludeColumnNames.map(normalizeCase(_, ignoreCase)).toSet - - StructType( - schema.fields.filterNot(schemaField => - caseNormalizedExcludeColumnNames.contains(normalizeCase(schemaField.name, ignoreCase)) - ) - ) + val dropIndices = lookupFieldIndices(schemaName, schema, cols, ignoreCase) + StructType(schema.fields.zipWithIndex.collect { + case (field, idx) if !dropIndices.contains(idx) => field + }) } - private def validateColumnsExistInSchema( + private def lookupFieldIndices( schemaName: String, schema: StructType, - columnNames: Seq[String], - ignoreCase: Boolean): Unit = { - val caseNormalizedSchemaColumns = - schema.fieldNames.map(normalizeCase(_, ignoreCase)).toSet - - // Compare folded forms but report the missing and available columns using their original - // casing so error messages reflect what the user actually wrote and what the schema holds. - val columnsMissingInSchema = columnNames - .filterNot(columnName => - caseNormalizedSchemaColumns.contains(normalizeCase(columnName, ignoreCase)) - ) - .distinct - - if (columnsMissingInSchema.nonEmpty) { + fields: Seq[UnqualifiedColumnName], + ignoreCase: Boolean): Set[Int] = { + val caseAwareGetFieldIndex: String => Option[Int] = + if (ignoreCase) schema.getFieldIndexCaseInsensitive else schema.getFieldIndex + + val fieldIndexResolutions = fields.map(f => f -> caseAwareGetFieldIndex(f.name)) + val missingFieldNames = fieldIndexResolutions.collect { case (f, None) => f.name }.distinct + if (missingFieldNames.nonEmpty) { throw new AnalysisException( errorClass = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", messageParameters = Map( "caseSensitivity" -> CaseSensitivityLabels.of(ignoreCase), "schemaName" -> schemaName, - "missingColumns" -> columnsMissingInSchema.mkString(", "), + "missingColumns" -> missingFieldNames.mkString(", "), "availableColumns" -> schema.fieldNames.mkString(", ") - )) - } - } - - /** - * If ignoreCase, normalize all strings to lowercase for stable comparison. - */ - private def normalizeCase(name: String, ignoreCase: Boolean): String = { - if (ignoreCase) { - name.toLowerCase(Locale.ROOT) - } else { - name + ) + ) } + fieldIndexResolutions.flatMap { case (_, idx) => idx }.toSet } } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 709b7cffe3f50..e5a602b5e84e4 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -40,6 +40,30 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { ) == sourceSchema) } + test("ColumnSelection IncludeColumns(Seq()) returns an empty schema") { + // An explicit empty include-list is semantically distinct from None: it means "select + // no columns" and produces an empty StructType, not the original schema. + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.IncludeColumns(Seq.empty)), + ignoreCase = false + ) == new StructType()) + } + + test("ColumnSelection ExcludeColumns(Seq()) leaves schema unchanged") { + // An empty exclude-list is a no-op: nothing to remove, so the original schema is + // returned unchanged (same observable behavior as None for this case). + assert( + ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some(ColumnSelection.ExcludeColumns(Seq.empty)), + ignoreCase = false + ) == sourceSchema) + } + test("ColumnSelection IncludeColumns filters by exact name in schema order") { val filteredSchema = ColumnSelection.applyToSchema( schemaName = "test", From 436ff0ad7865f19e234346f18bbf6fad7adc3077 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 18:10:04 +0000 Subject: [PATCH 11/25] PR feedback --- .../sql/pipelines/autocdc/ChangeArgs.scala | 30 +++++++---- .../pipelines/autocdc/ChangeArgsSuite.scala | 53 ++++++++++++------- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 1c87068fca291..5774781b8ab9f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -61,24 +61,34 @@ object ColumnSelection { extends ColumnSelection /** - * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. Field - * order follows the original schema; filtering happens in place. + * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered schema. Field order + * follows the original schema; only matching fields are retained in the returned schema. + * + * @param schemaName Logical name of the schema being filtered, surfaced in error messages + * when columns are not found (e.g. "microbatch", "target"). + * @param schema The schema to filter. + * @param columnSelection The user-provided selection. `None` is a no-op and returns `schema` + * unchanged. + * @param caseSensitive Whether to match column names case-sensitively against the schema. + * Callers should derive this from the session, e.g. + * `session.sessionState.conf.caseSensitiveAnalysis`, so column matching + * stays consistent with `spark.sql.caseSensitive`. */ def applyToSchema( schemaName: String, schema: StructType, columnSelection: Option[ColumnSelection], - ignoreCase: Boolean): StructType = columnSelection match { + caseSensitive: Boolean): StructType = columnSelection match { case None => // A None column selection is interpreted as a no-op. schema case Some(IncludeColumns(cols)) => - val keepIndices = lookupFieldIndices(schemaName, schema, cols, ignoreCase) + val keepIndices = lookupFieldIndices(schemaName, schema, cols, caseSensitive) StructType(schema.fields.zipWithIndex.collect { case (field, idx) if keepIndices.contains(idx) => field }) case Some(ExcludeColumns(cols)) => - val dropIndices = lookupFieldIndices(schemaName, schema, cols, ignoreCase) + val dropIndices = lookupFieldIndices(schemaName, schema, cols, caseSensitive) StructType(schema.fields.zipWithIndex.collect { case (field, idx) if !dropIndices.contains(idx) => field }) @@ -88,9 +98,9 @@ object ColumnSelection { schemaName: String, schema: StructType, fields: Seq[UnqualifiedColumnName], - ignoreCase: Boolean): Set[Int] = { + caseSensitive: Boolean): Set[Int] = { val caseAwareGetFieldIndex: String => Option[Int] = - if (ignoreCase) schema.getFieldIndexCaseInsensitive else schema.getFieldIndex + if (caseSensitive) schema.getFieldIndex else schema.getFieldIndexCaseInsensitive val fieldIndexResolutions = fields.map(f => f -> caseAwareGetFieldIndex(f.name)) val missingFieldNames = fieldIndexResolutions.collect { case (f, None) => f.name }.distinct @@ -98,7 +108,7 @@ object ColumnSelection { throw new AnalysisException( errorClass = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(ignoreCase), + "caseSensitivity" -> CaseSensitivityLabels.of(caseSensitive), "schemaName" -> schemaName, "missingColumns" -> missingFieldNames.mkString(", "), "availableColumns" -> schema.fieldNames.mkString(", ") @@ -114,8 +124,8 @@ private[autocdc] object CaseSensitivityLabels { val CaseSensitive: String = "case-sensitive" val CaseInsensitive: String = "case-insensitive" - def of(ignoreCase: Boolean): String = - if (ignoreCase) CaseInsensitive else CaseSensitive + def of(caseSensitive: Boolean): String = + if (caseSensitive) CaseSensitive else CaseInsensitive } /** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */ diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index e5a602b5e84e4..816338cb677e8 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -36,7 +36,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { schemaName = "test", schema = sourceSchema, columnSelection = None, - ignoreCase = false + caseSensitive = true ) == sourceSchema) } @@ -48,7 +48,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { schemaName = "test", schema = sourceSchema, columnSelection = Some(ColumnSelection.IncludeColumns(Seq.empty)), - ignoreCase = false + caseSensitive = true ) == new StructType()) } @@ -60,7 +60,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { schemaName = "test", schema = sourceSchema, columnSelection = Some(ColumnSelection.ExcludeColumns(Seq.empty)), - ignoreCase = false + caseSensitive = true ) == sourceSchema) } @@ -73,7 +73,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name")) ) ), - ignoreCase = false + caseSensitive = true ) assert(filteredSchema == new StructType() @@ -88,7 +88,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { columnSelection = Some( ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))) ), - ignoreCase = false + caseSensitive = true ) assert(filteredSchema == new StructType() @@ -102,13 +102,13 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { ColumnSelection.applyToSchema( schemaName = "test", schema = sourceSchema, - // Under ignoreCase = false, "name" will not match the schema field "Name". + // Under caseSensitive = true, "name" will not match the schema field "Name". columnSelection = Some( ColumnSelection.IncludeColumns( Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("missing")) ) ), - ignoreCase = false + caseSensitive = true ) }, condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", @@ -128,13 +128,13 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { ColumnSelection.applyToSchema( schemaName = "test", schema = sourceSchema, - // Under ignoreCase = false, "NAME" will not match the schema field "Name". + // Under caseSensitive = true, "NAME" will not match the schema field "Name". columnSelection = Some( ColumnSelection.ExcludeColumns( Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("missing")) ) ), - ignoreCase = false + caseSensitive = true ) }, condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", @@ -148,9 +148,9 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { ) } - test("ColumnSelection IncludeColumns matches case-insensitively under ignoreCase=true") { + test("ColumnSelection IncludeColumns matches case-insensitively under caseSensitive=false") { // "NAME" and "AGE" do not exactly match the schema fields "Name" and "age", but - // ignoreCase = true folds both sides to lowercase before comparing. + // caseSensitive = false folds both sides to lowercase before comparing. val filteredSchema = ColumnSelection.applyToSchema( schemaName = "test", schema = sourceSchema, @@ -159,7 +159,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { Seq(UnqualifiedColumnName("AGE"), UnqualifiedColumnName("NAME")) ) ), - ignoreCase = true + caseSensitive = false ) // The retained fields keep their original casing from the schema, not the user's input. @@ -169,7 +169,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { } test("ColumnSelection deduplicates user-provided columns that normalize to the same name") { - // Under ignoreCase = true, "name" and "NAME" both fold to "name" and refer to the same + // Under caseSensitive = false, "name" and "NAME" both fold to "name" and refer to the same // schema field. The returned schema must include "Name" once, not twice. Output ordering // and casing follow the schema, not the user's input. val filteredSchema = ColumnSelection.applyToSchema( @@ -180,20 +180,20 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("NAME")) ) ), - ignoreCase = true + caseSensitive = false ) assert(filteredSchema == new StructType().add("Name", StringType)) } - test("ColumnSelection ExcludeColumns matches case-insensitively under ignoreCase=true") { + test("ColumnSelection ExcludeColumns matches case-insensitively under caseSensitive=false") { val filteredSchema = ColumnSelection.applyToSchema( schemaName = "test", schema = sourceSchema, columnSelection = Some( ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) ), - ignoreCase = true + caseSensitive = false ) assert(filteredSchema == new StructType() @@ -201,13 +201,13 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { .add("age", IntegerType)) } - test("ColumnSelection missing-column error under ignoreCase=true preserves user casing") { + test("ColumnSelection missing-column error under caseSensitive=false preserves user casing") { checkError( exception = intercept[AnalysisException] { ColumnSelection.applyToSchema( schemaName = "test", schema = sourceSchema, - // "NAME" matches "Name" under ignoreCase=true, but "Missing" has no schema match. + // "NAME" matches "Name" under caseSensitive=false, but "Missing" has no schema match. // The error message reports the user's original casing for the missing column and // the schema's original casing for the available columns. columnSelection = Some( @@ -215,7 +215,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { Seq(UnqualifiedColumnName("NAME"), UnqualifiedColumnName("Missing")) ) ), - ignoreCase = true + caseSensitive = false ) }, condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA", @@ -301,12 +301,25 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { columnSelection = Some( ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`a.b`"))) ), - ignoreCase = false + caseSensitive = true ) assert(filteredSchema == new StructType().add("a.b", IntegerType)) } + test("IncludeColumns correctly matches a backtick-quoted mixed-case column") { + val filteredSchema = ColumnSelection.applyToSchema( + schemaName = "test", + schema = sourceSchema, + columnSelection = Some( + ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`Name`"))) + ), + caseSensitive = true + ) + + assert(filteredSchema == new StructType().add("Name", StringType)) + } + test("UnqualifiedColumnName rejects a dotted (multi-part) identifier") { checkError( exception = intercept[AnalysisException] { From 875f0b19345431054b8afc657a6c4d086ac1c57d Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 14 May 2026 22:18:55 +0000 Subject: [PATCH 12/25] Implement deduplicateMicrobatch --- .../autocdc/OutOfOrderCdcMergeUtils.scala | 33 +++ .../autocdc/Scd1BatchProcessor.scala | 57 +++++ .../autocdc/Scd1BatchProcessorSuite.scala | 232 ++++++++++++++++++ 3 files changed, 322 insertions(+) create mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala create mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala create mode 100644 sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala new file mode 100644 index 0000000000000..50b635c4ba2b8 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +/** Shared helpers for the out-of-order CDC merge implementations (SCD Type 1 and Type 2). */ +private[autocdc] object OutOfOrderCdcMergeUtils { + + /** + * Build a synthetic column name with a UUID suffix so it cannot collide with any user + * column. Intended for transient columns attached during merge processing (e.g. holding + * intermediate aggregation outputs, carrying per-key state through a join, etc.). + * + * Each invocation produces a fresh name, so callers should remember the returned string if + * they need to reference the same column from multiple sites within a single merge plan. + */ + def tempColName(prefix: String): String = + s"${prefix}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala new file mode 100644 index 0000000000000..c8f10bb96938b --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F} +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.util.ArrayImplicits._ + +/** + * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] + * configuration. + */ +case class Scd1BatchProcessor(changeArgs: ChangeArgs) { + + /** + * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key + * as ordered by [[ChangeArgs.sequencing]]. + * + * For SCD1 we only care about the most recent (by sequence value) event per key. When + * multiple events share the same key and the same sequence value, the row selected is + * non-deterministic and undefined. + */ + def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { + // The `max_by` API can only return a single column, so pack/unpack the entire row into a + // temporary column before and after the `max_by` operation. + val WinningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + + val allMicrobatchColumns = + microbatchDf.columns + .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) + .toImmutableArraySeq + + microbatchDf + .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) + .agg( + F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) + .as(WinningRowCol) + ) + .select(F.col(s"$WinningRowCol.*")) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala new file mode 100644 index 0000000000000..0545946935ed6 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, Row} +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Returns the `(name, dataType)` pairs of `schema`'s fields. Used to compare two schemas for + * structural equivalence while deliberately ignoring nullability and metadata, which can shift + * benignly when columns are unpacked from a struct. + */ + private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = + schema.fields.map(f => (f.name, f.dataType)).toSeq + + test("deduplicateMicrobatch keeps only the row with the largest sequence value per key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first"), + Row(1, 30L, "winner"), + Row(1, 20L, "middle") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 30L, "winner") + ) + } + + test("deduplicateMicrobatch processes multiple keys independently") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a1"), + Row(2, 50L, "b1-winner"), + Row(1, 20L, "a2-winner"), + Row(2, 40L, "b2-loser"), + Row(3, 1L, "c1-only") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 20L, "a2-winner"), + Row(2, 50L, "b1-winner"), + Row(3, 1L, "c1-only") + ) + ) + } + + test("deduplicateMicrobatch carries non-key, non-sequence columns from the winning row") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("name", StringType) + .add("amount", IntegerType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old-name", 100), + Row(1, 20L, "winning-name", 200) + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // All non-key columns must come from the row with the largest sequence value, never + // a mix of values from multiple rows. + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "winning-name", 200) + ) + } + + test("deduplicateMicrobatch supports composite (multi-column) keys") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("US", 1, 10L, "us1-old"), + Row("US", 1, 20L, "us1-new"), + // Same customer_id as above but different region: independent group. + Row("EU", 1, 5L, "eu1-only"), + // Same region as above but different customer_id: independent group. + Row("US", 2, 99L, "us2-only") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row("US", 1, 20L, "us1-new"), + Row("EU", 1, 5L, "eu1-only"), + Row("US", 2, 99L, "us2-only") + ) + ) + } + + test("deduplicateMicrobatch supports literal-dot column names") { + val schema = new StructType() + .add("user.id", IntegerType) + .add("seq", LongType) + .add("event.value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old"), + Row(1, 20L, "new") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "new") + ) + } + + test("deduplicateMicrobatch preserves the input column names, types, and ordering") { + val schema = new StructType() + .add("a", StringType) + .add("id", IntegerType) + .add("z", DoubleType) + .add("seq", LongType) + .add("flag", BooleanType) + + val batch = microbatchOf(schema)( + Row("a1", 1, 1.5, 10L, true), + Row("a2", 1, 2.5, 20L, false) + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // Field names and dataTypes must match the input exactly, in the original order. + assert( + columnNamesAndDataTypes(processor.deduplicateMicrobatch(batch).schema) == + columnNamesAndDataTypes(schema)) + } + + test("deduplicateMicrobatch returns an empty DataFrame with preserved schema") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)() + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + val result = processor.deduplicateMicrobatch(batch) + assert(result.collect().isEmpty) + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) + } +} From 08ea9f477c645a18917979e348182fb2185c19f2 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 00:38:05 +0000 Subject: [PATCH 13/25] indenting cleanup --- .../autocdc/Scd1BatchProcessorSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 0545946935ed6..cb4df5e0d6c1f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -50,7 +50,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -78,7 +78,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -108,7 +108,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -140,7 +140,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -169,7 +169,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("`user.id`")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -196,7 +196,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -218,7 +218,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { val batch = microbatchOf(schema)() val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 From cf3ec828163ac12ef7a7c853121f42ca30c1a114 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 00:40:40 +0000 Subject: [PATCH 14/25] schema comment --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index c8f10bb96938b..024e46d585e39 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -35,6 +35,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * For SCD1 we only care about the most recent (by sequence value) event per key. When * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. + * + * The schema of the returned dataframe matches the schema of the microbatch exactly. */ def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a From 21d4ffe8632bff9a3d873ef5821b4f8cf9073dd3 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 20:14:14 +0000 Subject: [PATCH 15/25] casing --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 024e46d585e39..340ca087c6bac 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -41,7 +41,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. - val WinningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") val allMicrobatchColumns = microbatchDf.columns @@ -52,8 +52,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) .agg( F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) - .as(WinningRowCol) + .as(winningRowCol) ) - .select(F.col(s"$WinningRowCol.*")) + .select(F.col(s"$winningRowCol.*")) } } From 2ff07f4d0d67d229765a56ade91848a46d1e5419 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:08:16 +0000 Subject: [PATCH 16/25] linting --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 340ca087c6bac..9beea0508b911 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -35,7 +35,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * For SCD1 we only care about the most recent (by sequence value) event per key. When * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. - * + * * The schema of the returned dataframe matches the schema of the microbatch exactly. */ def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { From 76d775d3a2cd4f7fcead026554d59e1e1bb74896 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 21:37:54 +0000 Subject: [PATCH 17/25] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 11 +- .../autocdc/Scd1BatchProcessorSuite.scala | 110 +++++++++++++++++- 2 files changed, 115 insertions(+), 6 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 9beea0508b911..edeca022a27a3 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -27,7 +27,6 @@ import org.apache.spark.util.ArrayImplicits._ * configuration. */ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { - /** * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key * as ordered by [[ChangeArgs.sequencing]]. @@ -36,19 +35,23 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. * + * @param validatedMicrobatch A microbatch that has already been validated such that the + * sequencing column should not contain null values, and its data type + * should support ordering. + * * The schema of the returned dataframe matches the schema of the microbatch exactly. */ - def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { + def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") val allMicrobatchColumns = - microbatchDf.columns + validatedMicrobatch.columns .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) .toImmutableArraySeq - microbatchDf + validatedMicrobatch .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) .agg( F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index cb4df5e0d6c1f..1125440f42bbf 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.{functions => F, Row} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { +class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = @@ -63,6 +63,112 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) } + test("deduplicateMicrobatch is no-op if there's a single event for a key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "only-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "only-row") + ) + } + + test("deduplicateMicrobatch handles equal sequencing values for the same key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first-tied-row"), + Row(1, 10L, "second-tied-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // On equal sequence number events for the same key we provide no guarantee on which event will + // survive, but the contract is _one_ event will survive - assert that below. + val result = processor.deduplicateMicrobatch(batch).collect() + assert(result.length == 1) + assert(result.head.getInt(0) == 1) + assert(result.head.getLong(1) == 10L) + assert(Set("first-tied-row", "second-tied-row").contains(result.head.getString(2))) + } + + test("deduplicateMicrobatch ignores rows with null sequencing when a non-null value exists") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that null sequence rows are de-prioritized in + // deduplication. + Row(1, null, "null-sequence"), + Row(1, 10L, "non-null-sequence") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "non-null-sequence") + ) + } + + test( + "deduplicateMicrobatch returns a null row when all sequencing values for a key are null" + ) { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that a null row will be returned by + // deduplication if all rows contain a null sequence in the microbatch. + Row(1, null, "null-sequence") + ) + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(null, null, null) + ) + } + test("deduplicateMicrobatch processes multiple keys independently") { val schema = new StructType() .add("id", IntegerType) From 8790a2d89dff49f5b65615214dcd5a4142b5e682 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 21:49:54 +0000 Subject: [PATCH 18/25] use reserved __spark_autocdc* prefix --- .../autocdc/OutOfOrderCdcMergeUtils.scala | 33 ------------------- .../autocdc/Scd1BatchProcessor.scala | 7 +++- 2 files changed, 6 insertions(+), 34 deletions(-) delete mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala deleted file mode 100644 index 50b635c4ba2b8..0000000000000 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.pipelines.autocdc - -/** Shared helpers for the out-of-order CDC merge implementations (SCD Type 1 and Type 2). */ -private[autocdc] object OutOfOrderCdcMergeUtils { - - /** - * Build a synthetic column name with a UUID suffix so it cannot collide with any user - * column. Intended for transient columns attached during merge processing (e.g. holding - * intermediate aggregation outputs, carrying per-key state through a join, etc.). - * - * Each invocation produces a fresh name, so callers should remember the returned string if - * they need to reference the same column from multiple sites within a single merge plan. - */ - def tempColName(prefix: String): String = - s"${prefix}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" -} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index edeca022a27a3..732930c93630d 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -44,7 +44,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. - val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + val winningRowCol = Scd1BatchProcessor.winningRowColName val allMicrobatchColumns = validatedMicrobatch.columns @@ -60,3 +60,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { .select(F.col(s"$winningRowCol.*")) } } + +object Scd1BatchProcessor { + // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. + private val winningRowColName = "__spark_autocdc_winning_row" +} From 5c0c0f8b6e18b403a07134efe6491b456bdb33c5 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 23:05:58 +0000 Subject: [PATCH 19/25] Add deduplicate test when row contains nested columns --- .../autocdc/Scd1BatchProcessorSuite.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 1125440f42bbf..a82323a842deb 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -229,6 +229,34 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("deduplicateMicrobatch carries nested columns correctly from the winning row") { + val payloadType = new StructType() + .add("name", StringType) + .add("amount", IntegerType) + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("payload", payloadType) + + val batch = microbatchOf(schema)( + Row(1, 10L, Row("old", 100)), + Row(1, 20L, Row("new", 200)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, Row("new", 200)) + ) + } + test("deduplicateMicrobatch supports composite (multi-column) keys") { val schema = new StructType() .add("region", StringType) From 1a640d17e3df85d28c18ca15e1b43bf6c1274a64 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 20:15:09 +0000 Subject: [PATCH 20/25] validation --- .../resources/error/error-conditions.json | 6 + .../autocdc/Scd1BatchProcessor.scala | 109 +++++++- .../autocdc/Scd1BatchProcessorSuite.scala | 239 +++++++++++++++++- 3 files changed, 339 insertions(+), 15 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index f009f947248b5..8536c6385f2bd 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -203,6 +203,12 @@ ], "sqlState" : "42703" }, + "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : { + "message" : [ + "Using column name comparison, the column `` in the schema conflicts with the reserved AutoCDC column name ``. Rename or remove the column." + ], + "sqlState" : "42710" + }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { "message" : [ "Cannot write null value for field defined as non-null Avro data type .", diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 732930c93630d..d3fcc2b8e7e0f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -17,16 +17,26 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.sql.{functions => F} +import org.apache.spark.SparkException +import org.apache.spark.sql.{functions => F, AnalysisException} +import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.util.QuotingUtils import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.ArrayImplicits._ /** * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] * configuration. + * + * @param changeArgs The CDC flow configuration. + * @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived + * from the flow's resolved DataFrame at flow setup time. */ -case class Scd1BatchProcessor(changeArgs: ChangeArgs) { +case class Scd1BatchProcessor( + changeArgs: ChangeArgs, + resolvedSequencingType: DataType) { + /** * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key * as ordered by [[ChangeArgs.sequencing]]. @@ -59,9 +69,102 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { ) .select(F.col(s"$winningRowCol.*")) } + + /** + * Project the CDC metadata column onto the microbatch. + */ + def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + // Proactively validate the reserved CDC metadata column does not exist in the microbatch. + validateCdcMetadataColumnNotPresent(microbatchDf) + + val rowDeleteSequence: Column = changeArgs.deleteCondition match { + case Some(deleteCondition) => + F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null)) + case None => + F.lit(null) + } + + val rowUpsertSequence: Column = + // A row that is not a delete must be an upsert, these are mutually exclusive and a complete + // set of CDC event types. + F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null)) + + microbatchDf.withColumn( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.constructCdcMetadataCol( + deleteSequence = rowDeleteSequence, + upsertSequence = rowUpsertSequence, + sequencingType = resolvedSequencingType + ) + ) + } + + private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { + val ignoreColumnNameCase = + !microbatchDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + + microbatchDf.schema.fieldNames + .find { fieldName => + if (ignoreColumnNameCase) { + fieldName.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) + } else { + fieldName.equals(Scd1BatchProcessor.cdcMetadataColName) + } + } + .foreach { conflictingColumnName => + throw new AnalysisException( + errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of(!ignoreColumnNameCase), + "columnName" -> conflictingColumnName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } object Scd1BatchProcessor { // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. - private val winningRowColName = "__spark_autocdc_winning_row" + private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row" + private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata" + + private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence" + private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence" + + /** + * Schema of the CDC metadata struct column for SCD1. + */ + private def cdcMetadataColSchema(sequencingType: DataType): StructType = + StructType( + Seq( + // The sequencing of the event if it represents a delete, null otherwise. + StructField(cdcDeleteSequenceFieldName, sequencingType, nullable = true), + // The sequencing of the event if it represents an upsert, null otherwise. + StructField(cdcUpsertSequenceFieldName, sequencingType, nullable = true) + ) + ) + + /** + * Construct the CDC metadata struct column for SCD1, following the exact schema and field + * ordering defined by [[cdcMetadataColSchema]]. + */ + private[autocdc] def constructCdcMetadataCol( + deleteSequence: Column, + upsertSequence: Column, + sequencingType: DataType): Column = { + val cdcMetadataFieldsInOrder = cdcMetadataColSchema(sequencingType).fields.map { field => + val value = field.name match { + case `cdcDeleteSequenceFieldName` => deleteSequence + case `cdcUpsertSequenceFieldName` => upsertSequence + case other => + throw SparkException.internalError( + s"Unable to construct SCD1 CDC metadata column due to unknown `${other}` field." + ) + } + value.cast(field.dataType).as(field.name) + } + F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*) + } } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index a82323a842deb..97d5ce8b281b9 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.{functions => F, Row} +import org.apache.spark.sql.{functions => F, AnalysisException, Row} import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -54,7 +55,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -78,7 +80,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -103,7 +106,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // On equal sequence number events for the same key we provide no guarantee on which event will @@ -134,7 +138,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -161,7 +166,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( df = processor.deduplicateMicrobatch(batch), @@ -188,7 +194,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -218,7 +225,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // All non-key columns must come from the row with the largest sequence value, never @@ -278,7 +286,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -307,7 +316,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("`user.id`")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -334,7 +344,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // Field names and dataTypes must match the input exactly, in the original order. @@ -356,11 +367,215 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) val result = processor.deduplicateMicrobatch(batch) assert(result.collect().isEmpty) assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) } + + test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or an upsert " + + "per deleteCondition") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, 10L, false), + Row(2, 20L, true), + Row(3, 30L, false), + Row(4, 40L, true) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = Some(F.col("is_delete") === true) + ), + resolvedSequencingType = LongType + ) + + // Mutual-exclusivity invariant: each row's _cdc_metadata struct has exactly one of + // (deleteSequence, upsertSequence) non-null, and the non-null side carries the row's + // sequence value. + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, false, Row(null, 10L)), + Row(2, 20L, true, Row(20L, null)), + Row(3, 30L, false, Row(null, 30L)), + Row(4, 40L, true, Row(40L, null)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " + + "when deleteCondition is None") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a"), + Row(2, 20L, "b") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = None + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, "a", Row(null, 10L)), + Row(2, 20L, "b", Row(null, 20L)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata appends CDC metadata as the last column") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + // Original columns are preserved in their original order, with CDC metadata appended at + // the very end. + assert(result.schema.fieldNames.toSeq == + schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName) + } + + test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence fields to " + + "resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is IntegerType, but the flow's resolved sequencing type + // will be LongType. This should be upcasted in the projected CDC metadata column. + .add("seq", IntegerType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + val cdcMetadataDataType = + resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq( + Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType, + Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType)) + + // The cast must also succeed at runtime: upsertSequence is materialized as a Long value, not + // an Int. + checkAnswer( + df = resultDf, + expectedAnswer = Row(1, 10, "a", Row(null, 10L)) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata fails fast when the microbatch's sequencing column " + + "is incompatible with resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is a struct, whereas the flow's resolved sequencing type + // will be LongType. These are incompatible and should throw. + .add( + "seq", + new StructType() + .add("major", LongType) + .add("minor", LongType)) + + val batch = microbatchOf(schema)( + Row(1, Row(1L, 0L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val ex = intercept[AnalysisException] { + // .schema forces analysis of the underlying logical plan, surfacing the invalid cast. + processor.extendMicrobatchRowsWithCdcMetadata(batch).schema + } + assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION") + } + + test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already contains the " + + "reserved CDC metadata column") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add(Scd1BatchProcessor.cdcMetadataColName, StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "user-supplied") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkError( + exception = intercept[AnalysisException] { + processor.extendMicrobatchRowsWithCdcMetadata(batch) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "columnName" -> Scd1BatchProcessor.cdcMetadataColName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } From 88e9c1d47ba23cff95f8de604ef3d81e799dd89e Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 21:31:20 +0000 Subject: [PATCH 21/25] buff scaladoc --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index d3fcc2b8e7e0f..bdc8bf1f9683b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -72,6 +72,9 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. + * + * The returned dataframe has all of the columns in the input microbatch + the CDC metadata + * column. */ def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { // Proactively validate the reserved CDC metadata column does not exist in the microbatch. From fd631adabedbbcb57ac1c71eef4e1d1ce30f6c22 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 01:29:47 +0000 Subject: [PATCH 22/25] use spark resolver --- .../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index bdc8bf1f9683b..953f963efed4b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -103,22 +103,16 @@ case class Scd1BatchProcessor( } private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val ignoreColumnNameCase = - !microbatchDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + val sqlConf = microbatchDf.sparkSession.sessionState.conf + val resolver = sqlConf.resolver microbatchDf.schema.fieldNames - .find { fieldName => - if (ignoreColumnNameCase) { - fieldName.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) - } else { - fieldName.equals(Scd1BatchProcessor.cdcMetadataColName) - } - } + .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) .foreach { conflictingColumnName => throw new AnalysisException( errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(!ignoreColumnNameCase), + "caseSensitivity" -> CaseSensitivityLabels.of(sqlConf.caseSensitiveAnalysis), "columnName" -> conflictingColumnName, "schemaName" -> "microbatch", "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName From 2a886afc3bcd07d3d4e4d1ee05d0dd9ddc926566 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:10:09 +0000 Subject: [PATCH 23/25] lingint --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 953f963efed4b..1e35678a7200e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -72,7 +72,7 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. - * + * * The returned dataframe has all of the columns in the input microbatch + the CDC metadata * column. */ From e8df6f02e00d2565411b009b8ca7a20438a17f5f Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 18:22:41 +0000 Subject: [PATCH 24/25] rebase conflict --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 1e35678a7200e..00450bee6984b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -103,8 +103,8 @@ case class Scd1BatchProcessor( } private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val sqlConf = microbatchDf.sparkSession.sessionState.conf - val resolver = sqlConf.resolver + val microbatchSqlConf = microbatchDf.sparkSession.sessionState.conf + val resolver = microbatchSqlConf.resolver microbatchDf.schema.fieldNames .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) @@ -112,7 +112,7 @@ case class Scd1BatchProcessor( throw new AnalysisException( errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(sqlConf.caseSensitiveAnalysis), + "caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis), "columnName" -> conflictingColumnName, "schemaName" -> "microbatch", "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName From f9c2aed3c9a413c9e74f9093863f0e948c7612ed Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 23:01:00 +0000 Subject: [PATCH 25/25] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 8 +++ .../autocdc/Scd1BatchProcessorSuite.scala | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 00450bee6984b..5554efdafd68c 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -73,6 +73,14 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. * + * This must run before any column selection is applied to the microbatch. The + * [[ChangeArgs.deleteCondition]] and [[ChangeArgs.sequencing]] expressions are evaluated against + * the current microbatch schema, and column selection may drop inputs required by those + * expressions. + * + * Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] evaluates to true. A + * false or null delete condition classifies the row as an upsert. + * * The returned dataframe has all of the columns in the input microbatch + the CDC metadata * column. */ diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 97d5ce8b281b9..01d0fcc9c7107 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -414,6 +414,32 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("extendMicrobatchRowsWithCdcMetadata treats null deleteCondition results as upserts") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, 10L, null) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Row(1, 10L, null, Row(null, 10L)) + ) + } + test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " + "when deleteCondition is None") { val schema = new StructType() @@ -578,4 +604,42 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } } + + test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata column " + + "case-insensitively") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val conflictingColumnName = Scd1BatchProcessor.cdcMetadataColName.toUpperCase + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add(conflictingColumnName, StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "user-supplied") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkError( + exception = intercept[AnalysisException] { + processor.extendMicrobatchRowsWithCdcMetadata(batch) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> conflictingColumnName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } }