Skip to content

Commit

Permalink
[SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with defau…
Browse files Browse the repository at this point in the history
…lt pattern

### What changes were proposed in this pull request?
In the PR, I propose to use the `CAST` logic when the pattern is not specified in `DateFormatter` or `TimestampFormatter`. In particular, invoke the `DateTimeUtils.stringToTimestampAnsi()` or `stringToDateAnsi()` in the case.

### Why are the changes needed?
1. This can improve user experience with Spark SQL by making the default date/timestamp parsers more flexible and tolerant to their inputs.
2. We make the default case consistent to the behavior of the `CAST` expression which makes implementation more consistent.

### Does this PR introduce _any_ user-facing change?
The changes shouldn't introduce behavior change in regular cases but it can influence on corner cases. New implementation is able to parse more dates/timestamps by default. For instance, old (current) date parses can recognize dates only in the format **yyyy-MM-dd** but new one can handle:
   * `[+-]yyyy*`
   * `[+-]yyyy*-[m]m`
   * `[+-]yyyy*-[m]m-[d]d`
   * `[+-]yyyy*-[m]m-[d]d `
   * `[+-]yyyy*-[m]m-[d]d *`
   * `[+-]yyyy*-[m]m-[d]dT*`

Similarly for timestamps. The old (current) timestamp formatter is able to parse timestamps only in the format **yyyy-MM-dd HH:mm:ss** by default, but new implementation can handle:
   * `[+-]yyyy*`
   * `[+-]yyyy*-[m]m`
   * `[+-]yyyy*-[m]m-[d]d`
   * `[+-]yyyy*-[m]m-[d]d `
   * `[+-]yyyy*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `[+-]yyyy*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *ImageFileFormatSuite"
$ build/sbt "test:testOnly *ParquetV2PartitionDiscoverySuite"
```

Closes #33709 from MaxGekk/datetime-cast-default-pattern.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Aug 16, 2021
1 parent 2270ecf commit f620996
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 17 deletions.
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.ml.source.image

import java.net.URI
import java.nio.file.Paths
import java.sql.Date

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.image.ImageSchema._
Expand Down Expand Up @@ -95,14 +96,14 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {
.collect()

assert(Set(result: _*) === Set(
Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"),
Row("54893.jpg", "kittens", "2018-02"),
Row("DP153539.jpg", "kittens", "2018-02"),
Row("DP802813.jpg", "kittens", "2018-02"),
Row("BGRA.png", "multichannel", "2018-01"),
Row("BGRA_alpha_60.png", "multichannel", "2018-01"),
Row("chr30.4.184.jpg", "multichannel", "2018-02"),
Row("grayscale.jpg", "multichannel", "2018-02")
Row("29.5.a_b_EGDP022204.jpg", "kittens", Date.valueOf("2018-01-01")),
Row("54893.jpg", "kittens", Date.valueOf("2018-02-01")),
Row("DP153539.jpg", "kittens", Date.valueOf("2018-02-01")),
Row("DP802813.jpg", "kittens", Date.valueOf("2018-02-01")),
Row("BGRA.png", "multichannel", Date.valueOf("2018-01-01")),
Row("BGRA_alpha_60.png", "multichannel", Date.valueOf("2018-01-01")),
Row("chr30.4.184.jpg", "multichannel", Date.valueOf("2018-02-01")),
Row("grayscale.jpg", "multichannel", Date.valueOf("2018-02-01"))
))
}

Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.unsafe.types.UTF8String

sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch
Expand All @@ -48,7 +49,8 @@ class Iso8601DateFormatter(
private lazy val formatter = getOrCreateFormatter(pattern, locale, isParsing)

@transient
private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(pattern, locale, legacyFormat)
protected lazy val legacyFormatter =
DateFormatter.getLegacyFormatter(pattern, locale, legacyFormat)

override def parse(s: String): Int = {
try {
Expand Down Expand Up @@ -79,6 +81,28 @@ class Iso8601DateFormatter(
}
}

/**
* The formatter for dates which doesn't require users to specify a pattern. While formatting,
* it uses the default pattern [[DateFormatter.defaultPattern]]. In parsing, it follows the CAST
* logic in conversion of strings to Catalyst's DateType.
*
* @param locale The locale overrides the system locale and is used in formatting.
* @param legacyFormat Defines the formatter used for legacy dates.
* @param isParsing Whether the formatter is used for parsing (`true`) or for formatting (`false`).
*/
class DefaultDateFormatter(
locale: Locale,
legacyFormat: LegacyDateFormats.LegacyDateFormat,
isParsing: Boolean)
extends Iso8601DateFormatter(DateFormatter.defaultPattern, locale, legacyFormat, isParsing) {

override def parse(s: String): Int = {
try {
DateTimeUtils.stringToDateAnsi(UTF8String.fromString(s))
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}

trait LegacyDateFormatter extends DateFormatter {
def parseToDate(s: String): Date

Expand Down Expand Up @@ -151,11 +175,12 @@ object DateFormatter {
locale: Locale = defaultLocale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT,
isParsing: Boolean): DateFormatter = {
val pattern = format.getOrElse(defaultPattern)
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
getLegacyFormatter(pattern, locale, legacyFormat)
getLegacyFormatter(format.getOrElse(defaultPattern), locale, legacyFormat)
} else {
val df = new Iso8601DateFormatter(pattern, locale, legacyFormat, isParsing)
val df = format
.map(new Iso8601DateFormatter(_, locale, legacyFormat, isParsing))
.getOrElse(new DefaultDateFormatter(locale, legacyFormat, isParsing))
df.validatePatternString()
df
}
Expand Down
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

sealed trait TimestampFormatter extends Serializable {
/**
Expand Down Expand Up @@ -160,6 +161,31 @@ class Iso8601TimestampFormatter(
}
}

/**
* The formatter for timestamps which doesn't require users to specify a pattern. While formatting,
* it uses the default pattern [[TimestampFormatter.defaultPattern()]]. In parsing, it follows
* the CAST logic in conversion of strings to Catalyst's TimestampType.
*
* @param zoneId The time zone ID in which timestamps should be formatted or parsed.
* @param locale The locale overrides the system locale and is used in formatting.
* @param legacyFormat Defines the formatter used for legacy timestamps.
* @param isParsing Whether the formatter is used for parsing (`true`) or for formatting (`false`).
*/
class DefaultTimestampFormatter(
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT,
isParsing: Boolean)
extends Iso8601TimestampFormatter(
TimestampFormatter.defaultPattern(), zoneId, locale, legacyFormat, isParsing) {

override def parse(s: String): Long = {
try {
DateTimeUtils.stringToTimestampAnsi(UTF8String.fromString(s), zoneId)
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}

/**
* The formatter parses/formats timestamps according to the pattern `yyyy-MM-dd HH:mm:ss.[..fff..]`
* where `[..fff..]` is a fraction of second up to microsecond resolution. The formatter does not
Expand Down Expand Up @@ -341,12 +367,12 @@ object TimestampFormatter {
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT,
isParsing: Boolean,
forTimestampNTZ: Boolean = false): TimestampFormatter = {
val pattern = format.getOrElse(defaultPattern)
val formatter = if (SQLConf.get.legacyTimeParserPolicy == LEGACY && !forTimestampNTZ) {
getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
getLegacyFormatter(format.getOrElse(defaultPattern), zoneId, locale, legacyFormat)
} else {
new Iso8601TimestampFormatter(
pattern, zoneId, locale, legacyFormat, isParsing)
format
.map(new Iso8601TimestampFormatter(_, zoneId, locale, legacyFormat, isParsing))
.getOrElse(new DefaultTimestampFormatter(zoneId, locale, legacyFormat, isParsing))
}
formatter.validatePatternString(checkLegacy = !forTimestampNTZ)
formatter
Expand Down
Expand Up @@ -188,4 +188,26 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
// SparkUpgradeException here.
intercept[SparkUpgradeException](formatter.parse("02-29"))
}

test("SPARK-36418: default parsing w/o pattern") {
val formatter = new DefaultDateFormatter(
locale = DateFormatter.defaultLocale,
legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT,
isParsing = true)
Seq(
"-0042-3-4" -> LocalDate.of(-42, 3, 4),
"1000" -> LocalDate.of(1000, 1, 1),
"1582-10-4" -> LocalDate.of(1582, 10, 4),
"1583-1-1 " -> LocalDate.of(1583, 1, 1),
"1970-01-1 00:00" -> LocalDate.of(1970, 1, 1),
"2021-8-12T18:31:50" -> LocalDate.of(2021, 8, 12)
).foreach { case (inputStr, ld) =>
assert(formatter.parse(inputStr) === ld.toEpochDay)
}

val errMsg = intercept[DateTimeException] {
formatter.parse("x123")
}.getMessage
assert(errMsg.contains("Cannot cast x123 to DateType"))
}
}
Expand Up @@ -431,4 +431,29 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite {
val e2 = intercept[ArithmeticException](formatter.parse("-290308"))
assert(e2.getMessage === "long overflow")
}

test("SPARK-36418: default parsing w/o pattern") {
outstandingZoneIds.foreach { zoneId =>
val formatter = new DefaultTimestampFormatter(
zoneId,
locale = DateFormatter.defaultLocale,
legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT,
isParsing = true)
Seq(
"-0042-3-4" -> LocalDateTime.of(-42, 3, 4, 0, 0, 0),
"1000" -> LocalDateTime.of(1000, 1, 1, 0, 0, 0),
"1582-10-4" -> LocalDateTime.of(1582, 10, 4, 0, 0, 0),
"1583-1-1 " -> LocalDateTime.of(1583, 1, 1, 0, 0, 0),
"1970-01-1 01:02:3" -> LocalDateTime.of(1970, 1, 1, 1, 2, 3),
"2021-8-12T18:31:50" -> LocalDateTime.of(2021, 8, 12, 18, 31, 50)
).foreach { case (inputStr, ldt) =>
assert(formatter.parse(inputStr) === DateTimeTestUtils.localDateTimeToMicros(ldt, zoneId))
}

val errMsg = intercept[DateTimeException] {
formatter.parse("x123")
}.getMessage
assert(errMsg.contains("Cannot cast x123 to TimestampType"))
}
}
}
Expand Up @@ -1067,7 +1067,7 @@ abstract class ParquetPartitionDiscoverySuite

test("SPARK-23436: invalid Dates should be inferred as String in partition inference") {
withTempPath { path =>
val data = Seq(("1", "2018-01", "2018-01-01-04", "test"))
val data = Seq(("1", "2018-41", "2018-01-01-04", "test"))
.toDF("id", "date_month", "date_hour", "data")

data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath)
Expand Down

0 comments on commit f620996

Please sign in to comment.