-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST #28856
[SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST #28856
Conversation
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
Outdated
Show resolved
Hide resolved
@@ -2623,8 +2623,16 @@ object Sequence { | |||
// about a month length in days and a day length in microseconds | |||
val intervalStepInMicros = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like the step should not be physical seconds, but logical interval. cc @MaxGekk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation is strange mix, it seems. There are following options:
-
Step is an interval of (months, days, micros):
- If start point is TimestampType, we should convert it to local date-time in the session time zone, and add the interval by time components. The intermediate local timestamps should be converted back to micros using the session time zone but we should keep adding the interval to local timestamp "accumulator".
- The same for dates - convert
start
to a local date. Time zone shouldn't involved here.
-
If the step is a duration in micros or days (this is not our case)
- start is TimestampType, we shouldn't convert it to local timestamp, and just add micros to instants. So, time zone will be not involved here.
- start is DateType, just add number of days. The same as for timestamps, time zone is not involved here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @cloud-fan,as @MaxGekk explain here, I am not sure if this patch looks ok,I am willing to add more documents to TemporalSequenceImpl
but I am not sure if we can follow this way or refactor a little.
Option(Literal(stringToInterval("interval 1 month"))), | ||
Option(tz)), | ||
Seq( | ||
Date.valueOf("2011-03-01"), Date.valueOf("2011-04-01"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The guys are still in America/Los_Angeles, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, America/Los_Angeles
can pass the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean Date.valueOf
always uses America/Los_Angeles
independently what you test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, the result could be tz
independently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, wrap the code by withDefaultTimeZone
otherwise your expected dates are wrong.
@@ -2698,7 +2717,7 @@ object Sequence { | |||
| int $i = 0; | |||
| | |||
| while ($t < $exclusiveItem ^ $stepSign < 0) { | |||
| $arr[$i] = ($elemType) ($t / ${scale}L); | |||
| $arr[$i] = ($elemType) (Math.round($t / (float)${scale}L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Floating point ops looks dangerous here. Can you avoid them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk Because we may need the Math.round
, if not use this float ops,it seems hard to avoid the gap about one day between the output and expected.
How do you get the result? |
It seems to me that both |
ok to test |
Test build #124246 has finished for PR 28856 at commit
|
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
Outdated
Show resolved
Hide resolved
Test build #124254 has finished for PR 28856 at commit
|
Test build #124273 has finished for PR 28856 at commit
|
082f9c0
to
871867b
Compare
Test build #124275 has finished for PR 28856 at commit
|
Test build #124281 has finished for PR 28856 at commit
|
The function |
Test build #124285 has finished for PR 28856 at commit
|
Test build #124286 has finished for PR 28856 at commit
|
347fa9d
to
19d8c48
Compare
Test build #124321 has finished for PR 28856 at commit
|
Test build #124322 has finished for PR 28856 at commit
|
} | ||
else { | ||
(daysToMicros(num.toInt(start), zoneId), | ||
daysToMicros(num.toInt(stop), zoneId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain a bit more? It's hard to understand this change without any comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeh, please, explain why if scale
!= 1, start
and stop
contain days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because when scale
!= 1,it is converted to day count,so we may need to use zone info to translate into microseconds to get a correct result,rather than just multiply MICROS_PER_DAY
which ignore timezone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because when scale != 1,it is converted to day count
How can we tell it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should add more documents to TemporalSequenceImpl
first, to understand what it is doing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, maybe we can pass the scale through constructor:
private class TemporalSequenceImpl[T: ClassTag]
(dt: IntegralType, scale: Long, fromLong: Long => T, zoneId: ZoneId)
|
||
// Date to timestamp is not equal from GMT and Chicago timezones | ||
val (startMicros, stopMicros) = if (scale == 1) { | ||
(num.toLong(start), num.toLong(stop)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think this is correct, see my comment https://github.com/apache/spark/pull/28856/files#r442366706 but it is at least backward compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could separate this into different methods ?
Test build #124366 has finished for PR 28856 at commit
|
@@ -2589,6 +2589,8 @@ object Sequence { | |||
} | |||
} | |||
|
|||
// To generate time sequences, we use scale 1 in TemporalSequenceImpl | |||
// for `TimestampType`, while MICROS_PER_DAY for `DateType` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if start/end is date, can the step by seconds/minutes/hours?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, seems we can, the result as follows:
`scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 hour))").count
res19: Long = 1465
scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 minute))").count
res20: Long = 87841
scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 second))").count
res21: Long = 5270401
scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 minute))").head(3)
res25: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])
scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 second))").head(3)
res26: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])
scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 minute))").head(3)
res27: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])
scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 hour))").head(3)
res28: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])
`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does pgsql support it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems pgsql can only support int as follows:
postgres= create sequence seq_test;
CREATE SEQUENCE
postgres= select nextval('seq_test');
1
(1 行记录)
postgres= select nextval('seq_test');
2
(1 行记录)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this function is from presto: https://prestodb.io/docs/current/functions/array.html
Can you check the behavior of presto? It looks confusing to use time fields as the step for date start/stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Base presto-server-0.236
:
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' hour);
Query 20200624_122744_00002_pehix failed: sequence step must be a day interval if start and end values are dates
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' day);
_col0
[2011-03-01, 2011-03-02]
(1 row)
Query 20200624_122757_00003_pehix, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' month);
_col0
[2011-03-01]
(1 row)
Query 20200624_122806_00004_pehix, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' year);
_col0
[2011-03-01]
(1 row)
Query 20200624_122810_00005_pehix, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Done, It seems can be sequence day
,month
,year
when start and end are DateType
in presto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the presto behavior makes sense. Can we send a PR to follow it first? e.g. throw an exception if the step is time fields while start/end is date. This can also simplify the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will do it tomorrow.
val startMicros: Long = num.toLong(start) * scale | ||
val stopMicros: Long = num.toLong(stop) * scale | ||
|
||
// Date to timestamp is not equal from GMT and Chicago timezones |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do these codes depend on few specific timezones? Also, other comments look valid here. We should take the timezone into account for timestamps too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the date if different from west to east, when it is date, we might need to consider to zone info to convert to time stamp, if it is already a time stamp, not a date here, we may ignore the zone because the time stamp is already consider it.
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
Add a unit test.
Logical bug fix in
org.apache.spark.sql.catalyst.expressions.Sequence.TemporalSequenceImpl
Why are the changes needed?
Spark sequence doesn't handle date increments that cross DST
Does this PR introduce any user-facing change?
Before the PR, people will not get a correct result:
set spark.sql.session.timeZone
toAsia/Shanghai, America/Chicago, GMT
,Before execute
sql("select sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 month)").show(false)
, People will get[2011-03-01, 2011-04-01, 2011-05-01]
,[2011-03-01, 2011-03-28, 2011-04-28]
,[2011-03-01, 2011-04-01, 2011-05-01]
.After the PR, sequence date conversion is corrected:
We will get
[2011-03-01, 2011-04-01, 2011-05-01]
from the former three conditions.How was this patch tested?
Unit test.