Skip to content

Commit

Permalink
[SPARK-47368][SQL] Remove inferTimestampNTZ config check in ParquetRo…
Browse files Browse the repository at this point in the history
…wConverter

### What changes were proposed in this pull request?

The configuration `spark.sql.parquet.inferTimestampNTZ.enabled` is not related the parquet row converter.  This PR is the remove the config check `spark.sql.parquet.inferTimestampNTZ.enabled` in the ParquetRowConverter

### Why are the changes needed?

Bug fix.  Otherwise reading TimestampNTZ columns may fail when `spark.sql.parquet.inferTimestampNTZ.enabled` is disabled.
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New UT

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45480 from gengliangwang/ntzRowConverter.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
gengliangwang committed Mar 13, 2024
1 parent 5d32e62 commit 625589f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,8 @@ private[parquet] class ParquetRowConverter(
}

// Allow upcasting INT32 date to timestampNTZ.
case TimestampNTZType if schemaConverter.isTimestampNTZEnabled() &&
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 &&
parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] =>
case TimestampNTZType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 &&
parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
this.updater.set(DateTimeUtils.daysToMicros(dateRebaseFunc(value), ZoneOffset.UTC))
Expand Down Expand Up @@ -537,11 +536,10 @@ private[parquet] class ParquetRowConverter(
// can be read as Spark's TimestampNTZ type. This is to avoid mistakes in reading the timestamp
// values.
private def canReadAsTimestampNTZ(parquetType: Type): Boolean =
schemaConverter.isTimestampNTZEnabled() &&
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
!parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
!parquetType.getLogicalTypeAnnotation
.asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC

/**
* Parquet converter for strings. A dictionary is used to minimize string decoding cost.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ class ParquetToSparkSchemaConverter(
inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)

/**
* Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter.
*/
def isTimestampNTZEnabled(): Boolean = {
inferTimestampNTZ
}

/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,27 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

test("SPARK-36182: writing and reading TimestampNTZType column") {
withTable("ts") {
sql("create table ts (c1 timestamp_ntz) using parquet")
sql("insert into ts values (timestamp_ntz'2016-01-01 10:11:12.123456')")
sql("insert into ts values (null)")
sql("insert into ts values (timestamp_ntz'1965-01-01 10:11:12.123456')")
val expectedSchema = new StructType().add(StructField("c1", TimestampNTZType))
assert(spark.table("ts").schema == expectedSchema)
val expected = Seq(
("2016-01-01 10:11:12.123456"),
(null),
("1965-01-01 10:11:12.123456"))
.toDS().select($"value".cast("timestamp_ntz"))
withAllParquetReaders {
checkAnswer(sql("select * from ts"), expected)
test("SPARK-36182, SPARK-47368: writing and reading TimestampNTZType column") {
Seq("true", "false").foreach { inferNTZ =>
// The SQL Conf PARQUET_INFER_TIMESTAMP_NTZ_ENABLED should not affect the file written
// by Spark.
withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> inferNTZ) {
withTable("ts") {
sql("create table ts (c1 timestamp_ntz) using parquet")
sql("insert into ts values (timestamp_ntz'2016-01-01 10:11:12.123456')")
sql("insert into ts values (null)")
sql("insert into ts values (timestamp_ntz'1965-01-01 10:11:12.123456')")
val expectedSchema = new StructType().add(StructField("c1", TimestampNTZType))
assert(spark.table("ts").schema == expectedSchema)
val expected = Seq(
("2016-01-01 10:11:12.123456"),
(null),
("1965-01-01 10:11:12.123456"))
.toDS().select($"value".cast("timestamp_ntz"))
withAllParquetReaders {
checkAnswer(sql("select * from ts"), expected)
}
}
}
}
}
Expand Down

0 comments on commit 625589f

Please sign in to comment.