From a4002651a3ea673cf3eff7927531c1659663d194 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 11 Dec 2017 16:33:06 -0800 Subject: [PATCH] [SPARK-20557][SQL] Only support TIMESTAMP WITH TIME ZONE for Oracle Dialect ## What changes were proposed in this pull request? In the previous PRs, https://github.com/apache/spark/pull/17832 and https://github.com/apache/spark/pull/17835 , we convert `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` to `TIMESTAMP` for all the JDBC sources. However, this conversion could be risky since it does not respect our SQL configuration `spark.sql.session.timeZone`. In addition, each vendor might have different semantics for these two types. For example, Postgres simply returns `TIMESTAMP` types for `TIMESTAMP WITH TIME ZONE`. For such supports, we should do it case by case. This PR reverts the general support of `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` for JDBC sources, except ORACLE Dialect. When supporting the ORACLE's `TIMESTAMP WITH TIME ZONE`, we only support it when the JVM default timezone is the same as the user-specified configuration `spark.sql.session.timeZone` (whose default is the JVM default timezone). Now, we still treat `TIMESTAMP WITH TIME ZONE` as `TIMESTAMP` when fetching the values via the Oracle JDBC connector, whose client converts the timestamp values with time zone to the timestamp values using the local JVM default timezone (a test case is added to `OracleIntegrationSuite.scala` in this PR for showing the behavior). Thus, to avoid any future behavior change, we will not support it if JVM default timezone is different from `spark.sql.session.timeZone` No regression because the previous two PRs were just merged to be unreleased master branch. ## How was this patch tested? Added the test cases Author: gatorsmile Closes #19939 from gatorsmile/timezoneUpdate. --- .../sql/jdbc/OracleIntegrationSuite.scala | 67 ++++++++++++++++++- .../sql/jdbc/PostgresIntegrationSuite.scala | 2 + .../datasources/jdbc/JdbcUtils.scala | 4 +- .../apache/spark/sql/jdbc/OracleDialect.scala | 13 +++- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 +- 5 files changed, 82 insertions(+), 8 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 90343182712ed..8512496e5fe52 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, Date, Timestamp} -import java.util.Properties +import java.util.{Properties, TimeZone} import java.math.BigDecimal -import org.apache.spark.sql.{DataFrame, Row, SaveMode} -import org.apache.spark.sql.execution.{WholeStageCodegenExec, RowDataSourceScanExec} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @@ -77,6 +78,9 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo conn.prepareStatement( "INSERT INTO ts_with_timezone VALUES " + "(1, to_timestamp_tz('1999-12-01 11:00:00 UTC','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate() + conn.prepareStatement( + "INSERT INTO ts_with_timezone VALUES " + + "(2, to_timestamp_tz('1999-12-01 12:00:00 PST','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate() conn.commit() conn.prepareStatement( @@ -235,6 +239,63 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(types(1).equals("class java.sql.Timestamp")) } + test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from default") { + val defaultJVMTimeZone = TimeZone.getDefault + // Pick the timezone different from the current default time zone of JVM + val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia") + val shanghaiTimeZone = TimeZone.getTimeZone("Asia/Shanghai") + val localSessionTimeZone = + if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else shanghaiTimeZone + + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> localSessionTimeZone.getID) { + val e = intercept[java.sql.SQLException] { + val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) + dfRead.collect() + }.getMessage + assert(e.contains("Unrecognized SQL type -101")) + } + } + + /** + * Change the Time Zone `timeZoneId` of JVM before executing `f`, then switches back to the + * original after `f` returns. + * @param timeZoneId the ID for a TimeZone, either an abbreviation such as "PST", a full name such + * as "America/Los_Angeles", or a custom ID such as "GMT-8:00". + */ + private def withTimeZone(timeZoneId: String)(f: => Unit): Unit = { + val originalLocale = TimeZone.getDefault + try { + // Add Locale setting + TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId)) + f + } finally { + TimeZone.setDefault(originalLocale) + } + } + + test("Column TIMESTAMP with TIME ZONE(JVM timezone)") { + def checkRow(row: Row, ts: String): Unit = { + assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts))) + } + + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> TimeZone.getDefault.getID) { + val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) + withTimeZone("PST") { + assert(dfRead.collect().toSet === + Set( + Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 03:00:00")), + Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 12:00:00")))) + } + + withTimeZone("UTC") { + assert(dfRead.collect().toSet === + Set( + Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 11:00:00")), + Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 20:00:00")))) + } + } + } + test("SPARK-18004: Make sure date or timestamp related predicate is pushed down correctly") { val props = new Properties() props.put("oracle.jdbc.mapDateToTimestamp", "false") diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 48aba90afc787..be32cb89f4886 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -151,6 +151,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE " + "should be recognized") { + // When using JDBC to read the columns of TIMESTAMP with TIME ZONE and TIME with TIME ZONE + // the actual types are java.sql.Types.TIMESTAMP and java.sql.Types.TIME val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) val rows = dfRead.collect() val types = rows(0).toSeq.map(x => x.getClass.toString) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 75c94fc486493..bbc95df4d9dc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -226,10 +226,10 @@ object JdbcUtils extends Logging { case java.sql.Types.STRUCT => StringType case java.sql.Types.TIME => TimestampType case java.sql.Types.TIME_WITH_TIMEZONE - => TimestampType + => null case java.sql.Types.TIMESTAMP => TimestampType case java.sql.Types.TIMESTAMP_WITH_TIMEZONE - => TimestampType + => null case java.sql.Types.TINYINT => IntegerType case java.sql.Types.VARBINARY => BinaryType case java.sql.Types.VARCHAR => StringType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index e3f106c41c7ff..6ef77f24460be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.jdbc import java.sql.{Date, Timestamp, Types} +import java.util.TimeZone +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -29,6 +32,13 @@ private case object OracleDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") + private def supportTimeZoneTypes: Boolean = { + val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone) + // TODO: support timezone types when users are not using the JVM timezone, which + // is the default value of SESSION_LOCAL_TIMEZONE + timeZone == TimeZone.getDefault + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { @@ -49,7 +59,8 @@ private case object OracleDialect extends JdbcDialect { case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10)) case _ => None } - case TIMESTAMPTZ => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle + case TIMESTAMPTZ if supportTimeZoneTypes + => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE case _ => None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 61571bccdcb51..0767ca1573a7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1064,10 +1064,10 @@ class JDBCSuite extends SparkFunSuite } test("unsupported types") { - var e = intercept[SparkException] { + var e = intercept[SQLException] { spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect() }.getMessage - assert(e.contains("java.lang.UnsupportedOperationException: unimplemented")) + assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE")) e = intercept[SQLException] { spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect() }.getMessage