Skip to content

Commit

Permalink
[SPARK-33498][SQL] Datetime parsing should fail if the input string c…
Browse files Browse the repository at this point in the history
…an't be parsed, or the pattern string is invalid

### What changes were proposed in this pull request?

Datetime parsing should fail if the input string can't be parsed, or the pattern string is invalid, when ANSI mode is enable. This patch should update GetTimeStamp, UnixTimeStamp, ToUnixTimeStamp and Cast.

### Why are the changes needed?

For ANSI mode.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT and Existing UT.

Closes #30442 from leanken/leanken-SPARK-33498.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
leanken-zz authored and cloud-fan committed Nov 27, 2020
1 parent e432550 commit b9f2f78
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 59 deletions.
5 changes: 5 additions & 0 deletions docs/sql-ref-ansi-compliance.md
Expand Up @@ -136,12 +136,17 @@ The behavior of some SQL functions can be different under ANSI mode (`spark.sql.
- `element_at`: This function throws `NoSuchElementException` if key does not exist in map.
- `elt`: This function throws `ArrayIndexOutOfBoundsException` if using invalid indices.
- `parse_url`: This function throws `IllegalArgumentException` if an input string is not a valid url.
- `to_date` This function should fail with an exception if the input string can't be parsed, or the pattern string is invalid.
- `to_timestamp` This function should fail with an exception if the input string can't be parsed, or the pattern string is invalid.
- `unix_timestamp` This function should fail with an exception if the input string can't be parsed, or the pattern string is invalid.
- `to_unix_timestamp` This function should fail with an exception if the input string can't be parsed, or the pattern string is invalid.

### SQL Operators

The behavior of some SQL operators can be different under ANSI mode (`spark.sql.ansi.enabled=true`).
- `array_col[index]`: This operator throws `ArrayIndexOutOfBoundsException` if using invalid indices.
- `map_col[key]`: This operator throws `NoSuchElementException` if key does not exist in map.
- `CAST(string_col AS TIMESTAMP)`: This operator should fail with an exception if the input string can't be parsed.

### SQL Keywords

Expand Down
Expand Up @@ -448,7 +448,13 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
// TimestampConverter
private[this] def castToTimestamp(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, zoneId).orNull)
buildCast[UTF8String](_, utfs => {
if (ansiEnabled) {
DateTimeUtils.stringToTimestampAnsi(utfs, zoneId)
} else {
DateTimeUtils.stringToTimestamp(utfs, zoneId).orNull
}
})
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1L else 0)
case LongType =>
Expand Down Expand Up @@ -1250,15 +1256,22 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
zoneIdClass)
val longOpt = ctx.freshVariable("longOpt", classOf[Option[Long]])
(c, evPrim, evNull) =>
code"""
scala.Option<Long> $longOpt =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $zid);
if ($longOpt.isDefined()) {
$evPrim = ((Long) $longOpt.get()).longValue();
} else {
$evNull = true;
}
"""
if (ansiEnabled) {
code"""
$evPrim =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestampAnsi($c, $zid);
"""
} else {
code"""
scala.Option<Long> $longOpt =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $zid);
if ($longOpt.isDefined()) {
$evPrim = ((Long) $longOpt.get()).longValue();
} else {
$evNull = true;
}
"""
}
case BooleanType =>
(c, evPrim, evNull) => code"$evPrim = $c ? 1L : 0L;"
case _: IntegralType =>
Expand Down
Expand Up @@ -720,10 +720,12 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
case class ToUnixTimestamp(
timeExp: Expression,
format: Expression,
timeZoneId: Option[String] = None)
timeZoneId: Option[String] = None,
failOnError: Boolean = SQLConf.get.ansiEnabled)
extends UnixTime {

def this(timeExp: Expression, format: Expression) = this(timeExp, format, None)
def this(timeExp: Expression, format: Expression) =
this(timeExp, format, None, SQLConf.get.ansiEnabled)

override def left: Expression = timeExp
override def right: Expression = format
Expand Down Expand Up @@ -767,10 +769,15 @@ case class ToUnixTimestamp(
group = "datetime_funcs",
since = "1.5.0")
// scalastyle:on line.size.limit
case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Option[String] = None)
case class UnixTimestamp(
timeExp: Expression,
format: Expression,
timeZoneId: Option[String] = None,
failOnError: Boolean = SQLConf.get.ansiEnabled)
extends UnixTime {

def this(timeExp: Expression, format: Expression) = this(timeExp, format, None)
def this(timeExp: Expression, format: Expression) =
this(timeExp, format, None, SQLConf.get.ansiEnabled)

override def left: Expression = timeExp
override def right: Expression = format
Expand All @@ -792,6 +799,8 @@ case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Op
abstract class ToTimestamp
extends BinaryExpression with TimestampFormatterHelper with ExpectsInputTypes {

def failOnError: Boolean

// The result of the conversion to timestamp is microseconds divided by this factor.
// For example if the factor is 1000000, the result of the expression is in seconds.
protected def downScaleFactor: Long
Expand All @@ -803,7 +812,14 @@ abstract class ToTimestamp
Seq(TypeCollection(StringType, DateType, TimestampType), StringType)

override def dataType: DataType = LongType
override def nullable: Boolean = true
override def nullable: Boolean = if (failOnError) children.exists(_.nullable) else true

private def isParseError(e: Throwable): Boolean = e match {
case _: DateTimeParseException |
_: DateTimeException |
_: ParseException => true
case _ => false
}

override def eval(input: InternalRow): Any = {
val t = left.eval(input)
Expand All @@ -824,9 +840,12 @@ abstract class ToTimestamp
try {
formatter.parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor
} catch {
case _: DateTimeParseException |
_: DateTimeException |
_: ParseException => null
case e if isParseError(e) =>
if (failOnError) {
throw e
} else {
null
}
}
}
}
Expand All @@ -835,6 +854,7 @@ abstract class ToTimestamp

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val javaType = CodeGenerator.javaType(dataType)
val parseErrorBranch = if (failOnError) "throw e;" else s"${ev.isNull} = true;"
left.dataType match {
case StringType => formatterOption.map { fmt =>
val df = classOf[TimestampFormatter].getName
Expand All @@ -844,11 +864,11 @@ abstract class ToTimestamp
|try {
| ${ev.value} = $formatterName.parse($datetimeStr.toString()) / $downScaleFactor;
|} catch (java.time.DateTimeException e) {
| ${ev.isNull} = true;
| $parseErrorBranch
|} catch (java.time.format.DateTimeParseException e) {
| ${ev.isNull} = true;
| $parseErrorBranch
|} catch (java.text.ParseException e) {
| ${ev.isNull} = true;
| $parseErrorBranch
|}
|""".stripMargin)
}.getOrElse {
Expand All @@ -866,11 +886,11 @@ abstract class ToTimestamp
|try {
| ${ev.value} = $timestampFormatter.parse($string.toString()) / $downScaleFactor;
|} catch (java.time.format.DateTimeParseException e) {
| ${ev.isNull} = true;
| $parseErrorBranch
|} catch (java.time.DateTimeException e) {
| ${ev.isNull} = true;
| $parseErrorBranch
|} catch (java.text.ParseException e) {
| ${ev.isNull} = true;
| $parseErrorBranch
|}
|""".stripMargin)
}
Expand Down Expand Up @@ -1737,7 +1757,8 @@ case class DateDiff(endDate: Expression, startDate: Expression)
private case class GetTimestamp(
left: Expression,
right: Expression,
timeZoneId: Option[String] = None)
timeZoneId: Option[String] = None,
failOnError: Boolean = SQLConf.get.ansiEnabled)
extends ToTimestamp {

override val downScaleFactor = 1
Expand Down
Expand Up @@ -364,6 +364,15 @@ object DateTimeUtils {
}
}

def stringToTimestampAnsi(s: UTF8String, timeZoneId: ZoneId): Long = {
val timestamp = stringToTimestamp(s, timeZoneId)
if (timestamp.isEmpty) {
throw new DateTimeException(s"Cannot cast $s to TimestampType.")
} else {
timestamp.get
}
}

/**
* Gets the number of microseconds since the epoch of 1970-01-01 00:00:00Z from the given
* instance of `java.time.Instant`. The epoch microsecond count is a simple incrementing count of
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}
import java.time.DateTimeException
import java.util.{Calendar, TimeZone}

import scala.collection.parallel.immutable.ParVector
Expand Down Expand Up @@ -106,8 +107,6 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(Literal(str), TimestampType, Option(zid.getId)), expected)
}

checkCastStringToTimestamp("123", null)

val tz = TimeZone.getTimeZone(zid)
var c = Calendar.getInstance(tz)
c.set(2015, 0, 1, 0, 0, 0)
Expand Down Expand Up @@ -184,15 +183,6 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
c.set(2015, 2, 18, 12, 3, 17)
c.set(Calendar.MILLISECOND, 123)
checkCastStringToTimestamp("2015-03-18T12:03:17.123+7:3", new Timestamp(c.getTimeInMillis))

checkCastStringToTimestamp("2015-03-18 123142", null)
checkCastStringToTimestamp("2015-03-18T123123", null)
checkCastStringToTimestamp("2015-03-18X", null)
checkCastStringToTimestamp("2015/03/18", null)
checkCastStringToTimestamp("2015.03.18", null)
checkCastStringToTimestamp("20150318", null)
checkCastStringToTimestamp("2015-031-8", null)
checkCastStringToTimestamp("2015-03-18T12:03:17-0:70", null)
}
}

Expand Down Expand Up @@ -302,7 +292,6 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
}

checkEvaluation(cast("abdef", StringType), "abdef")
checkEvaluation(cast("abdef", TimestampType, UTC_OPT), null)
checkEvaluation(cast("12.65", DecimalType.SYSTEM_DEFAULT), Decimal(12.65))

checkEvaluation(cast(cast(sd, DateType), StringType), sd)
Expand Down Expand Up @@ -962,6 +951,34 @@ abstract class AnsiCastSuiteBase extends CastSuiteBase {
cast("abcd", DecimalType(38, 1)),
"invalid input syntax for type numeric")
}

test("ANSI mode: cast string to timestamp with parse error") {
val activeConf = conf
new ParVector(ALL_TIMEZONES.toVector).foreach { zid =>
def checkCastWithParseError(str: String): Unit = {
checkExceptionInExpression[DateTimeException](
cast(Literal(str), TimestampType, Option(zid.getId)),
s"Cannot cast $str to TimestampType.")
}

SQLConf.withExistingConf(activeConf) {
checkCastWithParseError("123")
checkCastWithParseError("2015-03-18 123142")
checkCastWithParseError("2015-03-18T123123")
checkCastWithParseError("2015-03-18X")
checkCastWithParseError("2015/03/18")
checkCastWithParseError("2015.03.18")
checkCastWithParseError("20150318")
checkCastWithParseError("2015-031-8")
checkCastWithParseError("2015-03-18T12:03:17-0:70")

val input = "abdef"
checkExceptionInExpression[DateTimeException](
cast(input, TimestampType, Option(zid.getId)),
s"Cannot cast $input to TimestampType.")
}
}
}
}

/**
Expand Down
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.text.{ParseException, SimpleDateFormat}
import java.time.{Instant, LocalDate, ZoneId}
import java.time.format.DateTimeParseException
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit._

Expand Down Expand Up @@ -1286,4 +1287,58 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
testIntegralFunc(Long.MaxValue)
testIntegralFunc(Long.MinValue)
}
}

test("SPARK-33498: GetTimestamp,UnixTimestamp,ToUnixTimestamp with parseError") {
Seq(true, false).foreach { ansiEnabled =>
Seq("LEGACY", "CORRECTED", "EXCEPTION").foreach { policy =>
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy,
SQLConf.ANSI_ENABLED.key -> ansiEnabled.toString) {

val exprSeq = Seq[Expression](
GetTimestamp(Literal("2020-01-27T20:06:11.847"), Literal("yyyy-MM-dd HH:mm:ss.SSS")),
GetTimestamp(Literal("Unparseable"), Literal("yyyy-MM-dd HH:mm:ss.SSS")),
UnixTimestamp(Literal("2020-01-27T20:06:11.847"), Literal("yyyy-MM-dd HH:mm:ss.SSS")),
UnixTimestamp(Literal("Unparseable"), Literal("yyyy-MM-dd HH:mm:ss.SSS")),
ToUnixTimestamp(Literal("2020-01-27T20:06:11.847"), Literal("yyyy-MM-dd HH:mm:ss.SSS")),
ToUnixTimestamp(Literal("Unparseable"), Literal("yyyy-MM-dd HH:mm:ss.SSS"))
)

if (!ansiEnabled) {
exprSeq.foreach(checkEvaluation(_, null))
} else if (policy == "LEGACY") {
exprSeq.foreach(checkExceptionInExpression[ParseException](_, "Unparseable"))
} else {
exprSeq.foreach(
checkExceptionInExpression[DateTimeParseException](_, "could not be parsed"))
}

// LEGACY works, CORRECTED failed, EXCEPTION with SparkUpgradeException
val exprSeq2 = Seq[(Expression, Long)](
(GetTimestamp(Literal("2020-01-27T20:06:11.847!!!"),
Literal("yyyy-MM-dd'T'HH:mm:ss.SSS")), 1580184371847000L),
(UnixTimestamp(Literal("2020-01-27T20:06:11.847!!!"),
Literal("yyyy-MM-dd'T'HH:mm:ss.SSS")), 1580184371L),
(ToUnixTimestamp(Literal("2020-01-27T20:06:11.847!!!"),
Literal("yyyy-MM-dd'T'HH:mm:ss.SSS")), 1580184371L)
)

if (policy == "LEGACY") {
exprSeq2.foreach(pair => checkEvaluation(pair._1, pair._2))
} else if (policy == "EXCEPTION") {
exprSeq2.foreach(pair =>
checkExceptionInExpression[SparkUpgradeException](
pair._1,
"You may get a different result due to the upgrading of Spark 3.0"))
} else {
if (ansiEnabled) {
exprSeq2.foreach(pair =>
checkExceptionInExpression[DateTimeParseException](pair._1, "could not be parsed"))
} else {
exprSeq2.foreach(pair => checkEvaluation(pair._1, null))
}
}
}
}
}
}
}
11 changes: 11 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/datetime.sql
Expand Up @@ -153,3 +153,14 @@ select from_json('{"t":"26/October/2015"}', 't Timestamp', map('timestampFormat'
select from_json('{"d":"26/October/2015"}', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy'));
select from_csv('26/October/2015', 't Timestamp', map('timestampFormat', 'dd/MMMMM/yyyy'));
select from_csv('26/October/2015', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy'));

-- Timestamp type parse error
select to_date("2020-01-27T20:06:11.847", "yyyy-MM-dd HH:mm:ss.SSS");
select to_date("Unparseable", "yyyy-MM-dd HH:mm:ss.SSS");
select to_timestamp("2020-01-27T20:06:11.847", "yyyy-MM-dd HH:mm:ss.SSS");
select to_timestamp("Unparseable", "yyyy-MM-dd HH:mm:ss.SSS");
select unix_timestamp("2020-01-27T20:06:11.847", "yyyy-MM-dd HH:mm:ss.SSS");
select unix_timestamp("Unparseable", "yyyy-MM-dd HH:mm:ss.SSS");
select to_unix_timestamp("2020-01-27T20:06:11.847", "yyyy-MM-dd HH:mm:ss.SSS");
select to_unix_timestamp("Unparseable", "yyyy-MM-dd HH:mm:ss.SSS");
select cast("Unparseable" as timestamp)

0 comments on commit b9f2f78

Please sign in to comment.