Skip to content

Commit

Permalink
[SPARK-46769][SQL] Refine timestamp related schema inference
Browse files Browse the repository at this point in the history
This is a refinement of #43243 . This PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and only infer LTZ type using LTZ parser. This consistency is important to avoid nondeterministic behaviors.

Avoid non-deterministic behaviors. After #43243 , we can still have inconsistency if the LEGACY mode is enabled.

Yes for the legacy parser. Now it's more likely to infer string type instead of inferring timestamp type "by luck"

existing tests

no

Closes #44789

Closes #44800 from cloud-fan/infer.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e4e4076)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Jan 20, 2024
1 parent fa6bf22 commit c19bf01
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {
Expand Down Expand Up @@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
"yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", "yyyy")

private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType

/**
* Similar to the JSON schema inference
* 1. Infer type of each row
Expand Down Expand Up @@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
}

private def tryParseTimestampNTZ(field: String): DataType = {
// We can only parse the value as TimestampNTZType if it does not have zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
val timestampType = SQLConf.get.timestampType
if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
timestampType == TimestampNTZType) &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
timestampType
// For text-based format, it's ambiguous to infer a timestamp string without timezone, as it can
// be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support of NTZ, here
// we only try to infer NTZ if the config is set to use NTZ by default.
if (isDefaultNTZ &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
TimestampNTZType
} else {
tryParseTimestamp(field)
}
Expand Down
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
Expand All @@ -53,6 +54,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
isParsing = true,
forTimestampNTZ = true)

private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
private val legacyMode = SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY

private def handleJsonErrorsByParseMode(parseMode: ParseMode,
columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
parseMode match {
Expand Down Expand Up @@ -148,16 +152,30 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
val timestampType = SQLConf.get.timestampType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
} else if (options.inferTimestamp && (SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
} else if (options.inferTimestamp) {
// For text-based format, it's ambiguous to infer a timestamp string without timezone, as
// it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support
// of NTZ, here we only try to infer NTZ if the config is set to use NTZ by default.
if (isDefaultNTZ &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
timestampType
} else if (options.inferTimestamp &&
timestampFormatter.parseOptional(field).isDefined) {
TimestampType
TimestampNTZType
} else if (timestampFormatter.parseOptional(field).isDefined) {
TimestampType
} else if (legacyMode) {
val utf8Value = UTF8String.fromString(field)
// There was a mistake that we use TIMESTAMP NTZ parser to infer LTZ type with legacy
// mode. The mistake makes it easier to infer TIMESTAMP LTZ type and we have to keep
// this behavior now. See SPARK-46769 for more details.
if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, false).isDefined) {
TimestampType
} else {
StringType
}
} else {
StringType
}
} else {
StringType
}
Expand Down
Expand Up @@ -1105,10 +1105,12 @@ abstract class CSVSuite

test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") {
withTempPath { path =>
val exp = spark.sql("""
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
select timestamp_ntz'2020-12-12 12:12:12' as col0
""")
val exp = spark.sql(
"""
|select *
|from values (timestamp_ntz'2020-12-12 12:12:12'), (timestamp_ntz'2020-12-12 12:12:12')
|as t(col0)
|""".stripMargin)

exp.write.format("csv").option("header", "true").save(path.getAbsolutePath)

Expand All @@ -1126,6 +1128,15 @@ abstract class CSVSuite

if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
} else if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
// When legacy parser is enabled, we can't parse the NTZ string to LTZ, and eventually
// infer string type.
val expected = spark.read
.format("csv")
.option("inferSchema", "false")
.option("header", "true")
.load(path.getAbsolutePath)
checkAnswer(res, expected)
} else {
checkAnswer(
res,
Expand Down Expand Up @@ -2862,13 +2873,12 @@ abstract class CSVSuite

test("SPARK-40474: Infer schema for columns with a mix of dates and timestamp") {
withTempPath { path =>
Seq(
"1765-03-28",
val input = Seq(
"1423-11-12T23:41:00",
"1765-03-28",
"2016-01-28T20:00:00"
).toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
).toDF().repartition(1)
input.write.text(path.getAbsolutePath)

if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
val options = Map(
Expand All @@ -2879,12 +2889,7 @@ abstract class CSVSuite
.format("csv")
.options(options)
.load(path.getAbsolutePath)
val expected = Seq(
Row(Timestamp.valueOf("1765-03-28 00:00:00.0")),
Row(Timestamp.valueOf("1423-11-12 23:41:00.0")),
Row(Timestamp.valueOf("2016-01-28 20:00:00.0"))
)
checkAnswer(df, expected)
checkAnswer(df, input)
} else {
// When timestampFormat is specified, infer and parse the column as strings
val options1 = Map(
Expand All @@ -2895,12 +2900,7 @@ abstract class CSVSuite
.format("csv")
.options(options1)
.load(path.getAbsolutePath)
val expected1 = Seq(
Row("1765-03-28"),
Row("1423-11-12T23:41:00"),
Row("2016-01-28T20:00:00")
)
checkAnswer(df1, expected1)
checkAnswer(df1, input)

// When timestampFormat is not specified, infer and parse the column as
// timestamp type if possible
Expand Down

0 comments on commit c19bf01

Please sign in to comment.