New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32776][SS] Limit in streaming should not be optimized away by PropagateEmptyRelation #29623
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as a quick fix.
cc @HeartSaVioR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming the new test would fail in master branch.
Ideally the fundamental fix is to ensure state stores to write delta files for all state stores in the query per batch, instead of caring all of stateful operations in complicated rules. That sounds a bit hard to achieve, so let's see who brings a good idea to deal with.
Btw, looks like we are missing to update the guide doc once we address limitation. According to the guide, limit is not supported in streaming dataset. |
Test build #128174 has finished for PR 29623 at commit
|
@@ -257,4 +257,10 @@ class PropagateEmptyRelationSuite extends PlanTest { | |||
val optimized = Optimize.execute(query.analyze) | |||
assert(optimized.resolved) | |||
} | |||
|
|||
test("should not optimize away limit if streaming") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add a JIRA prefix?
test("should not optimize away limit if streaming") { | |
test("SPARK-32776: should not optimize away limit if streaming") { |
@@ -1141,6 +1143,42 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi | |||
} | |||
} | |||
|
|||
testQuietly("limit on empty batch should not cause state store error") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testQuietly("limit on empty batch should not cause state store error") { | |
testQuietly("SPARK-32776: limit on empty batch should not cause state store error") { |
I am just going to merge since the PR is ready to go. |
Merged to master and branch-3.0. |
…PropagateEmptyRelation PropagateEmptyRelation will not be applied to LIMIT operators in streaming queries. Right now, the limit operator in a streaming query may get optimized away when the relation is empty. This can be problematic for stateful streaming, as this empty batch will not write any state store files, and the next batch will fail when trying to read these state store files and throw a file not found error. We should not let PropagateEmptyRelation optimize away the Limit operator for streaming queries. This PR is intended as a small and safe fix for PropagateEmptyRelation. A fundamental fix that can prevent this from happening again in the future and in other optimizer rules is more desirable, but that's a much larger task. No unit tests. Closes #29623 from liwensun/spark-32776. Authored-by: liwensun <liwen.sun@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit f0851e9) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…PropagateEmptyRelation PropagateEmptyRelation will not be applied to LIMIT operators in streaming queries. Right now, the limit operator in a streaming query may get optimized away when the relation is empty. This can be problematic for stateful streaming, as this empty batch will not write any state store files, and the next batch will fail when trying to read these state store files and throw a file not found error. We should not let PropagateEmptyRelation optimize away the Limit operator for streaming queries. This PR is intended as a small and safe fix for PropagateEmptyRelation. A fundamental fix that can prevent this from happening again in the future and in other optimizer rules is more desirable, but that's a much larger task. No unit tests. Closes apache#29623 from liwensun/spark-32776. Authored-by: liwensun <liwen.sun@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit f0851e9) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
PropagateEmptyRelation will not be applied to LIMIT operators in streaming queries.
Why are the changes needed?
Right now, the limit operator in a streaming query may get optimized away when the relation is empty. This can be problematic for stateful streaming, as this empty batch will not write any state store files, and the next batch will fail when trying to read these state store files and throw a file not found error.
We should not let PropagateEmptyRelation optimize away the Limit operator for streaming queries.
This PR is intended as a small and safe fix for PropagateEmptyRelation. A fundamental fix that can prevent this from happening again in the future and in other optimizer rules is more desirable, but that's a much larger task.
Does this PR introduce any user-facing change?
No
How was this patch tested?
unit tests.