Skip to content

Commit

Permalink
[SPARK-20557][SQL] Only support TIMESTAMP WITH TIME ZONE for Oracle D…
Browse files Browse the repository at this point in the history
…ialect

## What changes were proposed in this pull request?
In the previous PRs, apache#17832 and apache#17835 , we convert `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` to `TIMESTAMP` for all the JDBC sources. However, this conversion could be risky since it does not respect our SQL configuration `spark.sql.session.timeZone`.

In addition, each vendor might have different semantics for these two types. For example, Postgres simply returns `TIMESTAMP` types for `TIMESTAMP WITH TIME ZONE`. For such supports, we should do it case by case. This PR reverts the general support of `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` for JDBC sources, except ORACLE Dialect.

When supporting the ORACLE's `TIMESTAMP WITH TIME ZONE`, we only support it when the JVM default timezone is the same as the user-specified configuration `spark.sql.session.timeZone` (whose default is the JVM default timezone). Now, we still treat `TIMESTAMP WITH TIME ZONE` as `TIMESTAMP` when fetching the values via the Oracle JDBC connector, whose client converts the timestamp values with time zone to the timestamp values using the local JVM default timezone (a test case is added to `OracleIntegrationSuite.scala` in this PR for showing the behavior). Thus, to avoid any future behavior change, we will not support it if JVM default timezone is different from `spark.sql.session.timeZone`

No regression because the previous two PRs were just merged to be unreleased master branch.

## How was this patch tested?
Added the test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#19939 from gatorsmile/timezoneUpdate.
  • Loading branch information
gatorsmile committed Dec 12, 2017
1 parent 3d82f6e commit a400265
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.spark.sql.jdbc

import java.sql.{Connection, Date, Timestamp}
import java.util.Properties
import java.util.{Properties, TimeZone}
import java.math.BigDecimal

import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.execution.{WholeStageCodegenExec, RowDataSourceScanExec}
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -77,6 +78,9 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
conn.prepareStatement(
"INSERT INTO ts_with_timezone VALUES " +
"(1, to_timestamp_tz('1999-12-01 11:00:00 UTC','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate()
conn.prepareStatement(
"INSERT INTO ts_with_timezone VALUES " +
"(2, to_timestamp_tz('1999-12-01 12:00:00 PST','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate()
conn.commit()

conn.prepareStatement(
Expand Down Expand Up @@ -235,6 +239,63 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
assert(types(1).equals("class java.sql.Timestamp"))
}

test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from default") {
val defaultJVMTimeZone = TimeZone.getDefault
// Pick the timezone different from the current default time zone of JVM
val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia")
val shanghaiTimeZone = TimeZone.getTimeZone("Asia/Shanghai")
val localSessionTimeZone =
if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else shanghaiTimeZone

withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> localSessionTimeZone.getID) {
val e = intercept[java.sql.SQLException] {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
dfRead.collect()
}.getMessage
assert(e.contains("Unrecognized SQL type -101"))
}
}

/**
* Change the Time Zone `timeZoneId` of JVM before executing `f`, then switches back to the
* original after `f` returns.
* @param timeZoneId the ID for a TimeZone, either an abbreviation such as "PST", a full name such
* as "America/Los_Angeles", or a custom ID such as "GMT-8:00".
*/
private def withTimeZone(timeZoneId: String)(f: => Unit): Unit = {
val originalLocale = TimeZone.getDefault
try {
// Add Locale setting
TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId))
f
} finally {
TimeZone.setDefault(originalLocale)
}
}

test("Column TIMESTAMP with TIME ZONE(JVM timezone)") {
def checkRow(row: Row, ts: String): Unit = {
assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts)))
}

withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> TimeZone.getDefault.getID) {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
withTimeZone("PST") {
assert(dfRead.collect().toSet ===
Set(
Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 03:00:00")),
Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 12:00:00"))))
}

withTimeZone("UTC") {
assert(dfRead.collect().toSet ===
Set(
Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 11:00:00")),
Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 20:00:00"))))
}
}
}

test("SPARK-18004: Make sure date or timestamp related predicate is pushed down correctly") {
val props = new Properties()
props.put("oracle.jdbc.mapDateToTimestamp", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {

test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE " +
"should be recognized") {
// When using JDBC to read the columns of TIMESTAMP with TIME ZONE and TIME with TIME ZONE
// the actual types are java.sql.Types.TIMESTAMP and java.sql.Types.TIME
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
val rows = dfRead.collect()
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ object JdbcUtils extends Logging {
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIME_WITH_TIMEZONE
=> TimestampType
=> null
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
=> TimestampType
=> null
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.spark.sql.jdbc

import java.sql.{Date, Timestamp, Types}
import java.util.TimeZone

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand All @@ -29,6 +32,13 @@ private case object OracleDialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")

private def supportTimeZoneTypes: Boolean = {
val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
// TODO: support timezone types when users are not using the JVM timezone, which
// is the default value of SESSION_LOCAL_TIMEZONE
timeZone == TimeZone.getDefault
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
Expand All @@ -49,7 +59,8 @@ private case object OracleDialect extends JdbcDialect {
case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
case _ => None
}
case TIMESTAMPTZ => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
case TIMESTAMPTZ if supportTimeZoneTypes
=> Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,10 +1064,10 @@ class JDBCSuite extends SparkFunSuite
}

test("unsupported types") {
var e = intercept[SparkException] {
var e = intercept[SQLException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect()
}.getMessage
assert(e.contains("java.lang.UnsupportedOperationException: unimplemented"))
assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE"))
e = intercept[SQLException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect()
}.getMessage
Expand Down

0 comments on commit a400265

Please sign in to comment.