Skip to content

[SPARK-56566][SS] Stop applying prev lower bound to transformWithState timer scans#55474

Closed
HeartSaVioR wants to merge 1 commit into
apache:masterfrom
HeartSaVioR:SPARK-56566
Closed

[SPARK-56566][SS] Stop applying prev lower bound to transformWithState timer scans#55474
HeartSaVioR wants to merge 1 commit into
apache:masterfrom
HeartSaVioR:SPARK-56566

Conversation

@HeartSaVioR
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR changes TransformWithStateExec.processTimers to stop passing prevBatchTimestampMs / eventTimeWatermarkForLateEvents as the exclusive lower bound to processorHandle.getExpiredTimers(...). Timer scans now always run without a lower bound (full scan up to the current batch timestamp / eviction watermark).

Concretely:

  • ProcessingTime branch: getExpiredTimers(batchTimestamp, prevBatchTimestampMs) -> getExpiredTimers(batchTimestamp).
  • EventTime branch: the STATEFUL_OPERATOR_ALLOW_MULTIPLE conditional that computed prevWatermark is removed; getExpiredTimers(watermark, prevWatermark) -> getExpiredTimers(watermark).

The TimerStateImpl.getExpiredTimers signature is unchanged — its prevExpiryTimestampMs parameter (added in SPARK-56400) is retained as a library primitive so that we can re-enable the optimization in the future once registerTimer enforces ts > currentBatchTimestamp / watermark server-side.

TTLState.ttlEvictionIterator is also unchanged. TTL expirations are always strictly above prevBatchTimestampMs by construction (TTL is processing-time-only, ttlDuration is validated > 0, and batchTimestampMs is monotonically non-decreasing), so the TTL path is not affected by this bug.

Why are the changes needed?

SPARK-56400 introduced a bounded rangeScan in TimerStateImpl.getExpiredTimers whose exclusive lower bound is prevBatchTimestampMs (ProcessingTime) or eventTimeWatermarkForLateEvents (EventTime, multi-op — the default). registerTimer has guard on the registered expiry, so a user can legally call registerTimer(ts) with ts at or below that lower bound. In that case the bounded scan excludes the entry, and because the lower bound is monotonically non-decreasing across batches, the timer is never fired in any subsequent batch either — silently lost.

Reproduction:

  • ProcessingTime: from batch 2 onwards (prevBatchTimestampMs is non-None), any registerTimer(ts <= prevBatchTimestampMs) never fires.
  • EventTime (default STATEFUL_OPERATOR_ALLOW_MULTIPLE=true): any registerTimer(ts <= eventTimeWatermarkForLateEvents) never fires.
  • EventTime legacy single-op (STATEFUL_OPERATOR_ALLOW_MULTIPLE=false): unaffected — processTimers already falls back to a full scan in that mode.
    Affected idioms the bug silently breaks:
  • registerTimer(0L) as a "fire on the next batch" pattern.
  • Event-time timers derived from a record's event_time in multi-op chains where a downstream operator's late-events watermark is looser than the upstream.
  • Any registerTimer(ts) that hits ts <= prev even once — the timer is dead forever unless the user also calls deleteTimer.

Reverting the lower bound is the most conservative fix. A complementary follow-up (tracked separately) should restrict the valid timestamp range on registerTimer so that ts > currentBatchTimestamp / watermark is enforced; after that lands we can re-enable the bounded scan.

Does this PR introduce any user-facing change?

No, since the bug wasn't released yet.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Thanks! Merging to master.

longvu-db pushed a commit to longvu-db/spark that referenced this pull request Apr 22, 2026
…e timer scans

### What changes were proposed in this pull request?

This PR changes `TransformWithStateExec.processTimers` to stop passing `prevBatchTimestampMs` / `eventTimeWatermarkForLateEvents` as the exclusive lower bound to `processorHandle.getExpiredTimers(...)`. Timer scans now always run without a lower bound (full scan up to the current batch timestamp / eviction watermark).

Concretely:
- ProcessingTime branch: `getExpiredTimers(batchTimestamp, prevBatchTimestampMs)` -> `getExpiredTimers(batchTimestamp)`.
- EventTime branch: the `STATEFUL_OPERATOR_ALLOW_MULTIPLE` conditional that computed `prevWatermark` is removed; `getExpiredTimers(watermark, prevWatermark)` -> `getExpiredTimers(watermark)`.

The `TimerStateImpl.getExpiredTimers` signature is unchanged — its `prevExpiryTimestampMs` parameter (added in SPARK-56400) is retained as a library primitive so that we can re-enable the optimization in the future once `registerTimer` enforces `ts > currentBatchTimestamp / watermark` server-side.

`TTLState.ttlEvictionIterator` is also unchanged. TTL expirations are always strictly above `prevBatchTimestampMs` by construction (TTL is processing-time-only, `ttlDuration` is validated `> 0`, and `batchTimestampMs` is monotonically non-decreasing), so the TTL path is not affected by this bug.

### Why are the changes needed?

SPARK-56400 introduced a bounded `rangeScan` in `TimerStateImpl.getExpiredTimers` whose exclusive lower bound is `prevBatchTimestampMs` (ProcessingTime) or `eventTimeWatermarkForLateEvents` (EventTime, multi-op — the default). `registerTimer` has guard on the registered expiry, so a user can legally call `registerTimer(ts)` with `ts` at or below that lower bound. In that case the bounded scan excludes the entry, and because the lower bound is monotonically non-decreasing across batches, the timer is never fired in any subsequent batch either — silently lost.

Reproduction:

- ProcessingTime: from batch 2 onwards (`prevBatchTimestampMs` is non-`None`), any `registerTimer(ts <= prevBatchTimestampMs)` never fires.
- EventTime (default `STATEFUL_OPERATOR_ALLOW_MULTIPLE=true`): any `registerTimer(ts <= eventTimeWatermarkForLateEvents)` never fires.
- EventTime legacy single-op (`STATEFUL_OPERATOR_ALLOW_MULTIPLE=false`): unaffected — `processTimers` already falls back to a full scan in that mode.
Affected idioms the bug silently breaks:
- `registerTimer(0L)` as a "fire on the next batch" pattern.
- Event-time timers derived from a record's `event_time` in multi-op chains where a downstream operator's late-events watermark is looser than the upstream.
- Any `registerTimer(ts)` that hits `ts <= prev` even once — the timer is dead forever unless the user also calls `deleteTimer`.

Reverting the lower bound is the most conservative fix. A complementary follow-up (tracked separately) should restrict the valid timestamp range on `registerTimer` so that `ts > currentBatchTimestamp / watermark` is enforced; after that lands we can re-enable the bounded scan.

### Does this PR introduce _any_ user-facing change?

No, since the bug wasn't released yet.

### How was this patch tested?

New UTs.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

Closes apache#55474 from HeartSaVioR/SPARK-56566.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants