From ecf0e3b852648f765e5c8cdd1301bbf632d7d089 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 2 Feb 2023 12:45:08 -0800 Subject: [PATCH] infer TimestampNTZ in JDBC --- .../apache/spark/sql/internal/SQLConf.scala | 9 ++-- .../datasources/jdbc/JDBCOptions.scala | 7 ++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 43 ++++++++++++++----- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1cc3b61b83454..363e763be4fcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3520,11 +3520,10 @@ object SQLConf { val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES = buildConf("spark.sql.inferTimestampNTZInDataSources.enabled") - .doc("When true, the TimestampNTZ type is the prior choice of the schema inference " + - "over built-in data sources. Otherwise, the inference result will be TimestampLTZ for " + - "backward compatibility. As a result, for JSON/CSV files and partition directories " + - "written with TimestampNTZ columns, the inference results will still be of TimestampLTZ " + - "types.") + .doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " + + "this config determines whether to choose the TimestampNTZ type if a column can be " + + "either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " + + "the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.") .version("3.4.0") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index e725de95335dc..888951cf9a80f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkFiles import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf /** * Options for the JDBC data source. @@ -232,7 +233,11 @@ class JDBCOptions( val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("") // Infers timestamp values as TimestampNTZ type when reading data. - val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean + val inferTimestampNTZType = + parameters + .get(JDBC_INFER_TIMESTAMP_NTZ) + .map(_.toBoolean) + .getOrElse(SQLConf.get.inferTimestampNTZInDataSources) } class JdbcOptionsInWrite( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 3e317dc95476b..3b3b1bfdb60eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime} import java.util.{Calendar, GregorianCalendar, Properties, TimeZone} import scala.collection.JavaConverters._ +import scala.util.Random import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ @@ -1935,13 +1936,26 @@ class JDBCSuite extends QueryTest with SharedSparkSession { .option("url", urlWithUserAndPass) .option("dbtable", tableName).save() - val res = spark.read.format("jdbc") - .option("inferTimestampNTZType", "true") + val readDf = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", tableName) - .load() - checkAnswer(res, Seq(Row(null))) + Seq(true, false).foreach { inferTimestampNTZ => + val tsType = if (inferTimestampNTZ) { + TimestampNTZType + } else { + TimestampType + } + val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load() + checkAnswer(res, Seq(Row(null))) + assert(res.schema.fields.head.dataType == tsType) + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) { + val res2 = readDf.load() + checkAnswer(res2, Seq(Row(null))) + assert(res2.schema.fields.head.dataType == tsType) + } + } + } test("SPARK-39339: TimestampNTZType with different local time zones") { @@ -1961,16 +1975,23 @@ class JDBCSuite extends QueryTest with SharedSparkSession { .option("url", urlWithUserAndPass) .option("dbtable", tableName) .save() - - DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => - DateTimeTestUtils.withDefaultTimeZone(zoneId) { - val res = spark.read.format("jdbc") - .option("inferTimestampNTZType", "true") + val zoneId = DateTimeTestUtils.outstandingZoneIds( + Random.nextInt(DateTimeTestUtils.outstandingZoneIds.length)) + DateTimeTestUtils.withDefaultTimeZone(zoneId) { + // Infer TimestmapNTZ column with data source option + val res = spark.read.format("jdbc") + .option("inferTimestampNTZType", "true") + .option("url", urlWithUserAndPass) + .option("dbtable", tableName) + .load() + checkAnswer(res, df) + + withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") { + val res2 = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .load() - - checkAnswer(res, df) + checkAnswer(res2, df) } } }