-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years #16304
Conversation
@@ -252,6 +252,9 @@ public static long parseSecondNano(String secondNano) throws IllegalArgumentExce | |||
public final int months; | |||
public final long microseconds; | |||
|
|||
/** | |||
* Return the interval in miliseconds, not including the months in interval. | |||
*/ | |||
public final long milliseconds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin Documented as per your request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm why don't you remove this and i will send a patch for docs.
event time? |
@@ -572,6 +572,10 @@ class Dataset[T] private[sql]( | |||
val parsedDelay = | |||
Option(CalendarInterval.fromString("interval " + delayThreshold)) | |||
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) | |||
// Threshold specified in months/years is non-deterministic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think waiting 1 month for late data is a reasonable use case. Based on the definition of the watermark, its actually okay for us to over estimate this delay too. Why not take take the max (31 days, leap year)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i worry users think it will align with month boundary. It is safer to just say "31 days".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "safe" mean in this context? users must not rely on watermarks for correctness as they can be arbitrarily delayed based on batch boundaries. I think this error actually confuses the point as its is enforcing precision when this API cannot provide that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when i have a record that's 29 days late in feb what should I expect? If we want to just change month to a fix number of days, I'd say just use 30, and then document it clearly in the API (e.g. "if month is specified, 1 month = 30 days").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when i have a record that's 29 days late in feb what should I expect?
If I set the watermark to 1 month I would expect that in no case will a record that arrives on the 1st (of any month) be dropped before that month is over. This expectation should hold true for February as well as January. As such, I would pick 31 days (and document it as you suggest).
You should never expect data to be dropped by a watermark. If you want to ensure that data will be dropped, you should use a filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm
Test build #70233 has finished for PR 16304 at commit
|
fyi the title has a typo |
val input = MemoryStream[Long] | ||
val aggWithWatermark = input.toDF() | ||
.withColumn("eventTime", $"value".cast("timestamp")) | ||
.withWatermark("eventTime", "1 month") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be testing months, but the title says years?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. sorry .. i pushed too early. i updated the test now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to make sure I wasn't missing something :)
LGTM |
Test build #70282 has finished for PR 16304 at commit
|
Test build #70284 has finished for PR 16304 at commit
|
@@ -387,7 +387,7 @@ class StreamExecution( | |||
lastExecution.executedPlan.collect { | |||
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => | |||
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") | |||
e.eventTimeStats.value.max - e.delay.milliseconds | |||
math.max(0, e.eventTimeStats.value.max - e.delayMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newWatermarkMs
can be negative. Image the user is processing some very old data which is before 1970.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think a lot of things are going to break for that usecase. I dont think our sql functions, Java time format, etc are even designed to handle negative millis. The way I found this issue is that when i tried for convert -ve watermark to formatted string, it gave a very weird date. So I dont think we should add complexity for that use case.
@marmbrus any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not totally sure, but it does seem likely that we don't handle negative dates (and that is probably okay).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like a SQL bug. IMO, a negative timestamp is valid, it's not a negative date, just a date before 1970.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I think I was doing something to goof up the conversion using SimpleDateFormat. I am fixing that.
Test build #70420 has finished for PR 16304 at commit
|
Test build #70449 has finished for PR 16304 at commit
|
LGTM |
Thanks! Merging to master and 2.1. |
…in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on #16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16304 from tdas/SPARK-18834-1. (cherry picked from commit 607a1e6) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
assertEventStats { e => | ||
assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000) | ||
val watermarkTime = timestampFormat.parse(e.get("watermark")) | ||
assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the recent builds failed in this check.
[info] - delay in months and years handled correctly *** FAILED *** (325 milliseconds)
[info] 30 did not equal 29 (EventTimeWatermarkSuite.scala:193)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reporting it. Working on a PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing in #16449
…in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on apache#16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#16304 from tdas/SPARK-18834-1.
…onth ### What changes were proposed in this pull request? Revert this commit 18b7ad2. ### Why are the changes needed? See #16304 (comment) ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? There is no test for that. Closes #26101 from MaxGekk/revert-mean-seconds-per-month. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
What changes were proposed in this pull request?
Two changes
How was this patch tested?
Updated and new unit tests