Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.ml.source.image

import java.net.URI
import java.nio.file.Paths
import java.sql.Date

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.image.ImageSchema._
Expand Down Expand Up @@ -95,14 +96,14 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {
.collect()

assert(Set(result: _*) === Set(
Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the changes here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the changes, the partition value 2018-01 became a valid date value. New partition formatter can parse it as 2018-01-01, see patterns that supported by the CAST expression:


As a consequence, Spark infers DateType as the type of partition values. And finally, it converts all strings to the type.

Row("54893.jpg", "kittens", "2018-02"),
Row("DP153539.jpg", "kittens", "2018-02"),
Row("DP802813.jpg", "kittens", "2018-02"),
Row("BGRA.png", "multichannel", "2018-01"),
Row("BGRA_alpha_60.png", "multichannel", "2018-01"),
Row("chr30.4.184.jpg", "multichannel", "2018-02"),
Row("grayscale.jpg", "multichannel", "2018-02")
Row("29.5.a_b_EGDP022204.jpg", "kittens", Date.valueOf("2018-01-01")),
Row("54893.jpg", "kittens", Date.valueOf("2018-02-01")),
Row("DP153539.jpg", "kittens", Date.valueOf("2018-02-01")),
Row("DP802813.jpg", "kittens", Date.valueOf("2018-02-01")),
Row("BGRA.png", "multichannel", Date.valueOf("2018-01-01")),
Row("BGRA_alpha_60.png", "multichannel", Date.valueOf("2018-01-01")),
Row("chr30.4.184.jpg", "multichannel", Date.valueOf("2018-02-01")),
Row("grayscale.jpg", "multichannel", Date.valueOf("2018-02-01"))
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.unsafe.types.UTF8String

sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch
Expand All @@ -48,7 +49,8 @@ class Iso8601DateFormatter(
private lazy val formatter = getOrCreateFormatter(pattern, locale, isParsing)

@transient
private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(pattern, locale, legacyFormat)
protected lazy val legacyFormatter =
DateFormatter.getLegacyFormatter(pattern, locale, legacyFormat)

override def parse(s: String): Int = {
try {
Expand Down Expand Up @@ -79,6 +81,28 @@ class Iso8601DateFormatter(
}
}

/**
* The formatter for dates which doesn't require users to specify a pattern. While formatting,
* it uses the default pattern [[DateFormatter.defaultPattern]]. In parsing, it follows the CAST
* logic in conversion of strings to Catalyst's DateType.
*
* @param locale The locale overrides the system locale and is used in formatting.
* @param legacyFormat Defines the formatter used for legacy dates.
* @param isParsing Whether the formatter is used for parsing (`true`) or for formatting (`false`).
*/
class DefaultDateFormatter(
locale: Locale,
legacyFormat: LegacyDateFormats.LegacyDateFormat,
isParsing: Boolean)
extends Iso8601DateFormatter(DateFormatter.defaultPattern, locale, legacyFormat, isParsing) {

override def parse(s: String): Int = {
try {
DateTimeUtils.stringToDateAnsi(UTF8String.fromString(s))
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}

trait LegacyDateFormatter extends DateFormatter {
def parseToDate(s: String): Date

Expand Down Expand Up @@ -151,11 +175,12 @@ object DateFormatter {
locale: Locale = defaultLocale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT,
isParsing: Boolean): DateFormatter = {
val pattern = format.getOrElse(defaultPattern)
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
getLegacyFormatter(pattern, locale, legacyFormat)
getLegacyFormatter(format.getOrElse(defaultPattern), locale, legacyFormat)
} else {
val df = new Iso8601DateFormatter(pattern, locale, legacyFormat, isParsing)
val df = format
.map(new Iso8601DateFormatter(_, locale, legacyFormat, isParsing))
.getOrElse(new DefaultDateFormatter(locale, legacyFormat, isParsing))
df.validatePatternString()
df
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

sealed trait TimestampFormatter extends Serializable {
/**
Expand Down Expand Up @@ -160,6 +161,31 @@ class Iso8601TimestampFormatter(
}
}

/**
* The formatter for timestamps which doesn't require users to specify a pattern. While formatting,
* it uses the default pattern [[TimestampFormatter.defaultPattern()]]. In parsing, it follows
* the CAST logic in conversion of strings to Catalyst's TimestampType.
*
* @param zoneId The time zone ID in which timestamps should be formatted or parsed.
* @param locale The locale overrides the system locale and is used in formatting.
* @param legacyFormat Defines the formatter used for legacy timestamps.
* @param isParsing Whether the formatter is used for parsing (`true`) or for formatting (`false`).
*/
class DefaultTimestampFormatter(
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT,
isParsing: Boolean)
extends Iso8601TimestampFormatter(
TimestampFormatter.defaultPattern(), zoneId, locale, legacyFormat, isParsing) {

override def parse(s: String): Long = {
try {
DateTimeUtils.stringToTimestampAnsi(UTF8String.fromString(s), zoneId)
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}

/**
* The formatter parses/formats timestamps according to the pattern `yyyy-MM-dd HH:mm:ss.[..fff..]`
* where `[..fff..]` is a fraction of second up to microsecond resolution. The formatter does not
Expand Down Expand Up @@ -341,12 +367,12 @@ object TimestampFormatter {
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT,
isParsing: Boolean,
forTimestampNTZ: Boolean = false): TimestampFormatter = {
val pattern = format.getOrElse(defaultPattern)
val formatter = if (SQLConf.get.legacyTimeParserPolicy == LEGACY && !forTimestampNTZ) {
getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
getLegacyFormatter(format.getOrElse(defaultPattern), zoneId, locale, legacyFormat)
} else {
new Iso8601TimestampFormatter(
pattern, zoneId, locale, legacyFormat, isParsing)
format
.map(new Iso8601TimestampFormatter(_, zoneId, locale, legacyFormat, isParsing))
.getOrElse(new DefaultTimestampFormatter(zoneId, locale, legacyFormat, isParsing))
}
formatter.validatePatternString(checkLegacy = !forTimestampNTZ)
formatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,26 @@ class DateFormatterSuite extends DatetimeFormatterSuite {
// SparkUpgradeException here.
intercept[SparkUpgradeException](formatter.parse("02-29"))
}

test("SPARK-36418: default parsing w/o pattern") {
val formatter = new DefaultDateFormatter(
locale = DateFormatter.defaultLocale,
legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT,
isParsing = true)
Seq(
"-0042-3-4" -> LocalDate.of(-42, 3, 4),
"1000" -> LocalDate.of(1000, 1, 1),
"1582-10-4" -> LocalDate.of(1582, 10, 4),
"1583-1-1 " -> LocalDate.of(1583, 1, 1),
"1970-01-1 00:00" -> LocalDate.of(1970, 1, 1),
"2021-8-12T18:31:50" -> LocalDate.of(2021, 8, 12)
).foreach { case (inputStr, ld) =>
assert(formatter.parse(inputStr) === ld.toEpochDay)
}

val errMsg = intercept[DateTimeException] {
formatter.parse("x123")
}.getMessage
assert(errMsg.contains("Cannot cast x123 to DateType"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,4 +431,29 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite {
val e2 = intercept[ArithmeticException](formatter.parse("-290308"))
assert(e2.getMessage === "long overflow")
}

test("SPARK-36418: default parsing w/o pattern") {
outstandingZoneIds.foreach { zoneId =>
val formatter = new DefaultTimestampFormatter(
zoneId,
locale = DateFormatter.defaultLocale,
legacyFormat = LegacyDateFormats.SIMPLE_DATE_FORMAT,
isParsing = true)
Seq(
"-0042-3-4" -> LocalDateTime.of(-42, 3, 4, 0, 0, 0),
"1000" -> LocalDateTime.of(1000, 1, 1, 0, 0, 0),
"1582-10-4" -> LocalDateTime.of(1582, 10, 4, 0, 0, 0),
"1583-1-1 " -> LocalDateTime.of(1583, 1, 1, 0, 0, 0),
"1970-01-1 01:02:3" -> LocalDateTime.of(1970, 1, 1, 1, 2, 3),
"2021-8-12T18:31:50" -> LocalDateTime.of(2021, 8, 12, 18, 31, 50)
).foreach { case (inputStr, ldt) =>
assert(formatter.parse(inputStr) === DateTimeTestUtils.localDateTimeToMicros(ldt, zoneId))
}

val errMsg = intercept[DateTimeException] {
formatter.parse("x123")
}.getMessage
assert(errMsg.contains("Cannot cast x123 to TimestampType"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ abstract class ParquetPartitionDiscoverySuite

test("SPARK-23436: invalid Dates should be inferred as String in partition inference") {
withTempPath { path =>
val data = Seq(("1", "2018-01", "2018-01-01-04", "test"))
val data = Seq(("1", "2018-41", "2018-01-01-04", "test"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk
Hello,
It's late, but I'd like to ask the 'my' test result related to this test code.

In spark 3.3.0, I tested what is the read type of partition columns.
When the date_month column value was '2018-01-01', I was able to get the DateType type, but I couldn't obtain it when the value was '2018-01' like below.

val data = Seq(("1", "2018-01", "2018-01-01-04", "test"))
  .toDF("id", "date_month", "date_hour", "data")
val path = "some_path"
data.write.partitionBy("date_month", "date_hour").parquet(path)
val input = spark.read.parquet(path).select("id", "date_month", "date_hour", "data")
println(input.schema)
println(data.schema)
StructType(StructField(id,StringType,true), StructField(date_month,StringType,true), StructField(date_hour,StringType,true), StructField(data,StringType,true))
StructType(StructField(id,StringType,true), StructField(date_month,StringType,true), StructField(date_hour,StringType,true), StructField(data,StringType,true))

So, I would like to ask what it means to change the date_month value from '2018-01' to '2018-41' in this test code.
Is there anything I might be misunderstanding or any mistakes in my testing?

Thanks.🙏

Copy link
Contributor

@wayne-kyungwonpark wayne-kyungwonpark Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally,
I tested this code using sbt by replacing the value of the date_month column as shown below.
(2018-04 -> 2018-01)

test(
  "SPARK-23436, SPARK-36861: invalid Dates should be inferred as String in partition inference") {
  withTempPath { path =>
    val data = Seq(("1", "2018-01", "2018-01-01-04", "2021-01-01T00", "test"))
      .toDF("id", "date_month", "date_hour", "date_t_hour", "data")

    data.write.partitionBy("date_month", "date_hour", "date_t_hour").parquet(path.getAbsolutePath)
    val input = spark.read.parquet(path.getAbsolutePath).select("id",
      "date_month", "date_hour", "date_t_hour", "data")

    assert(data.schema.sameType(input.schema))
    checkAnswer(input, data)
  }
}
sbt:spark-sql> testOnly *ParquetV1PartitionDiscoverySuite -- -z "SPARK-23436"
sbt:spark-sql> testOnly *ParquetV2PartitionDiscoverySuite -- -z "SPARK-23436"

=> All success..!

However, after changing the value to 2018-01-01 with the DateType format 'yyyy-MM-dd', all sbt tests failed as expected.

.toDF("id", "date_month", "date_hour", "data")

data.write.partitionBy("date_month", "date_hour").parquet(path.getAbsolutePath)
Expand Down