From f620996142ba312f7e52f75476b1b18be667ffdf Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 16 Aug 2021 23:29:33 +0800 Subject: [PATCH] [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default pattern ### What changes were proposed in this pull request? In the PR, I propose to use the `CAST` logic when the pattern is not specified in `DateFormatter` or `TimestampFormatter`. In particular, invoke the `DateTimeUtils.stringToTimestampAnsi()` or `stringToDateAnsi()` in the case. ### Why are the changes needed? 1. This can improve user experience with Spark SQL by making the default date/timestamp parsers more flexible and tolerant to their inputs. 2. We make the default case consistent to the behavior of the `CAST` expression which makes implementation more consistent. ### Does this PR introduce _any_ user-facing change? The changes shouldn't introduce behavior change in regular cases but it can influence on corner cases. New implementation is able to parse more dates/timestamps by default. For instance, old (current) date parses can recognize dates only in the format **yyyy-MM-dd** but new one can handle: * `[+-]yyyy*` * `[+-]yyyy*-[m]m` * `[+-]yyyy*-[m]m-[d]d` * `[+-]yyyy*-[m]m-[d]d ` * `[+-]yyyy*-[m]m-[d]d *` * `[+-]yyyy*-[m]m-[d]dT*` Similarly for timestamps. The old (current) timestamp formatter is able to parse timestamps only in the format **yyyy-MM-dd HH:mm:ss** by default, but new implementation can handle: * `[+-]yyyy*` * `[+-]yyyy*-[m]m` * `[+-]yyyy*-[m]m-[d]d` * `[+-]yyyy*-[m]m-[d]d ` * `[+-]yyyy*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` * `[+-]yyyy*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *ImageFileFormatSuite" $ build/sbt "test:testOnly *ParquetV2PartitionDiscoverySuite" ``` Closes #33709 from MaxGekk/datetime-cast-default-pattern. Authored-by: Max Gekk Signed-off-by: Wenchen Fan --- .../source/image/ImageFileFormatSuite.scala | 17 +++++----- .../sql/catalyst/util/DateFormatter.scala | 33 +++++++++++++++--- .../catalyst/util/TimestampFormatter.scala | 34 ++++++++++++++++--- .../catalyst/util/DateFormatterSuite.scala | 22 ++++++++++++ .../util/TimestampFormatterSuite.scala | 25 ++++++++++++++ .../ParquetPartitionDiscoverySuite.scala | 2 +- 6 files changed, 116 insertions(+), 17 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala index 0ec2747be6585..7dca81ef40130 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.ml.source.image import java.net.URI import java.nio.file.Paths +import java.sql.Date import org.apache.spark.SparkFunSuite import org.apache.spark.ml.image.ImageSchema._ @@ -95,14 +96,14 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { .collect() assert(Set(result: _*) === Set( - Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"), - Row("54893.jpg", "kittens", "2018-02"), - Row("DP153539.jpg", "kittens", "2018-02"), - Row("DP802813.jpg", "kittens", "2018-02"), - Row("BGRA.png", "multichannel", "2018-01"), - Row("BGRA_alpha_60.png", "multichannel", "2018-01"), - Row("chr30.4.184.jpg", "multichannel", "2018-02"), - Row("grayscale.jpg", "multichannel", "2018-02") + Row("29.5.a_b_EGDP022204.jpg", "kittens", Date.valueOf("2018-01-01")), + Row("54893.jpg", "kittens", Date.valueOf("2018-02-01")), + Row("DP153539.jpg", "kittens", Date.valueOf("2018-02-01")), + Row("DP802813.jpg", "kittens", Date.valueOf("2018-02-01")), + Row("BGRA.png", "multichannel", Date.valueOf("2018-01-01")), + Row("BGRA_alpha_60.png", "multichannel", Date.valueOf("2018-01-01")), + Row("chr30.4.184.jpg", "multichannel", Date.valueOf("2018-02-01")), + Row("grayscale.jpg", "multichannel", Date.valueOf("2018-02-01")) )) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 76bc196ddc209..d9ccf3091f839 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -26,6 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ +import org.apache.spark.unsafe.types.UTF8String sealed trait DateFormatter extends Serializable { def parse(s: String): Int // returns days since epoch @@ -48,7 +49,8 @@ class Iso8601DateFormatter( private lazy val formatter = getOrCreateFormatter(pattern, locale, isParsing) @transient - private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(pattern, locale, legacyFormat) + protected lazy val legacyFormatter = + DateFormatter.getLegacyFormatter(pattern, locale, legacyFormat) override def parse(s: String): Int = { try { @@ -79,6 +81,28 @@ class Iso8601DateFormatter( } } +/** + * The formatter for dates which doesn't require users to specify a pattern. While formatting, + * it uses the default pattern [[DateFormatter.defaultPattern]]. In parsing, it follows the CAST + * logic in conversion of strings to Catalyst's DateType. + * + * @param locale The locale overrides the system locale and is used in formatting. + * @param legacyFormat Defines the formatter used for legacy dates. + * @param isParsing Whether the formatter is used for parsing (`true`) or for formatting (`false`). + */ +class DefaultDateFormatter( + locale: Locale, + legacyFormat: LegacyDateFormats.LegacyDateFormat, + isParsing: Boolean) + extends Iso8601DateFormatter(DateFormatter.defaultPattern, locale, legacyFormat, isParsing) { + + override def parse(s: String): Int = { + try { + DateTimeUtils.stringToDateAnsi(UTF8String.fromString(s)) + } catch checkParsedDiff(s, legacyFormatter.parse) + } +} + trait LegacyDateFormatter extends DateFormatter { def parseToDate(s: String): Date @@ -151,11 +175,12 @@ object DateFormatter { locale: Locale = defaultLocale, legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT, isParsing: Boolean): DateFormatter = { - val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { - getLegacyFormatter(pattern, locale, legacyFormat) + getLegacyFormatter(format.getOrElse(defaultPattern), locale, legacyFormat) } else { - val df = new Iso8601DateFormatter(pattern, locale, legacyFormat, isParsing) + val df = format + .map(new Iso8601DateFormatter(_, locale, legacyFormat, isParsing)) + .getOrElse(new DefaultDateFormatter(locale, legacyFormat, isParsing)) df.validatePatternString() df } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 35c91a62de551..fb8502adeae9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.types.Decimal +import org.apache.spark.unsafe.types.UTF8String sealed trait TimestampFormatter extends Serializable { /** @@ -160,6 +161,31 @@ class Iso8601TimestampFormatter( } } +/** + * The formatter for timestamps which doesn't require users to specify a pattern. While formatting, + * it uses the default pattern [[TimestampFormatter.defaultPattern()]]. In parsing, it follows + * the CAST logic in conversion of strings to Catalyst's TimestampType. + * + * @param zoneId The time zone ID in which timestamps should be formatted or parsed. + * @param locale The locale overrides the system locale and is used in formatting. + * @param legacyFormat Defines the formatter used for legacy timestamps. + * @param isParsing Whether the formatter is used for parsing (`true`) or for formatting (`false`). + */ +class DefaultTimestampFormatter( + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT, + isParsing: Boolean) + extends Iso8601TimestampFormatter( + TimestampFormatter.defaultPattern(), zoneId, locale, legacyFormat, isParsing) { + + override def parse(s: String): Long = { + try { + DateTimeUtils.stringToTimestampAnsi(UTF8String.fromString(s), zoneId) + } catch checkParsedDiff(s, legacyFormatter.parse) + } +} + /** * The formatter parses/formats timestamps according to the pattern `yyyy-MM-dd HH:mm:ss.[..fff..]` * where `[..fff..]` is a fraction of second up to microsecond resolution. The formatter does not @@ -341,12 +367,12 @@ object TimestampFormatter { legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT, isParsing: Boolean, forTimestampNTZ: Boolean = false): TimestampFormatter = { - val pattern = format.getOrElse(defaultPattern) val formatter = if (SQLConf.get.legacyTimeParserPolicy == LEGACY && !forTimestampNTZ) { - getLegacyFormatter(pattern, zoneId, locale, legacyFormat) + getLegacyFormatter(format.getOrElse(defaultPattern), zoneId, locale, legacyFormat) } else { - new Iso8601TimestampFormatter( - pattern, zoneId, locale, legacyFormat, isParsing) + format + .map(new Iso8601TimestampFormatter(_, zoneId, locale, legacyFormat, isParsing)) + .getOrElse(new DefaultTimestampFormatter(zoneId, locale, legacyFormat, isParsing)) } formatter.validatePatternString(checkLegacy = !forTimestampNTZ) formatter diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala index 4c22e671e300b..44c90db7630ac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateFormatterSuite.scala @@ -188,4 +188,26 @@ class DateFormatterSuite extends DatetimeFormatterSuite { // SparkUpgradeException here. intercept[SparkUpgradeException](formatter.parse("02-29")) } + + test("SPARK-36418: default parsing w/o pattern") { + val formatter = new DefaultDateFormatter( + locale = DateFormatter.defaultLocale, + legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT, + isParsing = true) + Seq( + "-0042-3-4" -> LocalDate.of(-42, 3, 4), + "1000" -> LocalDate.of(1000, 1, 1), + "1582-10-4" -> LocalDate.of(1582, 10, 4), + "1583-1-1 " -> LocalDate.of(1583, 1, 1), + "1970-01-1 00:00" -> LocalDate.of(1970, 1, 1), + "2021-8-12T18:31:50" -> LocalDate.of(2021, 8, 12) + ).foreach { case (inputStr, ld) => + assert(formatter.parse(inputStr) === ld.toEpochDay) + } + + val errMsg = intercept[DateTimeException] { + formatter.parse("x123") + }.getMessage + assert(errMsg.contains("Cannot cast x123 to DateType")) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala index 6e9386615aa1c..661e624efa592 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala @@ -431,4 +431,29 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite { val e2 = intercept[ArithmeticException](formatter.parse("-290308")) assert(e2.getMessage === "long overflow") } + + test("SPARK-36418: default parsing w/o pattern") { + outstandingZoneIds.foreach { zoneId => + val formatter = new DefaultTimestampFormatter( + zoneId, + locale = DateFormatter.defaultLocale, + legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT, + isParsing = true) + Seq( + "-0042-3-4" -> LocalDateTime.of(-42, 3, 4, 0, 0, 0), + "1000" -> LocalDateTime.of(1000, 1, 1, 0, 0, 0), + "1582-10-4" -> LocalDateTime.of(1582, 10, 4, 0, 0, 0), + "1583-1-1 " -> LocalDateTime.of(1583, 1, 1, 0, 0, 0), + "1970-01-1 01:02:3" -> LocalDateTime.of(1970, 1, 1, 1, 2, 3), + "2021-8-12T18:31:50" -> LocalDateTime.of(2021, 8, 12, 18, 31, 50) + ).foreach { case (inputStr, ldt) => + assert(formatter.parse(inputStr) === DateTimeTestUtils.localDateTimeToMicros(ldt, zoneId)) + } + + val errMsg = intercept[DateTimeException] { + formatter.parse("x123") + }.getMessage + assert(errMsg.contains("Cannot cast x123 to TimestampType")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index a8507b7dd97f3..a3aa74d9fc780 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -1067,7 +1067,7 @@ abstract class ParquetPartitionDiscoverySuite test("SPARK-23436: invalid Dates should be inferred as String in partition inference") { withTempPath { path => - val data = Seq(("1", "2018-01", "2018-01-01-04", "test")) + val data = Seq(("1", "2018-41", "2018-01-01-04", "test")) .toDF("id", "date_month", "date_hour", "data") data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath)