Skip to content

[SPARK-55147][SS] Scope timestamp range for time-interval join retrieval in V4 state format#54879

Closed
nicholaschew11 wants to merge 2 commits intoapache:masterfrom
nicholaschew11:SPARK-55147-range-scan-v4
Closed

[SPARK-55147][SS] Scope timestamp range for time-interval join retrieval in V4 state format#54879
nicholaschew11 wants to merge 2 commits intoapache:masterfrom
nicholaschew11:SPARK-55147-range-scan-v4

Conversation

@nicholaschew11
Copy link
Contributor

@nicholaschew11 nicholaschew11 commented Mar 18, 2026

What changes were proposed in this pull request?

This PR improves the retrieval operation in the V4 stream-stream join state manager to scope the timestamp range for time-interval joins. Instead of scanning all timestamps for a given key during prefix scan, V4 now extracts constant interval offsets from the join condition and computes a (minTs, maxTs) range per input row, enabling the prefix scan to skip entries before minTs and terminate early past maxTs.

  • Add scanRangeOffsets and computeTimestampRange to OneSideHashJoiner, using StreamingJoinHelper.getStateValueWatermark(eventWatermark=0) to extract interval bounds from the join condition
  • Add timestampRange parameter to getJoinedRows in the state manager trait, V4 implementation, and V1-V3 base class (ignored by V1-V3)
  • Add getValuesInRange to KeyWithTsToValuesStore that filters by range and stops early past the upper bound
  • getValues now delegates to getValuesInRange(Long.MinValue, Long.MaxValue)

Why are the changes needed?

For time-interval joins, the V4 state format stores values indexed by (key, timestamp). Without range scoping, retrieving matches requires scanning all timestamps for a key via prefix scan, even though the join condition constrains matching to a specific time window. With this change, the scan is bounded to only the relevant timestamp range, reducing I/O proportionally to the ratio of the interval width to the total timestamp span in state.

Does this PR introduce any user-facing change?

No. V4 state format is experimental and gated behind spark.sql.streaming.join.stateFormatV4.enabled.

How was this patch tested?

New unit tests in SymmetricHashJoinStateManagerEventTimeInValueSuite:

  • getJoinedRows with timestampRange: boundary conditions, exact matches, empty ranges, full range
  • timestampRange with multiple values per timestamp: multiple values at the same timestamp

Existing V4 join suites (Inner, Outer, FullOuter, LeftSemi) all pass.

Was this patch authored or co-authored using generative AI tooling?

Yes. (Claude Opus 4.6)

@nicholaschew11 nicholaschew11 force-pushed the SPARK-55147-range-scan-v4 branch from 7935015 to bde2f50 Compare March 18, 2026 05:49
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass.

*
* It is caller's responsibility to consume the whole iterator.
*
* For V4 time-interval joins, timestampRange may be provided to skip/stop-early during
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the documentation of interface to take a role of "interface". We can just talk about contract/requirement and derived classes will have separate method doc to mention the fact.

I'd rather document the parameter timestampRange as "hint" for optimization of reducing scope of scan. The derived class can make an optimization with that hint but it's still OK for derived class to ignore it, if the derived class cannot leverage that hint.

We should still also clarify that given timestampRange is a hint and derived class can decide not to leverage it, timestampRange is expected to be a subset of predicate condition in practice. That means, the parameter predicate has to be provided in a way to produce the correct output whether the parameter timestampRange is leveraged as a hint or not.

(The reason I said "in practice" is because you leverage this in the test and I admit there is no easy way to test this except breaking the above.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also let's clarify the boundary of inclusive vs exclusive in both sides. This should be described in the interface method doc.

private val iter = stateStore.prefixScanWithMultiValues(key, colFamilyName)

private var currentTs = -1L
private var currentTsInRange = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly the new logic is over complicated. Technically, the entire logic should be the same as before, except handling lower bound and upper bound. Handling lower bound and upper bound can be handled as exceptional case than processing the data in timestamp boundary.

Below is the simplified logic (DISCLAIMER: Claude 4.6 opus) with my guidance of direction for above simplification:

        private var currentTs = -1L
        private var pastUpperBound = false
        private val valueAndMatchPairs = scala.collection.mutable.ArrayBuffer[ValueAndMatchPair]()

        private def flushAccumulated(): GetValuesResult = {
          if (valueAndMatchPairs.nonEmpty) {
            val result = reusableGetValuesResult.withNew(
              currentTs, valueAndMatchPairs.toList)
            currentTs = -1L
            valueAndMatchPairs.clear()
            result
          } else {
            finished = true
            null
          }
        }

        @tailrec
        override protected def getNext(): GetValuesResult = {
          if (pastUpperBound || !iter.hasNext) {
            flushAccumulated()
          } else {
            val unsafeRowPair = iter.next()
            val ts = TimestampKeyStateEncoder.extractTimestamp(unsafeRowPair.key)

            if (ts > maxTs) {
              pastUpperBound = true
              getNext()
            } else if (ts < minTs) {
              getNext()
            } else if (currentTs == -1L || currentTs == ts) {
              currentTs = ts
              valueAndMatchPairs += valueRowConverter.convertValue(unsafeRowPair.value)
              getNext()
            } else {
              // Timestamp changed -- flush previous group before starting new one
              val prevTs = currentTs
              val prevValues = valueAndMatchPairs.toList

              currentTs = ts
              valueAndMatchPairs.clear()
              valueAndMatchPairs += valueRowConverter.convertValue(unsafeRowPair.value)

              reusableGetValuesResult.withNew(prevTs, prevValues)
            }
          }
        }

Would you take a look and apply the change if you think it's good?

} else if (iter.hasNext) {
val unsafeRowPair = iter.next()

val ts = TimestampKeyStateEncoder.extractTimestamp(unsafeRowPair.key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be considered as "beyond the scope of PR" (since it requires some code change on StateStore API), but just for recording:

We might get better outcome if we can specify the start boundary and the end boundary when scanning in general. This would be useful for range scan and prefix scan with timestamp (timestamp as postfix).

Seeking to specific position directly is better than seeking to first position and doing sequential scan to find the specific position. (RocksDB won't find the position via sequentially scan)

Scoping the iterator to closely in upper bound would also help RocksDB to avoid unnecessary scanning (especially tombstones), although it's not probably very different due to the pattern we remove the state (we remove the state in timestamp order, so it's unlikely to have tombstones beyond upper bound if we ever have a valid entry within timestamp boundary.

key: Int,
range: Option[(Long, Long)])(implicit manager: SymmetricHashJoinStateManager): Seq[Int] = {
val dummyRow = new GenericInternalRow(0)
manager.getJoinedRows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's briefly leave code comment that predicate should contain the condition of timestampRange in practice, but we don't do it intentionally for testing the functionality.

inputValueAttributes, joinKeyExpressions, stateFormatVersion = 4) { manager =>
implicit val mgr = manager

append(40, 20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not adding a bit more to show that the method works correctly with multiple values across multiple timestamp buckets?

@nicholaschew11 nicholaschew11 force-pushed the SPARK-55147-range-scan-v4 branch from bde2f50 to 1115098 Compare March 19, 2026 17:14
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

terana pushed a commit to terana/spark that referenced this pull request Mar 23, 2026
…val in V4 state format

### What changes were proposed in this pull request?

This PR improves the retrieval operation in the V4 stream-stream join state manager to scope the timestamp range for time-interval joins. Instead of scanning all timestamps for a given key during prefix scan, V4 now extracts constant interval offsets from the join condition and computes a `(minTs, maxTs)` range per input row, enabling the prefix scan to skip entries before `minTs` and terminate early past `maxTs`.

- Add `scanRangeOffsets` and `computeTimestampRange` to `OneSideHashJoiner`, using `StreamingJoinHelper.getStateValueWatermark(eventWatermark=0)` to extract interval bounds from the join condition
- Add `timestampRange` parameter to `getJoinedRows` in the state manager trait, V4 implementation, and V1-V3 base class (ignored by V1-V3)
- Add `getValuesInRange` to `KeyWithTsToValuesStore` that filters by range and stops early past the upper bound
- `getValues` now delegates to `getValuesInRange(Long.MinValue, Long.MaxValue)`

### Why are the changes needed?

For time-interval joins, the V4 state format stores values indexed by `(key, timestamp)`. Without range scoping, retrieving matches requires scanning all timestamps for a key via prefix scan, even though the join condition constrains matching to a specific time window. With this change, the scan is bounded to only the relevant timestamp range, reducing I/O proportionally to the ratio of the interval width to the total timestamp span in state.

### Does this PR introduce _any_ user-facing change?

No. V4 state format is experimental and gated behind `spark.sql.streaming.join.stateFormatV4.enabled`.

### How was this patch tested?

New unit tests in `SymmetricHashJoinStateManagerEventTimeInValueSuite`:
- `getJoinedRows with timestampRange`: boundary conditions, exact matches, empty ranges, full range
- `timestampRange with multiple values per timestamp`: multiple values at the same timestamp

Existing V4 join suites (Inner, Outer, FullOuter, LeftSemi) all pass.

### Was this patch authored or co-authored using generative AI tooling?

Yes. (Claude Opus 4.6)

Closes apache#54879 from nicholaschew11/SPARK-55147-range-scan-v4.

Authored-by: Nicholas Chew <chew.nicky@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants