-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18234][SS] Made update mode public #16360
Conversation
@marmbrus Can you take a look. |
keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)) | ||
|
||
val optionalPredicate = optionalWatermarkAttribute.map { watermarkAttribute => | ||
// If we are evicting based on a window, use the end of the window. Otherwise just |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why you wouldn't want to make this a method? Seems exactly the same as above under append
mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
Test build #70428 has finished for PR 16360 at commit
|
@tdas Can you also please update pyspark docs: |
private[this] val baseIterator = iter | ||
|
||
// Filter late date using watermark if specified | ||
private[this] val baseIterator = watermarkPredicate match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a bug that append
mode doesn't do a similar filtering at the moment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very very good question. I had to go back and double check this with an improved test. But the behavior is correct in append mode even without the filter. This is because append mode and update mode are implemented differently.
-
append mode: consume the iterator and push all changes into state store, remove things that should be removed, and then create a new iterator from the removed elements. so late date (whose state have been dropped in the past) will get added and immediately removed before. The new iterator is smart enough to only filter those that got immediately added and removed. Note that this is blocking, the parent iterator is first consumed, and then a new iterator created.
-
update mode: In contrast, update mode is not really blocking. A new iterator is immediately created that wraps the parent iterator. It one by one, consumes from parent iterator, updates state store, and returns the updated rows. So we need the filter out the rows with late data so that we dont emit anything for them.
Test build #70429 has finished for PR 16360 at commit
|
Test build #70430 has finished for PR 16360 at commit
|
Test build #70441 has finished for PR 16360 at commit
|
Test build #70450 has finished for PR 16360 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM!
@@ -15,7 +15,7 @@ | |||
* limitations under the License. | |||
*/ | |||
|
|||
package org.apache.spark.sql | |||
package org.apache.spark.sql.catalyst |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe in catalyst.streaming or something?
outputMode match { | ||
case Append | Complete => // allowed | ||
case Update => | ||
throw new AnalysisException("Update ouptut mode is not supported for memory format") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"memory sink" for consistency with the above error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also "output". Thinking a little more, I wonder if it would be better to say what is supported?
Test build #70492 has finished for PR 16360 at commit
|
## What changes were proposed in this pull request? Made update mode public. As part of that here are the changes. - Update DatastreamWriter to accept "update" - Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst - Added update mode state removing with watermark to StateStoreSaveExec ## How was this patch tested? Added new tests in changed modules Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16360 from tdas/SPARK-18234. (cherry picked from commit 83a6ace) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
## What changes were proposed in this pull request? Made update mode public. As part of that here are the changes. - Update DatastreamWriter to accept "update" - Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst - Added update mode state removing with watermark to StateStoreSaveExec ## How was this patch tested? Added new tests in changed modules Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#16360 from tdas/SPARK-18234.
What changes were proposed in this pull request?
Made update mode public. As part of that here are the changes.
How was this patch tested?
Added new tests in changed modules