From ada1bdb6af60ceee0a80f32501b34441fa98c3cb Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 14 May 2026 22:18:55 +0000 Subject: [PATCH 01/18] Implement deduplicateMicrobatch --- .../autocdc/OutOfOrderCdcMergeUtils.scala | 33 +++ .../autocdc/Scd1BatchProcessor.scala | 57 +++++ .../autocdc/Scd1BatchProcessorSuite.scala | 232 ++++++++++++++++++ 3 files changed, 322 insertions(+) create mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala create mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala create mode 100644 sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala new file mode 100644 index 0000000000000..50b635c4ba2b8 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +/** Shared helpers for the out-of-order CDC merge implementations (SCD Type 1 and Type 2). */ +private[autocdc] object OutOfOrderCdcMergeUtils { + + /** + * Build a synthetic column name with a UUID suffix so it cannot collide with any user + * column. Intended for transient columns attached during merge processing (e.g. holding + * intermediate aggregation outputs, carrying per-key state through a join, etc.). + * + * Each invocation produces a fresh name, so callers should remember the returned string if + * they need to reference the same column from multiple sites within a single merge plan. + */ + def tempColName(prefix: String): String = + s"${prefix}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala new file mode 100644 index 0000000000000..c8f10bb96938b --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F} +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.util.ArrayImplicits._ + +/** + * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] + * configuration. + */ +case class Scd1BatchProcessor(changeArgs: ChangeArgs) { + + /** + * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key + * as ordered by [[ChangeArgs.sequencing]]. + * + * For SCD1 we only care about the most recent (by sequence value) event per key. When + * multiple events share the same key and the same sequence value, the row selected is + * non-deterministic and undefined. + */ + def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { + // The `max_by` API can only return a single column, so pack/unpack the entire row into a + // temporary column before and after the `max_by` operation. + val WinningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + + val allMicrobatchColumns = + microbatchDf.columns + .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) + .toImmutableArraySeq + + microbatchDf + .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) + .agg( + F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) + .as(WinningRowCol) + ) + .select(F.col(s"$WinningRowCol.*")) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala new file mode 100644 index 0000000000000..0545946935ed6 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{functions => F, Row} +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Returns the `(name, dataType)` pairs of `schema`'s fields. Used to compare two schemas for + * structural equivalence while deliberately ignoring nullability and metadata, which can shift + * benignly when columns are unpacked from a struct. + */ + private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = + schema.fields.map(f => (f.name, f.dataType)).toSeq + + test("deduplicateMicrobatch keeps only the row with the largest sequence value per key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first"), + Row(1, 30L, "winner"), + Row(1, 20L, "middle") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 30L, "winner") + ) + } + + test("deduplicateMicrobatch processes multiple keys independently") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a1"), + Row(2, 50L, "b1-winner"), + Row(1, 20L, "a2-winner"), + Row(2, 40L, "b2-loser"), + Row(3, 1L, "c1-only") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 20L, "a2-winner"), + Row(2, 50L, "b1-winner"), + Row(3, 1L, "c1-only") + ) + ) + } + + test("deduplicateMicrobatch carries non-key, non-sequence columns from the winning row") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("name", StringType) + .add("amount", IntegerType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old-name", 100), + Row(1, 20L, "winning-name", 200) + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // All non-key columns must come from the row with the largest sequence value, never + // a mix of values from multiple rows. + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "winning-name", 200) + ) + } + + test("deduplicateMicrobatch supports composite (multi-column) keys") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("US", 1, 10L, "us1-old"), + Row("US", 1, 20L, "us1-new"), + // Same customer_id as above but different region: independent group. + Row("EU", 1, 5L, "eu1-only"), + // Same region as above but different customer_id: independent group. + Row("US", 2, 99L, "us2-only") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Seq( + Row("US", 1, 20L, "us1-new"), + Row("EU", 1, 5L, "eu1-only"), + Row("US", 2, 99L, "us2-only") + ) + ) + } + + test("deduplicateMicrobatch supports literal-dot column names") { + val schema = new StructType() + .add("user.id", IntegerType) + .add("seq", LongType) + .add("event.value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "old"), + Row(1, 20L, "new") + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, "new") + ) + } + + test("deduplicateMicrobatch preserves the input column names, types, and ordering") { + val schema = new StructType() + .add("a", StringType) + .add("id", IntegerType) + .add("z", DoubleType) + .add("seq", LongType) + .add("flag", BooleanType) + + val batch = microbatchOf(schema)( + Row("a1", 1, 1.5, 10L, true), + Row("a2", 1, 2.5, 20L, false) + ) + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // Field names and dataTypes must match the input exactly, in the original order. + assert( + columnNamesAndDataTypes(processor.deduplicateMicrobatch(batch).schema) == + columnNamesAndDataTypes(schema)) + } + + test("deduplicateMicrobatch returns an empty DataFrame with preserved schema") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)() + + val processor = Scd1BatchProcessor( + ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + val result = processor.deduplicateMicrobatch(batch) + assert(result.collect().isEmpty) + assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) + } +} From a1a0e7b3407158ed6ffbe94569ab08e494799dff Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 00:38:05 +0000 Subject: [PATCH 02/18] indenting cleanup --- .../autocdc/Scd1BatchProcessorSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 0545946935ed6..cb4df5e0d6c1f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -50,7 +50,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -78,7 +78,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -108,7 +108,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -140,7 +140,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -169,7 +169,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("`user.id`")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -196,7 +196,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 @@ -218,7 +218,7 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { val batch = microbatchOf(schema)() val processor = Scd1BatchProcessor( - ChangeArgs( + changeArgs = ChangeArgs( keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 From 434f6ad4a7a4b8a2a702e02542a772e7ddee161c Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 00:40:40 +0000 Subject: [PATCH 03/18] schema comment --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index c8f10bb96938b..024e46d585e39 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -35,6 +35,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * For SCD1 we only care about the most recent (by sequence value) event per key. When * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. + * + * The schema of the returned dataframe matches the schema of the microbatch exactly. */ def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a From 022a95c4270db207b52e66ebca383366dca8a6c4 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 20:14:14 +0000 Subject: [PATCH 04/18] casing --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 024e46d585e39..340ca087c6bac 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -41,7 +41,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. - val WinningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") val allMicrobatchColumns = microbatchDf.columns @@ -52,8 +52,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) .agg( F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) - .as(WinningRowCol) + .as(winningRowCol) ) - .select(F.col(s"$WinningRowCol.*")) + .select(F.col(s"$winningRowCol.*")) } } From f92f1e327e12918bb20917f81f9c213cc017624a Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:08:16 +0000 Subject: [PATCH 05/18] linting --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 340ca087c6bac..9beea0508b911 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -35,7 +35,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * For SCD1 we only care about the most recent (by sequence value) event per key. When * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. - * + * * The schema of the returned dataframe matches the schema of the microbatch exactly. */ def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { From 04a38f247ace2ad9530fd935d153043522b726b5 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 21:37:54 +0000 Subject: [PATCH 06/18] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 11 +- .../autocdc/Scd1BatchProcessorSuite.scala | 110 +++++++++++++++++- 2 files changed, 115 insertions(+), 6 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 9beea0508b911..edeca022a27a3 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -27,7 +27,6 @@ import org.apache.spark.util.ArrayImplicits._ * configuration. */ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { - /** * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key * as ordered by [[ChangeArgs.sequencing]]. @@ -36,19 +35,23 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { * multiple events share the same key and the same sequence value, the row selected is * non-deterministic and undefined. * + * @param validatedMicrobatch A microbatch that has already been validated such that the + * sequencing column should not contain null values, and its data type + * should support ordering. + * * The schema of the returned dataframe matches the schema of the microbatch exactly. */ - def deduplicateMicrobatch(microbatchDf: DataFrame): DataFrame = { + def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") val allMicrobatchColumns = - microbatchDf.columns + validatedMicrobatch.columns .map(colName => F.col(QuotingUtils.quoteIdentifier(colName))) .toImmutableArraySeq - microbatchDf + validatedMicrobatch .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*) .agg( F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index cb4df5e0d6c1f..1125440f42bbf 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.{functions => F, Row} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { +class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = @@ -63,6 +63,112 @@ class Scd1BatchProcessorSuite extends SparkFunSuite with SharedSparkSession { ) } + test("deduplicateMicrobatch is no-op if there's a single event for a key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "only-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "only-row") + ) + } + + test("deduplicateMicrobatch handles equal sequencing values for the same key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "first-tied-row"), + Row(1, 10L, "second-tied-row") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + // On equal sequence number events for the same key we provide no guarantee on which event will + // survive, but the contract is _one_ event will survive - assert that below. + val result = processor.deduplicateMicrobatch(batch).collect() + assert(result.length == 1) + assert(result.head.getInt(0) == 1) + assert(result.head.getLong(1) == 10L) + assert(Set("first-tied-row", "second-tied-row").contains(result.head.getString(2))) + } + + test("deduplicateMicrobatch ignores rows with null sequencing when a non-null value exists") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that null sequence rows are de-prioritized in + // deduplication. + Row(1, null, "null-sequence"), + Row(1, 10L, "non-null-sequence") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, "non-null-sequence") + ) + } + + test( + "deduplicateMicrobatch returns a null row when all sequencing values for a key are null" + ) { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + val batch = microbatchOf(schema)( + // In production the expectation is the microbatch will have been validated to not contain + // any null sequence values, but demonstrate that a null row will be returned by + // deduplication if all rows contain a null sequence in the microbatch. + Row(1, null, "null-sequence") + ) + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(null, null, null) + ) + } + test("deduplicateMicrobatch processes multiple keys independently") { val schema = new StructType() .add("id", IntegerType) From 19d9040b6e796e8e8dd59024bfc51ae3dd2d6b48 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 21:49:54 +0000 Subject: [PATCH 07/18] use reserved __spark_autocdc* prefix --- .../autocdc/OutOfOrderCdcMergeUtils.scala | 33 ------------------- .../autocdc/Scd1BatchProcessor.scala | 7 +++- 2 files changed, 6 insertions(+), 34 deletions(-) delete mode 100644 sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala deleted file mode 100644 index 50b635c4ba2b8..0000000000000 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/OutOfOrderCdcMergeUtils.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.pipelines.autocdc - -/** Shared helpers for the out-of-order CDC merge implementations (SCD Type 1 and Type 2). */ -private[autocdc] object OutOfOrderCdcMergeUtils { - - /** - * Build a synthetic column name with a UUID suffix so it cannot collide with any user - * column. Intended for transient columns attached during merge processing (e.g. holding - * intermediate aggregation outputs, carrying per-key state through a join, etc.). - * - * Each invocation produces a fresh name, so callers should remember the returned string if - * they need to reference the same column from multiple sites within a single merge plan. - */ - def tempColName(prefix: String): String = - s"${prefix}_${java.util.UUID.randomUUID().toString.replace("-", "_")}" -} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index edeca022a27a3..732930c93630d 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -44,7 +44,7 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = { // The `max_by` API can only return a single column, so pack/unpack the entire row into a // temporary column before and after the `max_by` operation. - val winningRowCol = OutOfOrderCdcMergeUtils.tempColName("__winning_row") + val winningRowCol = Scd1BatchProcessor.winningRowColName val allMicrobatchColumns = validatedMicrobatch.columns @@ -60,3 +60,8 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { .select(F.col(s"$winningRowCol.*")) } } + +object Scd1BatchProcessor { + // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. + private val winningRowColName = "__spark_autocdc_winning_row" +} From 1e8b86c0e33859c28a4cc5333ded52e6c77d72f0 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 23:05:58 +0000 Subject: [PATCH 08/18] Add deduplicate test when row contains nested columns --- .../autocdc/Scd1BatchProcessorSuite.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 1125440f42bbf..a82323a842deb 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -229,6 +229,34 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("deduplicateMicrobatch carries nested columns correctly from the winning row") { + val payloadType = new StructType() + .add("name", StringType) + .add("amount", IntegerType) + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("payload", payloadType) + + val batch = microbatchOf(schema)( + Row(1, 10L, Row("old", 100)), + Row(1, 20L, Row("new", 200)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 20L, Row("new", 200)) + ) + } + test("deduplicateMicrobatch supports composite (multi-column) keys") { val schema = new StructType() .add("region", StringType) From 0b498a084f64af6c89f273b4dcc91ae3e5cae749 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 20 May 2026 20:22:27 +0000 Subject: [PATCH 09/18] PR feedback --- .../resources/error/error-conditions.json | 6 ++ .../sql/pipelines/autocdc/ChangeArgs.scala | 20 +++++- .../autocdc/Scd1BatchProcessor.scala | 2 +- .../pipelines/autocdc/ChangeArgsSuite.scala | 15 ++++ .../autocdc/Scd1BatchProcessorSuite.scala | 72 ++++++++++++++++++- 5 files changed, 111 insertions(+), 4 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 81ef749e19f81..0c8bfcc11b25c 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -197,6 +197,12 @@ ], "sqlState" : "42703" }, + "AUTOCDC_EMPTY_KEYS" : { + "message" : [ + "AutoCDC requires at least one key column to identify rows, but received an empty key set." + ], + "sqlState" : "22023" + }, "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : { "message" : [ "Expected a single column identifier; got the multi-part identifier (parts: )." diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index 5774781b8ab9f..c17c89967baa5 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -156,4 +156,22 @@ case class ChangeArgs( storedAsScdType: ScdType, deleteCondition: Option[Column] = None, columnSelection: Option[ColumnSelection] = None -) +) { + ChangeArgs.validateNonEmptyKeys(keys) +} + +object ChangeArgs { + /** + * Validates that [[ChangeArgs.keys]] is non-empty. Both SCD1 and SCD2 semantics require at + * least one key column to identify rows; rejecting empty key sets at construction lets + * downstream consumers rely on `keys.nonEmpty` without re-validating. + */ + private def validateNonEmptyKeys(keys: Seq[UnqualifiedColumnName]): Unit = { + if (keys.isEmpty) { + throw new AnalysisException( + errorClass = "AUTOCDC_EMPTY_KEYS", + messageParameters = Map.empty + ) + } + } +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 732930c93630d..f87a4a1da53d4 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -63,5 +63,5 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { object Scd1BatchProcessor { // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. - private val winningRowColName = "__spark_autocdc_winning_row" + private[autocdc] val winningRowColName = "__spark_autocdc_winning_row" } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 816338cb677e8..1de2120a8f915 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -362,6 +362,21 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { ) } + test("ChangeArgs rejects an empty key list") { + checkError( + exception = intercept[AnalysisException] { + ChangeArgs( + keys = Seq.empty, + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + }, + condition = "AUTOCDC_EMPTY_KEYS", + sqlState = "22023", + parameters = Map.empty + ) + } + test("UnqualifiedColumnName lets a ParseException from the SQL parser propagate") { checkError( exception = intercept[ParseException] { diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index a82323a842deb..208c0aa1e4c59 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.{functions => F, Row} +import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -291,6 +290,38 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("deduplicateMicrobatch supports an arbitrary sequencing expression") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("alt_seq", LongType) + .add("value", StringType) + + // The sequencing expression is a function call referencing multiple columns, not a bare + // identifier. Locks in that `max_by(..., changeArgs.sequencing)` evaluates the full + // expression per-row rather than treating `sequencing` as a single column reference. + val batch = microbatchOf(schema)( + // greatest(10, 30) = 30 - winner under the expression. + Row(1, 10L, 30L, "winner"), + // greatest(25, 20) = 25 - would win under `seq` alone, but loses under `greatest`. + Row(1, 25L, 20L, "would-win-on-seq-alone"), + Row(1, 15L, 15L, "always-loses") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.greatest(F.col("seq"), F.col("alt_seq")), + storedAsScdType = ScdType.Type1 + ) + ) + + checkAnswer( + df = processor.deduplicateMicrobatch(batch), + expectedAnswer = Row(1, 10L, 30L, "winner") + ) + } + test("deduplicateMicrobatch supports literal-dot column names") { val schema = new StructType() .add("user.id", IntegerType) @@ -316,6 +347,43 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test( + "deduplicateMicrobatch fails when a key column collides with the reserved name" + ) { + val reservedColName = Scd1BatchProcessor.winningRowColName + + val schema = new StructType() + .add(reservedColName, StringType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row("k1", 10L, "loser"), + Row("k1", 20L, "winner") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName(reservedColName)), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + ) + + checkError( + exception = intercept[AnalysisException] { + processor.deduplicateMicrobatch(batch).collect() + }, + condition = "AMBIGUOUS_REFERENCE", + sqlState = "42704", + parameters = Map( + "name" -> s"`$reservedColName`", + "referenceNames" -> s"[`$reservedColName`, `$reservedColName`]" + ), + context = ExpectedContext(fragment = "col", callSitePattern = "") + ) + } + test("deduplicateMicrobatch preserves the input column names, types, and ordering") { val schema = new StructType() .add("a", StringType) From a0d119864eb497648dabc69e8ec1a3b54503703f Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 20:15:09 +0000 Subject: [PATCH 10/18] validation --- .../resources/error/error-conditions.json | 6 + .../autocdc/Scd1BatchProcessor.scala | 109 +++++++- .../autocdc/Scd1BatchProcessorSuite.scala | 237 +++++++++++++++++- 3 files changed, 338 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 0c8bfcc11b25c..3bbdcc0a281b0 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -209,6 +209,12 @@ ], "sqlState" : "42703" }, + "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : { + "message" : [ + "Using column name comparison, the column `` in the schema conflicts with the reserved AutoCDC column name ``. Rename or remove the column." + ], + "sqlState" : "42710" + }, "AVRO_CANNOT_WRITE_NULL_FIELD" : { "message" : [ "Cannot write null value for field defined as non-null Avro data type .", diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index f87a4a1da53d4..d3fcc2b8e7e0f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -17,16 +17,26 @@ package org.apache.spark.sql.pipelines.autocdc -import org.apache.spark.sql.{functions => F} +import org.apache.spark.SparkException +import org.apache.spark.sql.{functions => F, AnalysisException} +import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.util.QuotingUtils import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.ArrayImplicits._ /** * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the specified [[changeArgs]] * configuration. + * + * @param changeArgs The CDC flow configuration. + * @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived + * from the flow's resolved DataFrame at flow setup time. */ -case class Scd1BatchProcessor(changeArgs: ChangeArgs) { +case class Scd1BatchProcessor( + changeArgs: ChangeArgs, + resolvedSequencingType: DataType) { + /** * Deduplicate the incoming CDC microbatch by key, keeping the most recent event per key * as ordered by [[ChangeArgs.sequencing]]. @@ -59,9 +69,102 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) { ) .select(F.col(s"$winningRowCol.*")) } + + /** + * Project the CDC metadata column onto the microbatch. + */ + def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + // Proactively validate the reserved CDC metadata column does not exist in the microbatch. + validateCdcMetadataColumnNotPresent(microbatchDf) + + val rowDeleteSequence: Column = changeArgs.deleteCondition match { + case Some(deleteCondition) => + F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null)) + case None => + F.lit(null) + } + + val rowUpsertSequence: Column = + // A row that is not a delete must be an upsert, these are mutually exclusive and a complete + // set of CDC event types. + F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null)) + + microbatchDf.withColumn( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.constructCdcMetadataCol( + deleteSequence = rowDeleteSequence, + upsertSequence = rowUpsertSequence, + sequencingType = resolvedSequencingType + ) + ) + } + + private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { + val ignoreColumnNameCase = + !microbatchDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + + microbatchDf.schema.fieldNames + .find { fieldName => + if (ignoreColumnNameCase) { + fieldName.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) + } else { + fieldName.equals(Scd1BatchProcessor.cdcMetadataColName) + } + } + .foreach { conflictingColumnName => + throw new AnalysisException( + errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of(!ignoreColumnNameCase), + "columnName" -> conflictingColumnName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } object Scd1BatchProcessor { // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. - private[autocdc] val winningRowColName = "__spark_autocdc_winning_row" + private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row" + private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata" + + private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence" + private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence" + + /** + * Schema of the CDC metadata struct column for SCD1. + */ + private def cdcMetadataColSchema(sequencingType: DataType): StructType = + StructType( + Seq( + // The sequencing of the event if it represents a delete, null otherwise. + StructField(cdcDeleteSequenceFieldName, sequencingType, nullable = true), + // The sequencing of the event if it represents an upsert, null otherwise. + StructField(cdcUpsertSequenceFieldName, sequencingType, nullable = true) + ) + ) + + /** + * Construct the CDC metadata struct column for SCD1, following the exact schema and field + * ordering defined by [[cdcMetadataColSchema]]. + */ + private[autocdc] def constructCdcMetadataCol( + deleteSequence: Column, + upsertSequence: Column, + sequencingType: DataType): Column = { + val cdcMetadataFieldsInOrder = cdcMetadataColSchema(sequencingType).fields.map { field => + val value = field.name match { + case `cdcDeleteSequenceFieldName` => deleteSequence + case `cdcUpsertSequenceFieldName` => upsertSequence + case other => + throw SparkException.internalError( + s"Unable to construct SCD1 CDC metadata column due to unknown `${other}` field." + ) + } + value.cast(field.dataType).as(field.name) + } + F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*) + } } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 208c0aa1e4c59..3975f11f9b68f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row} import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -53,7 +54,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -77,7 +79,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -102,7 +105,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // On equal sequence number events for the same key we provide no guarantee on which event will @@ -133,7 +137,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -160,7 +165,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( df = processor.deduplicateMicrobatch(batch), @@ -187,7 +193,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -217,7 +224,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // All non-key columns must come from the row with the largest sequence value, never @@ -277,7 +285,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -338,7 +347,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("`user.id`")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -402,7 +412,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) // Field names and dataTypes must match the input exactly, in the original order. @@ -424,11 +435,215 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) val result = processor.deduplicateMicrobatch(batch) assert(result.collect().isEmpty) assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) } + + test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or an upsert " + + "per deleteCondition") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, 10L, false), + Row(2, 20L, true), + Row(3, 30L, false), + Row(4, 40L, true) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = Some(F.col("is_delete") === true) + ), + resolvedSequencingType = LongType + ) + + // Mutual-exclusivity invariant: each row's _cdc_metadata struct has exactly one of + // (deleteSequence, upsertSequence) non-null, and the non-null side carries the row's + // sequence value. + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, false, Row(null, 10L)), + Row(2, 20L, true, Row(20L, null)), + Row(3, 30L, false, Row(null, 30L)), + Row(4, 40L, true, Row(40L, null)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " + + "when deleteCondition is None") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a"), + Row(2, 20L, "b") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = None + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Seq( + Row(1, 10L, "a", Row(null, 10L)), + Row(2, 20L, "b", Row(null, 20L)) + ) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata appends CDC metadata as the last column") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + // Original columns are preserved in their original order, with CDC metadata appended at + // the very end. + assert(result.schema.fieldNames.toSeq == + schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName) + } + + test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence fields to " + + "resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is IntegerType, but the flow's resolved sequencing type + // will be LongType. This should be upcasted in the projected CDC metadata column. + .add("seq", IntegerType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10, "a") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch) + + val cdcMetadataDataType = + resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq( + Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType, + Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType)) + + // The cast must also succeed at runtime: upsertSequence is materialized as a Long value, not + // an Int. + checkAnswer( + df = resultDf, + expectedAnswer = Row(1, 10, "a", Row(null, 10L)) + ) + } + + test("extendMicrobatchRowsWithCdcMetadata fails fast when the microbatch's sequencing column " + + "is incompatible with resolvedSequencingType") { + val schema = new StructType() + .add("id", IntegerType) + // Microbatch's sequencing column is a struct, whereas the flow's resolved sequencing type + // will be LongType. These are incompatible and should throw. + .add( + "seq", + new StructType() + .add("major", LongType) + .add("minor", LongType)) + + val batch = microbatchOf(schema)( + Row(1, Row(1L, 0L)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val ex = intercept[AnalysisException] { + // .schema forces analysis of the underlying logical plan, surfacing the invalid cast. + processor.extendMicrobatchRowsWithCdcMetadata(batch).schema + } + assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION") + } + + test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already contains the " + + "reserved CDC metadata column") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add(Scd1BatchProcessor.cdcMetadataColName, StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "user-supplied") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkError( + exception = intercept[AnalysisException] { + processor.extendMicrobatchRowsWithCdcMetadata(batch) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, + "columnName" -> Scd1BatchProcessor.cdcMetadataColName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } From 9a2c28fd7b5522160a61274917f5a57769a099ef Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Fri, 15 May 2026 21:31:20 +0000 Subject: [PATCH 11/18] buff scaladoc --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index d3fcc2b8e7e0f..bdc8bf1f9683b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -72,6 +72,9 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. + * + * The returned dataframe has all of the columns in the input microbatch + the CDC metadata + * column. */ def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { // Proactively validate the reserved CDC metadata column does not exist in the microbatch. From c95144d3f6390ea393e014644f004ba67bb81029 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 01:29:47 +0000 Subject: [PATCH 12/18] use spark resolver --- .../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index bdc8bf1f9683b..953f963efed4b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -103,22 +103,16 @@ case class Scd1BatchProcessor( } private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val ignoreColumnNameCase = - !microbatchDf.sparkSession.sessionState.conf.caseSensitiveAnalysis + val sqlConf = microbatchDf.sparkSession.sessionState.conf + val resolver = sqlConf.resolver microbatchDf.schema.fieldNames - .find { fieldName => - if (ignoreColumnNameCase) { - fieldName.equalsIgnoreCase(Scd1BatchProcessor.cdcMetadataColName) - } else { - fieldName.equals(Scd1BatchProcessor.cdcMetadataColName) - } - } + .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) .foreach { conflictingColumnName => throw new AnalysisException( errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(!ignoreColumnNameCase), + "caseSensitivity" -> CaseSensitivityLabels.of(sqlConf.caseSensitiveAnalysis), "columnName" -> conflictingColumnName, "schemaName" -> "microbatch", "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName From 415f90be87fa156c0adb1862ec42cce683f8f23d Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 17:10:09 +0000 Subject: [PATCH 13/18] lingint --- .../apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 953f963efed4b..1e35678a7200e 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -72,7 +72,7 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. - * + * * The returned dataframe has all of the columns in the input microbatch + the CDC metadata * column. */ From c1da259ec29c6d3cffc0d58f0bf6b990ffd5c6ae Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 18:22:41 +0000 Subject: [PATCH 14/18] rebase conflict --- .../spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 1e35678a7200e..00450bee6984b 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -103,8 +103,8 @@ case class Scd1BatchProcessor( } private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val sqlConf = microbatchDf.sparkSession.sessionState.conf - val resolver = sqlConf.resolver + val microbatchSqlConf = microbatchDf.sparkSession.sessionState.conf + val resolver = microbatchSqlConf.resolver microbatchDf.schema.fieldNames .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) @@ -112,7 +112,7 @@ case class Scd1BatchProcessor( throw new AnalysisException( errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(sqlConf.caseSensitiveAnalysis), + "caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis), "columnName" -> conflictingColumnName, "schemaName" -> "microbatch", "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName From 53382058a282fd780ce6b5da0deef06da1f2aab4 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 19 May 2026 23:01:00 +0000 Subject: [PATCH 15/18] PR feedback --- .../autocdc/Scd1BatchProcessor.scala | 8 +++ .../autocdc/Scd1BatchProcessorSuite.scala | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 00450bee6984b..5554efdafd68c 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -73,6 +73,14 @@ case class Scd1BatchProcessor( /** * Project the CDC metadata column onto the microbatch. * + * This must run before any column selection is applied to the microbatch. The + * [[ChangeArgs.deleteCondition]] and [[ChangeArgs.sequencing]] expressions are evaluated against + * the current microbatch schema, and column selection may drop inputs required by those + * expressions. + * + * Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] evaluates to true. A + * false or null delete condition classifies the row as an upsert. + * * The returned dataframe has all of the columns in the input microbatch + the CDC metadata * column. */ diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 3975f11f9b68f..3042e61838de9 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -482,6 +482,32 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } + test("extendMicrobatchRowsWithCdcMetadata treats null deleteCondition results as upserts") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, 10L, null) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.extendMicrobatchRowsWithCdcMetadata(batch), + expectedAnswer = Row(1, 10L, null, Row(null, 10L)) + ) + } + test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " + "when deleteCondition is None") { val schema = new StructType() @@ -646,4 +672,42 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } } + + test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata column " + + "case-insensitively") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val conflictingColumnName = Scd1BatchProcessor.cdcMetadataColName.toUpperCase + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add(conflictingColumnName, StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "user-supplied") + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkError( + exception = intercept[AnalysisException] { + processor.extendMicrobatchRowsWithCdcMetadata(batch) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> conflictingColumnName, + "schemaName" -> "microbatch", + "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName + ) + ) + } + } } From 02473ba9dabbbaa1633f0c2d42cc9cf18fd41e73 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 20 May 2026 21:08:50 +0000 Subject: [PATCH 16/18] rebase conflicts --- .../sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 3042e61838de9..9368205807cd4 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -255,7 +255,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -322,7 +323,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName("id")), sequencing = F.greatest(F.col("seq"), F.col("alt_seq")), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkAnswer( @@ -377,7 +379,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { keys = Seq(UnqualifiedColumnName(reservedColName)), sequencing = F.col("seq"), storedAsScdType = ScdType.Type1 - ) + ), + resolvedSequencingType = LongType ) checkError( From d018f891d4099d8fef61ee72c9a8239c31d0e09c Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 21 May 2026 01:26:59 +0000 Subject: [PATCH 17/18] Introduce AutoCdcFlow and AutoCdcMergeFlow --- .../resources/error/error-conditions.json | 10 +- .../sql/pipelines/autocdc/ChangeArgs.scala | 2 +- .../autocdc/Scd1BatchProcessor.scala | 38 +- .../spark/sql/pipelines/graph/Flow.scala | 160 ++++- .../graph/GraphRegistrationContext.scala | 2 +- .../graph/SqlGraphRegistrationContext.scala | 10 +- .../pipelines/autocdc/AutoCdcFlowSuite.scala | 550 ++++++++++++++++++ .../autocdc/Scd1BatchProcessorSuite.scala | 76 --- .../utils/TestGraphRegistrationContext.scala | 8 +- 9 files changed, 737 insertions(+), 119 deletions(-) create mode 100644 sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3bbdcc0a281b0..92ff59753526c 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -209,9 +209,15 @@ ], "sqlState" : "42703" }, - "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : { + "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : { "message" : [ - "Using column name comparison, the column `` in the schema conflicts with the reserved AutoCDC column name ``. Rename or remove the column." + "Using column name comparison, the AutoCDC key column `` is not present in the flow's selected source schema. AutoCDC requires every key column to be present in the source change-data feed and retained by any configured column selection." + ], + "sqlState" : "22023" + }, + "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : { + "message" : [ + "The column `` in the schema collides with the reserved AutoCDC column name prefix `` (using column name comparison). Rename or remove the column." ], "sqlState" : "42710" }, diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index c17c89967baa5..b975e06807f57 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -120,7 +120,7 @@ object ColumnSelection { } /** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */ -private[autocdc] object CaseSensitivityLabels { +private[pipelines] object CaseSensitivityLabels { val CaseSensitive: String = "case-sensitive" val CaseInsensitive: String = "case-insensitive" diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 5554efdafd68c..a72bb6c0ca1ce 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.SparkException -import org.apache.spark.sql.{functions => F, AnalysisException} +import org.apache.spark.sql.{functions => F} import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.util.QuotingUtils import org.apache.spark.sql.classic.DataFrame @@ -85,9 +85,6 @@ case class Scd1BatchProcessor( * column. */ def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { - // Proactively validate the reserved CDC metadata column does not exist in the microbatch. - validateCdcMetadataColumnNotPresent(microbatchDf) - val rowDeleteSequence: Column = changeArgs.deleteCondition match { case Some(deleteCondition) => F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null)) @@ -109,31 +106,18 @@ case class Scd1BatchProcessor( ) ) } - - private def validateCdcMetadataColumnNotPresent(microbatchDf: DataFrame): Unit = { - val microbatchSqlConf = microbatchDf.sparkSession.sessionState.conf - val resolver = microbatchSqlConf.resolver - - microbatchDf.schema.fieldNames - .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName)) - .foreach { conflictingColumnName => - throw new AnalysisException( - errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", - messageParameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis), - "columnName" -> conflictingColumnName, - "schemaName" -> "microbatch", - "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName - ) - ) - } - } } object Scd1BatchProcessor { - // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing. - private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row" - private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata" + /** + * Reserved column-name prefix for internal SDP AutoCDC processing. Source change-data-feed + * dataframes must not contain any columns starting with this prefix; the invariant is + * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction. + */ + private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_" + + private[autocdc] val winningRowColName: String = s"${reservedColumnNamePrefix}winning_row" + private[pipelines] val cdcMetadataColName: String = s"${reservedColumnNamePrefix}metadata" private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence" private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence" @@ -141,7 +125,7 @@ object Scd1BatchProcessor { /** * Schema of the CDC metadata struct column for SCD1. */ - private def cdcMetadataColSchema(sequencingType: DataType): StructType = + private[pipelines] def cdcMetadataColSchema(sequencingType: DataType): StructType = StructType( Seq( // The sequencing of the event if it represents a delete, null otherwise. diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index e329308502f0d..9cc8817e70891 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -20,11 +20,19 @@ package org.apache.spark.sql.pipelines.graph import scala.util.Try import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.pipelines.AnalysisWarning +import org.apache.spark.sql.pipelines.autocdc.{ + CaseSensitivityLabels, + ChangeArgs, + ColumnSelection, + Scd1BatchProcessor, + ScdType +} import org.apache.spark.sql.pipelines.util.InputReadOptions -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} /** * Contains the catalog and database context information for query execution. @@ -121,7 +129,21 @@ case class FlowFunctionResult( } /** A [[Flow]] whose output schema and dependencies aren't known. */ -case class UnresolvedFlow( +sealed trait UnresolvedFlow extends Flow { + /** Returns a copy of this flow with the given SQL confs overriding the existing ones. */ + def withSqlConf(newSqlConf: Map[String, String]): UnresolvedFlow +} + +/** + * An [[UnresolvedFlow]] whose execution-type has not yet been determined. + * + * In some cases, we know the execution-type for an [[UnresolvedFlow]] even before flow analysis + * and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a + * flow will be an AutoCDC flow immediately on construction, because it has its own special + * registration API. Such flows are considered "typed flows", but there isn't any semantic reason + * yet to explicitly introduce a `TypedFlow` trait/class. + */ +case class UntypedFlow( identifier: TableIdentifier, destinationIdentifier: TableIdentifier, func: FlowFunction, @@ -129,7 +151,34 @@ case class UnresolvedFlow( sqlConf: Map[String, String], override val once: Boolean, override val origin: QueryOrigin -) extends Flow +) extends UnresolvedFlow { + override def withSqlConf(newSqlConf: Map[String, String]): UntypedFlow = + copy(sqlConf = newSqlConf) +} + +/** + * An unresolved but typed that applies a CDC event stream to a target table via MERGE. + * + * [[AutoCdcFlow]] is a typed flow because it is only supported for streaming, and not as a once + * flow. Therefore by definition it is a streaming-type flow. + * + * In the future once-support for [[AutoCdcFlow]] may be added. + */ +case class AutoCdcFlow( + identifier: TableIdentifier, + destinationIdentifier: TableIdentifier, + func: FlowFunction, + queryContext: QueryContext, + sqlConf: Map[String, String] = Map.empty, + comment: Option[String] = None, + override val origin: QueryOrigin, + changeArgs: ChangeArgs +) extends UnresolvedFlow { + override val once: Boolean = false + + override def withSqlConf(newSqlConf: Map[String, String]): AutoCdcFlow = + copy(sqlConf = newSqlConf) +} /** * A [[Flow]] whose flow function has been invoked, meaning either: @@ -194,3 +243,108 @@ class AppendOnceFlow( override val once = true } + +/** + * A resolved flow that applies a CDC event stream to a target table via MERGE, in accordance to + * the configured [[flow.changeArgs]]. + */ +class AutoCdcMergeFlow( + val flow: AutoCdcFlow, + val funcResult: FlowFunctionResult +) extends ResolvedFlow { + requireReservedPrefixAbsentInSourceColumns() + + def changeArgs: ChangeArgs = flow.changeArgs + + /** + * Returns the augmented output schema of this flow, which can differ from the schema of the + * source change-data-feed dataframe. + * + * The source dataframe's schema describes the incoming CDC events; the augmented schema here + * applies the user-specified [[ColumnSelection]] and appends the SCD-specific metadata + * columns that the AutoCDC MERGE engine projects onto the target table. Downstream + * dependencies in the pipeline see this augmented schema. + */ + override val schema: StructType = { + val userSelectedSchema = ColumnSelection.applyToSchema( + schemaName = "changeDataFeed", + schema = df.schema, + columnSelection = changeArgs.columnSelection, + caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis + ) + + // AutoCDC flows require all key columns to be present in the target table, to adhere to SCD + // semantics. + requireKeysPresentInSelectedSchema(userSelectedSchema) + + changeArgs.storedAsScdType match { + case ScdType.Type1 => + // SCD1 produces a target table with all the user-selected output columns and a projected + // CDC operational metadata column at the end. + StructType( + userSelectedSchema.fields :+ StructField( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.cdcMetadataColSchema( + sequencingType = df.select(changeArgs.sequencing).schema.head.dataType + ), + nullable = false + ) + ) + case ScdType.Type2 => + throw new UnsupportedOperationException( + "AutoCDC flows do not currently support SCD Type 2 transformations." + ) + } + } + + /** + * Validate that the resolved source dataframe for the AutoCDC flow does not contain any column + * names that use the reserved Spark AutoCDC prefix. + */ + private def requireReservedPrefixAbsentInSourceColumns(): Unit = { + val resolver = spark.sessionState.conf.resolver + val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix + + def nameContainsReservedPrefix(name: String): Boolean = { + name.length >= reservedPrefix.length && resolver( + name.substring(0, reservedPrefix.length), + reservedPrefix + ) + } + + df.schema.fieldNames.find(nameContainsReservedPrefix).foreach { conflictingColumnName => + throw new AnalysisException( + errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of( + spark.sessionState.conf.caseSensitiveAnalysis + ), + "columnName" -> conflictingColumnName, + "schemaName" -> "changeDataFeed", + "reservedColumnNamePrefix" -> reservedPrefix + ) + ) + } + } + + /** + * Validate all keys specified in changeArgs are actually present in the user-selected schema. + */ + private def requireKeysPresentInSelectedSchema(selectedSchema: StructType): Unit = { + val resolver = spark.sessionState.conf.resolver + + changeArgs.keys + .find(key => !selectedSchema.fieldNames.exists(name => resolver(name, key.name))) + .foreach { missingKey => + throw new AnalysisException( + errorClass = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA", + messageParameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.of( + spark.sessionState.conf.caseSensitiveAnalysis + ), + "keyColumnName" -> missingKey.name + ) + ) + } + } +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala index dadda0561b19f..970fdb4b70e94 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala @@ -59,7 +59,7 @@ class GraphRegistrationContext( } def registerFlow(flowDef: UnresolvedFlow): Unit = { - flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf) + flows += flowDef.withSqlConf(defaultSqlConf ++ flowDef.sqlConf) } private def isEmpty: Boolean = { diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala index 829179142dc5c..4dfd096935781 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala @@ -237,7 +237,7 @@ class SqlGraphRegistrationContext( // Register flow that backs this streaming table. graphRegistrationContext.registerFlow( - UnresolvedFlow( + UntypedFlow( identifier = stIdentifier, destinationIdentifier = stIdentifier, func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cst.query), @@ -288,7 +288,7 @@ class SqlGraphRegistrationContext( // Register flow that backs this materialized view. graphRegistrationContext.registerFlow( - UnresolvedFlow( + UntypedFlow( identifier = mvIdentifier, destinationIdentifier = mvIdentifier, func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cmv.query), @@ -331,7 +331,7 @@ class SqlGraphRegistrationContext( // Register flow that backs this persisted view. graphRegistrationContext.registerFlow( - UnresolvedFlow( + UntypedFlow( identifier = viewIdentifier, destinationIdentifier = viewIdentifier, func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cv.query), @@ -375,7 +375,7 @@ class SqlGraphRegistrationContext( // Register flow definition that backs this temporary view. graphRegistrationContext.registerFlow( - UnresolvedFlow( + UntypedFlow( identifier = viewIdentifier, destinationIdentifier = viewIdentifier, func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cvc.plan), @@ -451,7 +451,7 @@ class SqlGraphRegistrationContext( .identifier graphRegistrationContext.registerFlow( - UnresolvedFlow( + UntypedFlow( identifier = flowIdentifier, destinationIdentifier = qualifiedDestinationIdentifier, func = FlowAnalysis.createFlowFunctionFromLogicalPlan(flowQueryLogicalPlan), diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala new file mode 100644 index 0000000000000..c741062c1b6c6 --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import java.util.Locale + +import scala.util.Success + +import org.apache.spark.sql.{functions => F, AnalysisException, Column, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.pipelines.graph.{ + AutoCdcFlow, + AutoCdcMergeFlow, + FlowFunction, + FlowFunctionResult, + Input, + QueryContext, + QueryOrigin +} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StringType, StructField, StructType} + +/** + * Unit tests for the [[AutoCdcFlow]] data class and the augmented schema computed by + * [[AutoCdcMergeFlow]]. The tests stop at the data-class / schema surface; they do not + * exercise the full pipeline-graph resolution machinery (which is not yet wired up to AutoCDC + * flows). + */ +class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { + + private val testIdentifier = TableIdentifier("cdc_target", Some("db")) + + /** A no-op [[FlowFunction]] that throws if invoked; AutoCdcFlow tests should never call it. */ + private val noOpFlowFunction: FlowFunction = new FlowFunction { + override def call( + allInputs: Set[TableIdentifier], + availableInputs: Seq[Input], + configuration: Map[String, String], + queryContext: QueryContext, + queryOrigin: QueryOrigin): FlowFunctionResult = + throw new UnsupportedOperationException( + "noOpFlowFunction.call should not be invoked from AutoCdcFlowSuite tests" + ) + } + + private val testQueryContext = + QueryContext(currentCatalog = Some("test_catalog"), currentDatabase = Some("test_db")) + + private val testChangeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type1 + ) + + private def newAutoCdcFlow( + identifier: TableIdentifier = testIdentifier, + destinationIdentifier: TableIdentifier = testIdentifier, + func: FlowFunction = noOpFlowFunction, + queryContext: QueryContext = testQueryContext, + sqlConf: Map[String, String] = Map.empty, + comment: Option[String] = None, + origin: QueryOrigin = QueryOrigin.empty, + changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = { + AutoCdcFlow( + identifier = identifier, + destinationIdentifier = destinationIdentifier, + func = func, + queryContext = queryContext, + sqlConf = sqlConf, + comment = comment, + origin = origin, + changeArgs = changeArgs + ) + } + + test("AutoCdcFlow exposes its constructor fields") { + val flow = newAutoCdcFlow( + sqlConf = Map("spark.sql.shuffle.partitions" -> "8"), + comment = Some("my CDC flow") + ) + + assert(flow.identifier == testIdentifier) + assert(flow.destinationIdentifier == testIdentifier) + assert(flow.func eq noOpFlowFunction) + assert(flow.queryContext == testQueryContext) + assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8")) + assert(flow.comment.contains("my CDC flow")) + assert(flow.origin == QueryOrigin.empty) + assert(flow.changeArgs == testChangeArgs) + } + + test("AutoCdcFlow defaults sqlConf to empty and comment to None") { + // Confirms the case-class default values match the documented contract; downstream + // registration code relies on `sqlConf` being a non-null empty map by default so that + // `defaultSqlConf ++ flowDef.sqlConf` is well-defined in [[GraphRegistrationContext]]. + val flow = AutoCdcFlow( + identifier = testIdentifier, + destinationIdentifier = testIdentifier, + func = noOpFlowFunction, + queryContext = testQueryContext, + origin = QueryOrigin.empty, + changeArgs = testChangeArgs + ) + + assert(flow.sqlConf.isEmpty) + assert(flow.comment.isEmpty) + } + + test("AutoCdcFlow.once is always false") { + // AutoCDC flows are streaming-only and must run on every batch trigger, never as a + // one-shot full-refresh-style flow. Locking this in so a future refactor doesn't + // accidentally make `once` configurable. + + // In the future we may intentionally add [[once]] support for AutoCDC flows, at which point + // this test can safely be removed. + val flow = newAutoCdcFlow() + assert(!flow.once) + } + + test("AutoCdcFlow.withSqlConf returns a new instance with the updated sqlConf") { + val original = newAutoCdcFlow(sqlConf = Map("a" -> "1")) + val updated = original.withSqlConf(Map("b" -> "2")) + + assert(updated.sqlConf == Map("b" -> "2")) + // All other fields should be preserved verbatim. + assert(updated.identifier == original.identifier) + assert(updated.destinationIdentifier == original.destinationIdentifier) + assert(updated.func eq original.func) + assert(updated.queryContext == original.queryContext) + assert(updated.comment == original.comment) + assert(updated.origin == original.origin) + assert(updated.changeArgs == original.changeArgs) + // The original must not be mutated. + assert(original.sqlConf == Map("a" -> "1")) + } + + // =========================================================================================== + // AutoCdcMergeFlow.schema tests + // =========================================================================================== + + /** Materializes a successful [[FlowFunctionResult]] backed by the given source dataframe. */ + private def successfulFuncResult(sourceDf: DataFrame): FlowFunctionResult = + FlowFunctionResult( + requestedInputs = Set.empty, + batchInputs = Set.empty, + streamingInputs = Set.empty, + usedExternalInputs = Set.empty, + dataFrame = Success(sourceDf), + sqlConf = Map.empty + ) + + /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change args. */ + private def newAutoCdcMergeFlow( + sourceDf: DataFrame, + keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")), + sequencing: Column = F.col("seq"), + storedAsScdType: ScdType = ScdType.Type1, + columnSelection: Option[ColumnSelection] = None): AutoCdcMergeFlow = { + val flow = newAutoCdcFlow( + changeArgs = ChangeArgs( + keys = keys, + sequencing = sequencing, + storedAsScdType = storedAsScdType, + columnSelection = columnSelection + ) + ) + new AutoCdcMergeFlow(flow, successfulFuncResult(sourceDf)) + } + + /** A stable 3-column source CDF schema used across most schema tests. */ + private def threeColumnSourceDf(): DataFrame = { + val schema = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType) + .add("seq", LongType) + spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], schema) + } + + /** Convenience to extract the [[StructType]] of the projected `_cdc_metadata` column. */ + private def cdcMetadataStruct(schema: StructType): StructType = + schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + + test( + "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when no " + + "columnSelection is set" + ) { + val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) + + val expected = new StructType() + .add("id", IntegerType, nullable = false) + .add("name", StringType) + .add("seq", LongType) + .add( + StructField( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.cdcMetadataColSchema(LongType), + nullable = false + ) + ) + assert(resolvedFlow.schema == expected) + } + + test("AutoCdcMergeFlow.schema applies an IncludeColumns selection") { + val resolvedFlow = newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq")) + ) + ) + ) + + val expected = new StructType() + .add("id", IntegerType, nullable = false) + .add("seq", LongType) + .add( + StructField( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.cdcMetadataColSchema(LongType), + nullable = false + ) + ) + assert(resolvedFlow.schema == expected) + } + + test("AutoCdcMergeFlow.schema applies an ExcludeColumns selection") { + val resolvedFlow = newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name"))) + ) + ) + + val expected = new StructType() + .add("id", IntegerType, nullable = false) + .add("seq", LongType) + .add( + StructField( + Scd1BatchProcessor.cdcMetadataColName, + Scd1BatchProcessor.cdcMetadataColSchema(LongType), + nullable = false + ) + ) + assert(resolvedFlow.schema == expected) + } + + test( + "AutoCdcMergeFlow.schema's _cdc_metadata struct uses the resolved sequencing data type" + ) { + // Source has a Long `seq` column; sequencing is `cast(seq as int)`, so the projected + // `_cdc_metadata` fields should be Int (not Long), demonstrating that the sequencing + // expression's *resolved* type drives the metadata schema. + val resolvedFlow = newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + sequencing = F.col("seq").cast(IntegerType) + ) + + val metaStruct = cdcMetadataStruct(resolvedFlow.schema) + assert(metaStruct == Scd1BatchProcessor.cdcMetadataColSchema(IntegerType)) + } + + test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with nullable inner fields") { + val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) + + val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName) + assert(!metaField.nullable, "_cdc_metadata column itself must be non-null") + + val metaStruct = metaField.dataType.asInstanceOf[StructType] + assert(metaStruct(Scd1BatchProcessor.cdcDeleteSequenceFieldName).nullable) + assert(metaStruct(Scd1BatchProcessor.cdcUpsertSequenceFieldName).nullable) + } + + test("AutoCdcMergeFlow.schema is stable across reads") { + // The schema computation calls `df.select(sequencing).schema`, which triggers Spark + // analysis. The eagerly-initialized `val` caches the result so downstream consumers get + // a stable schema instance across reads. + val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) + val first = resolvedFlow.schema + val second = resolvedFlow.schema + assert(first eq second, "schema should be cached as a val and return the same instance") + } + + test("AutoCdcMergeFlow rejects SCD2 at construction with UnsupportedOperationException") { + // Constructing the flow forces the resolved schema, which is unsupported for SCD2 today. + // Failing eagerly (rather than deferring to the first downstream `schema` read) is the + // intended UX -- pipeline graph analysis should not be able to register an SCD2 AutoCDC + // flow at all. + val ex = intercept[UnsupportedOperationException] { + newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + storedAsScdType = ScdType.Type2 + ) + } + assert( + ex.getMessage.contains("AutoCDC flows do not currently support SCD Type 2 transformations.") + ) + } + + // =========================================================================================== + // AutoCdcMergeFlow reserved-prefix validation tests + // + // The two "contract:" tests below lock in the high-level invariant that no reserved-prefix + // column name can be referenced anywhere -- not in the source change-data feed schema, and + // not in user-supplied [[ChangeArgs]] (keys or columnSelection). Together they ensure that + // (a) users cannot opt out of the reserved CDC metadata column by omitting it from the + // selected schema, and (b) users cannot opt in to (or out of) any other reserved-prefix + // name we may reserve in the future for an internal CDC concern. + // + // The remaining tests pin down case-sensitivity nuances of the source-schema validator. + // =========================================================================================== + + /** Builds an empty source df with `id` + `seq` + the supplied extra columns. */ + private def sourceDfWithExtraColumns(extraColumns: (String, DataType)*): DataFrame = { + val schema = extraColumns.foldLeft( + new StructType().add("id", IntegerType, nullable = false).add("seq", LongType) + ) { case (acc, (name, dt)) => acc.add(name, dt) } + spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], schema) + } + + test( + "Contract: a source df column with the reserved AutoCDC prefix is rejected at flow " + + "construction" + ) { + val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo" + val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType) + + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow(sourceDf) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> conflictingName, + "schemaName" -> "changeDataFeed", + "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + ) + ) + } + + test( + "Contract: ChangeArgs referencing a reserved-prefix column is rejected even when the " + + "source df is clean" + ) { + // The source df has no reserved-prefix columns, but referencing a reserved-prefix column + // from any ChangeArgs path still fails at construction with a different error. The + // reservation is on the name itself, not on its presence in the source feed. + val cleanSourceDf = threeColumnSourceDf() + val reservedName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo" + + val keysEx = intercept[AnalysisException] { + newAutoCdcMergeFlow( + sourceDf = cleanSourceDf, + keys = Seq(UnqualifiedColumnName(reservedName)) + ) + } + assert(keysEx.getCondition == "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA") + + val includeEx = intercept[AnalysisException] { + newAutoCdcMergeFlow( + sourceDf = cleanSourceDf, + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName(reservedName)) + ) + ) + ) + } + assert(includeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA") + + val excludeEx = intercept[AnalysisException] { + newAutoCdcMergeFlow( + sourceDf = cleanSourceDf, + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName(reservedName))) + ) + ) + } + assert(excludeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA") + } + + test("AutoCdcMergeFlow rejects a source df column starting with the reserved prefix") { + // Default Spark session is case-insensitive (`spark.sql.caseSensitive = false`). + val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo" + val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType) + + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow(sourceDf) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> conflictingName, + "schemaName" -> "changeDataFeed", + "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + ) + ) + } + + test( + "AutoCdcMergeFlow rejects a source df column whose name equals the reserved CDC " + + "metadata column" + ) { + // Locks in the previous engine-level guard (Scd1BatchProcessor.extendMicrobatchRowsWith + // CdcMetadata) at flow-construction time. Any future regression where a user-supplied + // CDC stream carries the reserved metadata column name should fail eagerly here. + val sourceDf = sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType) + + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow(sourceDf) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> Scd1BatchProcessor.cdcMetadataColName, + "schemaName" -> "changeDataFeed", + "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + ) + ) + } + + test( + "AutoCdcMergeFlow rejects an uppercase reserved-prefix column when caseSensitive=false" + ) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val conflictingName = + s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT) + val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType) + + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow(sourceDf) + }, + condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT", + sqlState = "42710", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "columnName" -> conflictingName, + "schemaName" -> "changeDataFeed", + "reservedColumnNamePrefix" -> Scd1BatchProcessor.reservedColumnNamePrefix + ) + ) + } + } + + test( + "AutoCdcMergeFlow allows an uppercase reserved-prefix column when caseSensitive=true" + ) { + // Under case-sensitive analysis, the uppercase variant is a distinct identifier and does + // not collide with the lowercase reserved namespace. Locks in that the validation respects + // `spark.sql.caseSensitive`, consistent with the schema-augmentation logic in this class. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val nonConflictingName = + s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT) + val sourceDf = sourceDfWithExtraColumns(nonConflictingName -> StringType) + + // No exception expected: construction succeeds. + newAutoCdcMergeFlow(sourceDf) + } + } + + // =========================================================================================== + // AutoCdcMergeFlow keys-presence validation tests (requireKeysPresentInSelectedSchema) + // =========================================================================================== + + test("AutoCdcMergeFlow rejects a key that is not present in the source change-data feed") { + // No columnSelection: the post-selection schema equals the source schema. The key `id` + // is absent from the source df entirely, so the validator must surface a CDC-specific + // error rather than deferring to Spark's generic UNRESOLVED_COLUMN. + val schema = new StructType() + .add("name", StringType) + .add("seq", LongType) + val sourceDf = + spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], schema) + + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow(sourceDf) + }, + condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA", + sqlState = "22023", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "keyColumnName" -> "id" + ) + ) + } + + test("AutoCdcMergeFlow rejects a key dropped by an IncludeColumns selection") { + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + columnSelection = Some( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("seq")) + ) + ) + ) + }, + condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA", + sqlState = "22023", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "keyColumnName" -> "id" + ) + ) + } + + test("AutoCdcMergeFlow rejects a key dropped by an ExcludeColumns selection") { + checkError( + exception = intercept[AnalysisException] { + newAutoCdcMergeFlow( + sourceDf = threeColumnSourceDf(), + columnSelection = Some( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id"))) + ) + ) + }, + condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA", + sqlState = "22023", + parameters = Map( + "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, + "keyColumnName" -> "id" + ) + ) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 9368205807cd4..bc83a4fece65e 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.pipelines.autocdc import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row} import org.apache.spark.sql.classic.DataFrame -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -638,79 +637,4 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { } assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION") } - - test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already contains the " + - "reserved CDC metadata column") { - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val schema = new StructType() - .add("id", IntegerType) - .add("seq", LongType) - .add(Scd1BatchProcessor.cdcMetadataColName, StringType) - - val batch = microbatchOf(schema)( - Row(1, 10L, "user-supplied") - ) - - val processor = Scd1BatchProcessor( - changeArgs = ChangeArgs( - keys = Seq(UnqualifiedColumnName("id")), - sequencing = F.col("seq"), - storedAsScdType = ScdType.Type1 - ), - resolvedSequencingType = LongType - ) - - checkError( - exception = intercept[AnalysisException] { - processor.extendMicrobatchRowsWithCdcMetadata(batch) - }, - condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", - sqlState = "42710", - parameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive, - "columnName" -> Scd1BatchProcessor.cdcMetadataColName, - "schemaName" -> "microbatch", - "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName - ) - ) - } - } - - test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata column " + - "case-insensitively") { - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - val conflictingColumnName = Scd1BatchProcessor.cdcMetadataColName.toUpperCase - val schema = new StructType() - .add("id", IntegerType) - .add("seq", LongType) - .add(conflictingColumnName, StringType) - - val batch = microbatchOf(schema)( - Row(1, 10L, "user-supplied") - ) - - val processor = Scd1BatchProcessor( - changeArgs = ChangeArgs( - keys = Seq(UnqualifiedColumnName("id")), - sequencing = F.col("seq"), - storedAsScdType = ScdType.Type1 - ), - resolvedSequencingType = LongType - ) - - checkError( - exception = intercept[AnalysisException] { - processor.extendMicrobatchRowsWithCdcMetadata(batch) - }, - condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT", - sqlState = "42710", - parameters = Map( - "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, - "columnName" -> conflictingColumnName, - "schemaName" -> "microbatch", - "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName - ) - ) - } - } } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala index 9ff92ee895b1d..f5bdf87a6cc62 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{LocalTempView, PersistedView => PersistedViewType, UnresolvedRelation, ViewType} import org.apache.spark.sql.classic.{DataFrame, SparkSession} -import org.apache.spark.sql.pipelines.graph.{DataflowGraph, FlowAnalysis, FlowFunction, GraphIdentifierManager, GraphRegistrationContext, PersistedView, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, Table, TemporaryView, UnresolvedFlow} +import org.apache.spark.sql.pipelines.graph.{DataflowGraph, FlowAnalysis, FlowFunction, GraphIdentifierManager, GraphRegistrationContext, PersistedView, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, Table, TemporaryView, UntypedFlow} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -176,7 +176,7 @@ class TestGraphRegistrationContext( if (query.isDefined) { registerFlow( - new UnresolvedFlow( + UntypedFlow( identifier = qualifiedIdentifier, destinationIdentifier = qualifiedIdentifier, func = query.get, @@ -267,7 +267,7 @@ class TestGraphRegistrationContext( ) registerFlow( - new UnresolvedFlow( + UntypedFlow( identifier = viewIdentifier, destinationIdentifier = viewIdentifier, func = query, @@ -339,7 +339,7 @@ class TestGraphRegistrationContext( } registerFlow( - new UnresolvedFlow( + UntypedFlow( identifier = flowIdentifier, destinationIdentifier = flowDestinationIdentifier, func = query, From 02f656a1d9e49e80b15e7a11400579bf96a475da Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Thu, 21 May 2026 16:10:27 +0000 Subject: [PATCH 18/18] add AutoCdcFlow -> AutoCdcMergeFlow resolution --- .../graph/CoreDataflowNodeProcessor.scala | 15 +++++++-- .../spark/sql/pipelines/graph/Flow.scala | 2 +- .../graph/ConnectValidPipelineSuite.scala | 33 +++++++++++++++++++ 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala index 38fde0bfec4a1..1d2b4ef8a0be5 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala @@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) { } else { f } - convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult) + transformUnresolvedFlowToResolvedFlow(flowToResolve, maybeNewFuncResult) // If the flow failed due to an UnresolvedDatasetException, it means that one of the // flow's inputs wasn't available. After other flows are resolved, these inputs @@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) { } } - private def convertResolvedToTypedFlow( + private def transformUnresolvedFlowToResolvedFlow( flow: UnresolvedFlow, funcResult: FlowFunctionResult): ResolvedFlow = { + flow match { + case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult) + case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf, funcResult) + } + } + + private def transformUntypedFlowToResolvedFlow( + flow: UntypedFlow, + funcResult: FlowFunctionResult): ResolvedFlow = { flow match { case _ if flow.once => new AppendOnceFlow(flow, funcResult) case _ if funcResult.dataFrame.get.isStreaming => @@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) { // then get their results overwritten. val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size > 1 new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend) - case _: UnresolvedFlow => new CompleteFlow(flow, funcResult) + case _ => new CompleteFlow(flow, funcResult) } } } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index 9cc8817e70891..8cf436693c60a 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -136,7 +136,7 @@ sealed trait UnresolvedFlow extends Flow { /** * An [[UnresolvedFlow]] whose execution-type has not yet been determined. - * + * * In some cases, we know the execution-type for an [[UnresolvedFlow]] even before flow analysis * and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a * flow will be an AutoCDC flow immediately on construction, because it has its own special diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala index 3ac3c09017506..3c7db2cca889e 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.Union import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ScdType, UnqualifiedColumnName} import org.apache.spark.sql.pipelines.utils.{PipelineTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -509,6 +510,38 @@ class ConnectValidPipelineSuite extends PipelineTest with SharedSparkSession { assert(g.flow(TableIdentifier("sink_flow")).isInstanceOf[StreamingFlow]) } + test("AutoCdcFlow registers and resolves to AutoCdcMergeFlow") { + val session = spark + import session.implicits._ + + val P = new TestGraphRegistrationContext(spark) { + val mem = MemoryStream[Int] + val cdcEvents = mem.toDF().select($"value" as "id", $"value" as "seq") + registerTable("target") + registerFlow( + AutoCdcFlow( + identifier = fullyQualifiedIdentifier("auto_cdc_flow"), + destinationIdentifier = fullyQualifiedIdentifier("target"), + func = dfFlowFunc(cdcEvents), + queryContext = QueryContext( + currentCatalog = Some(TestGraphRegistrationContext.DEFAULT_CATALOG), + currentDatabase = Some(TestGraphRegistrationContext.DEFAULT_DATABASE) + ), + origin = QueryOrigin.empty, + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = $"seq", + storedAsScdType = ScdType.Type1 + ) + ) + ) + } + val g = P.resolveToDataflowGraph() + assert( + g.flow(fullyQualifiedIdentifier("auto_cdc_flow")).isInstanceOf[AutoCdcMergeFlow] + ) + } + /** Verifies the [[DataflowGraph]] has the specified [[Flow]] with the specified schema. */ private def verifyFlowSchema( pipeline: DataflowGraph,