Skip to content

Commit

Permalink
[SPARK-46769][SQL] Refine timestamp related schema inference
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is a refinement of #43243 . This PR enforces one thing: we only infer TIMESTAMP NTZ type using NTZ parser, and only infer LTZ type using LTZ parser. This consistency is important to avoid nondeterministic behaviors.

### Why are the changes needed?

Avoid non-deterministic behaviors. After #43243 , we can still have inconsistency if the LEGACY mode is enabled.

### Does this PR introduce _any_ user-facing change?

Yes for the legacy parser. Now it's more likely to infer string type instead of inferring timestamp type "by luck"

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44789

Closes #44800 from cloud-fan/infer.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Jan 20, 2024
1 parent b7356b6 commit e4e4076
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 37 deletions.
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {
Expand Down Expand Up @@ -66,6 +66,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
"yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", "yyyy")

private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType

/**
* Similar to the JSON schema inference
* 1. Infer type of each row
Expand Down Expand Up @@ -199,14 +201,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
}

private def tryParseTimestampNTZ(field: String): DataType = {
// We can only parse the value as TimestampNTZType if it does not have zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
val timestampType = SQLConf.get.timestampType
if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
timestampType == TimestampNTZType) &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
timestampType
// For text-based format, it's ambiguous to infer a timestamp string without timezone, as it can
// be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support of NTZ, here
// we only try to infer NTZ if the config is set to use NTZ by default.
if (isDefaultNTZ &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
TimestampNTZType
} else {
tryParseTimestamp(field)
}
Expand Down
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

Expand All @@ -57,6 +58,8 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging {

private val ignoreCorruptFiles = options.ignoreCorruptFiles
private val ignoreMissingFiles = options.ignoreMissingFiles
private val isDefaultNTZ = SQLConf.get.timestampType == TimestampNTZType
private val legacyMode = SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY

private def handleJsonErrorsByParseMode(parseMode: ParseMode,
columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
Expand Down Expand Up @@ -159,16 +162,30 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging {
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
val timestampType = SQLConf.get.timestampType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
} else if (options.inferTimestamp && (SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
} else if (options.inferTimestamp) {
// For text-based format, it's ambiguous to infer a timestamp string without timezone, as
// it can be both TIMESTAMP LTZ and NTZ. To avoid behavior changes with the new support
// of NTZ, here we only try to infer NTZ if the config is set to use NTZ by default.
if (isDefaultNTZ &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
timestampType
} else if (options.inferTimestamp &&
timestampFormatter.parseOptional(field).isDefined) {
TimestampType
TimestampNTZType
} else if (timestampFormatter.parseOptional(field).isDefined) {
TimestampType
} else if (legacyMode) {
val utf8Value = UTF8String.fromString(field)
// There was a mistake that we use TIMESTAMP NTZ parser to infer LTZ type with legacy
// mode. The mistake makes it easier to infer TIMESTAMP LTZ type and we have to keep
// this behavior now. See SPARK-46769 for more details.
if (SparkDateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, false).isDefined) {
TimestampType
} else {
StringType
}
} else {
StringType
}
} else {
StringType
}
Expand Down
Expand Up @@ -1105,10 +1105,12 @@ abstract class CSVSuite

test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") {
withTempPath { path =>
val exp = spark.sql("""
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
select timestamp_ntz'2020-12-12 12:12:12' as col0
""")
val exp = spark.sql(
"""
|select *
|from values (timestamp_ntz'2020-12-12 12:12:12'), (timestamp_ntz'2020-12-12 12:12:12')
|as t(col0)
|""".stripMargin)

exp.write.format("csv").option("header", "true").save(path.getAbsolutePath)

Expand All @@ -1126,6 +1128,15 @@ abstract class CSVSuite

if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
} else if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
// When legacy parser is enabled, we can't parse the NTZ string to LTZ, and eventually
// infer string type.
val expected = spark.read
.format("csv")
.option("inferSchema", "false")
.option("header", "true")
.load(path.getAbsolutePath)
checkAnswer(res, expected)
} else {
checkAnswer(
res,
Expand Down Expand Up @@ -2874,13 +2885,12 @@ abstract class CSVSuite

test("SPARK-40474: Infer schema for columns with a mix of dates and timestamp") {
withTempPath { path =>
Seq(
"1765-03-28",
val input = Seq(
"1423-11-12T23:41:00",
"1765-03-28",
"2016-01-28T20:00:00"
).toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
).toDF().repartition(1)
input.write.text(path.getAbsolutePath)

if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
val options = Map(
Expand All @@ -2891,12 +2901,7 @@ abstract class CSVSuite
.format("csv")
.options(options)
.load(path.getAbsolutePath)
val expected = Seq(
Row(Timestamp.valueOf("1765-03-28 00:00:00.0")),
Row(Timestamp.valueOf("1423-11-12 23:41:00.0")),
Row(Timestamp.valueOf("2016-01-28 20:00:00.0"))
)
checkAnswer(df, expected)
checkAnswer(df, input)
} else {
// When timestampFormat is specified, infer and parse the column as strings
val options1 = Map(
Expand All @@ -2907,12 +2912,7 @@ abstract class CSVSuite
.format("csv")
.options(options1)
.load(path.getAbsolutePath)
val expected1 = Seq(
Row("1765-03-28"),
Row("1423-11-12T23:41:00"),
Row("2016-01-28T20:00:00")
)
checkAnswer(df1, expected1)
checkAnswer(df1, input)

// When timestampFormat is not specified, infer and parse the column as
// timestamp type if possible
Expand Down

0 comments on commit e4e4076

Please sign in to comment.