Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28141][SQL] Support special date values #25708

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c5034a8
Support special date values
MaxGekk Sep 5, 2019
695a6c6
Use special values in stringToDate
MaxGekk Sep 6, 2019
ebc5b02
Tests for stringToDate
MaxGekk Sep 6, 2019
2663542
Use map with UTF8String keys
MaxGekk Sep 6, 2019
edda1f2
Bug fix: pass zoneId in generated code
MaxGekk Sep 6, 2019
8d023ec
Uncomment tests in date.sql
MaxGekk Sep 6, 2019
775c7ec
Change ticket for the special values: infinity and -infinity
MaxGekk Sep 6, 2019
be787f4
Bug fix: pass zoneId to Cast
MaxGekk Sep 6, 2019
f356b72
Bug fix: use passed zoneId parameter
MaxGekk Sep 6, 2019
864e456
Remove an unused import
MaxGekk Sep 6, 2019
e237b27
Use outstandingZoneIds in the test
MaxGekk Sep 6, 2019
0055a8a
Merge remote-tracking branch 'remotes/origin/master' into datetime-sp…
MaxGekk Sep 6, 2019
4fb8834
Re-get date.sql.out
MaxGekk Sep 6, 2019
9a2349b
Create zoneId when it is needed
MaxGekk Sep 6, 2019
6da7905
Bug fix: require time zone in the string to date conversion
MaxGekk Sep 6, 2019
d57e11e
Use regex
MaxGekk Sep 7, 2019
33befed
Add a blank line after specialDate
MaxGekk Sep 8, 2019
e2275d4
specialDate -> specialValue
MaxGekk Sep 8, 2019
dabb6ec
Cut off blanks at the end
MaxGekk Sep 8, 2019
ce7e04d
Revert "Cut off blanks at the end"
MaxGekk Sep 8, 2019
0fd86a0
Require trimmed string
MaxGekk Sep 8, 2019
92c5509
Add tests for from_csv and from_json
MaxGekk Sep 8, 2019
254567d
Keep case of time zone as is
MaxGekk Sep 9, 2019
d61fdc7
Extract common parsing code to extractSpecialValue
MaxGekk Sep 10, 2019
03d3126
Add an assert
MaxGekk Sep 10, 2019
106524b
Import only needed things
MaxGekk Sep 10, 2019
0b0e5d4
Merge remote-tracking branch 'remotes/origin/master' into datetime-sp…
MaxGekk Sep 18, 2019
ff92531
Remove an unused import
MaxGekk Sep 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -488,7 +488,7 @@ object CatalogColumnStat extends Logging {
dataType match {
case BooleanType => s.toBoolean
case DateType if version == 1 => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
case DateType => DateFormatter().parse(s)
case DateType => DateFormatter(ZoneOffset.UTC).parse(s)
case TimestampType if version == 1 =>
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
case TimestampType => getTimestampFormatter().parse(s)
Expand All @@ -513,7 +513,7 @@ object CatalogColumnStat extends Logging {
*/
def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateFormatter().format(v.asInstanceOf[Int])
case DateType => DateFormatter(ZoneOffset.UTC).format(v.asInstanceOf[Int])
case TimestampType => getTimestampFormatter().format(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
Expand Down
Expand Up @@ -45,7 +45,10 @@ class UnivocityGenerator(
options.timestampFormat,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)

private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
Expand Down
Expand Up @@ -78,7 +78,10 @@ class UnivocityParser(
options.timestampFormat,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)
MaxGekk marked this conversation as resolved.
Show resolved Hide resolved

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
Expand Down
Expand Up @@ -287,7 +287,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
// [[func]] assumes the input is no longer null because eval already does the null check.
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])

private lazy val dateFormatter = DateFormatter()
private lazy val dateFormatter = DateFormatter(zoneId)
private lazy val timestampFormatter = TimestampFormatter.getFractionFormatter(zoneId)
private val failOnIntegralTypeOverflow = SQLConf.get.failOnIntegralTypeOverflow

Expand Down Expand Up @@ -469,7 +469,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
// DateConverter
private[this] def castToDate(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).orNull)
buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s, zoneId).orNull)
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
Expand Down Expand Up @@ -1056,28 +1056,31 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String

private[this] def castToDateCode(
from: DataType,
ctx: CodegenContext): CastFunction = from match {
case StringType =>
val intOpt = ctx.freshVariable("intOpt", classOf[Option[Integer]])
(c, evPrim, evNull) => code"""
scala.Option<Integer> $intOpt =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDate($c);
if ($intOpt.isDefined()) {
$evPrim = ((Integer) $intOpt.get()).intValue();
} else {
$evNull = true;
}
"""
case TimestampType =>
val zoneIdClass = classOf[ZoneId]
val zid = JavaCode.global(
ctx.addReferenceObj("zoneId", zoneId, zoneIdClass.getName),
zoneIdClass)
(c, evPrim, evNull) =>
code"""$evPrim =
org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToEpochDays($c, $zid);"""
case _ =>
(c, evPrim, evNull) => code"$evNull = true;"
ctx: CodegenContext): CastFunction = {
val zoneIdClass = classOf[ZoneId]
val zid = JavaCode.global(
ctx.addReferenceObj("zoneId", zoneId, zoneIdClass.getName),
zoneIdClass)
from match {
case StringType =>
val intOpt = ctx.freshVariable("intOpt", classOf[Option[Integer]])
(c, evPrim, evNull) =>
code"""
scala.Option<Integer> $intOpt =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, (this is not related to this pr though), scala.Option<Integer> is ok for java compilers/jvm? @rednaxelafx I remember that jdk compilers cannot compile this statement: #21770 (comment) because it seems the compilers erase the returned type from scala.Option<Integer> to scala.Option<Object>. I'm not sure why janino accepts this though....

org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDate($c, $zid);
if ($intOpt.isDefined()) {
$evPrim = ((Integer) $intOpt.get()).intValue();
} else {
$evNull = true;
}
"""
case TimestampType =>
(c, evPrim, evNull) =>
code"""$evPrim =
org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToEpochDays($c, $zid);"""
case _ =>
(c, evPrim, evNull) => code"$evNull = true;"
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just put the gen'd code for other reviewers;

/* 051 */         boolean project_isNull_0 = columnartorow_isNull_0;
/* 052 */         int project_value_0 = -1;
/* 053 */         if (!columnartorow_isNull_0) {
/* 054 */           scala.Option<Integer> project_intOpt_0 =
/* 055 */           org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDate(columnartorow_value_0, ((java.time.ZoneId) references[2] /* zoneId */));
/* 056 */           if (project_intOpt_0.isDefined()) {
/* 057 */             project_value_0 = ((Integer) project_intOpt_0.get()).intValue();
/* 058 */           } else {
/* 059 */             project_isNull_0 = true;
/* 060 */           }
/* 061 */         }


private[this] def changePrecision(d: ExprValue, decimalType: DecimalType,
Expand Down
Expand Up @@ -76,9 +76,7 @@ case class CurrentDate(timeZoneId: Option[String] = None)
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def eval(input: InternalRow): Any = {
localDateToDays(LocalDate.now(zoneId))
}
override def eval(input: InternalRow): Any = currentDate(zoneId)

override def prettyName: String = "current_date"
}
Expand Down
Expand Up @@ -371,7 +371,9 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression {
case _ => v + "D"
}
case (v: Decimal, t: DecimalType) => v + "BD"
case (v: Int, DateType) => s"DATE '${DateFormatter().format(v)}'"
case (v: Int, DateType) =>
val formatter = DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
s"DATE '${formatter.format(v)}'"
case (v: Long, TimestampType) =>
val formatter = TimestampFormatter.getFractionFormatter(
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
Expand Down
Expand Up @@ -81,7 +81,10 @@ private[sql] class JacksonGenerator(
options.timestampFormat,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)
MaxGekk marked this conversation as resolved.
Show resolved Hide resolved

private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
Expand Down
Expand Up @@ -59,7 +59,10 @@ class JacksonParser(
options.timestampFormat,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
options.locale)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
Expand Down
Expand Up @@ -1761,7 +1761,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
try {
valueType match {
case "DATE" => toLiteral(stringToDate, DateType)
case "DATE" =>
toLiteral(stringToDate(_, getZoneId(SQLConf.get.sessionLocalTimeZone)), DateType)
case "TIMESTAMP" =>
val zoneId = getZoneId(SQLConf.get.sessionLocalTimeZone)
toLiteral(stringToTimestamp(_, zoneId), TimestampType)
Expand Down
Expand Up @@ -17,24 +17,31 @@

package org.apache.spark.sql.catalyst.util

import java.time.LocalDate
import java.time.{LocalDate, ZoneId}
import java.util.Locale

import DateTimeUtils._

sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch
def format(days: Int): String
}

class Iso8601DateFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale) extends DateFormatter with DateTimeFormatterHelper {

@transient
private lazy val formatter = getOrCreateFormatter(pattern, locale)

override def parse(s: String): Int = {
val localDate = LocalDate.parse(s, formatter)
DateTimeUtils.localDateToDays(localDate)
if (specialDateKeys.contains(s)) {
specialDates(s)(zoneId)
} else {
val localDate = LocalDate.parse(s, formatter)
localDateToDays(localDate)
}
}

override def format(days: Int): String = {
Expand All @@ -46,11 +53,13 @@ object DateFormatter {
val defaultPattern: String = "uuuu-MM-dd"
val defaultLocale: Locale = Locale.US

def apply(format: String, locale: Locale): DateFormatter = {
new Iso8601DateFormatter(format, locale)
def apply(format: String, zoneId: ZoneId, locale: Locale): DateFormatter = {
new Iso8601DateFormatter(format, zoneId, locale)
}

def apply(format: String): DateFormatter = apply(format, defaultLocale)
def apply(format: String, zoneId: ZoneId): DateFormatter = {
apply(format, zoneId, defaultLocale)
}

def apply(): DateFormatter = apply(defaultPattern)
def apply(zoneId: ZoneId): DateFormatter = apply(defaultPattern, zoneId)
}
Expand Up @@ -375,9 +375,11 @@ object DateTimeUtils {
* `yyyy-[m]m-[d]d *`
* `yyyy-[m]m-[d]dT*`
*/
def stringToDate(s: UTF8String): Option[SQLDate] = {
def stringToDate(s: UTF8String, zoneId: ZoneId): Option[SQLDate] = {
if (s == null) {
return None
} else if (specialUTF8DateKeys.contains(s)) {
return Some(specialUTF8Dates(s)(zoneId))
}
val segments: Array[Int] = Array[Int](1, 1, 1)
var i = 0
Expand Down Expand Up @@ -848,4 +850,19 @@ object DateTimeUtils {
val sinceEpoch = BigDecimal(timestamp) / MICROS_PER_SECOND + offset
new Decimal().set(sinceEpoch, 20, 6)
}

def currentDate(zoneId: ZoneId): SQLDate = localDateToDays(LocalDate.now(zoneId))

/** Notational shorthands that are converted to ordinary dates. */
val specialDates: Map[String, ZoneId => SQLDate] = Map(
("epoch", (_: ZoneId) => 0),
("now", currentDate),
("today", currentDate),
("tomorrow", (z: ZoneId) => Math.addExact(currentDate(z), 1)),
("yesterday", (z: ZoneId) => Math.subtractExact(currentDate(z), 1)))
val specialDateKeys: Set[String] = specialDates.keySet

val specialUTF8Dates: Map[UTF8String, ZoneId => SQLDate] =
specialDates.map { case (key, value) => UTF8String.fromString(key) -> value}
val specialUTF8DateKeys: Set[UTF8String] = specialDateKeys.map(UTF8String.fromString)
}
Expand Up @@ -174,7 +174,7 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("hive-hash for date type") {
def checkHiveHashForDateType(dateString: String, expected: Long): Unit = {
checkHiveHash(
DateTimeUtils.stringToDate(UTF8String.fromString(dateString)).get,
DateTimeUtils.stringToDate(UTF8String.fromString(dateString), ZoneOffset.UTC).get,
DateType,
expected)
}
Expand Down
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.util

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.ZoneId
import java.time.{LocalDate, ZoneId, ZoneOffset}
import java.util.{Locale, TimeZone}
import java.util.concurrent.TimeUnit

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.unsafe.types.UTF8String

class DateTimeUtilsSuite extends SparkFunSuite {
Expand Down Expand Up @@ -118,28 +119,32 @@ class DateTimeUtilsSuite extends SparkFunSuite {
checkFromToJavaDate(new Date(df2.parse("1776-07-04 18:30:00 UTC").getTime))
}

private def toDate(s: String, zoneId: ZoneId = ZoneOffset.UTC): Option[SQLDate] = {
stringToDate(UTF8String.fromString(s), ZoneOffset.UTC)
}

test("string to date") {
assert(stringToDate(UTF8String.fromString("2015-01-28")).get === days(2015, 1, 28))
assert(stringToDate(UTF8String.fromString("2015")).get === days(2015, 1, 1))
assert(stringToDate(UTF8String.fromString("0001")).get === days(1, 1, 1))
assert(stringToDate(UTF8String.fromString("2015-03")).get === days(2015, 3, 1))
assert(toDate("2015-01-28").get === days(2015, 1, 28))
assert(toDate("2015").get === days(2015, 1, 1))
assert(toDate("0001").get === days(1, 1, 1))
assert(toDate("2015-03").get === days(2015, 3, 1))
Seq("2015-03-18", "2015-03-18 ", " 2015-03-18", " 2015-03-18 ", "2015-03-18 123142",
"2015-03-18T123123", "2015-03-18T").foreach { s =>
assert(stringToDate(UTF8String.fromString(s)).get === days(2015, 3, 18))
assert(toDate(s).get === days(2015, 3, 18))
}

assert(stringToDate(UTF8String.fromString("2015-03-18X")).isEmpty)
assert(stringToDate(UTF8String.fromString("2015/03/18")).isEmpty)
assert(stringToDate(UTF8String.fromString("2015.03.18")).isEmpty)
assert(stringToDate(UTF8String.fromString("20150318")).isEmpty)
assert(stringToDate(UTF8String.fromString("2015-031-8")).isEmpty)
assert(stringToDate(UTF8String.fromString("02015-03-18")).isEmpty)
assert(stringToDate(UTF8String.fromString("015-03-18")).isEmpty)
assert(stringToDate(UTF8String.fromString("015")).isEmpty)
assert(stringToDate(UTF8String.fromString("02015")).isEmpty)
assert(stringToDate(UTF8String.fromString("1999 08 01")).isEmpty)
assert(stringToDate(UTF8String.fromString("1999-08 01")).isEmpty)
assert(stringToDate(UTF8String.fromString("1999 08")).isEmpty)
assert(toDate("2015-03-18X").isEmpty)
assert(toDate("2015/03/18").isEmpty)
assert(toDate("2015.03.18").isEmpty)
assert(toDate("20150318").isEmpty)
assert(toDate("2015-031-8").isEmpty)
assert(toDate("02015-03-18").isEmpty)
assert(toDate("015-03-18").isEmpty)
assert(toDate("015").isEmpty)
assert(toDate("02015").isEmpty)
assert(toDate("1999 08 01").isEmpty)
assert(toDate("1999-08 01").isEmpty)
assert(toDate("1999 08").isEmpty)
}

test("string to timestamp") {
Expand Down Expand Up @@ -258,12 +263,10 @@ class DateTimeUtilsSuite extends SparkFunSuite {

test("SPARK-15379: special invalid date string") {
// Test stringToDate
assert(stringToDate(
UTF8String.fromString("2015-02-29 00:00:00")).isEmpty)
assert(stringToDate(
UTF8String.fromString("2015-04-31 00:00:00")).isEmpty)
assert(stringToDate(UTF8String.fromString("2015-02-29")).isEmpty)
assert(stringToDate(UTF8String.fromString("2015-04-31")).isEmpty)
assert(toDate("2015-02-29 00:00:00").isEmpty)
assert(toDate("2015-04-31 00:00:00").isEmpty)
assert(toDate("2015-02-29").isEmpty)
assert(toDate("2015-04-31").isEmpty)


// Test stringToTimestamp
Expand Down Expand Up @@ -564,4 +567,17 @@ class DateTimeUtilsSuite extends SparkFunSuite {
assert(DateTimeUtils.toMillis(-9223372036844776001L) === -9223372036844777L)
assert(DateTimeUtils.toMillis(-157700927876544L) === -157700927877L)
}

test("special date values") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val zoneId = getZoneId(timeZone)

assert(toDate("epoch", zoneId).get === 0)
val today = localDateToDays(LocalDate.now(zoneId))
assert(toDate("yesterday", zoneId).get === today - 1)
Copy link
Member Author

@MaxGekk MaxGekk Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, there is the risk that today contains previous day if we were unlucky and the test was executed at the end of the day. Maybe, need to introduce some tolerance.

assert(toDate("now", zoneId).get === today)
assert(toDate("today", zoneId).get === today)
assert(toDate("tomorrow", zoneId).get === today + 1)
}
}
}
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.time.ZoneId
import java.time.{ZoneId, ZoneOffset}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
Expand All @@ -38,8 +38,8 @@ class UnsafeArraySuite extends SparkFunSuite {
val doubleArray = Array(1.1, 2.2, 3.3)
val stringArray = Array("1", "10", "100")
val dateArray = Array(
DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1")).get,
DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26")).get)
DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1"), ZoneOffset.UTC).get,
DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26"), ZoneOffset.UTC).get)
private def defaultZoneId = ZoneId.systemDefault()
val timestampArray = Array(
DateTimeUtils.stringToTimestamp(
Expand Down