Skip to content

Commit 791d5ce

Browse files
committed
[SPARK-56686][SQL][FOLLOWUP] Fail fast on NULL _commit_timestamp in streaming row-level rewrite
Address @zikangh's review on #55637 -- the streaming row-level rewrite should enforce non-NULL _commit_timestamp, mirroring the runtime guard in CdcNetChangesStatefulProcessor. A NULL _commit_timestamp on a streaming read is a connector contract violation that would silently stall the row's group: the downstream streaming Aggregate uses _commit_timestamp as an event-time watermark column AND a grouping key, and Spark's eviction predicate is LessThanOrEqual(eventTime, watermark) -- a NULL group key never satisfies that, so the group sits in state until end of stream producing no output and no error. Add a Filter at the top of the streaming row-level rewrite that raises CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP via the same RaiseError pattern used for the multiple-changes-per-row-version guard in the batch path. Also adds the new error class to error-conditions.json. Tests: - Plan-shape tests: assert the guard Filter is present and sits directly above the streaming relation (so it runs before any downstream operator sees the NULL). - End-to-end test: feeding a row with a NULL _commit_timestamp surfaces CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP at the streaming query level rather than producing no output. - Existing carry-over / update-detection plan-shape tests updated for the extra guard Filter (was 1 -> now 2 Filters in carry-over and combined paths; was 0 -> now 1 in update-detection-only). Also refreshed the addStreamingRowLevelPostProcessing Scaladoc to add a step 0 (the guard) and step 7 (the watermark-metadata strip), keeping the per-operator detail aligned with the rewrite's actual shape. Doc-only side effect: scalafmt reflowed the watermark-metadata bullet in DataStreamReader.changes() Scaladoc (no semantic change).
1 parent 7f471f2 commit 791d5ce

5 files changed

Lines changed: 145 additions & 23 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,11 @@
666666
"The Change Data Capture (CDC) connector violated the `Changelog` contract at runtime."
667667
],
668668
"subClass" : {
669+
"NULL_COMMIT_TIMESTAMP" : {
670+
"message" : [
671+
"Connector emitted a row with a NULL `_commit_timestamp` on a streaming read engaging post-processing. The `Changelog` contract requires `_commit_timestamp` to be non-NULL for streaming reads, since post-processing uses it as event time to advance the watermark."
672+
]
673+
},
669674
"UNEXPECTED_CHANGE_TYPE" : {
670675
"message" : [
671676
"Connector emitted a row with a `_change_type` value that is not one of the four supported types (`insert`, `delete`, `update_preimage`, `update_postimage`). The `Changelog` contract requires every emitted row to carry one of these four values."

sql/api/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,11 @@ abstract class DataStreamReader {
145145
* to the max `_commit_timestamp` observed in the previous batch). A stream that reads its
146146
* last commit and stops will keep that commit's events in state until a subsequent
147147
* (no-data) micro-batch fires.
148-
* - The query is constrained to `Append` output mode; `Update` and `Complete` are
149-
* rejected at writer-start time with
150-
* `STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION`. The internal watermark metadata is
151-
* stripped from the user-visible `_commit_timestamp` output, so downstream
152-
* user-supplied watermarks on other columns do not interact with it via the global
153-
* multi-watermark policy.
148+
* - The query is constrained to `Append` output mode; `Update` and `Complete` are rejected at
149+
* writer-start time with `STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION`. The internal
150+
* watermark metadata is stripped from the user-visible `_commit_timestamp` output, so
151+
* downstream user-supplied watermarks on other columns do not interact with it via the
152+
* global multi-watermark policy.
154153
*
155154
* @param tableName
156155
* a qualified or unqualified name that designates a table.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
3434
import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo}
3535
import org.apache.spark.sql.errors.QueryCompilationErrors
3636
import org.apache.spark.sql.execution.datasources.v2.{ChangelogTable, DataSourceV2Relation}
37-
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType}
37+
import org.apache.spark.sql.types.{BooleanType, IntegerType, MetadataBuilder, StringType}
3838
import org.apache.spark.unsafe.types.CalendarInterval
3939

4040
/**
@@ -244,6 +244,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
244244
* the aggregate so no rows are lost.
245245
* {{{
246246
* DataSourceV2Relation
247+
* -> Filter (RaiseError on NULL _commit_timestamp)
247248
* -> EventTimeWatermark(_commit_timestamp, 0s)
248249
* -> Aggregate
249250
* group by (rowId..., _commit_version, _commit_timestamp)
@@ -255,6 +256,7 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
255256
* -> Generate(Inline(__spark_cdc_events)) // re-emit one row per buffered input
256257
* -> [Project (update relabel)]
257258
* -> Project (drop helper columns)
259+
* -> Project (strip internal EventTimeWatermark metadata)
258260
* }}}
259261
*
260262
* ==Runtime walkthrough==
@@ -278,15 +280,16 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
278280
*
279281
* ==Per-operator detail==
280282
*
283+
* 0. [[Filter]] guarding against NULL `_commit_timestamp` -- raises
284+
* `CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` for any row that
285+
* violates the contract. A NULL would never satisfy the downstream Aggregate's
286+
* `eventTime <= watermark` eviction predicate (NULL is silent in MAX, never
287+
* compares less-than-or-equal), so its group would be held in state forever.
288+
* Failing fast surfaces the connector bug instead of producing no output.
281289
* 1. [[EventTimeWatermark]] on `_commit_timestamp` (zero delay) -- required so the
282290
* downstream stateful aggregate can emit groups in append output mode. By CDC
283291
* contract every row in a single commit shares `_commit_timestamp`, so taking it
284-
* as event time is safe. Note: this is currently the only analyzer rule that
285-
* auto-injects an [[EventTimeWatermark]] (others resolve user-supplied watermarks).
286-
* The watermark metadata is preserved on the user-visible `_commit_timestamp`
287-
* output (since [[Generate]]'s `generatorOutput` copies attribute metadata), so a
288-
* downstream user-supplied `withWatermark` on a different column will interact
289-
* with this internal watermark under the global multi-watermark policy.
292+
* as event time is safe.
290293
* 2. [[Aggregate]] keyed by `(rowId..., _commit_version, _commit_timestamp)`. Computes
291294
* the same `_del_cnt` / `_ins_cnt` / (`_min_rv` / `_max_rv` / `_rv_cnt`) helpers as
292295
* the batch path, plus an `__spark_cdc_events` array-of-struct buffering every
@@ -304,17 +307,29 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
304307
* 5. [[Project]] (only when update detection is requested) applying the same
305308
* `CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION`
306309
* guard and `_change_type` relabel as the batch path.
307-
* 6. Final [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*` helpers so
310+
* 6. [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*` helpers so
308311
* the output schema matches the connector's declared schema.
312+
* 7. Final [[Project]] (via [[stripCommitTimestampWatermarkMetadata]]) clears the
313+
* `EventTimeWatermark.delayKey` from the user-visible `_commit_timestamp`
314+
* attribute so a downstream user-supplied `withWatermark` on a different column
315+
* does not interact with our internal watermark via the global multi-watermark
316+
* policy.
309317
*/
310318
private def addStreamingRowLevelPostProcessing(
311319
plan: LogicalPlan,
312320
cl: Changelog,
313321
requiresCarryOverRemoval: Boolean,
314322
requiresUpdateDetection: Boolean): LogicalPlan = {
315-
val rawCommitTsAttr = getAttribute(plan, "_commit_timestamp")
323+
// Fail fast on a NULL `_commit_timestamp`. The downstream Aggregate uses it as
324+
// both an event-time watermark column and a grouping key; a NULL group-key value
325+
// would never satisfy the `eventTime <= watermark` eviction predicate, so the
326+
// group would silently stall (held in state until end of stream). Mirrors the
327+
// runtime check in [[CdcNetChangesStatefulProcessor]] -- fail fast at the
328+
// contract violation rather than producing no output.
329+
val plan1 = addNullCommitTimestampGuard(plan)
330+
val rawCommitTsAttr = getAttribute(plan1, "_commit_timestamp")
316331
val watermarked = EventTimeWatermark(
317-
UUID.randomUUID(), rawCommitTsAttr, new CalendarInterval(0, 0, 0L), plan)
332+
UUID.randomUUID(), rawCommitTsAttr, new CalendarInterval(0, 0, 0L), plan1)
318333

319334
val rowIdExprs = V2ExpressionUtils.resolveRefs[NamedExpression](
320335
cl.rowId().toSeq, watermarked)
@@ -404,6 +419,26 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
404419
removeHelperColumns(cleaned)
405420
}
406421

422+
/**
423+
* Adds a `Filter` that raises
424+
* `CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP` for any input row whose
425+
* `_commit_timestamp` is `NULL`. Used as the first step of the streaming row-level
426+
* rewrite so a contract-violating connector fails fast instead of silently stalling
427+
* the downstream stateful aggregate's group.
428+
*/
429+
private def addNullCommitTimestampGuard(input: LogicalPlan): LogicalPlan = {
430+
val commitTsAttr = getAttribute(input, "_commit_timestamp")
431+
val raise = RaiseError(
432+
Literal("CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP"),
433+
CreateMap(Nil),
434+
BooleanType)
435+
// CaseWhen returns the default branch (true) for non-null timestamps and
436+
// evaluates the side-effecting RaiseError for nulls; the row never passes the
437+
// filter on a contract violation.
438+
val checkExpr = CaseWhen(Seq(IsNull(commitTsAttr) -> raise), Literal(true))
439+
Filter(checkExpr, input)
440+
}
441+
407442
/**
408443
* Final boundary for the streaming row-level rewrite: rebuilds the user-visible
409444
* `_commit_timestamp` attribute with empty watermark-related metadata. Other

sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -857,4 +857,39 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
857857
assert(e.getMessage.contains("Change Data Capture"),
858858
s"Error should mention CDC: ${e.getMessage}")
859859
}
860+
861+
test("streaming row-level rewrite raises on NULL _commit_timestamp") {
862+
val id = recreateWithRowVersion()
863+
catalog.setChangelogProperties(id, ChangelogProperties(
864+
containsCarryoverRows = true,
865+
rowIdNames = Seq("id"),
866+
rowVersionName = Some("row_commit_version")))
867+
868+
// Insert a row with NULL _commit_timestamp (last column).
869+
val row = InternalRow(
870+
1L, UTF8String.fromString("Alice"), 1L,
871+
UTF8String.fromString(CHANGE_TYPE_INSERT), 1L, null)
872+
catalog.addChangeRows(id, Seq(row))
873+
874+
val q = spark.readStream
875+
.option("startingVersion", "1")
876+
.changes(fullTableName)
877+
.writeStream
878+
.format("memory")
879+
.queryName("cdc_stream_null_ts")
880+
.outputMode("append")
881+
.start()
882+
try {
883+
val e = intercept[org.apache.spark.sql.streaming.StreamingQueryException] {
884+
q.processAllAvailable()
885+
}
886+
// The CHANGELOG_CONTRACT_VIOLATION runtime error wraps the message; it should
887+
// mention NULL_COMMIT_TIMESTAMP somewhere in the chain.
888+
assert(e.getMessage.contains("NULL_COMMIT_TIMESTAMP") ||
889+
Option(e.getCause).map(_.getMessage).getOrElse("").contains("NULL_COMMIT_TIMESTAMP"),
890+
s"Expected NULL_COMMIT_TIMESTAMP in the error chain. Got: ${e.getMessage}")
891+
} finally {
892+
q.stop()
893+
}
894+
}
860895
}

sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,15 @@ class ResolveChangelogTableStreamingPostProcessingSuite
133133
s"Generate must use Inline. Plan:\n$plan")
134134
}
135135

136+
private def assertContainsNullCommitTimestampGuard(plan: LogicalPlan): Unit = {
137+
val nullGuards = plan.collect {
138+
case f: Filter
139+
if f.condition.toString.contains("NULL_COMMIT_TIMESTAMP") => f
140+
}
141+
assert(nullGuards.size == 1,
142+
s"Expected exactly one NULL_COMMIT_TIMESTAMP guard Filter. Plan:\n$plan")
143+
}
144+
136145
// ===========================================================================
137146
// Carry-over removal only
138147
// ===========================================================================
@@ -154,8 +163,11 @@ class ResolveChangelogTableStreamingPostProcessingSuite
154163
assert(groupingNames.toSet == Set("id", "_commit_version", "_commit_timestamp"),
155164
s"Expected grouping by (id, _commit_version, _commit_timestamp); got $groupingNames")
156165

166+
// Two Filters: the NULL `_commit_timestamp` guard + the carry-over predicate.
157167
val filters = analyzed.collect { case f: Filter => f }
158-
assert(filters.size == 1, s"Expected one Filter for carry-over removal. Plan:\n$analyzed")
168+
assert(filters.size == 2,
169+
s"Expected NULL guard + carry-over Filter. Plan:\n$analyzed")
170+
assertContainsNullCommitTimestampGuard(analyzed)
159171

160172
assertInlineGenerate(analyzed)
161173
assertHelperColumnsRemoved(analyzed)
@@ -177,10 +189,13 @@ class ResolveChangelogTableStreamingPostProcessingSuite
177189
"deduplicationMode" -> "none").queryExecution.analyzed
178190
assertWatermarkOnCommitTimestamp(analyzed)
179191

180-
// No carry-over Filter when only update detection runs.
192+
// No carry-over Filter when only update detection runs -- but the NULL
193+
// `_commit_timestamp` guard Filter is always present.
181194
val filters = analyzed.collect { case f: Filter => f }
182-
assert(filters.isEmpty,
183-
s"No Filter expected for update-detection-only path. Plan:\n$analyzed")
195+
assert(filters.size == 1,
196+
s"Only the NULL guard Filter is expected for update-detection-only path. " +
197+
s"Plan:\n$analyzed")
198+
assertContainsNullCommitTimestampGuard(analyzed)
184199

185200
assertInlineGenerate(analyzed)
186201

@@ -211,10 +226,11 @@ class ResolveChangelogTableStreamingPostProcessingSuite
211226
val aggs = analyzed.collect { case a: Aggregate => a }
212227
assert(aggs.size == 1, s"Should fuse both passes into a single Aggregate. Plan:\n$analyzed")
213228

214-
// Filter for carry-over removal AND a Project for relabeling.
229+
// Two Filters: NULL guard + carry-over removal.
215230
val filters = analyzed.collect { case f: Filter => f }
216-
assert(filters.size == 1,
217-
s"Exactly one Filter expected for combined path. Plan:\n$analyzed")
231+
assert(filters.size == 2,
232+
s"Expected NULL guard + carry-over Filter for combined path. Plan:\n$analyzed")
233+
assertContainsNullCommitTimestampGuard(analyzed)
218234

219235
assertInlineGenerate(analyzed)
220236
assertHelperColumnsRemoved(analyzed)
@@ -277,4 +293,36 @@ class ResolveChangelogTableStreamingPostProcessingSuite
277293
assert(!ts.get.metadata.contains(EventTimeWatermark.delayKey),
278294
s"Watermark metadata leaked to user-visible `_commit_timestamp`. Plan:\n$analyzed")
279295
}
296+
297+
// ===========================================================================
298+
// NULL _commit_timestamp guard
299+
// ===========================================================================
300+
301+
test("NULL _commit_timestamp guard Filter is the first operator after the source") {
302+
catalog.setChangelogProperties(identifier, ChangelogProperties(
303+
containsCarryoverRows = true,
304+
rowIdNames = Seq("id"),
305+
rowVersionName = Some("row_commit_version")))
306+
307+
val analyzed = streamingDf().queryExecution.analyzed
308+
// The guard must sit BELOW the EventTimeWatermark (we don't want a NULL row to
309+
// be considered for watermark advancement at all). Verify by walking the plan
310+
// top-down and finding the guard before any Aggregate.
311+
val guards = analyzed.collect {
312+
case f: Filter if f.condition.toString.contains("NULL_COMMIT_TIMESTAMP") => f
313+
}
314+
assert(guards.size == 1, s"Expected exactly one guard. Plan:\n$analyzed")
315+
val guard = guards.head
316+
val guardChild = guard.child
317+
// The guard's child should be the bare relation (or a SubqueryAlias wrapping it),
318+
// not the EventTimeWatermark.
319+
val isSourceBelowGuard = guardChild match {
320+
case _: org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 => true
321+
case org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias(_,
322+
_: org.apache.spark.sql.catalyst.streaming.StreamingRelationV2) => true
323+
case _ => false
324+
}
325+
assert(isSourceBelowGuard,
326+
s"NULL guard Filter should sit directly above the streaming relation. Plan:\n$analyzed")
327+
}
280328
}

0 commit comments

Comments
 (0)