-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19861][SS] watermark should not be a negative time. #17202
Conversation
Test build #74172 has finished for PR 17202 at commit
|
cc @zsxwing |
Test build #74182 has started for PR 17202 at commit |
@@ -576,6 +576,8 @@ class Dataset[T] private[sql]( | |||
val parsedDelay = | |||
Option(CalendarInterval.fromString("interval " + delayThreshold)) | |||
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) | |||
assert(parsedDelay.microseconds >= 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens when it is 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set 0 means event time should be not less than max event time in last batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, 0 means the events always arrive in order
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 | ||
parsedDelay.milliseconds + parsedDelay.months * millisPerMonth | ||
} | ||
assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why compute all this -- don't you just mean to assert about delayThreshold
? this derived value can only be negative if the input is right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Thanks for you review!
Why compute all this -- don't you just mean to assert about delayThreshold?
I do mean to check the delayThreshold
. delayThreshold
is converted from String
to CalendarInterval
. CalendarInterval
divides the delayThreshold
into two parts, i.e. month (contain year and month) and microseconds of rest. (https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java#L86)
(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala#L87)
this derived value can only be negative if the input is right?
Sorry, I dont get what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you just assert(delayThreshold >= 0, ...)
? Why does that check require computing delayMs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delayThreshold: String
can not be used to assert directly, like:
inputData.withWatermark("value", "1 month -40 days")
inputData.withWatermark("value", "-10 seconds")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right. Can you not assert about parsedDelay then? it seems like that's always negative when the result of this conversion, copied from somewhere else? is negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you misunderstand my cases above. Those cases are invalid, i.e. the parsedDelay
are negative.
case | parsedDelay | validity |
---|---|---|
inputData.withWatermark("value", "1 month -40 days") | negative | invalid |
inputData.withWatermark("value", "-10 seconds") | negative | invalid |
inputData.withWatermark("value", "10 seconds") | positive | valid |
inputData.withWatermark("value", "1 day -10 seconds") | positive | valid |
inputData.withWatermark("value", "0 second") | - | valid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So can you require(parsedDelay >= 0, ...)
to detect invalid cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that last example didn't make sense, it's not a single number. It just doesn't seem like you need to reproduce this conversion to check what you want to, which is that the two fields aren't negative. require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0)
In fact, I wonder if this should be some kind of method of CalendarInterval, like isNegative
? it's not that trivial to decide if an interval is negative, and maybe other places do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen +1 to your comments. We should make it more significative but not just valid.
Test build #74203 has finished for PR 17202 at commit
|
Test build #74204 has finished for PR 17202 at commit
|
Test build #74211 has finished for PR 17202 at commit
|
@@ -563,7 +563,7 @@ class Dataset[T] private[sql]( | |||
* @param eventTime the name of the column that contains the event time of the row. | |||
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest | |||
* record that has been processed in the form of an interval | |||
* (e.g. "1 minute" or "5 hours"). | |||
* (e.g. "1 minute" or "5 hours"). NOTE: This should not be a negative time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just say "This should not be negative."
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 | ||
parsedDelay.milliseconds + parsedDelay.months * millisPerMonth | ||
} | ||
require(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"delay threshold ($delayThreshold) should not be negative"
@@ -576,6 +576,11 @@ class Dataset[T] private[sql]( | |||
val parsedDelay = | |||
Option(CalendarInterval.fromString("interval " + delayThreshold)) | |||
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) | |||
val delayMs = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you copied these codes from EventTimeWatermarkExec
. Could you extract them as a new method in object EventTimeWatermark
and reuse it?
Since you are touching these files, could you also fix the following two places as well?
Line 42 in d8830c5
.putLong(EventTimeWatermark.delayKey, delay.milliseconds) delayMs
. Right now we don't use this value in the logical plan, but it's better to make it consistent.- I forgot to update
EventTimeWatermarkExec
in [SPARK-19859][SS]The new watermark should override the old one #17199. Could you help me fix it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -576,6 +576,8 @@ class Dataset[T] private[sql]( | |||
val parsedDelay = | |||
Option(CalendarInterval.fromString("interval " + delayThreshold)) | |||
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) | |||
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, | |||
s"delay threshold ($delayThreshold) should not be negative.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0)
to make delayThreshold
more reasonable and significative.
Test build #74232 has finished for PR 17202 at commit
|
LGTM. Merging to master and 2.1. |
## What changes were proposed in this pull request? `watermark` should not be negative. This behavior is invalid, check it before real run. ## How was this patch tested? add new unit test. Author: uncleGen <hustyugm@gmail.com> Author: dylon <hustyugm@gmail.com> Closes #17202 from uncleGen/SPARK-19861. (cherry picked from commit 30b18e6) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
What changes were proposed in this pull request?
watermark
should not be negative. This behavior is invalid, check it before real run.How was this patch tested?
add new unit test.