Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source #39868

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3520,11 +3520,10 @@ object SQLConf {

val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES =
buildConf("spark.sql.inferTimestampNTZInDataSources.enabled")
.doc("When true, the TimestampNTZ type is the prior choice of the schema inference " +
"over built-in data sources. Otherwise, the inference result will be TimestampLTZ for " +
"backward compatibility. As a result, for JSON/CSV files and partition directories " +
"written with TimestampNTZ columns, the inference results will still be of TimestampLTZ " +
"types.")
.doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " +
"this config determines whether to choose the TimestampNTZ type if a column can be " +
"either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " +
"the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.SparkFiles
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the JDBC data source.
Expand Down Expand Up @@ -232,7 +233,11 @@ class JDBCOptions(
val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("")

// Infers timestamp values as TimestampNTZ type when reading data.
val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean
val inferTimestampNTZType =
parameters
.get(JDBC_INFER_TIMESTAMP_NTZ)
.map(_.toBoolean)
.getOrElse(SQLConf.get.inferTimestampNTZInDataSources)
}

class JdbcOptionsInWrite(
Expand Down
43 changes: 32 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime}
import java.util.{Calendar, GregorianCalendar, Properties, TimeZone}

import scala.collection.JavaConverters._
import scala.util.Random

import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
Expand Down Expand Up @@ -1935,13 +1936,26 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.option("url", urlWithUserAndPass)
.option("dbtable", tableName).save()

val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
val readDf = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, Seq(Row(null)))
Seq(true, false).foreach { inferTimestampNTZ =>
val tsType = if (inferTimestampNTZ) {
TimestampNTZType
} else {
TimestampType
}
val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load()
checkAnswer(res, Seq(Row(null)))
assert(res.schema.fields.head.dataType == tsType)
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
val res2 = readDf.load()
checkAnswer(res2, Seq(Row(null)))
assert(res2.schema.fields.head.dataType == tsType)
}
}

}

test("SPARK-39339: TimestampNTZType with different local time zones") {
Expand All @@ -1961,16 +1975,23 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.save()

DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this test case requires 17 seconds on my M1 Max MBP. It can be longer on the github action tests. I suggest using a random time zone to reduce the execution time to 4 seconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

DateTimeTestUtils.withDefaultTimeZone(zoneId) {
val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
val zoneId = DateTimeTestUtils.outstandingZoneIds(
Random.nextInt(DateTimeTestUtils.outstandingZoneIds.length))
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
// Infer TimestmapNTZ column with data source option
val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()
checkAnswer(res, df)

withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
val res2 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, df)
checkAnswer(res2, df)
}
}
}
Expand Down