Skip to content

Commit

Permalink
[SPARK-45433][SQL] Fix CSV/JSON schema inference when timestamps do n…
Browse files Browse the repository at this point in the history
…ot match specified timestampFormat

### What changes were proposed in this pull request?
This PR fix CSV/JSON schema inference when timestamps do not match specified timestampFormat will report error.
```scala
//eg
val csv = spark.read.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
  .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS())
csv.show()
//error
Caused by: java.time.format.DateTimeParseException: Text '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 19
```
This bug only happend when partition had one row. The data type should be `StringType` not `TimestampType` because the value not match `timestampFormat`.

Use csv as eg, in `CSVInferSchema::tryParseTimestampNTZ`, first, use `timestampNTZFormatter.parseWithoutTimeZoneOptional` to inferring return `TimestampType`, if same partition had another row, it will use `tryParseTimestamp` to parse row with user defined `timestampFormat`, then found it can't be convert to timestamp with `timestampFormat`. Finally return `StringType`. But when only one row, we use `timestampNTZFormatter.parseWithoutTimeZoneOptional` to parse normally timestamp not right. We should only parse it when `spark.sql.timestampType` is `TIMESTAMP_NTZ`. If `spark.sql.timestampType` is `TIMESTAMP_LTZ`, we should directly parse it use `tryParseTimestamp`. To avoid return `TimestampType` when timestamps do not match specified timestampFormat.

### Why are the changes needed?
Fix schema inference bug.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43243 from Hisoka-X/SPARK-45433-inference-mismatch-timestamp-one-row.

Authored-by: Jia Fan <fanjiaeminem@qq.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit eae5c0e)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
Hisoka-X authored and MaxGekk committed Oct 11, 2023
1 parent 04e6b71 commit 7e3ddc1
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {
Expand Down Expand Up @@ -202,8 +202,11 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
// We can only parse the value as TimestampNTZType if it does not have zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
SQLConf.get.timestampType
val timestampType = SQLConf.get.timestampType
if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
timestampType == TimestampNTZType) &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
timestampType
} else {
tryParseTimestamp(field)
}
Expand Down
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -148,11 +148,13 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
val bigDecimal = decimalParser(field)
DecimalType(bigDecimal.precision, bigDecimal.scale)
}
val timestampType = SQLConf.get.timestampType
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
} else if (options.inferTimestamp &&
} else if (options.inferTimestamp && (SQLConf.get.legacyTimeParserPolicy ==
LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
SQLConf.get.timestampType
timestampType
} else if (options.inferTimestamp &&
timestampFormatter.parseOptional(field).isDefined) {
TimestampType
Expand Down
Expand Up @@ -263,4 +263,14 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
}

test("SPARK-45433: inferring the schema when timestamps do not match specified timestampFormat" +
" with only one row") {
val options = new CSVOptions(
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
columnPruning = false,
defaultTimeZoneId = "UTC")
val inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(NullType, "2884-06-24T02:45:51.138") == StringType)
}
}
Expand Up @@ -112,4 +112,12 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
checkType(Map("inferTimestamp" -> "true"), json, TimestampType)
checkType(Map("inferTimestamp" -> "false"), json, StringType)
}

test("SPARK-45433: inferring the schema when timestamps do not match specified timestampFormat" +
" with only one row") {
checkType(
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "inferTimestamp" -> "true"),
"""{"a": "2884-06-24T02:45:51.138"}""",
StringType)
}
}

0 comments on commit 7e3ddc1

Please sign in to comment.