From e4e40762ca41931646b8f201028b1f2298252d96 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 20 Jan 2024 20:57:09 +0800 Subject: [PATCH] [SPARK-46769][SQL] Refine timestamp related schema inference ### What changes were proposed in this pull request? This is a refinement of https://github.com/apache/spark/pull/43243 . This PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and only infer LTZ type using LTZ parser. This consistency is important to avoid nondeterministic behaviors. ### Why are the changes needed? Avoid non-deterministic behaviors. After https://github.com/apache/spark/pull/43243 , we can still have inconsistency if the LEGACY mode is enabled. ### Does this PR introduce _any_ user-facing change? Yes for the legacy parser. Now it's more likely to infer string type instead of inferring timestamp type "by luck" ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes https://github.com/apache/spark/pull/44789 Closes #44800 from cloud-fan/infer. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../sql/catalyst/csv/CSVInferSchema.scala | 18 ++++---- .../sql/catalyst/json/JsonInferSchema.scala | 31 ++++++++++---- .../execution/datasources/csv/CSVSuite.scala | 42 +++++++++---------- 3 files changed, 54 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index ec01b56f9eb7c..2c27da3cf6e15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set( "yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", "yyyy") + private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType + /** * Similar to the JSON schema inference * 1. Infer type of each row @@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } private def tryParseTimestampNTZ(field: String): DataType = { - // We can only parse the value as TimestampNTZType if it does not have zone-offset or - // time-zone component and can be parsed with the timestamp formatter. - // Otherwise, it is likely to be a timestamp with timezone. - val timestampType = SQLConf.get.timestampType - if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || - timestampType == TimestampNTZType) && - timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - timestampType + // For text-based format, it's ambiguous to infer a timestamp string without timezone, as it can + // be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support of NTZ, here + // we only try to infer NTZ if the config is set to use NTZ by default. + if (isDefaultNTZ && + timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { + TimestampNTZType } else { tryParseTimestamp(field) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index cfc9e5520e53b..bc7038fc71d48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -57,6 +58,8 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging { private val ignoreCorruptFiles = options.ignoreCorruptFiles private val ignoreMissingFiles = options.ignoreMissingFiles + private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType + private val legacyMode = SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY private def handleJsonErrorsByParseMode(parseMode: ParseMode, columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = { @@ -159,16 +162,30 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging { val bigDecimal = decimalParser(field) DecimalType(bigDecimal.precision, bigDecimal.scale) } - val timestampType = SQLConf.get.timestampType if (options.prefersDecimal && decimalTry.isDefined) { decimalTry.get - } else if (options.inferTimestamp && (SQLConf.get.legacyTimeParserPolicy == - LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) && + } else if (options.inferTimestamp) { + // For text-based format, it's ambiguous to infer a timestamp string without timezone, as + // it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support + // of NTZ, here we only try to infer NTZ if the config is set to use NTZ by default. + if (isDefaultNTZ && timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - timestampType - } else if (options.inferTimestamp && - timestampFormatter.parseOptional(field).isDefined) { - TimestampType + TimestampNTZType + } else if (timestampFormatter.parseOptional(field).isDefined) { + TimestampType + } else if (legacyMode) { + val utf8Value = UTF8String.fromString(field) + // There was a mistake that we use TIMESTAMP NTZ parser to infer LTZ type with legacy + // mode. The mistake makes it easier to infer TIMESTAMP LTZ type and we have to keep + // this behavior now. See SPARK-46769 for more details. + if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, false).isDefined) { + TimestampType + } else { + StringType + } + } else { + StringType + } } else { StringType } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 1ded117318624..e1e39ac1590f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1105,10 +1105,12 @@ abstract class CSVSuite test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") { withTempPath { path => - val exp = spark.sql(""" - select timestamp_ntz'2020-12-12 12:12:12' as col0 union all - select timestamp_ntz'2020-12-12 12:12:12' as col0 - """) + val exp = spark.sql( + """ + |select * + |from values (timestamp_ntz'2020-12-12 12:12:12'), (timestamp_ntz'2020-12-12 12:12:12') + |as t(col0) + |""".stripMargin) exp.write.format("csv").option("header", "true").save(path.getAbsolutePath) @@ -1126,6 +1128,15 @@ abstract class CSVSuite if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { checkAnswer(res, exp) + } else if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + // When legacy parser is enabled, we can't parse the NTZ string to LTZ, and eventually + // infer string type. + val expected = spark.read + .format("csv") + .option("inferSchema", "false") + .option("header", "true") + .load(path.getAbsolutePath) + checkAnswer(res, expected) } else { checkAnswer( res, @@ -2874,13 +2885,12 @@ abstract class CSVSuite test("SPARK-40474: Infer schema for columns with a mix of dates and timestamp") { withTempPath { path => - Seq( - "1765-03-28", + val input = Seq( "1423-11-12T23:41:00", + "1765-03-28", "2016-01-28T20:00:00" - ).toDF() - .repartition(1) - .write.text(path.getAbsolutePath) + ).toDF().repartition(1) + input.write.text(path.getAbsolutePath) if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { val options = Map( @@ -2891,12 +2901,7 @@ abstract class CSVSuite .format("csv") .options(options) .load(path.getAbsolutePath) - val expected = Seq( - Row(Timestamp.valueOf("1765-03-28 00:00:00.0")), - Row(Timestamp.valueOf("1423-11-12 23:41:00.0")), - Row(Timestamp.valueOf("2016-01-28 20:00:00.0")) - ) - checkAnswer(df, expected) + checkAnswer(df, input) } else { // When timestampFormat is specified, infer and parse the column as strings val options1 = Map( @@ -2907,12 +2912,7 @@ abstract class CSVSuite .format("csv") .options(options1) .load(path.getAbsolutePath) - val expected1 = Seq( - Row("1765-03-28"), - Row("1423-11-12T23:41:00"), - Row("2016-01-28T20:00:00") - ) - checkAnswer(df1, expected1) + checkAnswer(df1, input) // When timestampFormat is not specified, infer and parse the column as // timestamp type if possible