Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47570][SS] Integrate range scan encoder changes with timer implementation #45709

Closed
wants to merge 5 commits into from

Conversation

jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Mar 25, 2024

What changes were proposed in this pull request?

Previously timer state implementation was using No prefix rocksdb state encoder. When doing iterator() or prefix(), the returned iterator is not sorted on timestamp value. After Anish's PR for supporting range scan encoder, we could integrate it with TimerStateImpl such that we will use range scan encoder on timer to key.

Why are the changes needed?

The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests in TimerSuite

Was this patch authored or co-authored using generative AI tooling?

No

@jingz-db jingz-db marked this pull request as ready for review March 25, 2024 22:24
@HyukjinKwon
Copy link
Member

@jingz-db mind liking the JIRA into the PR title? See also https://spark.apache.org/contributing.html

@github-actions github-actions bot removed the DOCS label Mar 26, 2024
@jingz-db jingz-db changed the title Integrate range scan encoder changes with timer implementation [SPARK-47570][SS] Integrate range scan encoder changes with timer implementation Mar 26, 2024
@jingz-db
Copy link
Contributor Author

@jingz-db mind liking the JIRA into the PR title? See also https://spark.apache.org/contributing.html

Just did. Thanks!

@@ -161,25 +161,23 @@ case class TransformWithStateExec(
assert(batchTimestampMs.isDefined)
val batchTimestamp = batchTimestampMs.get
val procTimeIter = processorHandle.getExpiredTimers()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we pass the timeout to the iterator and just return the expired rows instead ? i.e. move the filtering logic within the TimerStateImpl ?

val rowPair = iter.next()
val keyRow = rowPair.key
val result = getTimerRowFromSecIndex(keyRow)
val rowPair = if (iter.hasNext) iter.next() else null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets keep the original if condition as it is ? and add a condition to return null or result within this case ?

@@ -164,11 +164,13 @@ class StatefulProcessorHandleImpl(

/**
* Function to retrieve all registered timers for all grouping keys
* @param expiryTimestampMs Threshold for expired timestamp in milliseconds, this function
* will return every timers that has (strictly) smaller timestamp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: will return all timers that have timestamp less than passed threshold

timerTimerStamps3.foreach(timerState3.registerTimer)
ImplicitGroupingKeyTracker.removeImplicitKey()

ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove setting the implicit key just for this section (for the calls to getExpiredTimers) would the test still work ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it works. Will removed them in the next commit.

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -188,9 +187,12 @@ class TimerStateImpl(

/**
* Function to get all the registered timers for all grouping keys
* @param expiryTimestampMs Threshold for expired timestamp in milliseconds, this function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe add a small comment here mentioning that we perform a range scan and stop iterating once the key row timestamp exceeds the threshold

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not user-facing, right? If it is, I'd suggest avoiding implementation detail. Looks like as it doesn't seem to be an user facing, but just to remind.

Copy link
Contributor Author

@jingz-db jingz-db Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out (I did not realize this before but looks like we did the right thing)!

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

sweisdb pushed a commit to sweisdb/spark that referenced this pull request Apr 1, 2024
…lementation

### What changes were proposed in this pull request?

Previously timer state implementation was using No prefix rocksdb state encoder. When doing `iterator()` or `prefix()`, the returned iterator is not sorted on timestamp value. After Anish's PR for supporting range scan encoder, we could integrate it with `TimerStateImpl` such that we will use range scan encoder on `timer to key`.

### Why are the changes needed?

The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests in `TimerSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45709 from jingz-db/integrate-range-scan.

Authored-by: jingz-db <jing.zhan@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants