diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 36c13e72993b3..1f4522aef2bb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -454,9 +454,8 @@ private[parquet] class ParquetRowConverter( } // Allow upcasting INT32 date to timestampNTZ. - case TimestampNTZType if schemaConverter.isTimestampNTZEnabled() && - parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 && - parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] => + case TimestampNTZType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 && + parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] => new ParquetPrimitiveConverter(updater) { override def addInt(value: Int): Unit = { this.updater.set(DateTimeUtils.daysToMicros(dateRebaseFunc(value), ZoneOffset.UTC)) @@ -537,11 +536,10 @@ private[parquet] class ParquetRowConverter( // can be read as Spark's TimestampNTZ type. This is to avoid mistakes in reading the timestamp // values. private def canReadAsTimestampNTZ(parquetType: Type): Boolean = - schemaConverter.isTimestampNTZEnabled() && - parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 && - parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] && - !parquetType.getLogicalTypeAnnotation - .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC + parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 && + parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] && + !parquetType.getLogicalTypeAnnotation + .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC /** * Parquet converter for strings. A dictionary is used to minimize string decoding cost. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 963b1520b3c0c..5a1da83912b4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -72,13 +72,6 @@ class ParquetToSparkSchemaConverter( inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean, nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean) - /** - * Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter. - */ - def isTimestampNTZEnabled(): Boolean = { - inferTimestampNTZ - } - /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index d8fad664e74c5..02937ae0fea89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -160,21 +160,27 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } - test("SPARK-36182: writing and reading TimestampNTZType column") { - withTable("ts") { - sql("create table ts (c1 timestamp_ntz) using parquet") - sql("insert into ts values (timestamp_ntz'2016-01-01 10:11:12.123456')") - sql("insert into ts values (null)") - sql("insert into ts values (timestamp_ntz'1965-01-01 10:11:12.123456')") - val expectedSchema = new StructType().add(StructField("c1", TimestampNTZType)) - assert(spark.table("ts").schema == expectedSchema) - val expected = Seq( - ("2016-01-01 10:11:12.123456"), - (null), - ("1965-01-01 10:11:12.123456")) - .toDS().select($"value".cast("timestamp_ntz")) - withAllParquetReaders { - checkAnswer(sql("select * from ts"), expected) + test("SPARK-36182, SPARK-47368: writing and reading TimestampNTZType column") { + Seq("true", "false").foreach { inferNTZ => + // The SQL Conf PARQUET_INFER_TIMESTAMP_NTZ_ENABLED should not affect the file written + // by Spark. + withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> inferNTZ) { + withTable("ts") { + sql("create table ts (c1 timestamp_ntz) using parquet") + sql("insert into ts values (timestamp_ntz'2016-01-01 10:11:12.123456')") + sql("insert into ts values (null)") + sql("insert into ts values (timestamp_ntz'1965-01-01 10:11:12.123456')") + val expectedSchema = new StructType().add(StructField("c1", TimestampNTZType)) + assert(spark.table("ts").schema == expectedSchema) + val expected = Seq( + ("2016-01-01 10:11:12.123456"), + (null), + ("1965-01-01 10:11:12.123456")) + .toDS().select($"value".cast("timestamp_ntz")) + withAllParquetReaders { + checkAnswer(sql("select * from ts"), expected) + } + } } } }