[SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper#42940
Conversation
|
cc. @zsxwing @brkyvz @viirya @anishshri-db Mind taking a look? Thanks! |
5551506 to
dc38638
Compare
There was a problem hiding this comment.
Is there any way to expedite this ? Any way to force developers to upgrade their sources to be compatible with latest Spark requirements ?
There was a problem hiding this comment.
I'd say we are going to break a bunch of 3rd party data sources then, which is what we have been avoided so far.
There was a problem hiding this comment.
When would this cause data loss ? Given there are so many dangers, is it worthwhile to expose this if we cannot reason about correctness ? Can we limit the use only in certain situations perhaps to prevent users from shooting themselves in the foot ?
There was a problem hiding this comment.
This new config is marked as "internal", which is not going to be a part of public documentation. We expect very advanced users and/or operators only know about the existence of the config and use it as their own risk. We don't even expose this config in the warning message - that's the way I avoid users from shooting themselves on the foot.
There was a problem hiding this comment.
I'm enumerating up the possibility - what we actually observed is a duplication, but suppose the data source which does not rely on offset management from Spark and tries to maintain that from source itself, then additional call might end up skipping some data to be provided.
There was a problem hiding this comment.
Nit: to contact the Spark data source developer to support
There was a problem hiding this comment.
maybe I'd remove Spark here - they need to contact 3rd party data source developer, not Spark community.
There was a problem hiding this comment.
This is not strictly related to this change right ? basically we are trying to add support for AvailNow for memory source here ? Should we move that to a separate PR ?
There was a problem hiding this comment.
While I agree this is a side-effect, this heavily increases a test coverage as well. We test with three data sources in TriggerAvailableNowSuite, and none of three supports Trigger.AvailableNow, hence we are basically missing the case where data source supports Trigger.AvailableNow in that suite. We missed that for a long time, I'd like to handle this as well, as long as we are here.
There was a problem hiding this comment.
Can we log some more query specific info here ? or we don't have more info here?
There was a problem hiding this comment.
Yeah let's put queryID and runID at least to differentiate the query. Please note that this is an INFO log indicating that they turned on the flag (which is OK), and the WARN log will follow when they are falling back to use wrapper (which could be problematic).
There was a problem hiding this comment.
Nit: Typo ? Falling back to ?
There was a problem hiding this comment.
Not sure Im reading this correctly. If the flag is enabled, we use multi-batch directly ? Shouldn't this be the case if the flag is disabled ?
There was a problem hiding this comment.
The logic is correct, but I agree that it's very confusing. I'll add some comment to elaborate.
docs/ss-migration-guide.md
Outdated
There was a problem hiding this comment.
If this is about correctness, do you plan to backport this to 3.4/3.5 branch?
There was a problem hiding this comment.
Let's discuss about this after the fix is landed in master branch. It is technically a behavioral change, hence easier to introduce the change in new major/minor version but may need some discussion to introduce the change in bugfix version. Maybe I can send a mail in dev@ and try to gather voices on it.
.../src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
Outdated
Show resolved
Hide resolved
docs/ss-migration-guide.md
Outdated
There was a problem hiding this comment.
For "single batch execution", isn't it deprecated now? Do we guarantee that it could be used in future version?
There was a problem hiding this comment.
I intentionally avoid saying it is Trigger.Once. We deprecated it in good reason, and I'd say it is still worth saying users have to use Trigger.AvailableNow. We just have a fallback to Trigger.Once in technical reason, unfortunately.
Ideally, we still need to persuade 3rd party to implement Trigger.AvailableNow and remove Trigger.Once at all eventually (I know, it won't happen, we barely remove public API), but I also see that several data source projects having no update for a couple of years, which is unfortunate. Maybe we shouldn't introduce fallback logic and don't support the source so that 3rd party would indicate the necessity. My bad.
There was a problem hiding this comment.
Not sure why this cannot be called. Cannot MemoryStream be used as a MicroBatchStream after this change?
There was a problem hiding this comment.
When data source implements AdmissionControl, this method is guaranteed to be never called. latestOffset(start, limit) will supersede this.
viirya
left a comment
There was a problem hiding this comment.
As we are already in the status that sources not supporting Trigger.AvailableNow can work with the wrapper instead of simply failing since Trigger.Once is deprecated, it sounds reasonable to continue making it work under single batch execution.
One option is to stop support of it in 4.0 (as it is deprecated since 3.4), but add fallback logic for previous branches which provide the support for now (as it is related to correctness)?
|
(Looks like python side env issue. I'll rebase and see the chance to be already fixed in latest master.) |
viirya
left a comment
There was a problem hiding this comment.
I'm okay to continue the support with a fallback mode behind the scene. We probably can remove the support in future releases.
Removing API would trigger a long discussion and debate. From my experience of Spark community (you've experienced longer than me :) ), I don't believe the community wants to remove the API unless there is a clear proof that few users use it. For Trigger.Once, I think that's probably Spark 5.0, not 4.0. This is applied to the same for breaking existing query. That said, we couldn't apply more aggressive alternatives. I don't feel like we can persuade community to decide existing query to fail, which users cannot make any change to mitigate (it would require 3rd party data source engineer to work on). |
|
Ah I commented too late :) Thanks for your support! |
…ow with unsupported sources rather than using wrapper
…ng/AvailableNowDataStreamWrapper.scala Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
…ng/MicroBatchExecution.scala Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
00f606a to
015b2eb
Compare
|
https://github.com/HeartSaVioR/spark/runs/16947355981 https://github.com/HeartSaVioR/spark/actions/runs/6234669032 |
|
Thanks, merging to master. |
|
https://lists.apache.org/thread/ljronxf6bymvqjmlwpzy84gzgvnqrmoh |
What changes were proposed in this pull request?
This PR proposes to change the behavior when user runs streaming query with Trigger.AvailableNow, which query has any source which does not support Trigger.AvailableNow. Instead of using wrapper implementation, this PR proposes to fall back to execute a single batch (a.k.a Trigger.Once).
This PR introduces a new flag
spark.sql.streaming.triggerAvailableNowWrapper.enabledto retain the behavior for advanced and extreme users. The flag is marked as internal since it's really only for extreme users who are concerned about behavioral change.Minor details would be following:
Why are the changes needed?
We have observed a data duplication issue with 3rd party data source when it's used with Trigger.AvailableNow. The source didn't support Trigger.AvailableNow, and unfortunately is also not played well with wrapper implementation.
We care more about possible correctness issue than better coverage of Trigger.AvailableNow, hence want to stop using wrapper implementation by default. We also care about not breaking existing query, so fallback to single batch execution rather than failing the query.
Does this PR introduce any user-facing change?
Yes, this introduces a behavioral change for streaming query with Trigger.AvailableNow which contains any source not supporting Trigger.AvailableNow.
How was this patch tested?
Modified UT.
Was this patch authored or co-authored using generative AI tooling?
No.