Skip to content

Commit

Permalink
[SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.ena…
Browse files Browse the repository at this point in the history
…bled on JDBC data source

### What changes were proposed in this pull request?

Simliar to #39777 and #39812, this PR proposes to use `spark.sql.inferTimestampNTZInDataSources.enabled` to control the behavior of timestamp type inference on JDBC data sources.

### Why are the changes needed?

Unify the TimestampNTZ type inference behavior over data sources. In JDBC/JSON/CSV data sources, a column can be Timestamp type or TimestampNTZ type. We need a lightweight configuration to control the behavior.
### Does this PR introduce _any_ user-facing change?

No, TimestampNTZ is not released yet.

### How was this patch tested?

UTs

Closes #39868 from gengliangwang/jdbcNTZ.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
gengliangwang authored and dongjoon-hyun committed Feb 3, 2023
1 parent 4ebfc0e commit 4760a8b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3520,11 +3520,10 @@ object SQLConf {

val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES =
buildConf("spark.sql.inferTimestampNTZInDataSources.enabled")
.doc("When true, the TimestampNTZ type is the prior choice of the schema inference " +
"over built-in data sources. Otherwise, the inference result will be TimestampLTZ for " +
"backward compatibility. As a result, for JSON/CSV files and partition directories " +
"written with TimestampNTZ columns, the inference results will still be of TimestampLTZ " +
"types.")
.doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " +
"this config determines whether to choose the TimestampNTZ type if a column can be " +
"either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " +
"the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.SparkFiles
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the JDBC data source.
Expand Down Expand Up @@ -232,7 +233,11 @@ class JDBCOptions(
val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("")

// Infers timestamp values as TimestampNTZ type when reading data.
val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean
val inferTimestampNTZType =
parameters
.get(JDBC_INFER_TIMESTAMP_NTZ)
.map(_.toBoolean)
.getOrElse(SQLConf.get.inferTimestampNTZInDataSources)
}

class JdbcOptionsInWrite(
Expand Down
43 changes: 32 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime}
import java.util.{Calendar, GregorianCalendar, Properties, TimeZone}

import scala.collection.JavaConverters._
import scala.util.Random

import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
Expand Down Expand Up @@ -1935,13 +1936,26 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.option("url", urlWithUserAndPass)
.option("dbtable", tableName).save()

val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
val readDf = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, Seq(Row(null)))
Seq(true, false).foreach { inferTimestampNTZ =>
val tsType = if (inferTimestampNTZ) {
TimestampNTZType
} else {
TimestampType
}
val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load()
checkAnswer(res, Seq(Row(null)))
assert(res.schema.fields.head.dataType == tsType)
withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
val res2 = readDf.load()
checkAnswer(res2, Seq(Row(null)))
assert(res2.schema.fields.head.dataType == tsType)
}
}

}

test("SPARK-39339: TimestampNTZType with different local time zones") {
Expand All @@ -1961,16 +1975,23 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.save()

DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
val zoneId = DateTimeTestUtils.outstandingZoneIds(
Random.nextInt(DateTimeTestUtils.outstandingZoneIds.length))
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
// Infer TimestmapNTZ column with data source option
val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()
checkAnswer(res, df)

withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
val res2 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()

checkAnswer(res, df)
checkAnswer(res2, df)
}
}
}
Expand Down

0 comments on commit 4760a8b

Please sign in to comment.