From 6e73cc5f41ca16c06b65c5b2aa31296a4df00e29 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Thu, 23 Apr 2026 10:04:33 +0000 Subject: [PATCH 1/6] [SPARK-55668][SQL] Add ResolveChangelogTable analyzer rule for batch CDC post-processing Post-process a resolved `DataSourceV2Relation(ChangelogTable)` to inject carry-over removal and/or update detection, fused into a single pass over a `(rowId, _commit_version)`-partitioned Window. Also reject streaming CDC reads that would require post-processing, to prevent silent wrong results. - `ResolveChangelogTable` analyzer rule: - Batch: carry-over removal is a `Filter` on the Window (drops CoW pairs where `min(rowVersion) == max(rowVersion)`); update detection is a `CASE WHEN` over delete/insert counts (relabels pairs as `update_preimage` / `update_postimage`). Fused into a single Window. - Streaming: throws `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` when the requested options would need post-processing. Streams that don't need post-processing pass through unchanged. - Net changes: throws `INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED` for both batch and streaming. - Option validation: throws `INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL` when `computeUpdates = true` is combined with a carry-over-surfacing connector and `deduplicationMode = none`, which would silently misclassify carry-overs as updates. - Runtime guard: the generated plan raises `INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION` when the connector emits more than one delete or insert for the same `(rowId, _commit_version)` partition. - `Analyzer`: register the rule after `ResolveRelations`. - `InMemoryChangelogCatalog`: `ChangelogProperties` extension so tests can configure post-processing scenarios. Actual streaming post-processing implementation and net change computation are scoped to follow-up PRs. --- .../resources/error/error-conditions.json | 20 + .../sql/catalyst/analysis/Analyzer.scala | 1 + .../analysis/ResolveChangelogTable.scala | 315 ++++++ .../sql/errors/QueryCompilationErrors.scala | 19 + .../catalog/InMemoryChangelogCatalog.scala | 71 +- .../connector/ChangelogResolutionSuite.scala | 76 +- ...lveChangelogTablePostProcessingSuite.scala | 945 ++++++++++++++++++ 7 files changed, 1436 insertions(+), 11 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e13e8104dd539..464fab3e940e0 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3278,6 +3278,26 @@ "message" : [ "`startingVersion` is required when `endingVersion` is specified for CDC queries." ] + }, + "NET_CHANGES_NOT_YET_SUPPORTED" : { + "message" : [ + "The `deduplicationMode = netChanges` option on connector `` is not yet supported. Use `deduplicationMode = dropCarryovers` (default) or `deduplicationMode = none` instead." + ] + }, + "STREAMING_POST_PROCESSING_NOT_SUPPORTED" : { + "message" : [ + "Change Data Capture (CDC) streaming reads on connector `` do not yet support post-processing (carry-over removal, update detection, or net change computation). The requested combination of options would require post-processing, which is currently only available for batch reads. Use a batch read, or set `deduplicationMode = none` and `computeUpdates = false` to receive raw change rows in streaming." + ] + }, + "UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL" : { + "message" : [ + "`computeUpdates` cannot be used with `deduplicationMode=none` on connector `` because the connector emits copy-on-write carry-over pairs (`containsCarryoverRows()` returns true) that would be silently mislabeled as updates. Set `deduplicationMode` to `dropCarryovers` or `netChanges`." + ] + }, + "UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION" : { + "message" : [ + "Connector emitted multiple delete or insert rows for the same `(rowId, _commit_version)` partition. The `Changelog` contract requires at most one logical change per row identity per commit when `containsIntermediateChanges() = false`. Either fix the connector to deduplicate intermediate states, or set `containsIntermediateChanges() = true` and use `deduplicationMode = netChanges`." + ] } }, "sqlState" : "42K03" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c09361969a9e4..00a47a7341e53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -445,6 +445,7 @@ class Analyzer( new ResolveCatalogs(catalogManager) :: ResolveInsertInto :: ResolveRelations :: + ResolveChangelogTable :: ResolvePartitionSpec :: ResolveFieldNameAndPosition :: AddMetadataColumns :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala new file mode 100644 index 0000000000000..9cc74b6dc8f3f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Max, Min} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} +import org.apache.spark.sql.types.{IntegerType, StringType} + +/** + * Post-processes a resolved [[ChangelogTable]] read to apply CDC option semantics + * (carry-over removal, update detection) and to enforce supported option combinations. + * + * Fires after [[ResolveRelations]] has wrapped the connector's [[Changelog]] in a + * [[ChangelogTable]]. Both batch ([[DataSourceV2Relation]]) and streaming + * ([[StreamingRelationV2]]) reads are handled: + * - Batch: the requested post-processing passes are injected as logical operators on top + * of the relation. Carry-over removal and update detection are fused into a single + * pass over a (rowId, _commit_version)-partitioned Window: the Filter drops CoW + * carry-over pairs (same rowVersion on both sides) and the subsequent Project relabels + * real delete+insert pairs as update_preimage / update_postimage. + * - Streaming: post-processing is not yet supported. If the requested options would + * require any post-processing, the rule throws an explicit [[AnalysisException]] to + * prevent silent wrong results. Streams that don't require post-processing pass + * through unchanged. + * + * Net change computation (`deduplicationMode = netChanges`) is not yet implemented and + * is rejected up-front for both batch and streaming. + */ +object ResolveChangelogTable extends Rule[LogicalPlan] { + + private val CHANGELOG_TRANSFORMED_TAG = + TreeNodeTag[Boolean]("changelog_transformed") + + private object HelperColumn { + final val DelCnt = "_del_cnt" + final val InsCnt = "_ins_cnt" + final val MinRv = "_min_rv" + final val MaxRv = "_max_rv" + + val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv) + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (isAlreadyTransformed(plan)) return plan + var updatedPlan = plan + updatedPlan = plan.resolveOperatorsUp { + case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) => + val changelog = table.changelog + val req = evaluateRequirements(changelog, table.changelogInfo) + + var updatedRel: LogicalPlan = rel + if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { + updatedRel = addRowLevelPostProcessing( + rel, changelog, req.requiresCarryOverRemoval, req.requiresUpdateDetection) + } + if (req.requiresNetChanges) { + updatedRel = injectNetChangeComputation(updatedRel, changelog) + } + updatedRel + + case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, _) => + // Streaming CDC reads do not yet apply post-processing. Run the same option / + // capability validation as the batch path so silent wrong results are impossible: + // either no post-processing would be required (fall through, return raw stream), + // or we throw an explicit AnalysisException. + val changelog = table.changelog + val req = evaluateRequirements(changelog, table.changelogInfo) + if (req.needsAny) { + throw QueryCompilationErrors.cdcStreamingPostProcessingNotSupported(changelog.name()) + } + rel + } + if (updatedPlan ne plan) { + updatedPlan.setTagValue(CHANGELOG_TRANSFORMED_TAG, true) + } + updatedPlan + } + + // --------------------------------------------------------------------------- + // Option validation & Requirement Computation + // --------------------------------------------------------------------------- + + /** + * Captures which post-processing passes a CDC query requires, derived from the + * user-provided [[ChangelogInfo]] options and the connector-declared [[Changelog]] + * capability flags. + */ + private case class PostProcessingRequirements( + requiresCarryOverRemoval: Boolean, + requiresUpdateDetection: Boolean, + requiresNetChanges: Boolean) { + def needsAny: Boolean = + requiresCarryOverRemoval || requiresUpdateDetection || requiresNetChanges + } + + /** + * Validates CDC option/capability combinations and computes which post-processing + * passes are required. Throws an [[org.apache.spark.sql.AnalysisException]] for + * unsupported or contradictory combinations (currently: `netChanges` deduplication, + * and `computeUpdates` with surfaced carry-overs but no carry-over removal). + */ + private def evaluateRequirements( + changelog: Changelog, + options: ChangelogInfo): PostProcessingRequirements = { + // Net change computation is not yet implemented. + if (options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES) { + throw QueryCompilationErrors.cdcNetChangesNotYetSupported(changelog.name()) + } + + val requiresCarryOverRemoval = + options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && + changelog.containsCarryoverRows() + val requiresUpdateDetection = + options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert() + val requiresNetChanges = + options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES && + changelog.containsIntermediateChanges() + + // If carry-overs are surfaced and update detection is enabled without carry-over + // removal, carry-overs would be falsely classified as updates, leading to wrong + // results. Hence we throw. + if (requiresUpdateDetection && + changelog.containsCarryoverRows() && + options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) { + throw QueryCompilationErrors.cdcUpdateDetectionRequiresCarryOverRemoval( + changelog.name()) + } + + PostProcessingRequirements( + requiresCarryOverRemoval, requiresUpdateDetection, requiresNetChanges) + } + + // --------------------------------------------------------------------------- + // Row Level Post Processing (Update Detection & Carry-over Removal) + // --------------------------------------------------------------------------- + + /** + * Adds row-level post-processing (carry-over removal and/or update detection) on top of + * the given plan: + * - both active -> Window(counts + rv bounds) -> Filter -> Project(relabel) -> Drop helpers + * - carry-over only -> Window(counts + rv bounds) -> Filter -> Drop helpers + * - update only -> Window(counts only) -> Project(relabel) -> Drop helpers + * - neither -> not invoked (caller guards this case) + */ + private def addRowLevelPostProcessing( + plan: LogicalPlan, + cl: Changelog, + requiresCarryOverRemoval: Boolean, + requiresUpdateDetection: Boolean): LogicalPlan = { + // Row-version bounds in the Window are needed iff we filter carry-over pairs. + var modifiedPlan = addPostProcessingWindow(plan, cl, + includeRowVersionBounds = requiresCarryOverRemoval) + if (requiresCarryOverRemoval) modifiedPlan = addCarryOverPairFilter(modifiedPlan) + if (requiresUpdateDetection) modifiedPlan = addUpdateRelabelProjection(modifiedPlan) + removeHelperColumns(modifiedPlan) + } + + /** + * Adds a Window node partitioned by (rowId, _commit_version) that computes + * `_del_cnt` and `_ins_cnt` per partition, and, when `includeRowVersionBounds` + * is true, additionally `_min_rv` / `_max_rv` (min/max of `Changelog.rowVersion()`). + * + * `_del_cnt` / `_ins_cnt` drive update detection (1 each -> relabel as + * update_preimage / update_postimage). `_min_rv` / `_max_rv` drive carry-over + * detection (within a delete+insert pair, equal bounds signal a CoW carry-over). + */ + private def addPostProcessingWindow( + plan: LogicalPlan, + cl: Changelog, + includeRowVersionBounds: Boolean): LogicalPlan = { + val changeTypeAttr = getAttribute(plan, "_change_type") + val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan) + val commitVersionAttr = getAttribute(plan, "_commit_version") + val partitionByCols = rowIdExprs ++ Seq(commitVersionAttr) + val windowSpec = WindowSpecDefinition(partitionByCols, Nil, UnspecifiedFrame) + + val insertIf = If(EqualTo(changeTypeAttr, Literal("insert")), + Literal(1), Literal(null, IntegerType)) + val deleteIf = If(EqualTo(changeTypeAttr, Literal("delete")), + Literal(1), Literal(null, IntegerType)) + + val insCntAlias = Alias(WindowExpression( + Count(Seq(insertIf)).toAggregateExpression(), windowSpec), HelperColumn.InsCnt)() + val delCntAlias = Alias(WindowExpression( + Count(Seq(deleteIf)).toAggregateExpression(), windowSpec), HelperColumn.DelCnt)() + val baseAliases = Seq(delCntAlias, insCntAlias) + val rowVersionAliases = if (includeRowVersionBounds) { + val rowVersionExpr = + V2ExpressionUtils.resolveRef[NamedExpression](cl.rowVersion(), plan) + Seq( + Alias(WindowExpression( + Min(rowVersionExpr).toAggregateExpression(), windowSpec), HelperColumn.MinRv)(), + Alias(WindowExpression( + Max(rowVersionExpr).toAggregateExpression(), windowSpec), HelperColumn.MaxRv)()) + } else { + Seq.empty + } + Window(baseAliases ++ rowVersionAliases, partitionByCols, Nil, plan) + } + + /** + * Adds a Filter node that drops rows belonging to a CoW carry-over pair. + * Expects the input to expose `_del_cnt`, `_ins_cnt`, `_min_rv`, `_max_rv`. + * A pair is a carry-over iff `_del_cnt = 1 AND _ins_cnt = 1 AND _min_rv = _max_rv`. + */ + private def addCarryOverPairFilter(input: LogicalPlan): LogicalPlan = { + val delCnt = getAttribute(input, HelperColumn.DelCnt) + val insCnt = getAttribute(input, HelperColumn.InsCnt) + val minRv = getAttribute(input, HelperColumn.MinRv) + val maxRv = getAttribute(input, HelperColumn.MaxRv) + + val isCarryoverPair = And( + And(EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))), + EqualTo(minRv, maxRv)) + Filter(Not(isCarryoverPair), input) + } + + /** + * Adds a Project node that rewrites `_change_type` to `update_preimage` / + * `update_postimage` whenever a delete+insert pair is present in the partition. + * Expects the input to expose `_del_cnt` and `_ins_cnt`. + */ + private def addUpdateRelabelProjection(input: LogicalPlan): LogicalPlan = { + val changeTypeAttr = getAttribute(input, "_change_type") + val delCnt = getAttribute(input, HelperColumn.DelCnt) + val insCnt = getAttribute(input, HelperColumn.InsCnt) + + val isUpdate = And( + EqualTo(delCnt, Literal(1L)), + EqualTo(insCnt, Literal(1L))) + val isInvalid = Or(GreaterThan(delCnt, Literal(1L)), GreaterThan(insCnt, Literal(1L))) + val updateType = If(EqualTo(changeTypeAttr, Literal("insert")), + Literal("update_postimage"), Literal("update_preimage")) + + val raiseInvalid = RaiseError( + Literal("INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION"), + CreateMap(Nil), + StringType) + val caseExpr = CaseWhen(Seq(isInvalid -> raiseInvalid, isUpdate -> updateType), changeTypeAttr) + + val projectList = input.output.map { attr => + if (attr.name == "_change_type") Alias(caseExpr, "_change_type")() + else attr + } + Project(projectList, input) + } + + // --------------------------------------------------------------------------- + // Net Change Computation + // --------------------------------------------------------------------------- + + /** + * Collapses multiple changes per row identity into the net effect. + * Not yet implemented. + */ + private def injectNetChangeComputation( + plan: LogicalPlan, + cl: Changelog): LogicalPlan = { + plan + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /** + * Returns true if this plan has already been transformed by this rule. + * Uses a plan-level tag to prevent re-processing on subsequent rule executor iterations. + */ + private def isAlreadyTransformed(plan: LogicalPlan): Boolean = { + plan.getTagValue(CHANGELOG_TRANSFORMED_TAG).getOrElse(false) + } + + /** + * Removes any helper columns (see [[HelperColumn]]) that earlier steps added to the + * plan. Helper columns not present in the input are silently ignored, so this method + * can be applied unconditionally regardless of which post-processing steps ran. + */ + private def removeHelperColumns(input: LogicalPlan): LogicalPlan = { + Project(input.output.filterNot(a => HelperColumn.all.contains(a.name)), input) + } + + /** + * Looks up an attribute by name in a plan's output. Throws a clear error if missing -- + * used for required columns like `_change_type` / `_commit_version` / helper columns + * added by earlier steps; a missing column is always a programming error. + */ + private def getAttribute(plan: LogicalPlan, name: String): Attribute = + plan.output.find(_.name == name).getOrElse( + throw new IllegalStateException( + s"Required column '$name' not found in plan output: " + + plan.output.map(_.name).mkString(", "))) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 446c6f2a4784a..daed30a1c9774 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3868,6 +3868,25 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("catalogName" -> catalogName)) } + def cdcUpdateDetectionRequiresCarryOverRemoval( + changelogName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL", + messageParameters = Map("changelogName" -> changelogName)) + } + + def cdcNetChangesNotYetSupported(changelogName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED", + messageParameters = Map("changelogName" -> changelogName)) + } + + def cdcStreamingPostProcessingNotSupported(changelogName: String): AnalysisException = { + new AnalysisException( + errorClass = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", + messageParameters = Map("changelogName" -> changelogName)) + } + def invalidCdcOptionConflictingRangeTypes(): Throwable = { new AnalysisException( errorClass = "INVALID_CDC_OPTION.CONFLICTING_RANGE_TYPES", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala index c47ed2668e3b4..3a37b0a84fa26 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryChangelogCatalog.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange} +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} import org.apache.spark.sql.types._ @@ -44,6 +45,22 @@ class InMemoryChangelogCatalog extends InMemoryCatalog { private var _lastChangelogInfo: Option[ChangelogInfo] = None def lastChangelogInfo: Option[ChangelogInfo] = _lastChangelogInfo + // Per-table overrides for Changelog properties (carry-over rows, intermediate changes, + // update representation, row identity). Tests can set these to exercise post-processing. + private val changelogProperties: mutable.Map[String, ChangelogProperties] = + mutable.Map.empty + + /** + * Override the [[Changelog]] properties returned for a given table. + * Defaults are: containsCarryoverRows=false, containsIntermediateChanges=false, + * representsUpdateAsDeleteAndInsert=false, no rowId, no rowVersion. + */ + def setChangelogProperties( + ident: Identifier, + properties: ChangelogProperties): Unit = { + changelogProperties(ident.toString) = properties + } + override def loadChangelog( ident: Identifier, changelogInfo: ChangelogInfo): Changelog = { @@ -58,8 +75,9 @@ class InMemoryChangelogCatalog extends InMemoryCatalog { // _commit_version is at index numDataCols + 1 (after _change_type) val commitVersionIdx = numDataCols + 1 val filtered = filterByRange(allRows.toSeq, commitVersionIdx, changelogInfo.range()) + val props = changelogProperties.getOrElse(ident.toString, ChangelogProperties()) new InMemoryChangelog( - table.name + "_changelog", table.columns, filtered) + table.name + "_changelog", table.columns, filtered, props) } /** @@ -109,15 +127,42 @@ class InMemoryChangelogCatalog extends InMemoryCatalog { } } +/** + * Configurable properties for [[InMemoryChangelog]] that test cases can use to exercise + * Spark's post-processing (carry-over removal, update detection, net changes). + * + * @param containsCarryoverRows whether the change stream may contain identical CoW pairs + * @param containsIntermediateChanges whether multiple changes per row may exist + * @param representsUpdateAsDeleteAndInsert whether updates appear as raw delete+insert + * @param rowIdNames optional row identity columns as top-level names (e.g. Seq("id")) + * @param rowIdPaths optional row identity paths for nested struct fields + * (e.g. Seq(Seq("payload", "id"))); takes precedence over rowIdNames + * @param rowVersionName optional row version column (e.g. Some("row_commit_version")); + * must be a per-row version that distinguishes carry-overs from + * real updates. Do NOT pass the commit version, which is constant + * within a partition and would cause every delete+insert pair to + * look like a carry-over + */ +case class ChangelogProperties( + containsCarryoverRows: Boolean = false, + containsIntermediateChanges: Boolean = false, + representsUpdateAsDeleteAndInsert: Boolean = false, + rowIdNames: Seq[String] = Seq.empty, + rowIdPaths: Seq[Seq[String]] = Seq.empty, + rowVersionName: Option[String] = None) + /** * A test [[Changelog]] that returns pre-populated change rows. * - * Reports `containsCarryoverRows = false` so Spark skips carry-over removal. + * Properties (carry-over presence, update representation, row identity) are configurable + * via the [[ChangelogProperties]] parameter so tests can exercise different code paths + * in Spark's post-processing analyzer rule. */ class InMemoryChangelog( tableName: String, dataColumns: Array[Column], - changeRows: Seq[InternalRow]) extends Changelog { + changeRows: Seq[InternalRow], + properties: ChangelogProperties = ChangelogProperties()) extends Changelog { private val cdcColumns: Array[Column] = dataColumns ++ Array( Column.create("_change_type", StringType), @@ -128,11 +173,25 @@ class InMemoryChangelog( override def columns(): Array[Column] = cdcColumns - override def containsCarryoverRows(): Boolean = false + override def containsCarryoverRows(): Boolean = properties.containsCarryoverRows + + override def containsIntermediateChanges(): Boolean = properties.containsIntermediateChanges - override def containsIntermediateChanges(): Boolean = false + override def representsUpdateAsDeleteAndInsert(): Boolean = + properties.representsUpdateAsDeleteAndInsert - override def representsUpdateAsDeleteAndInsert(): Boolean = false + override def rowId(): Array[NamedReference] = { + if (properties.rowIdPaths.nonEmpty) { + properties.rowIdPaths.map(parts => FieldReference(parts): NamedReference).toArray + } else { + properties.rowIdNames.map(name => FieldReference.column(name): NamedReference).toArray + } + } + + override def rowVersion(): NamedReference = properties.rowVersionName match { + case Some(name) => FieldReference.column(name) + case None => super.rowVersion() + } override def newScanBuilder( options: CaseInsensitiveStringMap): ScanBuilder = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala index 143ca3fe1a09e..f0f727e6d2e7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.connector -import java.util +import java.util.Collections import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.ChangelogRange +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StringType} @@ -63,8 +64,8 @@ class ChangelogResolutionSuite extends QueryTest with SharedSparkSession { Array( Column.create("id", LongType), Column.create("data", StringType)), - Array.empty, - new util.HashMap[String, String]()) + Array.empty[Transform], + Collections.emptyMap[String, String]()) val noCdcCat = spark.sessionState.catalogManager.catalog(noCdcCatalogName).asTableCatalog val ident2 = Identifier.of(Array.empty, "test_table") @@ -76,8 +77,8 @@ class ChangelogResolutionSuite extends QueryTest with SharedSparkSession { Array( Column.create("id", LongType), Column.create("data", StringType)), - Array.empty, - new util.HashMap[String, String]()) + Array.empty[Transform], + Collections.emptyMap[String, String]()) } test("CHANGES clause resolves to DataSourceV2Relation with ChangelogTable") { @@ -203,4 +204,69 @@ class ChangelogResolutionSuite extends QueryTest with SharedSparkSession { assert(range.startingVersion() == "1") assert(range.endingVersion().get() == "5") } + + // =========================================================================== + // Streaming post-processing rejection + // =========================================================================== + // + // Streaming CDC reads bypass the post-processing analyzer rule's transformation + // path. To prevent silent wrong results when the requested options would require + // post-processing, the rule throws an explicit AnalysisException for streaming. + + /** Re-creates the test table with non-nullable columns suitable as rowId / rowVersion. */ + private def recreatePostProcessingTable(): Identifier = { + val cat = spark.sessionState.catalogManager.catalog(cdcCatalogName).asTableCatalog + val ident = Identifier.of(Array.empty, "test_table") + if (cat.tableExists(ident)) cat.dropTable(ident) + cat.createTable( + ident, + Array( + Column.create("id", LongType, false), + Column.create("row_commit_version", LongType, false)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + ident + } + + test("DataStreamReader - changes() with carry-over capability throws") { + val ident = recreatePostProcessingTable() + val cat = spark.sessionState.catalogManager + .catalog(cdcCatalogName) + .asInstanceOf[InMemoryChangelogCatalog] + cat.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + checkError( + intercept[AnalysisException] { + spark.readStream + .changes(s"$cdcCatalogName.test_table") + .queryExecution.analyzed + }, + condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", + parameters = Map("changelogName" -> s"$cdcCatalogName.test_table_changelog")) + } + + test("DataStreamReader - changes() with computeUpdates throws") { + val ident = recreatePostProcessingTable() + val cat = spark.sessionState.catalogManager + .catalog(cdcCatalogName) + .asInstanceOf[InMemoryChangelogCatalog] + cat.setChangelogProperties(ident, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + checkError( + intercept[AnalysisException] { + spark.readStream + .option("computeUpdates", "true") + .option("deduplicationMode", "none") + .changes(s"$cdcCatalogName.test_table") + .queryExecution.analyzed + }, + condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", + parameters = Map("changelogName" -> s"$cdcCatalogName.test_table_changelog")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala new file mode 100644 index 0000000000000..be97ceb04fe3e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -0,0 +1,945 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util.Collections + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.SparkRuntimeException +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{ + ChangelogProperties, Column, Identifier, InMemoryChangelogCatalog} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{ + BinaryType, BooleanType, DoubleType, LongType, StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +/** + * Tests for [[org.apache.spark.sql.catalyst.analysis.ResolveChangelogTable]] using the + * in-memory changelog catalog. These tests don't depend on Delta or any specific connector; + * they directly control what the connector "returns" by populating the in-memory changelog + * with hand-crafted change rows. + * + * Each test sets up [[ChangelogProperties]] on the catalog to enable specific post-processing + * paths (carry-over removal, update detection) and then verifies that Spark's analyzer rule + * correctly transforms the plan and produces the expected output. + */ +class ResolveChangelogTablePostProcessingSuite + extends QueryTest + with SharedSparkSession + with BeforeAndAfterEach { + + private val catalogName = "cdc_test_catalog" + private val testTableName = "events" + + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set( + s"spark.sql.catalog.$catalogName", + classOf[InMemoryChangelogCatalog].getName) + } + + override def beforeEach(): Unit = { + super.beforeEach() + val cat = catalog + val ident = Identifier.of(Array.empty, testTableName) + if (cat.tableExists(ident)) cat.dropTable(ident) + cat.clearChangeRows(ident) + cat.setChangelogProperties(ident, ChangelogProperties()) + cat.createTable( + ident, + Array( + Column.create("id", LongType), + Column.create("name", StringType), + Column.create("row_commit_version", LongType, false)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + } + + private def catalog: InMemoryChangelogCatalog = { + spark.sessionState.catalogManager + .catalog(catalogName) + .asInstanceOf[InMemoryChangelogCatalog] + } + + private def ident = Identifier.of(Array.empty, testTableName) + + /** + * Helper to create a change row matching schema + * (id, name, row_commit_version, _change_type, _commit_version, _commit_timestamp). + * + * `rowCommitVersion` follows Delta row-tracking semantics: carry-over pairs (CoW-rewritten + * unchanged rows) share the same value on both sides; real updates carry the OLD value on + * the delete side and the NEW value on the insert side. Defaults to `commitVersion` for + * tests that don't exercise carry-over removal. + */ + private def changeRow( + id: Long, + name: String, + changeType: String, + commitVersion: Long, + rowCommitVersion: Long = -1L, + commitTimestamp: Long = 0L): InternalRow = { + val rcv = if (rowCommitVersion == -1L) commitVersion else rowCommitVersion + InternalRow( + id, + UTF8String.fromString(name), + rcv, + UTF8String.fromString(changeType), + commitVersion, + commitTimestamp) + } + + // =========================================================================== + // Carry-Over Removal + // =========================================================================== + + test("carry-over removal drops identical delete+insert pairs") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // v1: insert Alice and Bob (rcv=1 each) + // v2: real delete Alice (preimage carries old rcv=1); + // carry-over for Bob (CoW, rcv unchanged on both sides) + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), // carry-over + changeRow(2L, "Bob", "insert", 2L, rowCommitVersion = 1L))) // carry-over (same rcv) + + val rows = sql( + s"SELECT id, name, _change_type, _commit_version " + + s"FROM $catalogName.$testTableName CHANGES FROM VERSION 1 TO VERSION 2") + .orderBy("_commit_version", "id", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}:v${r.getLong(3)}") + + // v1 inserts kept + assert(descs.contains("1:Alice:insert:v1")) + assert(descs.contains("2:Bob:insert:v1")) + // Real Alice delete kept + assert(descs.contains("1:Alice:delete:v2")) + // Bob carry-over pair removed + assert(!descs.contains("2:Bob:delete:v2"), + s"Bob delete should be dropped. Got: ${descs.mkString(",")}") + assert(!descs.contains("2:Bob:insert:v2"), + s"Bob insert should be dropped. Got: ${descs.mkString(",")}") + } + + test("deduplicationMode=none keeps all carry-over rows") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 2L, rowCommitVersion = 1L))) + + val rows = sql( + s"SELECT id FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (deduplicationMode = 'none')") + .collect() + + assert(rows.length == 3, "Without dedup, all 3 raw rows should be returned") + } + + // =========================================================================== + // Update Detection + // =========================================================================== + + test("update detection relabels delete+insert with different data as update") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = false, // no carry-overs in this test + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + // v2: Alice -> Robert (delete old, insert new) + changeRow(1L, "Alice", "delete", 2L), + changeRow(1L, "Robert", "insert", 2L))) + + val rows = sql( + s"SELECT id, name, _change_type, _commit_version " + + s"FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") + .orderBy("_commit_version", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}") + + assert(descs.contains("1:Alice:insert"), s"v1 insert. Got: ${descs.mkString(",")}") + assert(descs.contains("1:Alice:update_preimage")) + assert(descs.contains("1:Robert:update_postimage")) + // No raw delete/insert at v2 + assert(!descs.contains("1:Alice:delete")) + assert(!descs.contains("1:Robert:insert")) + } + + test("delete and insert in different versions are NOT labeled as update") { + catalog.setChangelogProperties(ident, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + changeRow(1L, "Alice", "delete", 2L), + changeRow(1L, "Alice", "insert", 3L))) + + val rows = sql( + s"SELECT _change_type, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 3 " + + s"WITH (computeUpdates = 'true', deduplicationMode = 'none')") + .collect() + + assert(!rows.exists(_.getString(0).contains("update_")), + "Delete and insert in different versions should not be labeled as update") + } + + // =========================================================================== + // Composite rowId: partitioning uses every rowId column + // =========================================================================== + // + // With a composite rowId such as Seq("id", "name"), the (rowId, _commit_version) + // window partition must include BOTH columns. A regression that drops one of the + // rowId columns would either falsely merge two different row identities into one + // partition (silently mislabeling unrelated delete/insert pairs as updates) or + // trip the UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION runtime guard. + + test("update detection with composite rowId keeps different (id, name) tuples raw") { + catalog.setChangelogProperties(ident, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id", "name"), + rowVersionName = Some("row_commit_version"))) + + // delete (1, Alice) and insert (1, Bob) at v2. These are DIFFERENT composite + // rowIds; they must NOT be relabeled as update. + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "delete", 2L), + changeRow(1L, "Bob", "insert", 2L))) + + val rows = sql( + s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (computeUpdates = 'true')") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}").toSet + + assert(descs == Set("1:Alice:delete", "1:Bob:insert"), + s"Composite rowId must keep different (id, name) tuples raw. Got: $descs") + } + + test("carry-over removal with composite rowId removes pairs per (id, name) tuple") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id", "name"), + rowVersionName = Some("row_commit_version"))) + + // Two independent carry-over pairs at v2, both with id=1 but different names. + // With correct composite-rowId partitioning, each pair lives in its own + // (id, name, _commit_version) partition, has _del_cnt=1 / _ins_cnt=1 and equal + // _min_rv / _max_rv, and gets dropped. With broken (id-only) partitioning, the + // four rows would collapse into one partition with _del_cnt=2 / _ins_cnt=2 and + // the carry-over filter (which requires =1) would keep them all. + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Bob", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 1L), + changeRow(1L, "Bob", "delete", 2L, rowCommitVersion = 1L), + changeRow(1L, "Bob", "insert", 2L, rowCommitVersion = 1L))) + + val rows = sql( + s"SELECT id, name, _change_type, _commit_version " + + s"FROM $catalogName.$testTableName CHANGES FROM VERSION 2 TO VERSION 2") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}") + assert(rows.isEmpty, + s"Both Alice and Bob carry-over pairs at v2 should be removed. Got: ${descs.mkString(",")}") + } + + // =========================================================================== + // No row identity: post-processing skipped + // =========================================================================== + + test("empty rowId skips post-processing in plan") { + // Default ChangelogProperties has no rowId; post-processing must not be injected + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + changeRow(2L, "Bob", "delete", 2L), + changeRow(2L, "Bob", "insert", 2L))) + + val df = sql( + s"SELECT * FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") + + val plan = df.queryExecution.analyzed.treeString + assert(!plan.contains("_del_cnt"), + s"Plan must not contain post-processing window helpers without rowId. Plan:\n$plan") + assert(!plan.contains("_ins_cnt"), + s"Plan must not contain post-processing window helpers without rowId. Plan:\n$plan") + } + + // =========================================================================== + // Combined + // =========================================================================== + + test("carry-over removal and update detection combined") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // v1: insert Alice (rcv=1), Bob (rcv=1) + // v2: Alice carry-over (CoW, rcv unchanged), Bob real update (old rcv=1, new rcv=2) + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), // carry-over + changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 1L), // carry-over + changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), // update preimage (old rcv) + changeRow(2L, "Robert", "insert", 2L, rowCommitVersion = 2L))) // update postimage (new rcv) + + val rows = sql( + s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") + .orderBy("_commit_version", "id", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}").toSet + + // v1 inserts + assert(descs.contains("1:Alice:insert")) + assert(descs.contains("2:Bob:insert")) + // Alice carry-over dropped + assert(!descs.contains("1:Alice:delete")) + // Bob -> Robert as update + assert(descs.contains("2:Bob:update_preimage")) + assert(descs.contains("2:Robert:update_postimage")) + // Should be exactly 4 rows + assert(rows.length == 4, s"Expected 4 rows, got ${rows.length}: ${descs.mkString(",")}") + } + + // =========================================================================== + // computeUpdates default (false) keeps raw delete+insert + // =========================================================================== + + test("without computeUpdates, delete+insert with different data stays raw") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), + // Alice: carry-over (CoW, rcv unchanged on both sides) + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 1L), + // Bob -> Robert: real change (old rcv on pre, new rcv on post) + changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), + changeRow(2L, "Robert", "insert", 2L, rowCommitVersion = 2L))) + + // Default computeUpdates=false: do NOT relabel, but DO drop carry-overs + val rows = sql( + s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2") + .orderBy("_commit_version", "id", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}") + + assert(descs.contains("2:Bob:delete"), s"Bob delete remains raw. Got: ${descs.mkString(",")}") + assert(descs.contains("2:Robert:insert"), "Robert insert remains raw") + assert(!descs.exists(_.contains("update_")), "No update_* without computeUpdates") + assert(!descs.contains("1:Alice:delete"), "Alice carry-over removed") + } + + test("update detection on pure inserts leaves them as inserts") { + catalog.setChangelogProperties(ident, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + changeRow(2L, "Bob", "insert", 2L))) + + val rows = sql( + s"SELECT id, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") + .collect() + + assert(rows.length == 2) + assert(rows.forall(_.getString(1) == "insert"), + s"Pure inserts must stay 'insert'. Got: ${rows.map(_.getString(1)).mkString(",")}") + } + + // =========================================================================== + // Keep Carry-over Rows and deduplication flag tests + // =========================================================================== + + test("computeUpdates with deduplicationMode=none is rejected on COW connector") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + checkError( + intercept[AnalysisException] { + sql(s"SELECT * FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 " + + s"WITH (computeUpdates = 'true', deduplicationMode = 'none')") + }, + condition = "INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL", + parameters = Map("changelogName" -> s"$catalogName.${testTableName}_changelog")) + } + + test("computeUpdates with deduplicationMode=none is allowed on non-COW connector") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = false, // MOR-style: no carry-overs possible + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + // v2: Alice -> Robert (delete old, insert new) + changeRow(1L, "Alice", "delete", 2L), + changeRow(1L, "Robert", "insert", 2L))) + + val rows = sql( + s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 " + + s"WITH (computeUpdates = 'true', deduplicationMode = 'none')") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}") + assert(descs.contains("1:Alice:update_preimage"), + s"Expected Alice update_preimage. Got: ${descs.mkString(",")}") + assert(descs.contains("1:Robert:update_postimage"), + s"Expected Robert update_postimage. Got: ${descs.mkString(",")}") + } + + // =========================================================================== + // Contract enforcement: at most one delete + one insert per (rowId, version) + // =========================================================================== + // + // With `representsUpdateAsDeleteAndInsert = true` and `containsIntermediateChanges = false`, + // the `Changelog` contract guarantees at most one logical change per (rowId, _commit_version) + // partition. The update-relabel projection enforces this at runtime: if it sees more than one + // delete or more than one insert in a partition, it raises + // INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION instead of silently + // mislabeling extra rows as updates. + + test("update detection raises on multiple inserts for same (rowId, _commit_version)") { + catalog.setChangelogProperties(ident, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // Contract violation: 2 inserts for id=1 at v2. + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "delete", 2L), + changeRow(1L, "Alice2", "insert", 2L), + changeRow(1L, "Alice3", "insert", 2L))) + + checkError( + intercept[SparkRuntimeException] { + sql(s"SELECT * FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (computeUpdates = 'true')") + .collect() + }, + condition = "INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION", + parameters = Map.empty) + } + + test("update detection raises on multiple deletes for same (rowId, _commit_version)") { + catalog.setChangelogProperties(ident, ChangelogProperties( + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // Contract violation: 2 deletes for id=1 at v2. + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "delete", 2L), + changeRow(1L, "Alice2", "delete", 2L), + changeRow(1L, "Alice3", "insert", 2L))) + + checkError( + intercept[SparkRuntimeException] { + sql(s"SELECT * FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (computeUpdates = 'true')") + .collect() + }, + condition = "INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION", + parameters = Map.empty) + } + + // =========================================================================== + // Net changes deduplication: not yet supported + // =========================================================================== + // + // `deduplicationMode = netChanges` collapses multiple changes per row identity into the + // net effect. It is not yet implemented in [[ResolveChangelogTable]]. + + test("deduplicationMode=netChanges is rejected when connector emits intermediate changes") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsIntermediateChanges = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + checkError( + intercept[AnalysisException] { + sql(s"SELECT * FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 " + + s"WITH (deduplicationMode = 'netChanges')") + }, + condition = "INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED", + parameters = Map("changelogName" -> s"$catalogName.${testTableName}_changelog")) + } + + test("deduplicationMode=netChanges is rejected even when connector has no intermediate changes") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsIntermediateChanges = false, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + checkError( + intercept[AnalysisException] { + sql(s"SELECT * FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 " + + s"WITH (deduplicationMode = 'netChanges')") + }, + condition = "INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED", + parameters = Map("changelogName" -> s"$catalogName.${testTableName}_changelog")) + } + + // =========================================================================== + // Range edge cases + // =========================================================================== + + test("single-version range FROM VERSION X TO VERSION X") { + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + changeRow(2L, "Bob", "insert", 1L), + changeRow(3L, "Charlie", "insert", 2L))) + + val rows = sql( + s"SELECT id, _change_type, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (deduplicationMode = 'none')") + .collect() + + assert(rows.length == 1, s"Single version: 1 row. Got ${rows.length}") + assert(rows(0).getLong(0) == 3L) + assert(rows(0).getString(1) == "insert") + } + + test("multiple operations across versions") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + // v1: insert 3 rows (rcv=1 each) + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", "insert", 1L, rowCommitVersion = 1L), + // v2: delete Alice (preimage carries old rcv=1); CoW carry-overs for Bob/Charlie + // keep rcv=1 on both sides (row unchanged). + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 2L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", "delete", 2L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", "insert", 2L, rowCommitVersion = 1L), + // v3: update Bob -> Robert (old rcv=1, new rcv=3); CoW carry-over for Charlie (rcv=1) + changeRow(2L, "Bob", "delete", 3L, rowCommitVersion = 1L), + changeRow(2L, "Robert", "insert", 3L, rowCommitVersion = 3L), + changeRow(3L, "Charlie", "delete", 3L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", "insert", 3L, rowCommitVersion = 1L), + // v4: insert Diana (rcv=4) + changeRow(4L, "Diana", "insert", 4L, rowCommitVersion = 4L))) + + val rows = sql( + s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 4 WITH (computeUpdates = 'true')") + .orderBy("_commit_version", "id", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}:v${r.getLong(3)}").toSet + + // v1 + assert(descs.contains("1:Alice:insert:v1")) + assert(descs.contains("2:Bob:insert:v1")) + assert(descs.contains("3:Charlie:insert:v1")) + // v2 + assert(descs.contains("1:Alice:delete:v2")) + assert(!descs.contains("2:Bob:delete:v2"), "Bob carry-over dropped") + assert(!descs.contains("3:Charlie:delete:v2"), "Charlie carry-over dropped") + // v3 + assert(descs.contains("2:Bob:update_preimage:v3")) + assert(descs.contains("2:Robert:update_postimage:v3")) + assert(!descs.contains("3:Charlie:delete:v3"), "Charlie carry-over dropped in v3") + // v4 + assert(descs.contains("4:Diana:insert:v4")) + } + + test("larger insert batch returns all rows") { + catalog.addChangeRows(ident, (1 to 5).map(i => + changeRow(i.toLong, ('A' + i - 1).toChar.toString, "insert", 1L))) + + val rows = sql( + s"SELECT id, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 1 WITH (deduplicationMode = 'none')") + .collect() + + assert(rows.length == 5) + assert(rows.forall(_.getString(1) == "insert")) + } + + test("EXCLUSIVE start bound skips the start version") { + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + changeRow(2L, "Bob", "insert", 2L), + changeRow(3L, "Charlie", "insert", 3L))) + + val rows = sql( + s"SELECT id, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 EXCLUSIVE TO VERSION 3 " + + s"WITH (deduplicationMode = 'none')") + .orderBy("_commit_version") + .collect() + + assert(!rows.exists(_.getLong(1) == 1L), "v1 must be excluded") + assert(rows.exists(_.getLong(0) == 2L), "Bob (v2) included") + assert(rows.exists(_.getLong(0) == 3L), "Charlie (v3) included") + } + + test("open-ended range (no TO clause) reads to latest") { + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + changeRow(2L, "Bob", "insert", 2L), + changeRow(3L, "Charlie", "insert", 3L))) + + val rows = sql( + s"SELECT id, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 WITH (deduplicationMode = 'none')") + .orderBy("_commit_version", "id") + .collect() + + assert(rows.length == 3, s"Open-ended range should see all 3. Got ${rows.length}") + assert(rows.exists(r => r.getLong(0) == 3L && r.getLong(1) == 3L)) + } + + test("DELETE all rows: no carry-over inserts at v2") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // v1 inserts carry rcv=1; v2 deletes carry the old rcv=1 (rcv tracks last modification) + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L))) + + val rows = sql( + s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2") + .orderBy("_commit_version", "id") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}:v${r.getLong(3)}") + + assert(descs.contains("1:Alice:insert:v1")) + assert(descs.contains("2:Bob:insert:v1")) + assert(descs.contains("1:Alice:delete:v2")) + assert(descs.contains("2:Bob:delete:v2")) + assert(!descs.exists(_.contains("insert:v2")), "No inserts at v2") + } + + test("UPDATE all rows: every row gets update_pre/postimage, no carry-overs") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // Every v2 row is a real update: delete side carries old rcv=1, insert side new rcv=2. + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice_updated", "insert", 2L, rowCommitVersion = 2L), + changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob_updated", "insert", 2L, rowCommitVersion = 2L))) + + val rows = sql( + s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") + .orderBy("_commit_version", "id", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}:v${r.getLong(3)}").toSet + + assert(descs.contains("1:Alice:update_preimage:v2")) + assert(descs.contains("1:Alice_updated:update_postimage:v2")) + assert(descs.contains("2:Bob:update_preimage:v2")) + assert(descs.contains("2:Bob_updated:update_postimage:v2")) + assert(rows.length == 6, s"Expected 2 inserts + 2 pre + 2 post. Got ${rows.length}") + } + + test("append-only workload: all inserts, no carry-over needed") { + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L), + changeRow(2L, "Bob", "insert", 2L), + changeRow(3L, "Charlie", "insert", 3L))) + + val rows = sql( + s"SELECT id, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 3") + .collect() + + assert(rows.length == 3) + assert(rows.forall(_.getString(1) == "insert")) + } + + test("carry-over removal with many rows: only real change remains") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + // 10 inserts at v1 (rcv=1 each). At v2: delete row 5; CoW writes 9 carry-over pairs + // (rcv unchanged since v1, i.e. rcv=1 on both sides) plus 1 real delete (rcv=1, old). + val v1Inserts = (1 to 10).map(i => + changeRow(i.toLong, ('A' + i - 1).toChar.toString, "insert", 1L, rowCommitVersion = 1L)) + val v2Carryovers = (1 to 10).filter(_ != 5).flatMap { i => + val name = ('A' + i - 1).toChar.toString + Seq( + changeRow(i.toLong, name, "delete", 2L, rowCommitVersion = 1L), + changeRow(i.toLong, name, "insert", 2L, rowCommitVersion = 1L)) + } + val v2RealDelete = Seq(changeRow(5L, "E", "delete", 2L, rowCommitVersion = 1L)) + catalog.addChangeRows(ident, v1Inserts ++ v2Carryovers ++ v2RealDelete) + + val rows = sql( + s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 2 TO VERSION 2") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}") + assert(rows.length == 1, + s"Only 1 real change should remain (9 carry-overs dropped). Got: ${descs.mkString(",")}") + assert(descs.contains("5:E:delete")) + } + + test("carry-over removal with mixed types (DOUBLE, BOOLEAN, BINARY)") { + val mixedTable = "events_mixed" + val mixedIdent = Identifier.of(Array.empty, mixedTable) + val cat = catalog + if (cat.tableExists(mixedIdent)) cat.dropTable(mixedIdent) + cat.clearChangeRows(mixedIdent) + cat.createTable( + mixedIdent, + Array( + Column.create("id", LongType), + Column.create("name", StringType), + Column.create("score", DoubleType), + Column.create("active", BooleanType), + Column.create("payload", BinaryType), + Column.create("row_commit_version", LongType, false)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + cat.setChangelogProperties(mixedIdent, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + def mixedRow( + id: Long, name: String, score: Double, active: Boolean, payload: Array[Byte], + ct: String, v: Long, rcv: Long): InternalRow = { + InternalRow( + id, UTF8String.fromString(name), score, active, payload, rcv, + UTF8String.fromString(ct), v, 0L) + } + + val alicePayload = Array[Byte](1, 2, 3) + val bobPayload = Array[Byte](4, 5, 6) + + cat.addChangeRows(mixedIdent, Seq( + mixedRow(1L, "Alice", 95.5, true, alicePayload, "insert", 1L, rcv = 1L), + mixedRow(2L, "Bob", 87.3, false, bobPayload, "insert", 1L, rcv = 1L), + // v2: update Alice's score (old rcv=1, new rcv=2); Bob is carry-over (rcv unchanged) + mixedRow(1L, "Alice", 95.5, true, alicePayload, "delete", 2L, rcv = 1L), + mixedRow(1L, "Alice", 99.0, true, alicePayload, "insert", 2L, rcv = 2L), + mixedRow(2L, "Bob", 87.3, false, bobPayload, "delete", 2L, rcv = 1L), // carry-over + mixedRow(2L, "Bob", 87.3, false, bobPayload, "insert", 2L, rcv = 1L))) // carry-over + + val rows = sql( + s"SELECT id, name, score, active, _change_type FROM $catalogName.$mixedTable " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") + .orderBy("_commit_version", "id", "_change_type") + .collect() + + val descs = rows.map(r => s"${r.getLong(0)}:${r.getString(4)}") + assert(descs.contains("1:update_preimage")) + assert(descs.contains("1:update_postimage")) + assert(!descs.contains("2:delete"), + s"Bob carry-over must be dropped despite DOUBLE/BOOLEAN/BINARY. Got: " + + descs.mkString(",")) + + val pre = rows.find(r => r.getLong(0) == 1L && r.getString(4) == "update_preimage").get + val post = rows.find(r => r.getLong(0) == 1L && r.getString(4) == "update_postimage").get + assert(pre.getDouble(2) == 95.5) + assert(post.getDouble(2) == 99.0) + } + + // =========================================================================== + // Regression: nested rowId + nested rowVersion end-to-end + // =========================================================================== + + // rowId is payload.id (nested); rowVersion is also row-level. A delete+insert pair with + // the same payload.id but different row_commit_version is a real update and must survive. + // A pair with matching row_commit_version would be a CoW carry-over and would be dropped. + test("nested rowId must not hide sibling-field changes") { + val nestedTable = "events_nested" + val nestedIdent = Identifier.of(Array.empty, nestedTable) + val cat = catalog + if (cat.tableExists(nestedIdent)) cat.dropTable(nestedIdent) + cat.clearChangeRows(nestedIdent) + + val payloadType = StructType(Seq( + StructField("id", LongType), + StructField("value", StringType))) + + cat.createTable( + nestedIdent, + Array( + Column.create("payload", payloadType), + Column.create("row_commit_version", LongType, false)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + + cat.setChangelogProperties(nestedIdent, ChangelogProperties( + containsCarryoverRows = true, + rowIdPaths = Seq(Seq("payload", "id")), + rowVersionName = Some("row_commit_version"))) + + def nestedRow(id: Long, value: String, ct: String, v: Long, rcv: Long): InternalRow = { + InternalRow( + InternalRow(id, UTF8String.fromString(value)), + rcv, + UTF8String.fromString(ct), v, 0L) + } + + cat.addChangeRows(nestedIdent, Seq( + nestedRow(1L, "original", "insert", 1L, rcv = 1L), + // v2 update: rowId same, rowVersion differs (old rcv=1 on preimage, new rcv=2 on postimage) + nestedRow(1L, "original", "delete", 2L, rcv = 1L), + nestedRow(1L, "CHANGED", "insert", 2L, rcv = 2L))) + + val rows = sql( + s"SELECT payload.id AS id, payload.value AS value, _change_type, _commit_version " + + s"FROM $catalogName.$nestedTable CHANGES FROM VERSION 1 TO VERSION 2") + .orderBy("_commit_version", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}:v${r.getLong(3)}") + + assert(descs.contains("1:original:insert:v1"), + s"v1 insert must survive. Got: ${descs.mkString(",")}") + assert(descs.contains("1:original:delete:v2"), + s"v2 delete must survive (payload.value differs from insert). Got: ${descs.mkString(",")}") + assert(descs.contains("1:CHANGED:insert:v2"), + s"v2 insert must survive (payload.value differs from delete). Got: ${descs.mkString(",")}") + assert(rows.length == 3, + s"Expected 3 rows (v1 insert + v2 delete + v2 insert). Got ${rows.length}: " + + descs.mkString(",")) + } + + // =========================================================================== + // No-op UPDATE is correctly preserved as update_preimage/postimage + // =========================================================================== + + test("no-op UPDATE is labeled as update (row_commit_version differs on pre/post)") { + // A no-op UPDATE bumps row_commit_version even when data is byte-identical, so the + // delete side carries the OLD rcv and the insert side the NEW rcv. Window post-processing + // sees different rowVersions, treats this as a real change, and labels both rows as + // update_preimage / update_postimage. + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + representsUpdateAsDeleteAndInsert = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + // v2 no-op update: identical data, but rcv differs (Delta bumps it on any UPDATE) + changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 2L))) + + val rows = sql( + s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") + .orderBy("_commit_version", "_change_type") + .collect() + + val descs = rows.map(r => + s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}:v${r.getLong(3)}") + + assert(descs.contains("1:Alice:insert:v1")) + assert(descs.contains("1:Alice:update_preimage:v2"), + s"No-op UPDATE preimage must be labeled. Got: ${descs.mkString(",")}") + assert(descs.contains("1:Alice:update_postimage:v2"), + s"No-op UPDATE postimage must be labeled. Got: ${descs.mkString(",")}") + assert(rows.length == 3, + s"Expected v1 insert + v2 update pre/post = 3 rows. Got ${rows.length}") + } +} From c4dd34669f9047ccca077c95165f5d800cf07c05 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Fri, 24 Apr 2026 09:36:25 +0000 Subject: [PATCH 2/6] PR feedback --- .../analysis/ResolveChangelogTable.scala | 27 ++++++---- ...lveChangelogTablePostProcessingSuite.scala | 52 +++++++++---------- 2 files changed, 42 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index 9cc74b6dc8f3f..ae9cad0d40440 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -53,19 +53,22 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { private val CHANGELOG_TRANSFORMED_TAG = TreeNodeTag[Boolean]("changelog_transformed") - private object HelperColumn { - final val DelCnt = "_del_cnt" - final val InsCnt = "_ins_cnt" - final val MinRv = "_min_rv" - final val MaxRv = "_max_rv" + /** + * Reserved (`__spark_cdc_*`) column names used internally by post-processing; + * connectors must not emit columns with these names. + */ + object HelperColumn { + final val DelCnt = "__spark_cdc_del_cnt" + final val InsCnt = "__spark_cdc_ins_cnt" + final val MinRv = "__spark_cdc_min_rv" + final val MaxRv = "__spark_cdc_max_rv" val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv) } override def apply(plan: LogicalPlan): LogicalPlan = { if (isAlreadyTransformed(plan)) return plan - var updatedPlan = plan - updatedPlan = plan.resolveOperatorsUp { + val updatedPlan = plan.resolveOperatorsUp { case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) => val changelog = table.changelog val req = evaluateRequirements(changelog, table.changelogInfo) @@ -158,11 +161,13 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { /** * Adds row-level post-processing (carry-over removal and/or update detection) on top of - * the given plan: - * - both active -> Window(counts + rv bounds) -> Filter -> Project(relabel) -> Drop helpers + * the given plan. `counts` = per-partition delete and insert change_type row counts over + * `(rowId, _commit_version)`. `rv bounds` = per-partition min/max of `rowVersion` — + * equal bounds signal a copy-on-write carry-over. + * - both active -> Window(counts + rv bounds) -> Filter -> Project(relabel) -> Drop helpers * - carry-over only -> Window(counts + rv bounds) -> Filter -> Drop helpers - * - update only -> Window(counts only) -> Project(relabel) -> Drop helpers - * - neither -> not invoked (caller guards this case) + * - update only -> Window(counts only) -> Project(relabel) -> Drop helpers + * - neither -> not invoked (caller guards this case) */ private def addRowLevelPostProcessing( plan: LogicalPlan, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala index be97ceb04fe3e..f3ccf0fc92e2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -22,7 +22,7 @@ import java.util.Collections import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkRuntimeException -import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{ ChangelogProperties, Column, Identifier, InMemoryChangelogCatalog} @@ -128,25 +128,14 @@ class ResolveChangelogTablePostProcessingSuite changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), // carry-over changeRow(2L, "Bob", "insert", 2L, rowCommitVersion = 1L))) // carry-over (same rcv) - val rows = sql( - s"SELECT id, name, _change_type, _commit_version " + - s"FROM $catalogName.$testTableName CHANGES FROM VERSION 1 TO VERSION 2") - .orderBy("_commit_version", "id", "_change_type") - .collect() - - val descs = rows.map(r => - s"${r.getLong(0)}:${r.getString(1)}:${r.getString(2)}:v${r.getLong(3)}") - - // v1 inserts kept - assert(descs.contains("1:Alice:insert:v1")) - assert(descs.contains("2:Bob:insert:v1")) - // Real Alice delete kept - assert(descs.contains("1:Alice:delete:v2")) - // Bob carry-over pair removed - assert(!descs.contains("2:Bob:delete:v2"), - s"Bob delete should be dropped. Got: ${descs.mkString(",")}") - assert(!descs.contains("2:Bob:insert:v2"), - s"Bob insert should be dropped. Got: ${descs.mkString(",")}") + checkAnswer( + sql( + s"SELECT id, name, _change_type, _commit_version " + + s"FROM $catalogName.$testTableName CHANGES FROM VERSION 1 TO VERSION 2"), + Seq( + Row(1L, "Alice", "insert", 1L), + Row(2L, "Bob", "insert", 1L), + Row(1L, "Alice", "delete", 2L))) } test("deduplicationMode=none keeps all carry-over rows") { @@ -293,8 +282,8 @@ class ResolveChangelogTablePostProcessingSuite // No row identity: post-processing skipped // =========================================================================== - test("empty rowId skips post-processing in plan") { - // Default ChangelogProperties has no rowId; post-processing must not be injected + test("no capability flags -> post-processing not injected in plan") { + // Default ChangelogProperties has no capability flags set; the rule sees nothing to do. catalog.addChangeRows(ident, Seq( changeRow(1L, "Alice", "insert", 1L), changeRow(2L, "Bob", "delete", 2L), @@ -305,10 +294,21 @@ class ResolveChangelogTablePostProcessingSuite s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") val plan = df.queryExecution.analyzed.treeString - assert(!plan.contains("_del_cnt"), - s"Plan must not contain post-processing window helpers without rowId. Plan:\n$plan") - assert(!plan.contains("_ins_cnt"), - s"Plan must not contain post-processing window helpers without rowId. Plan:\n$plan") + assert(!plan.contains("__spark_cdc_del_cnt"), + s"Plan must not contain post-processing window helpers. Plan:\n$plan") + assert(!plan.contains("__spark_cdc_ins_cnt"), + s"Plan must not contain post-processing window helpers. Plan:\n$plan") + } + + test("streaming without post-processing options passes through") { + // Streaming reads with no capability flags on the connector and no + // post-processing options must resolve without the rule throwing. + val df = spark.readStream + .option("startingVersion", "1") + .changes(s"$catalogName.$testTableName") + val plan = df.queryExecution.analyzed.treeString + assert(!plan.contains("__spark_cdc_del_cnt"), + s"Streaming plan must not contain post-processing helpers. Plan:\n$plan") } // =========================================================================== From 231579a4095519c6269c6afa6be382ea2c281cf2 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Fri, 24 Apr 2026 10:02:09 +0000 Subject: [PATCH 3/6] PR Feedback --- .../resources/error/error-conditions.json | 18 +++-- .../analysis/ResolveChangelogTable.scala | 79 ++++++++----------- .../datasources/v2/ChangelogTable.scala | 3 +- ...lveChangelogTablePostProcessingSuite.scala | 4 +- 4 files changed, 48 insertions(+), 56 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 464fab3e940e0..428b34b1d13d5 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -661,6 +661,19 @@ ], "sqlState" : "42P08" }, + "CHANGELOG_CONTRACT_VIOLATION" : { + "message" : [ + "The Change Data Capture (CDC) connector violated the `Changelog` contract at runtime." + ], + "subClass" : { + "UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION" : { + "message" : [ + "Connector emitted multiple delete or insert rows for the same `(rowId, _commit_version)` partition. The `Changelog` contract requires at most one logical change per row identity per commit when `containsIntermediateChanges() = false`. Either fix the connector to deduplicate intermediate states, or set `containsIntermediateChanges() = true` and use `deduplicationMode = netChanges`." + ] + } + }, + "sqlState" : "XX000" + }, "CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED" : { "message" : [ "Checksum verification failed, the file may be corrupted. File: ", @@ -3293,11 +3306,6 @@ "message" : [ "`computeUpdates` cannot be used with `deduplicationMode=none` on connector `` because the connector emits copy-on-write carry-over pairs (`containsCarryoverRows()` returns true) that would be silently mislabeled as updates. Set `deduplicationMode` to `dropCarryovers` or `netChanges`." ] - }, - "UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION" : { - "message" : [ - "Connector emitted multiple delete or insert rows for the same `(rowId, _commit_version)` partition. The `Changelog` contract requires at most one logical change per row identity per commit when `containsIntermediateChanges() = false`. Either fix the connector to deduplicate intermediate states, or set `containsIntermediateChanges() = true` and use `deduplicationMode = netChanges`." - ] } }, "sqlState" : "42K03" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index ae9cad0d40440..e459025103ba5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Max, Min} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 -import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation} @@ -50,9 +49,6 @@ import org.apache.spark.sql.types.{IntegerType, StringType} */ object ResolveChangelogTable extends Rule[LogicalPlan] { - private val CHANGELOG_TRANSFORMED_TAG = - TreeNodeTag[Boolean]("changelog_transformed") - /** * Reserved (`__spark_cdc_*`) column names used internally by post-processing; * connectors must not emit columns with these names. @@ -66,39 +62,34 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv) } - override def apply(plan: LogicalPlan): LogicalPlan = { - if (isAlreadyTransformed(plan)) return plan - val updatedPlan = plan.resolveOperatorsUp { - case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) => - val changelog = table.changelog - val req = evaluateRequirements(changelog, table.changelogInfo) - - var updatedRel: LogicalPlan = rel - if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { - updatedRel = addRowLevelPostProcessing( - rel, changelog, req.requiresCarryOverRemoval, req.requiresUpdateDetection) - } - if (req.requiresNetChanges) { - updatedRel = injectNetChangeComputation(updatedRel, changelog) - } - updatedRel - - case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, _) => - // Streaming CDC reads do not yet apply post-processing. Run the same option / - // capability validation as the batch path so silent wrong results are impossible: - // either no post-processing would be required (fall through, return raw stream), - // or we throw an explicit AnalysisException. - val changelog = table.changelog - val req = evaluateRequirements(changelog, table.changelogInfo) - if (req.needsAny) { - throw QueryCompilationErrors.cdcStreamingPostProcessingNotSupported(changelog.name()) - } - rel - } - if (updatedPlan ne plan) { - updatedPlan.setTagValue(CHANGELOG_TRANSFORMED_TAG, true) - } - updatedPlan + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) if !table.resolved => + val changelog = table.changelog + val req = evaluateRequirements(changelog, table.changelogInfo) + + val resolvedRel = rel.copy(table = table.copy(resolved = true)) + var updatedRel: LogicalPlan = resolvedRel + if (req.requiresCarryOverRemoval || req.requiresUpdateDetection) { + updatedRel = addRowLevelPostProcessing( + resolvedRel, changelog, req.requiresCarryOverRemoval, req.requiresUpdateDetection) + } + if (req.requiresNetChanges) { + updatedRel = injectNetChangeComputation(updatedRel, changelog) + } + updatedRel + + case rel @ StreamingRelationV2(_, _, table: ChangelogTable, _, _, _, _, _, _) + if !table.resolved => + // Streaming CDC reads do not yet apply post-processing. Run the same option / + // capability validation as the batch path so silent wrong results are impossible: + // either no post-processing would be required (fall through, return raw stream), + // or we throw an explicit AnalysisException. + val changelog = table.changelog + val req = evaluateRequirements(changelog, table.changelogInfo) + if (req.needsAny) { + throw QueryCompilationErrors.cdcStreamingPostProcessingNotSupported(changelog.name()) + } + rel.copy(table = table.copy(resolved = true)) } // --------------------------------------------------------------------------- @@ -162,8 +153,8 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { /** * Adds row-level post-processing (carry-over removal and/or update detection) on top of * the given plan. `counts` = per-partition delete and insert change_type row counts over - * `(rowId, _commit_version)`. `rv bounds` = per-partition min/max of `rowVersion` — - * equal bounds signal a copy-on-write carry-over. + * `(rowId, _commit_version)`. `rv bounds` = per-partition min/max of `rowVersion`. + * Equal bounds signal a copy-on-write carry-over. * - both active -> Window(counts + rv bounds) -> Filter -> Project(relabel) -> Drop helpers * - carry-over only -> Window(counts + rv bounds) -> Filter -> Drop helpers * - update only -> Window(counts only) -> Project(relabel) -> Drop helpers @@ -260,7 +251,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { Literal("update_postimage"), Literal("update_preimage")) val raiseInvalid = RaiseError( - Literal("INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION"), + Literal("CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION"), CreateMap(Nil), StringType) val caseExpr = CaseWhen(Seq(isInvalid -> raiseInvalid, isUpdate -> updateType), changeTypeAttr) @@ -290,14 +281,6 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { // Helpers // --------------------------------------------------------------------------- - /** - * Returns true if this plan has already been transformed by this rule. - * Uses a plan-level tag to prevent re-processing on subsequent rule executor iterations. - */ - private def isAlreadyTransformed(plan: LogicalPlan): Boolean = { - plan.getTagValue(CHANGELOG_TRANSFORMED_TAG).getOrElse(false) - } - /** * Removes any helper columns (see [[HelperColumn]]) that earlier steps added to the * plan. Helper columns not present in the input are silently ignored, so this method diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala index 8521df3db2ff0..bb5a03f64990d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ChangelogTable.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap */ case class ChangelogTable( changelog: Changelog, - changelogInfo: ChangelogInfo) extends Table with SupportsRead { + changelogInfo: ChangelogInfo, + resolved: Boolean = false) extends Table with SupportsRead { override def name: String = changelog.name diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala index f3ccf0fc92e2f..e3c584fb2ecf6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -487,7 +487,7 @@ class ResolveChangelogTablePostProcessingSuite s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (computeUpdates = 'true')") .collect() }, - condition = "INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION", + condition = "CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION", parameters = Map.empty) } @@ -509,7 +509,7 @@ class ResolveChangelogTablePostProcessingSuite s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (computeUpdates = 'true')") .collect() }, - condition = "INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION", + condition = "CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION", parameters = Map.empty) } From 6bc9e6027d8d76e864ec320d49ed78b8bb7d17d4 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Fri, 24 Apr 2026 12:32:51 +0000 Subject: [PATCH 4/6] Null rowVersion safety addition using rvCnt --- .../analysis/ResolveChangelogTable.scala | 24 ++++++++++++------ ...lveChangelogTablePostProcessingSuite.scala | 25 +++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index e459025103ba5..0fe01f0833bbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -58,8 +58,9 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { final val InsCnt = "__spark_cdc_ins_cnt" final val MinRv = "__spark_cdc_min_rv" final val MaxRv = "__spark_cdc_max_rv" + final val RvCnt = "__spark_cdc_rv_cnt" - val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv) + val all: Set[String] = Set(DelCnt, InsCnt, MinRv, MaxRv, RvCnt) } override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { @@ -176,11 +177,13 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { /** * Adds a Window node partitioned by (rowId, _commit_version) that computes * `_del_cnt` and `_ins_cnt` per partition, and, when `includeRowVersionBounds` - * is true, additionally `_min_rv` / `_max_rv` (min/max of `Changelog.rowVersion()`). + * is true, additionally `_min_rv` / `_max_rv` / `_rv_cnt` (min, max and non-null + * count of `Changelog.rowVersion()`). * * `_del_cnt` / `_ins_cnt` drive update detection (1 each -> relabel as - * update_preimage / update_postimage). `_min_rv` / `_max_rv` drive carry-over - * detection (within a delete+insert pair, equal bounds signal a CoW carry-over). + * update_preimage / update_postimage). `_min_rv` / `_max_rv` / `_rv_cnt` drive + * carry-over detection (within a delete+insert pair, `_rv_cnt = 2` AND equal + * bounds signal a CoW carry-over). */ private def addPostProcessingWindow( plan: LogicalPlan, @@ -209,7 +212,9 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { Alias(WindowExpression( Min(rowVersionExpr).toAggregateExpression(), windowSpec), HelperColumn.MinRv)(), Alias(WindowExpression( - Max(rowVersionExpr).toAggregateExpression(), windowSpec), HelperColumn.MaxRv)()) + Max(rowVersionExpr).toAggregateExpression(), windowSpec), HelperColumn.MaxRv)(), + Alias(WindowExpression( + Count(Seq(rowVersionExpr)).toAggregateExpression(), windowSpec), HelperColumn.RvCnt)()) } else { Seq.empty } @@ -218,18 +223,21 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { /** * Adds a Filter node that drops rows belonging to a CoW carry-over pair. - * Expects the input to expose `_del_cnt`, `_ins_cnt`, `_min_rv`, `_max_rv`. - * A pair is a carry-over iff `_del_cnt = 1 AND _ins_cnt = 1 AND _min_rv = _max_rv`. + * A pair is a carry-over iff + * `_del_cnt = 1 AND _ins_cnt = 1 AND _rv_cnt = 2 AND _min_rv = _max_rv`. + * The `_rv_cnt = 2` clause guards against a NULL rowVersion silently matching + * `_min_rv = _max_rv` (Spark's min/max skip NULLs). */ private def addCarryOverPairFilter(input: LogicalPlan): LogicalPlan = { val delCnt = getAttribute(input, HelperColumn.DelCnt) val insCnt = getAttribute(input, HelperColumn.InsCnt) val minRv = getAttribute(input, HelperColumn.MinRv) val maxRv = getAttribute(input, HelperColumn.MaxRv) + val rvCnt = getAttribute(input, HelperColumn.RvCnt) val isCarryoverPair = And( And(EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))), - EqualTo(minRv, maxRv)) + And(EqualTo(rvCnt, Literal(2L)), EqualTo(minRv, maxRv))) Filter(Not(isCarryoverPair), input) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala index e3c584fb2ecf6..d6c2675bde874 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -157,6 +157,31 @@ class ResolveChangelogTablePostProcessingSuite assert(rows.length == 3, "Without dedup, all 3 raw rows should be returned") } + test("NULL rowVersion on one side is NOT silently dropped as carry-over") { + // Regression for a NULL-safety hole: min/max skip NULLs, so _min_rv = _max_rv alone + // would match a pair with one NULL and one non-null rowVersion. The _rv_cnt = 2 + // clause in the carry-over filter prevents that. + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + // v2: one side has NULL rowVersion (buggy connector), the other has a real value. + InternalRow(1L, UTF8String.fromString("Alice"), null, + UTF8String.fromString("delete"), 2L, 0L), + changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 5L))) + + checkAnswer( + sql(s"SELECT id, name, _change_type, _commit_version " + + s"FROM $catalogName.$testTableName CHANGES FROM VERSION 1 TO VERSION 2"), + Seq( + Row(1L, "Alice", "insert", 1L), + Row(1L, "Alice", "delete", 2L), + Row(1L, "Alice", "insert", 2L))) + } + // =========================================================================== // Update Detection // =========================================================================== From fa4be9577191205b77ae67cbf6da083493aa35a5 Mon Sep 17 00:00:00 2001 From: Sandro Speh Date: Mon, 27 Apr 2026 08:36:56 +0000 Subject: [PATCH 5/6] PR Feedback --- .../sql/connector/catalog/Changelog.java | 9 + .../analysis/ResolveChangelogTable.scala | 9 +- ...lveChangelogTablePostProcessingSuite.scala | 410 ++++++++++-------- 3 files changed, 251 insertions(+), 177 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java index 0a811aa0ae4d7..5f2203aa1c379 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Changelog.java @@ -43,6 +43,15 @@ @Evolving public interface Changelog { + /** Constant for the {@code _change_type} value of a row inserted into the table. */ + String CHANGE_TYPE_INSERT = "insert"; + /** Constant for the {@code _change_type} value of a row deleted from the table. */ + String CHANGE_TYPE_DELETE = "delete"; + /** Constant for the {@code _change_type} value of an update's pre-image row. */ + String CHANGE_TYPE_UPDATE_PREIMAGE = "update_preimage"; + /** Constant for the {@code _change_type} value of an update's post-image row. */ + String CHANGE_TYPE_UPDATE_POSTIMAGE = "update_postimage"; + /** A name to identify this changelog. */ String name(); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala index 0fe01f0833bbf..bdf9b9fed09cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala @@ -195,9 +195,9 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { val partitionByCols = rowIdExprs ++ Seq(commitVersionAttr) val windowSpec = WindowSpecDefinition(partitionByCols, Nil, UnspecifiedFrame) - val insertIf = If(EqualTo(changeTypeAttr, Literal("insert")), + val insertIf = If(EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)), Literal(1), Literal(null, IntegerType)) - val deleteIf = If(EqualTo(changeTypeAttr, Literal("delete")), + val deleteIf = If(EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)), Literal(1), Literal(null, IntegerType)) val insCntAlias = Alias(WindowExpression( @@ -255,8 +255,9 @@ object ResolveChangelogTable extends Rule[LogicalPlan] { EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))) val isInvalid = Or(GreaterThan(delCnt, Literal(1L)), GreaterThan(insCnt, Literal(1L))) - val updateType = If(EqualTo(changeTypeAttr, Literal("insert")), - Literal("update_postimage"), Literal("update_preimage")) + val updateType = If(EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)), + Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE), + Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)) val raiseInvalid = RaiseError( Literal("CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala index d6c2675bde874..353472a035f91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -24,9 +24,14 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog.{ ChangelogProperties, Column, Identifier, InMemoryChangelogCatalog} +import org.apache.spark.sql.connector.catalog.Changelog.{ + CHANGE_TYPE_DELETE, CHANGE_TYPE_INSERT, CHANGE_TYPE_UPDATE_POSTIMAGE, + CHANGE_TYPE_UPDATE_PREIMAGE} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.datasources.v2.ChangelogTable import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{ BinaryType, BooleanType, DoubleType, LongType, StringType, StructField, StructType} @@ -122,20 +127,20 @@ class ResolveChangelogTablePostProcessingSuite // v2: real delete Alice (preimage carries old rcv=1); // carry-over for Bob (CoW, rcv unchanged on both sides) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), // carry-over - changeRow(2L, "Bob", "insert", 2L, rowCommitVersion = 1L))) // carry-over (same rcv) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), // carry-over + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L))) // carry-over (same rcv) checkAnswer( sql( s"SELECT id, name, _change_type, _commit_version " + s"FROM $catalogName.$testTableName CHANGES FROM VERSION 1 TO VERSION 2"), Seq( - Row(1L, "Alice", "insert", 1L), - Row(2L, "Bob", "insert", 1L), - Row(1L, "Alice", "delete", 2L))) + Row(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + Row(2L, "Bob", CHANGE_TYPE_INSERT, 1L), + Row(1L, "Alice", CHANGE_TYPE_DELETE, 2L))) } test("deduplicationMode=none keeps all carry-over rows") { @@ -145,41 +150,57 @@ class ResolveChangelogTablePostProcessingSuite rowVersionName = Some("row_commit_version"))) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 2L, rowCommitVersion = 1L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L))) - val rows = sql( - s"SELECT id FROM $catalogName.$testTableName " + - s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (deduplicationMode = 'none')") - .collect() - - assert(rows.length == 3, "Without dedup, all 3 raw rows should be returned") + checkAnswer( + sql( + s"SELECT id FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (deduplicationMode = 'none')"), + Seq(Row(1L), Row(2L), Row(2L))) } test("NULL rowVersion on one side is NOT silently dropped as carry-over") { // Regression for a NULL-safety hole: min/max skip NULLs, so _min_rv = _max_rv alone // would match a pair with one NULL and one non-null rowVersion. The _rv_cnt = 2 // clause in the carry-over filter prevents that. - catalog.setChangelogProperties(ident, ChangelogProperties( + // + // The fixture table here declares `row_commit_version` as nullable so the optimizer + // is not allowed to fold IsNull(non-nullable-col) to false; the NULL is a legitimate + // value the guard must defend against. + val nullableRcvTable = "events_nullable_rcv" + val nullableIdent = Identifier.of(Array.empty, nullableRcvTable) + val cat = catalog + if (cat.tableExists(nullableIdent)) cat.dropTable(nullableIdent) + cat.clearChangeRows(nullableIdent) + cat.createTable( + nullableIdent, + Array( + Column.create("id", LongType), + Column.create("name", StringType), + Column.create("row_commit_version", LongType, true)), + Array.empty[Transform], + Collections.emptyMap[String, String]()) + cat.setChangelogProperties(nullableIdent, ChangelogProperties( containsCarryoverRows = true, rowIdNames = Seq("id"), rowVersionName = Some("row_commit_version"))) - catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + cat.addChangeRows(nullableIdent, Seq( + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), // v2: one side has NULL rowVersion (buggy connector), the other has a real value. InternalRow(1L, UTF8String.fromString("Alice"), null, - UTF8String.fromString("delete"), 2L, 0L), - changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 5L))) + UTF8String.fromString(CHANGE_TYPE_DELETE), 2L, 0L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 5L))) checkAnswer( sql(s"SELECT id, name, _change_type, _commit_version " + - s"FROM $catalogName.$testTableName CHANGES FROM VERSION 1 TO VERSION 2"), + s"FROM $catalogName.$nullableRcvTable CHANGES FROM VERSION 1 TO VERSION 2"), Seq( - Row(1L, "Alice", "insert", 1L), - Row(1L, "Alice", "delete", 2L), - Row(1L, "Alice", "insert", 2L))) + Row(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + Row(1L, "Alice", CHANGE_TYPE_DELETE, 2L), + Row(1L, "Alice", CHANGE_TYPE_INSERT, 2L))) } // =========================================================================== @@ -194,10 +215,10 @@ class ResolveChangelogTablePostProcessingSuite rowVersionName = Some("row_commit_version"))) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), // v2: Alice -> Robert (delete old, insert new) - changeRow(1L, "Alice", "delete", 2L), - changeRow(1L, "Robert", "insert", 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L), + changeRow(1L, "Robert", CHANGE_TYPE_INSERT, 2L))) val rows = sql( s"SELECT id, name, _change_type, _commit_version " + @@ -224,9 +245,9 @@ class ResolveChangelogTablePostProcessingSuite rowVersionName = Some("row_commit_version"))) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), - changeRow(1L, "Alice", "delete", 2L), - changeRow(1L, "Alice", "insert", 3L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 3L))) val rows = sql( s"SELECT _change_type, _commit_version FROM $catalogName.$testTableName " + @@ -257,8 +278,8 @@ class ResolveChangelogTablePostProcessingSuite // delete (1, Alice) and insert (1, Bob) at v2. These are DIFFERENT composite // rowIds; they must NOT be relabeled as update. catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "delete", 2L), - changeRow(1L, "Bob", "insert", 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L), + changeRow(1L, "Bob", CHANGE_TYPE_INSERT, 2L))) val rows = sql( s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + @@ -285,12 +306,12 @@ class ResolveChangelogTablePostProcessingSuite // four rows would collapse into one partition with _del_cnt=2 / _ins_cnt=2 and // the carry-over filter (which requires =1) would keep them all. catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(1L, "Bob", "insert", 1L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 1L), - changeRow(1L, "Bob", "delete", 2L, rowCommitVersion = 1L), - changeRow(1L, "Bob", "insert", 2L, rowCommitVersion = 1L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(1L, "Bob", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L), + changeRow(1L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(1L, "Bob", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L))) val rows = sql( s"SELECT id, name, _change_type, _commit_version " + @@ -310,9 +331,9 @@ class ResolveChangelogTablePostProcessingSuite test("no capability flags -> post-processing not injected in plan") { // Default ChangelogProperties has no capability flags set; the rule sees nothing to do. catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), - changeRow(2L, "Bob", "delete", 2L), - changeRow(2L, "Bob", "insert", 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L))) val df = sql( s"SELECT * FROM $catalogName.$testTableName " + @@ -331,9 +352,37 @@ class ResolveChangelogTablePostProcessingSuite val df = spark.readStream .option("startingVersion", "1") .changes(s"$catalogName.$testTableName") - val plan = df.queryExecution.analyzed.treeString + val analyzed = df.queryExecution.analyzed + val plan = analyzed.treeString assert(!plan.contains("__spark_cdc_del_cnt"), s"Streaming plan must not contain post-processing helpers. Plan:\n$plan") + + // Positive assertion: the rule actually fired on the streaming relation. Without this, + // a regression that deletes the streaming arm of `ResolveChangelogTable.apply` would + // also pass the absence-of-helpers check above. + val tableResolved = analyzed.collectFirst { + case rel: StreamingRelationV2 if rel.table.isInstanceOf[ChangelogTable] => + rel.table.asInstanceOf[ChangelogTable].resolved + } + assert(tableResolved.contains(true), + s"Expected ChangelogTable to be marked resolved by the rule. Plan:\n$plan") + } + + test("streaming with post-processing options is rejected") { + catalog.setChangelogProperties(ident, ChangelogProperties( + containsCarryoverRows = true, + rowIdNames = Seq("id"), + rowVersionName = Some("row_commit_version"))) + + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .option("startingVersion", "1") + .changes(s"$catalogName.$testTableName") + .queryExecution.analyzed + }, + condition = "INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED", + parameters = Map("changelogName" -> s"$catalogName.${testTableName}_changelog")) } // =========================================================================== @@ -350,12 +399,12 @@ class ResolveChangelogTablePostProcessingSuite // v1: insert Alice (rcv=1), Bob (rcv=1) // v2: Alice carry-over (CoW, rcv unchanged), Bob real update (old rcv=1, new rcv=2) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), // carry-over - changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 1L), // carry-over - changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), // update preimage (old rcv) - changeRow(2L, "Robert", "insert", 2L, rowCommitVersion = 2L))) // update postimage (new rcv) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), // carry-over + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L), // carry-over + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), // update preimage + changeRow(2L, "Robert", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 2L))) // update postimage val rows = sql( s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + @@ -390,14 +439,14 @@ class ResolveChangelogTablePostProcessingSuite rowVersionName = Some("row_commit_version"))) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), // Alice: carry-over (CoW, rcv unchanged on both sides) - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L), // Bob -> Robert: real change (old rcv on pre, new rcv on post) - changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), - changeRow(2L, "Robert", "insert", 2L, rowCommitVersion = 2L))) + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(2L, "Robert", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 2L))) // Default computeUpdates=false: do NOT relabel, but DO drop carry-overs val rows = sql( @@ -422,8 +471,8 @@ class ResolveChangelogTablePostProcessingSuite rowVersionName = Some("row_commit_version"))) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), - changeRow(2L, "Bob", "insert", 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L))) val rows = sql( s"SELECT id, _change_type FROM $catalogName.$testTableName " + @@ -431,7 +480,7 @@ class ResolveChangelogTablePostProcessingSuite .collect() assert(rows.length == 2) - assert(rows.forall(_.getString(1) == "insert"), + assert(rows.forall(_.getString(1) == CHANGE_TYPE_INSERT), s"Pure inserts must stay 'insert'. Got: ${rows.map(_.getString(1)).mkString(",")}") } @@ -464,10 +513,10 @@ class ResolveChangelogTablePostProcessingSuite rowVersionName = Some("row_commit_version"))) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), // v2: Alice -> Robert (delete old, insert new) - changeRow(1L, "Alice", "delete", 2L), - changeRow(1L, "Robert", "insert", 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L), + changeRow(1L, "Robert", CHANGE_TYPE_INSERT, 2L))) val rows = sql( s"SELECT id, name, _change_type FROM $catalogName.$testTableName " + @@ -502,9 +551,9 @@ class ResolveChangelogTablePostProcessingSuite // Contract violation: 2 inserts for id=1 at v2. catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "delete", 2L), - changeRow(1L, "Alice2", "insert", 2L), - changeRow(1L, "Alice3", "insert", 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L), + changeRow(1L, "Alice2", CHANGE_TYPE_INSERT, 2L), + changeRow(1L, "Alice3", CHANGE_TYPE_INSERT, 2L))) checkError( intercept[SparkRuntimeException] { @@ -524,9 +573,9 @@ class ResolveChangelogTablePostProcessingSuite // Contract violation: 2 deletes for id=1 at v2. catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "delete", 2L), - changeRow(1L, "Alice2", "delete", 2L), - changeRow(1L, "Alice3", "insert", 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L), + changeRow(1L, "Alice2", CHANGE_TYPE_DELETE, 2L), + changeRow(1L, "Alice3", CHANGE_TYPE_INSERT, 2L))) checkError( intercept[SparkRuntimeException] { @@ -581,22 +630,6 @@ class ResolveChangelogTablePostProcessingSuite // Range edge cases // =========================================================================== - test("single-version range FROM VERSION X TO VERSION X") { - catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), - changeRow(2L, "Bob", "insert", 1L), - changeRow(3L, "Charlie", "insert", 2L))) - - val rows = sql( - s"SELECT id, _change_type, _commit_version FROM $catalogName.$testTableName " + - s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (deduplicationMode = 'none')") - .collect() - - assert(rows.length == 1, s"Single version: 1 row. Got ${rows.length}") - assert(rows(0).getLong(0) == 3L) - assert(rows(0).getString(1) == "insert") - } - test("multiple operations across versions") { catalog.setChangelogProperties(ident, ChangelogProperties( containsCarryoverRows = true, @@ -606,23 +639,23 @@ class ResolveChangelogTablePostProcessingSuite catalog.addChangeRows(ident, Seq( // v1: insert 3 rows (rcv=1 each) - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), - changeRow(3L, "Charlie", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), // v2: delete Alice (preimage carries old rcv=1); CoW carry-overs for Bob/Charlie // keep rcv=1 on both sides (row unchanged). - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 2L, rowCommitVersion = 1L), - changeRow(3L, "Charlie", "delete", 2L, rowCommitVersion = 1L), - changeRow(3L, "Charlie", "insert", 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L), // v3: update Bob -> Robert (old rcv=1, new rcv=3); CoW carry-over for Charlie (rcv=1) - changeRow(2L, "Bob", "delete", 3L, rowCommitVersion = 1L), - changeRow(2L, "Robert", "insert", 3L, rowCommitVersion = 3L), - changeRow(3L, "Charlie", "delete", 3L, rowCommitVersion = 1L), - changeRow(3L, "Charlie", "insert", 3L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 3L, rowCommitVersion = 1L), + changeRow(2L, "Robert", CHANGE_TYPE_INSERT, 3L, rowCommitVersion = 3L), + changeRow(3L, "Charlie", CHANGE_TYPE_DELETE, 3L, rowCommitVersion = 1L), + changeRow(3L, "Charlie", CHANGE_TYPE_INSERT, 3L, rowCommitVersion = 1L), // v4: insert Diana (rcv=4) - changeRow(4L, "Diana", "insert", 4L, rowCommitVersion = 4L))) + changeRow(4L, "Diana", CHANGE_TYPE_INSERT, 4L, rowCommitVersion = 4L))) val rows = sql( s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + @@ -651,7 +684,7 @@ class ResolveChangelogTablePostProcessingSuite test("larger insert batch returns all rows") { catalog.addChangeRows(ident, (1 to 5).map(i => - changeRow(i.toLong, ('A' + i - 1).toChar.toString, "insert", 1L))) + changeRow(i.toLong, ('A' + i - 1).toChar.toString, CHANGE_TYPE_INSERT, 1L))) val rows = sql( s"SELECT id, _change_type FROM $catalogName.$testTableName " + @@ -659,41 +692,7 @@ class ResolveChangelogTablePostProcessingSuite .collect() assert(rows.length == 5) - assert(rows.forall(_.getString(1) == "insert")) - } - - test("EXCLUSIVE start bound skips the start version") { - catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), - changeRow(2L, "Bob", "insert", 2L), - changeRow(3L, "Charlie", "insert", 3L))) - - val rows = sql( - s"SELECT id, _commit_version FROM $catalogName.$testTableName " + - s"CHANGES FROM VERSION 1 EXCLUSIVE TO VERSION 3 " + - s"WITH (deduplicationMode = 'none')") - .orderBy("_commit_version") - .collect() - - assert(!rows.exists(_.getLong(1) == 1L), "v1 must be excluded") - assert(rows.exists(_.getLong(0) == 2L), "Bob (v2) included") - assert(rows.exists(_.getLong(0) == 3L), "Charlie (v3) included") - } - - test("open-ended range (no TO clause) reads to latest") { - catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), - changeRow(2L, "Bob", "insert", 2L), - changeRow(3L, "Charlie", "insert", 3L))) - - val rows = sql( - s"SELECT id, _commit_version FROM $catalogName.$testTableName " + - s"CHANGES FROM VERSION 1 WITH (deduplicationMode = 'none')") - .orderBy("_commit_version", "id") - .collect() - - assert(rows.length == 3, s"Open-ended range should see all 3. Got ${rows.length}") - assert(rows.exists(r => r.getLong(0) == 3L && r.getLong(1) == 3L)) + assert(rows.forall(_.getString(1) == CHANGE_TYPE_INSERT)) } test("DELETE all rows: no carry-over inserts at v2") { @@ -704,10 +703,10 @@ class ResolveChangelogTablePostProcessingSuite // v1 inserts carry rcv=1; v2 deletes carry the old rcv=1 (rcv tracks last modification) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L))) val rows = sql( s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + @@ -734,12 +733,12 @@ class ResolveChangelogTablePostProcessingSuite // Every v2 row is a real update: delete side carries old rcv=1, insert side new rcv=2. catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), - changeRow(2L, "Bob", "insert", 1L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), - changeRow(1L, "Alice_updated", "insert", 2L, rowCommitVersion = 2L), - changeRow(2L, "Bob", "delete", 2L, rowCommitVersion = 1L), - changeRow(2L, "Bob_updated", "insert", 2L, rowCommitVersion = 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice_updated", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 2L), + changeRow(2L, "Bob", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(2L, "Bob_updated", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 2L))) val rows = sql( s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + @@ -759,9 +758,9 @@ class ResolveChangelogTablePostProcessingSuite test("append-only workload: all inserts, no carry-over needed") { catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L), - changeRow(2L, "Bob", "insert", 2L), - changeRow(3L, "Charlie", "insert", 3L))) + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L), + changeRow(3L, "Charlie", CHANGE_TYPE_INSERT, 3L))) val rows = sql( s"SELECT id, _change_type FROM $catalogName.$testTableName " + @@ -769,7 +768,7 @@ class ResolveChangelogTablePostProcessingSuite .collect() assert(rows.length == 3) - assert(rows.forall(_.getString(1) == "insert")) + assert(rows.forall(_.getString(1) == CHANGE_TYPE_INSERT)) } test("carry-over removal with many rows: only real change remains") { @@ -781,14 +780,15 @@ class ResolveChangelogTablePostProcessingSuite // 10 inserts at v1 (rcv=1 each). At v2: delete row 5; CoW writes 9 carry-over pairs // (rcv unchanged since v1, i.e. rcv=1 on both sides) plus 1 real delete (rcv=1, old). val v1Inserts = (1 to 10).map(i => - changeRow(i.toLong, ('A' + i - 1).toChar.toString, "insert", 1L, rowCommitVersion = 1L)) + changeRow( + i.toLong, ('A' + i - 1).toChar.toString, CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L)) val v2Carryovers = (1 to 10).filter(_ != 5).flatMap { i => val name = ('A' + i - 1).toChar.toString Seq( - changeRow(i.toLong, name, "delete", 2L, rowCommitVersion = 1L), - changeRow(i.toLong, name, "insert", 2L, rowCommitVersion = 1L)) + changeRow(i.toLong, name, CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(i.toLong, name, CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L)) } - val v2RealDelete = Seq(changeRow(5L, "E", "delete", 2L, rowCommitVersion = 1L)) + val v2RealDelete = Seq(changeRow(5L, "E", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L)) catalog.addChangeRows(ident, v1Inserts ++ v2Carryovers ++ v2RealDelete) val rows = sql( @@ -828,9 +828,9 @@ class ResolveChangelogTablePostProcessingSuite def mixedRow( id: Long, name: String, score: Double, active: Boolean, payload: Array[Byte], - ct: String, v: Long, rcv: Long): InternalRow = { + ct: String, v: Long, rowCommitVersion: Long): InternalRow = { InternalRow( - id, UTF8String.fromString(name), score, active, payload, rcv, + id, UTF8String.fromString(name), score, active, payload, rowCommitVersion, UTF8String.fromString(ct), v, 0L) } @@ -838,13 +838,19 @@ class ResolveChangelogTablePostProcessingSuite val bobPayload = Array[Byte](4, 5, 6) cat.addChangeRows(mixedIdent, Seq( - mixedRow(1L, "Alice", 95.5, true, alicePayload, "insert", 1L, rcv = 1L), - mixedRow(2L, "Bob", 87.3, false, bobPayload, "insert", 1L, rcv = 1L), + mixedRow( + 1L, "Alice", 95.5, true, alicePayload, CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), + mixedRow( + 2L, "Bob", 87.3, false, bobPayload, CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), // v2: update Alice's score (old rcv=1, new rcv=2); Bob is carry-over (rcv unchanged) - mixedRow(1L, "Alice", 95.5, true, alicePayload, "delete", 2L, rcv = 1L), - mixedRow(1L, "Alice", 99.0, true, alicePayload, "insert", 2L, rcv = 2L), - mixedRow(2L, "Bob", 87.3, false, bobPayload, "delete", 2L, rcv = 1L), // carry-over - mixedRow(2L, "Bob", 87.3, false, bobPayload, "insert", 2L, rcv = 1L))) // carry-over + mixedRow( + 1L, "Alice", 95.5, true, alicePayload, CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + mixedRow( + 1L, "Alice", 99.0, true, alicePayload, CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 2L), + mixedRow( + 2L, "Bob", 87.3, false, bobPayload, CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + mixedRow( + 2L, "Bob", 87.3, false, bobPayload, CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 1L))) val rows = sql( s"SELECT id, name, score, active, _change_type FROM $catalogName.$mixedTable " + @@ -859,8 +865,10 @@ class ResolveChangelogTablePostProcessingSuite s"Bob carry-over must be dropped despite DOUBLE/BOOLEAN/BINARY. Got: " + descs.mkString(",")) - val pre = rows.find(r => r.getLong(0) == 1L && r.getString(4) == "update_preimage").get - val post = rows.find(r => r.getLong(0) == 1L && r.getString(4) == "update_postimage").get + val pre = rows.find(r => + r.getLong(0) == 1L && r.getString(4) == CHANGE_TYPE_UPDATE_PREIMAGE).get + val post = rows.find(r => + r.getLong(0) == 1L && r.getString(4) == CHANGE_TYPE_UPDATE_POSTIMAGE).get assert(pre.getDouble(2) == 95.5) assert(post.getDouble(2) == 99.0) } @@ -869,10 +877,10 @@ class ResolveChangelogTablePostProcessingSuite // Regression: nested rowId + nested rowVersion end-to-end // =========================================================================== - // rowId is payload.id (nested); rowVersion is also row-level. A delete+insert pair with - // the same payload.id but different row_commit_version is a real update and must survive. - // A pair with matching row_commit_version would be a CoW carry-over and would be dropped. - test("nested rowId must not hide sibling-field changes") { + // End-to-end check that nested rowId paths (e.g. `payload.id`) are resolved on the plan + // and threaded through carry-over detection. The pair survives the filter because the + // row_commit_version differs across delete/insert, not because of any sibling-field data. + test("nested rowId path resolves correctly through carry-over filter") { val nestedTable = "events_nested" val nestedIdent = Identifier.of(Array.empty, nestedTable) val cat = catalog @@ -896,18 +904,19 @@ class ResolveChangelogTablePostProcessingSuite rowIdPaths = Seq(Seq("payload", "id")), rowVersionName = Some("row_commit_version"))) - def nestedRow(id: Long, value: String, ct: String, v: Long, rcv: Long): InternalRow = { + def nestedRow( + id: Long, value: String, ct: String, v: Long, rowCommitVersion: Long): InternalRow = { InternalRow( InternalRow(id, UTF8String.fromString(value)), - rcv, + rowCommitVersion, UTF8String.fromString(ct), v, 0L) } cat.addChangeRows(nestedIdent, Seq( - nestedRow(1L, "original", "insert", 1L, rcv = 1L), + nestedRow(1L, "original", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), // v2 update: rowId same, rowVersion differs (old rcv=1 on preimage, new rcv=2 on postimage) - nestedRow(1L, "original", "delete", 2L, rcv = 1L), - nestedRow(1L, "CHANGED", "insert", 2L, rcv = 2L))) + nestedRow(1L, "original", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + nestedRow(1L, "CHANGED", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 2L))) val rows = sql( s"SELECT payload.id AS id, payload.value AS value, _change_type, _commit_version " + @@ -945,10 +954,10 @@ class ResolveChangelogTablePostProcessingSuite rowVersionName = Some("row_commit_version"))) catalog.addChangeRows(ident, Seq( - changeRow(1L, "Alice", "insert", 1L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L, rowCommitVersion = 1L), // v2 no-op update: identical data, but rcv differs (Delta bumps it on any UPDATE) - changeRow(1L, "Alice", "delete", 2L, rowCommitVersion = 1L), - changeRow(1L, "Alice", "insert", 2L, rowCommitVersion = 2L))) + changeRow(1L, "Alice", CHANGE_TYPE_DELETE, 2L, rowCommitVersion = 1L), + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 2L, rowCommitVersion = 2L))) val rows = sql( s"SELECT id, name, _change_type, _commit_version FROM $catalogName.$testTableName " + @@ -967,4 +976,59 @@ class ResolveChangelogTablePostProcessingSuite assert(rows.length == 3, s"Expected v1 insert + v2 update pre/post = 3 rows. Got ${rows.length}") } + + // =========================================================================== + // Baseline (range syntax / connector range filtering -- rule bypassed via + // deduplicationMode = 'none'; included as smoke tests for the SQL surface). + // =========================================================================== + + test("baseline: single-version range FROM VERSION X TO VERSION X") { + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 1L), + changeRow(3L, "Charlie", CHANGE_TYPE_INSERT, 2L))) + + val rows = sql( + s"SELECT id, _change_type, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 2 TO VERSION 2 WITH (deduplicationMode = 'none')") + .collect() + + assert(rows.length == 1, s"Single version: 1 row. Got ${rows.length}") + assert(rows(0).getLong(0) == 3L) + assert(rows(0).getString(1) == CHANGE_TYPE_INSERT) + } + + test("baseline: EXCLUSIVE start bound skips the start version") { + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L), + changeRow(3L, "Charlie", CHANGE_TYPE_INSERT, 3L))) + + val rows = sql( + s"SELECT id, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 EXCLUSIVE TO VERSION 3 " + + s"WITH (deduplicationMode = 'none')") + .orderBy("_commit_version") + .collect() + + assert(!rows.exists(_.getLong(1) == 1L), "v1 must be excluded") + assert(rows.exists(_.getLong(0) == 2L), "Bob (v2) included") + assert(rows.exists(_.getLong(0) == 3L), "Charlie (v3) included") + } + + test("baseline: open-ended range (no TO clause) reads to latest") { + catalog.addChangeRows(ident, Seq( + changeRow(1L, "Alice", CHANGE_TYPE_INSERT, 1L), + changeRow(2L, "Bob", CHANGE_TYPE_INSERT, 2L), + changeRow(3L, "Charlie", CHANGE_TYPE_INSERT, 3L))) + + val rows = sql( + s"SELECT id, _commit_version FROM $catalogName.$testTableName " + + s"CHANGES FROM VERSION 1 WITH (deduplicationMode = 'none')") + .orderBy("_commit_version", "id") + .collect() + + assert(rows.length == 3, s"Open-ended range should see all 3. Got ${rows.length}") + assert(rows.exists(r => r.getLong(0) == 3L && r.getLong(1) == 3L)) + } } From f9fe11351d6cdcea250957998cb81d4b6e8fa574 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 27 Apr 2026 21:03:38 -0700 Subject: [PATCH 6/6] fix test failures --- .../connector/ChangelogEndToEndSuite.scala | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala index 58a2c7591dedc..bbb656083e064 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala @@ -418,27 +418,22 @@ class ChangelogEndToEndSuite extends QueryTest with SharedSparkSession { ChangelogInfo.DeduplicationMode.NONE) } - test("changes() passes deduplicationMode and computeUpdates to catalog") { + test("changes() passes computeUpdates to catalog") { catalog.addChangeRows(ident, Seq( makeChangeRow(1L, "a", "insert", 1L, 1000000L))) // DataFrame API spark.read .option("startingVersion", "1") - .option("deduplicationMode", "netChanges") .option("computeUpdates", "true") .changes(fullTableName) .collect() - val info1 = catalog.lastChangelogInfo.get - assert(info1.deduplicationMode() === ChangelogInfo.DeduplicationMode.NET_CHANGES) - assert(info1.computeUpdates() === true) + assert(catalog.lastChangelogInfo.get.computeUpdates() === true) // SQL sql(s"SELECT * FROM $fullTableName CHANGES FROM VERSION 1 " + - "WITH (deduplicationMode = 'netChanges', computeUpdates = 'true')").collect() - val info2 = catalog.lastChangelogInfo.get - assert(info2.deduplicationMode() === ChangelogInfo.DeduplicationMode.NET_CHANGES) - assert(info2.computeUpdates() === true) + "WITH (computeUpdates = 'true')").collect() + assert(catalog.lastChangelogInfo.get.computeUpdates() === true) } // ---------- Batch: timestamp range ---------- @@ -589,23 +584,20 @@ class ChangelogEndToEndSuite extends QueryTest with SharedSparkSession { // ---------- Streaming: CDC options ---------- - test("streaming changes() passes deduplicationMode and computeUpdates to catalog") { + test("streaming changes() passes computeUpdates to catalog") { catalog.addChangeRows(ident, Seq( makeChangeRow(1L, "a", "insert", 1L, 1000000L))) // DataFrame API val dfApiStream = spark.readStream .option("startingVersion", "1") - .option("deduplicationMode", "netChanges") .option("computeUpdates", "true") .changes(fullTableName) val q1 = dfApiStream.writeStream .format("memory").queryName("cdc_stream_opts_df").start() try { q1.processAllAvailable() - val info1 = catalog.lastChangelogInfo.get - assert(info1.deduplicationMode() === ChangelogInfo.DeduplicationMode.NET_CHANGES) - assert(info1.computeUpdates() === true) + assert(catalog.lastChangelogInfo.get.computeUpdates() === true) } finally { q1.stop() } @@ -613,14 +605,12 @@ class ChangelogEndToEndSuite extends QueryTest with SharedSparkSession { // SQL val sqlStream = sql( s"SELECT * FROM STREAM $fullTableName CHANGES FROM VERSION 1 " + - "WITH (deduplicationMode = 'netChanges', computeUpdates = 'true')") + "WITH (computeUpdates = 'true')") val q2 = sqlStream.writeStream .format("memory").queryName("cdc_stream_opts_sql").start() try { q2.processAllAvailable() - val info2 = catalog.lastChangelogInfo.get - assert(info2.deduplicationMode() === ChangelogInfo.DeduplicationMode.NET_CHANGES) - assert(info2.computeUpdates() === true) + assert(catalog.lastChangelogInfo.get.computeUpdates() === true) } finally { q2.stop() }