Skip to content

Commit

Permalink
[SPARK-30760][SQL] Port millisToDays and daysToMillis on Java 8 t…
Browse files Browse the repository at this point in the history
…ime API

### What changes were proposed in this pull request?
In the PR, I propose to rewrite the `millisToDays` and `daysToMillis` of `DateTimeUtils` using Java 8 time API.

I removed `getOffsetFromLocalMillis` from `DateTimeUtils` because it is a private methods, and is not used anymore in Spark SQL.

### Why are the changes needed?
New implementation is based on Proleptic Gregorian calendar which has been already used by other date-time functions. This changes make `millisToDays` and `daysToMillis` consistent to rest Spark SQL API related to date & time operations.

### Does this PR introduce any user-facing change?
Yes, this might effect behavior for old dates before 1582 year.

### How was this patch tested?
By existing test suites `DateTimeUtilsSuite`, `DateFunctionsSuite`, DateExpressionsSuite`, `SQLQuerySuite` and `HiveResultSuite`.

Closes apache#27494 from MaxGekk/millis-2-days-java8-api.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Feb 12, 2020
1 parent 5919bd3 commit aa0d136
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ case class CurrentBatchTimestamp(
def toLiteral: Literal = dataType match {
case _: TimestampType =>
Literal(DateTimeUtils.fromJavaTimestamp(new Timestamp(timestampMs)), TimestampType)
case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs, timeZone), DateType)
case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs, zoneId), DateType)
}
}

Expand Down Expand Up @@ -1332,14 +1332,14 @@ case class MonthsBetween(

override def nullSafeEval(t1: Any, t2: Any, roundOff: Any): Any = {
DateTimeUtils.monthsBetween(
t1.asInstanceOf[Long], t2.asInstanceOf[Long], roundOff.asInstanceOf[Boolean], timeZone)
t1.asInstanceOf[Long], t2.asInstanceOf[Long], roundOff.asInstanceOf[Boolean], zoneId)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val tz = ctx.addReferenceObj("timeZone", timeZone)
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (d1, d2, roundOff) => {
s"""$dtu.monthsBetween($d1, $d2, $roundOff, $tz)"""
s"""$dtu.monthsBetween($d1, $d2, $roundOff, $zid)"""
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,22 @@ object DateTimeUtils {

// we should use the exact day as Int, for example, (year, month, day) -> day
def millisToDays(millisUtc: Long): SQLDate = {
millisToDays(millisUtc, defaultTimeZone())
millisToDays(millisUtc, defaultTimeZone().toZoneId)
}

def millisToDays(millisUtc: Long, timeZone: TimeZone): SQLDate = {
// SPARK-6785: use Math.floorDiv so negative number of days (dates before 1970)
// will correctly work as input for function toJavaDate(Int)
val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
Math.floorDiv(millisLocal, MILLIS_PER_DAY).toInt
def millisToDays(millisUtc: Long, zoneId: ZoneId): SQLDate = {
val instant = microsToInstant(Math.multiplyExact(millisUtc, MICROS_PER_MILLIS))
localDateToDays(LocalDateTime.ofInstant(instant, zoneId).toLocalDate)
}

// reverse of millisToDays
def daysToMillis(days: SQLDate): Long = {
daysToMillis(days, defaultTimeZone())
daysToMillis(days, defaultTimeZone().toZoneId)
}

def daysToMillis(days: SQLDate, timeZone: TimeZone): Long = {
val millisLocal = days.toLong * MILLIS_PER_DAY
millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
def daysToMillis(days: SQLDate, zoneId: ZoneId): Long = {
val instant = daysToLocalDate(days).atStartOfDay(zoneId).toInstant
instantToMicros(instant) / MICROS_PER_MILLIS
}

// Converts Timestamp to string according to Hive TimestampWritable convention.
Expand Down Expand Up @@ -589,11 +587,11 @@ object DateTimeUtils {
time1: SQLTimestamp,
time2: SQLTimestamp,
roundOff: Boolean,
timeZone: TimeZone): Double = {
zoneId: ZoneId): Double = {
val millis1 = MICROSECONDS.toMillis(time1)
val millis2 = MICROSECONDS.toMillis(time2)
val date1 = millisToDays(millis1, timeZone)
val date2 = millisToDays(millis2, timeZone)
val date1 = millisToDays(millis1, zoneId)
val date2 = millisToDays(millis2, zoneId)
val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1)
val (year2, monthInYear2, dayInMonth2, daysToMonthEnd2) = splitDate(date2)

Expand All @@ -607,8 +605,8 @@ object DateTimeUtils {
}
// using milliseconds can cause precision loss with more than 8 digits
// we follow Hive's implementation which uses seconds
val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1, timeZone))
val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2, timeZone))
val secondsInDay1 = MILLISECONDS.toSeconds(millis1 - daysToMillis(date1, zoneId))
val secondsInDay2 = MILLISECONDS.toSeconds(millis2 - daysToMillis(date2, zoneId))
val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY + secondsInDay1 - secondsInDay2
val secondsInMonth = DAYS.toSeconds(31)
val diff = monthDiff + secondsDiff / secondsInMonth.toDouble
Expand Down Expand Up @@ -737,8 +735,8 @@ object DateTimeUtils {
millis += offset
millis - millis % MILLIS_PER_DAY - offset
case _ => // Try to truncate date levels
val dDays = millisToDays(millis, timeZone)
daysToMillis(truncDate(dDays, level), timeZone)
val dDays = millisToDays(millis, timeZone.toZoneId)
daysToMillis(truncDate(dDays, level), timeZone.toZoneId)
}
truncated * MICROS_PER_MILLIS
}
Expand Down Expand Up @@ -770,32 +768,6 @@ object DateTimeUtils {
}
}

/**
* Lookup the offset for given millis seconds since 1970-01-01 00:00:00 in given timezone.
* TODO: Improve handling of normalization differences.
* TODO: Replace with JSR-310 or similar system - see SPARK-16788
*/
private[sql] def getOffsetFromLocalMillis(millisLocal: Long, tz: TimeZone): Long = {
var guess = tz.getRawOffset
// the actual offset should be calculated based on milliseconds in UTC
val offset = tz.getOffset(millisLocal - guess)
if (offset != guess) {
guess = tz.getOffset(millisLocal - offset)
if (guess != offset) {
// fallback to do the reverse lookup using java.time.LocalDateTime
// this should only happen near the start or end of DST
val localDate = LocalDate.ofEpochDay(MILLISECONDS.toDays(millisLocal))
val localTime = LocalTime.ofNanoOfDay(MILLISECONDS.toNanos(
Math.floorMod(millisLocal, MILLIS_PER_DAY)))
val localDateTime = LocalDateTime.of(localDate, localTime)
val millisEpoch = localDateTime.atZone(tz.toZoneId).toInstant.toEpochMilli

guess = (millisLocal - millisEpoch).toInt
}
}
guess
}

/**
* Convert the timestamp `ts` from one timezone to another.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv

import java.math.BigDecimal
import java.text.{DecimalFormat, DecimalFormatSymbols}
import java.time.ZoneOffset
import java.util.{Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
Expand Down Expand Up @@ -137,7 +138,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
val expectedDate = format.parse(customDate).getTime
val castedDate = parser.makeConverter("_1", DateType, nullable = true)
.apply(customDate)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate, TimeZone.getTimeZone("GMT")))
assert(castedDate == DateTimeUtils.millisToDays(expectedDate, ZoneOffset.UTC))

val timestamp = "2015-01-01 00:00:00"
timestampsOptions = new CSVOptions(Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val ts = new Timestamp(toMillis(time))

test("datetime function current_date") {
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC)
val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int]
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), ZoneOffset.UTC)
assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)

val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int]
Expand Down Expand Up @@ -499,7 +499,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// Valid range of DateType is [0001-01-01, 9999-12-31]
val maxMonthInterval = 10000 * 12
checkEvaluation(
AddMonths(Literal(Date.valueOf("0001-01-01")), Literal(maxMonthInterval)), 2933261)
AddMonths(Literal(LocalDate.parse("0001-01-01")), Literal(maxMonthInterval)),
LocalDate.of(10001, 1, 1).toEpochDay.toInt)
checkEvaluation(
AddMonths(Literal(Date.valueOf("9999-12-31")), Literal(-1 * maxMonthInterval)), -719529)
// Test evaluation results between Interpreted mode and Codegen mode
Expand Down Expand Up @@ -788,15 +789,15 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz)))
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))),
Literal(fmt2), timeZoneId),
-1000L)
checkEvaluation(UnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId)))
val t1 = UnixTimestamp(
CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
val t2 = UnixTimestamp(
Expand All @@ -814,7 +815,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz)))
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null)
}
Expand Down Expand Up @@ -852,7 +853,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(
ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz)))
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
ToUnixTimestamp(
Literal(sdf2.format(new Timestamp(-1000000))),
Expand All @@ -861,7 +862,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(ToUnixTimestamp(
Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz)))
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz.toZoneId)))
val t1 = ToUnixTimestamp(
CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long]
val t2 = ToUnixTimestamp(
Expand All @@ -876,7 +877,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(ToUnixTimestamp(
Literal(date1), Literal.create(null, StringType), timeZoneId),
MILLISECONDS.toSeconds(
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz)))
DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz.toZoneId)))
checkEvaluation(
ToUnixTimestamp(
Literal("2015-07-24"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,13 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
}

test("SPARK-6785: java date conversion before and after epoch") {
def format(d: Date): String = {
TimestampFormatter("uuuu-MM-dd", defaultTimeZone().toZoneId)
.format(d.getTime * MICROS_PER_MILLIS)
}
def checkFromToJavaDate(d1: Date): Unit = {
val d2 = toJavaDate(fromJavaDate(d1))
assert(d2.toString === d1.toString)
assert(format(d2) === format(d1))
}

val df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
Expand Down Expand Up @@ -413,22 +417,22 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
test("monthsBetween") {
val date1 = date(1997, 2, 28, 10, 30, 0)
var date2 = date(1996, 10, 30)
assert(monthsBetween(date1, date2, true, TimeZoneUTC) === 3.94959677)
assert(monthsBetween(date1, date2, false, TimeZoneUTC) === 3.9495967741935485)
assert(monthsBetween(date1, date2, true, ZoneOffset.UTC) === 3.94959677)
assert(monthsBetween(date1, date2, false, ZoneOffset.UTC) === 3.9495967741935485)
Seq(true, false).foreach { roundOff =>
date2 = date(2000, 2, 28)
assert(monthsBetween(date1, date2, roundOff, TimeZoneUTC) === -36)
assert(monthsBetween(date1, date2, roundOff, ZoneOffset.UTC) === -36)
date2 = date(2000, 2, 29)
assert(monthsBetween(date1, date2, roundOff, TimeZoneUTC) === -36)
assert(monthsBetween(date1, date2, roundOff, ZoneOffset.UTC) === -36)
date2 = date(1996, 3, 31)
assert(monthsBetween(date1, date2, roundOff, TimeZoneUTC) === 11)
assert(monthsBetween(date1, date2, roundOff, ZoneOffset.UTC) === 11)
}

val date3 = date(2000, 2, 28, 16, tz = TimeZonePST)
val date4 = date(1997, 2, 28, 16, tz = TimeZonePST)
assert(monthsBetween(date3, date4, true, TimeZonePST) === 36.0)
assert(monthsBetween(date3, date4, true, TimeZoneGMT) === 35.90322581)
assert(monthsBetween(date3, date4, false, TimeZoneGMT) === 35.903225806451616)
assert(monthsBetween(date3, date4, true, TimeZonePST.toZoneId) === 36.0)
assert(monthsBetween(date3, date4, true, ZoneOffset.UTC) === 35.90322581)
assert(monthsBetween(date3, date4, false, ZoneOffset.UTC) === 35.903225806451616)
}

test("from UTC timestamp") {
Expand Down Expand Up @@ -571,15 +575,15 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {

test("daysToMillis and millisToDays") {
val input = TimeUnit.MICROSECONDS.toMillis(date(2015, 12, 31, 16, tz = TimeZonePST))
assert(millisToDays(input, TimeZonePST) === 16800)
assert(millisToDays(input, TimeZoneGMT) === 16801)
assert(millisToDays(-1 * MILLIS_PER_DAY + 1, TimeZoneGMT) == -1)
assert(millisToDays(input, TimeZonePST.toZoneId) === 16800)
assert(millisToDays(input, ZoneOffset.UTC) === 16801)
assert(millisToDays(-1 * MILLIS_PER_DAY + 1, ZoneOffset.UTC) == -1)

var expected = TimeUnit.MICROSECONDS.toMillis(date(2015, 12, 31, tz = TimeZonePST))
assert(daysToMillis(16800, TimeZonePST) === expected)
assert(daysToMillis(16800, TimeZonePST.toZoneId) === expected)

expected = TimeUnit.MICROSECONDS.toMillis(date(2015, 12, 31, tz = TimeZoneGMT))
assert(daysToMillis(16800, TimeZoneGMT) === expected)
assert(daysToMillis(16800, ZoneOffset.UTC) === expected)

// There are some days are skipped entirely in some timezone, skip them here.
val skipped_days = Map[String, Set[Int]](
Expand All @@ -594,7 +598,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
val skipped = skipped_days.getOrElse(tz.getID, Set.empty)
(-20000 to 20000).foreach { d =>
if (!skipped.contains(d)) {
assert(millisToDays(daysToMillis(d, tz), tz) === d,
assert(millisToDays(daysToMillis(d, tz.toZoneId), tz.toZoneId) === d,
s"Round trip of ${d} did not work in tz ${tz}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
Expand Down Expand Up @@ -67,8 +68,12 @@ object HiveResult {
case (null, _) => if (nested) "null" else "NULL"
case (b, BooleanType) => b.toString
case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
case (ld: LocalDate, DateType) =>
dateFormatter.format(DateTimeUtils.localDateToDays(ld))
case (t: Timestamp, TimestampType) =>
timestampFormatter.format(DateTimeUtils.fromJavaTimestamp(t))
case (i: Instant, TimestampType) =>
timestampFormatter.format(DateTimeUtils.instantToMicros(i))
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
case (decimal: java.math.BigDecimal, DecimalType()) => decimal.toPlainString
case (n, _: NumericType) => n.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,15 +800,15 @@ SELECT DATE_TRUNC('MILLENNIUM', TIMESTAMP '1970-03-20 04:30:00.00000')
-- !query schema
struct<date_trunc(MILLENNIUM, TIMESTAMP '1970-03-20 04:30:00'):timestamp>
-- !query output
1001-01-01 00:07:02
1001-01-01 00:00:00


-- !query
SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20')
-- !query schema
struct<date_trunc(MILLENNIUM, CAST(DATE '1970-03-20' AS TIMESTAMP)):timestamp>
-- !query output
1001-01-01 00:07:02
1001-01-01 00:00:00


-- !query
Expand Down Expand Up @@ -840,15 +840,15 @@ SELECT DATE_TRUNC('CENTURY', DATE '0002-02-04')
-- !query schema
struct<date_trunc(CENTURY, CAST(DATE '0002-02-04' AS TIMESTAMP)):timestamp>
-- !query output
0001-01-01 00:07:02
0001-01-01 00:00:00


-- !query
SELECT DATE_TRUNC('CENTURY', TO_DATE('0055-08-10 BC', 'yyyy-MM-dd G'))
-- !query schema
struct<date_trunc(CENTURY, CAST(to_date('0055-08-10 BC', 'yyyy-MM-dd G') AS TIMESTAMP)):timestamp>
-- !query output
-0099-01-01 00:07:02
-0099-01-01 00:00:00


-- !query
Expand All @@ -864,15 +864,15 @@ SELECT DATE_TRUNC('DECADE', DATE '0004-12-25')
-- !query schema
struct<date_trunc(DECADE, CAST(DATE '0004-12-25' AS TIMESTAMP)):timestamp>
-- !query output
0000-01-01 00:07:02
0000-01-01 00:00:00


-- !query
SELECT DATE_TRUNC('DECADE', TO_DATE('0002-12-31 BC', 'yyyy-MM-dd G'))
-- !query schema
struct<date_trunc(DECADE, CAST(to_date('0002-12-31 BC', 'yyyy-MM-dd G') AS TIMESTAMP)):timestamp>
-- !query output
-0010-01-01 00:07:02
-0010-01-01 00:00:00


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
localSparkSession.conf.set(SQLConf.ANSI_ENABLED.key, true)
case _ =>
}
localSparkSession.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)

if (configSet.nonEmpty) {
// Execute the list of set operation in order to add the desired configs
Expand Down

0 comments on commit aa0d136

Please sign in to comment.