Skip to content

Commit

Permalink
[SPARK-26424][SQL] Use java.time API in date/timestamp expressions
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the PR, I propose to switch the `DateFormatClass`, `ToUnixTimestamp`, `FromUnixTime`, `UnixTime` on java.time API for parsing/formatting dates and timestamps. The API has been already implemented by the `Timestamp`/`DateFormatter` classes. One of benefit is those classes support parsing timestamps with microsecond precision. Old behaviour can be switched on via SQL config: `spark.sql.legacy.timeParser.enabled` (`false` by default).

## How was this patch tested?

It was tested by existing test suites - `DateFunctionsSuite`, `DateExpressionsSuite`, `JsonSuite`, `CsvSuite`, `SQLQueryTestSuite` as well as PySpark tests.

Closes #23358 from MaxGekk/new-time-cast.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Dec 27, 2018
1 parent 827383a commit 7c7fccf
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 75 deletions.
8 changes: 4 additions & 4 deletions R/pkg/R/functions.R
Expand Up @@ -1723,7 +1723,7 @@ setMethod("radians",
#' @details
#' \code{to_date}: Converts the column into a DateType. You may optionally specify
#' a format according to the rules in:
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
#' \url{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}.
#' If the string cannot be parsed according to the specified format (or default),
#' the value of the column will be null.
#' By default, it follows casting rules to a DateType if the format is omitted
Expand Down Expand Up @@ -1819,7 +1819,7 @@ setMethod("to_csv", signature(x = "Column"),
#' @details
#' \code{to_timestamp}: Converts the column into a TimestampType. You may optionally specify
#' a format according to the rules in:
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
#' \url{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}.
#' If the string cannot be parsed according to the specified format (or default),
#' the value of the column will be null.
#' By default, it follows casting rules to a TimestampType if the format is omitted
Expand Down Expand Up @@ -2240,7 +2240,7 @@ setMethod("n", signature(x = "Column"),
#' \code{date_format}: Converts a date/timestamp/string to a value of string in the format
#' specified by the date format given by the second argument. A pattern could be for instance
#' \code{dd.MM.yyyy} and could return a string like '18.03.1993'. All
#' pattern letters of \code{java.text.SimpleDateFormat} can be used.
#' pattern letters of \code{java.time.format.DateTimeFormatter} can be used.
#' Note: Use when ever possible specialized functions like \code{year}. These benefit from a
#' specialized implementation.
#'
Expand Down Expand Up @@ -2666,7 +2666,7 @@ setMethod("format_string", signature(format = "character", x = "Column"),
#' \code{from_unixtime}: Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC)
#' to a string representing the timestamp of that moment in the current system time zone in the JVM
#' in the given format.
#' See \href{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}{
#' See \href{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}{
#' Customizing Formats} for available options.
#'
#' @rdname column_datetime_functions
Expand Down
1 change: 1 addition & 0 deletions docs/sql-migration-guide-upgrade.md
Expand Up @@ -39,6 +39,7 @@ displayTitle: Spark SQL Upgrading Guide

- In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully.

- Since Spark 3.0, the `unix_timestamp`, `date_format`, `to_unix_timestamp`, `from_unixtime`, `to_date`, `to_timestamp` functions use java.time API for parsing and formatting dates/timestamps from/to strings by using ISO chronology (https://docs.oracle.com/javase/8/docs/api/java/time/chrono/IsoChronology.html) based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, java.text.SimpleDateFormat and java.util.GregorianCalendar (hybrid calendar that supports both the Julian and Gregorian calendar systems, see https://docs.oracle.com/javase/7/docs/api/java/util/GregorianCalendar.html) is used for the same purpuse. New implementation supports pattern formats as described here https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html and performs strict checking of its input. For example, the `2015-07-22 10:00:00` timestamp cannot be parse if pattern is `yyyy-MM-dd` because the parser does not consume whole input. Another example is the `31/01/2015 00:00` input cannot be parsed by the `dd/MM/yyyy hh:mm` pattern because `hh` supposes hours in the range `1-12`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
## Upgrading From Spark SQL 2.3 to 2.4

- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/functions.py
Expand Up @@ -874,7 +874,7 @@ def date_format(date, format):
format given by the second argument.
A pattern could be for instance `dd.MM.yyyy` and could return a string like '18.03.1993'. All
pattern letters of the Java class `java.text.SimpleDateFormat` can be used.
pattern letters of the Java class `java.time.format.DateTimeFormatter` can be used.
.. note:: Use when ever possible specialized functions like `year`. These benefit from a
specialized implementation.
Expand Down Expand Up @@ -1094,7 +1094,7 @@ def to_date(col, format=None):
"""Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or
:class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`
using the optionally specified format. Specify formats according to
`SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_.
`DateTimeFormatter <https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html>`_. # noqa
By default, it follows casting rules to :class:`pyspark.sql.types.DateType` if the format
is omitted (equivalent to ``col.cast("date")``).
Expand All @@ -1119,7 +1119,7 @@ def to_timestamp(col, format=None):
"""Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or
:class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`
using the optionally specified format. Specify formats according to
`SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_.
`DateTimeFormatter <https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html>`_. # noqa
By default, it follows casting rules to :class:`pyspark.sql.types.TimestampType` if the format
is omitted (equivalent to ``col.cast("timestamp")``).
Expand Down
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {

@transient
private lazy val timestampParser = TimestampFormatter(
private val timestampParser = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
Expand Down
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp
import java.text.DateFormat
import java.util.{Calendar, TimeZone}
import java.util.{Calendar, Locale, TimeZone}

import scala.util.control.NonFatal

Expand All @@ -28,7 +27,8 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -562,16 +562,17 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
copy(timeZoneId = Option(timeZoneId))

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val df = DateTimeUtils.newDateFormat(format.toString, timeZone)
UTF8String.fromString(df.format(new java.util.Date(timestamp.asInstanceOf[Long] / 1000)))
val df = TimestampFormatter(format.toString, timeZone, Locale.US)
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
val tz = ctx.addReferenceObj("timeZone", timeZone)
val locale = ctx.addReferenceObj("locale", Locale.US)
defineCodeGen(ctx, ev, (timestamp, format) => {
s"""UTF8String.fromString($dtu.newDateFormat($format.toString(), $tz)
.format(new java.util.Date($timestamp / 1000)))"""
s"""UTF8String.fromString($tf.apply($format.toString(), $tz, $locale)
.format($timestamp))"""
})
}

Expand Down Expand Up @@ -612,9 +613,10 @@ case class ToUnixTimestamp(
}

/**
* Converts time string with given pattern.
* (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
* to Unix time stamp (in seconds), returns null if fail.
* Converts time string with given pattern to Unix time stamp (in seconds), returns null if fail.
* See [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html]
* if SQL config spark.sql.legacy.timeParser.enabled is set to true otherwise
* [https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html].
* Note that hive Language Manual says it returns 0 if fail, but in fact it returns null.
* If the second parameter is missing, use "yyyy-MM-dd HH:mm:ss".
* If no parameters provided, the first parameter will be current_timestamp.
Expand Down Expand Up @@ -663,9 +665,9 @@ abstract class UnixTime
override def nullable: Boolean = true

private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: DateFormat =
private lazy val formatter: TimestampFormatter =
try {
DateTimeUtils.newDateFormat(constFormat.toString, timeZone)
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
} catch {
case NonFatal(_) => null
}
Expand All @@ -677,16 +679,16 @@ abstract class UnixTime
} else {
left.dataType match {
case DateType =>
DateTimeUtils.daysToMillis(t.asInstanceOf[Int], timeZone) / 1000L
DateTimeUtils.daysToMillis(t.asInstanceOf[Int], timeZone) / MILLIS_PER_SECOND
case TimestampType =>
t.asInstanceOf[Long] / 1000000L
t.asInstanceOf[Long] / MICROS_PER_SECOND
case StringType if right.foldable =>
if (constFormat == null || formatter == null) {
null
} else {
try {
formatter.parse(
t.asInstanceOf[UTF8String].toString).getTime / 1000L
t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
} catch {
case NonFatal(_) => null
}
Expand All @@ -698,8 +700,8 @@ abstract class UnixTime
} else {
val formatString = f.asInstanceOf[UTF8String].toString
try {
DateTimeUtils.newDateFormat(formatString, timeZone).parse(
t.asInstanceOf[UTF8String].toString).getTime / 1000L
TimestampFormatter(formatString, timeZone, Locale.US).parse(
t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
} catch {
case NonFatal(_) => null
}
Expand All @@ -712,7 +714,7 @@ abstract class UnixTime
val javaType = CodeGenerator.javaType(dataType)
left.dataType match {
case StringType if right.foldable =>
val df = classOf[DateFormat].getName
val df = classOf[TimestampFormatter].getName
if (formatter == null) {
ExprCode.forNullValue(dataType)
} else {
Expand All @@ -724,24 +726,35 @@ abstract class UnixTime
$javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
if (!${ev.isNull}) {
try {
${ev.value} = $formatterName.parse(${eval1.value}.toString()).getTime() / 1000L;
${ev.value} = $formatterName.parse(${eval1.value}.toString()) / 1000000L;
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
} catch (java.text.ParseException e) {
${ev.isNull} = true;
} catch (java.time.format.DateTimeParseException e) {
${ev.isNull} = true;
} catch (java.time.DateTimeException e) {
${ev.isNull} = true;
}
}""")
}
case StringType =>
val tz = ctx.addReferenceObj("timeZone", timeZone)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val locale = ctx.addReferenceObj("locale", Locale.US)
val dtu = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (string, format) => {
s"""
try {
${ev.value} = $dtu.newDateFormat($format.toString(), $tz)
.parse($string.toString()).getTime() / 1000L;
${ev.value} = $dtu.apply($format.toString(), $tz, $locale)
.parse($string.toString()) / 1000000L;
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
} catch (java.text.ParseException e) {
${ev.isNull} = true;
} catch (java.time.format.DateTimeParseException e) {
${ev.isNull} = true;
} catch (java.time.DateTimeException e) {
${ev.isNull} = true;
}
"""
})
Expand Down Expand Up @@ -806,9 +819,9 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
copy(timeZoneId = Option(timeZoneId))

private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: DateFormat =
private lazy val formatter: TimestampFormatter =
try {
DateTimeUtils.newDateFormat(constFormat.toString, timeZone)
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
} catch {
case NonFatal(_) => null
}
Expand All @@ -823,8 +836,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
UTF8String.fromString(formatter.format(
new java.util.Date(time.asInstanceOf[Long] * 1000L)))
UTF8String.fromString(formatter.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
}
Expand All @@ -835,8 +847,8 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
UTF8String.fromString(DateTimeUtils.newDateFormat(f.toString, timeZone)
.format(new java.util.Date(time.asInstanceOf[Long] * 1000L)))
UTF8String.fromString(TimestampFormatter(f.toString, timeZone, Locale.US)
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
}
Expand All @@ -846,7 +858,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val df = classOf[DateFormat].getName
val df = classOf[TimestampFormatter].getName
if (format.foldable) {
if (formatter == null) {
ExprCode.forNullValue(StringType)
Expand All @@ -859,21 +871,21 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
if (!${ev.isNull}) {
try {
${ev.value} = UTF8String.fromString($formatterName.format(
new java.util.Date(${t.value} * 1000L)));
${ev.value} = UTF8String.fromString($formatterName.format(${t.value} * 1000000L));
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
}
}""")
}
} else {
val tz = ctx.addReferenceObj("timeZone", timeZone)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val locale = ctx.addReferenceObj("locale", Locale.US)
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (seconds, f) => {
s"""
try {
${ev.value} = UTF8String.fromString($dtu.newDateFormat($f.toString(), $tz).format(
new java.util.Date($seconds * 1000L)));
${ev.value} = UTF8String.fromString($tf.apply($f.toString(), $tz, $locale).
format($seconds * 1000000L));
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
}"""
Expand Down
Expand Up @@ -37,8 +37,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {

private val decimalParser = ExprUtils.getDecimalParser(options.locale)

@transient
private lazy val timestampFormatter = TimestampFormatter(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.internal.SQLConf

sealed trait DateFormatter {
sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch
def format(days: Int): String
}
Expand All @@ -35,7 +35,8 @@ class Iso8601DateFormatter(
pattern: String,
locale: Locale) extends DateFormatter with DateTimeFormatterHelper {

private val formatter = buildFormatter(pattern, locale)
@transient
private lazy val formatter = buildFormatter(pattern, locale)
private val UTC = ZoneId.of("UTC")

private def toInstant(s: String): Instant = {
Expand All @@ -56,7 +57,8 @@ class Iso8601DateFormatter(
}

class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
private val format = FastDateFormat.getInstance(pattern, locale)
@transient
private lazy val format = FastDateFormat.getInstance(pattern, locale)

override def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
Expand Down
Expand Up @@ -17,27 +17,36 @@

package org.apache.spark.sql.catalyst.util

import java.time.{Instant, LocalDateTime, ZonedDateTime, ZoneId}
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.time.temporal.{ChronoField, TemporalAccessor}
import java.time._
import java.time.chrono.IsoChronology
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.Locale

trait DateTimeFormatterHelper {

protected def buildFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.ERA, 1)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
.withChronology(IsoChronology.INSTANCE)
.withResolverStyle(ResolverStyle.STRICT)
}

protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor, zoneId: ZoneId): Instant = {
val localDateTime = LocalDateTime.from(temporalAccessor)
val localTime = if (temporalAccessor.query(TemporalQueries.localTime) == null) {
LocalTime.ofNanoOfDay(0)
} else {
LocalTime.from(temporalAccessor)
}
val localDate = LocalDate.from(temporalAccessor)
val localDateTime = LocalDateTime.of(localDate, localTime)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
Instant.from(zonedDateTime)
}
Expand Down

0 comments on commit 7c7fccf

Please sign in to comment.