Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl #44903

Closed
wants to merge 20 commits into from

Conversation

ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Jan 26, 2024

What changes were proposed in this pull request?

Adding the deleteIfExists method to the StatefulProcessorHandle in order to remove state variables from the State Store. Implemented only for RocksDBStateStoreProvider, as we do not currently support multiple column families for HDFS.

Why are the changes needed?

This functionality is needed to so users can remove state from the state store from the StatefulProcessorHandleImpl

Does this PR introduce any user-facing change?

Yes - this functionality (removing column families) was previously not supported from our RocksDB client.

How was this patch tested?

Added a unit test that creates two streams with the same checkpoint directory. The second stream removes state that was created in the first stream upon initialization. We ensure that the state from the previous stream isn't kept.

Was this patch authored or co-authored using generative AI tooling?

@ericm-db ericm-db changed the title [WIP] Adding deleteIfExists operator to StatefulProcessorHandleImpl [SPARK-46911] Adding deleteIfExists operator to StatefulProcessorHandleImpl Jan 30, 2024
@ericm-db ericm-db changed the title [SPARK-46911] Adding deleteIfExists operator to StatefulProcessorHandleImpl [SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl Jan 30, 2024
@ericm-db
Copy link
Contributor Author

cc @HeartSaVioR

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only nits. I haven't looked into the test change in detail and I may probably revisit the code, but I can sign off without revisiting as @anishshri-db reviewed that part already.

@transient private var _countState: ValueState[Long] = _
@transient private var _mostRecent: ValueState[String] = _
@transient var _processorHandle: StatefulProcessorHandle = _

override def init(
handle: StatefulProcessorHandle,
outputMode: OutputMode) : Unit = {
handle: StatefulProcessorHandle,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@ericm-db
Copy link
Contributor Author

ericm-db commented Feb 1, 2024

cc @HeartSaVioR, addressed all comments

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

GA failure is only from docker integration test which is unrelated.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants