Skip to content

Commit

Permalink
[SPARK-20557][SQL] Support JDBC data type Time with Time Zone
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR is to support JDBC data type TIME WITH TIME ZONE. It can be converted to TIMESTAMP

In addition, before this PR, for unsupported data types, we simply output the type number instead of the type name.

```
java.sql.SQLException: Unsupported type 2014
```
After this PR, the message is like
```
java.sql.SQLException: Unsupported type TIMESTAMP_WITH_TIMEZONE
```

- Also upgrade the H2 version to `1.4.195` which has the type fix for "TIMESTAMP WITH TIMEZONE". However, it is not fully supported. Thus, we capture the exception, but we still need it to partially test the support of "TIMESTAMP WITH TIMEZONE", because Docker tests are not regularly run.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17835 from gatorsmile/h2.
  • Loading branch information
gatorsmile committed May 7, 2017
1 parent b433aca commit cafca54
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
checkRow(sql("SELECT * FROM datetime1 where id = 1").head())
}

test("SPARK-20557: column type TIMEZONE with TIME STAMP should be recognized") {
test("SPARK-20557: column type TIMESTAMP with TIME ZONE should be recognized") {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
val rows = dfRead.collect()
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
+ "null, null, null, null, null, "
+ "null, null, null, null, null, null, null)"
).executeUpdate()

conn.prepareStatement("CREATE TABLE ts_with_timezone " +
"(id integer, tstz TIMESTAMP WITH TIME ZONE, ttz TIME WITH TIME ZONE)")
.executeUpdate()
conn.prepareStatement("INSERT INTO ts_with_timezone VALUES " +
"(1, TIMESTAMP WITH TIME ZONE '2016-08-12 10:22:31.949271-07', TIME WITH TIME ZONE '17:22:31.949271+00')")
.executeUpdate()
}

test("Type mapping for various types") {
Expand Down Expand Up @@ -126,4 +133,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(schema(0).dataType == FloatType)
assert(schema(1).dataType == ShortType)
}

test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE should be recognized") {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
val rows = dfRead.collect()
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types(1).equals("class java.sql.Timestamp"))
assert(types(2).equals("class java.sql.Timestamp"))
}
}
2 changes: 1 addition & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.183</version>
<version>1.4.195</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.sql.{Connection, Driver, DriverManager, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.util.Locale

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -217,23 +217,29 @@ object JdbcUtils extends Logging {
case java.sql.Types.OTHER => null
case java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.REF_CURSOR => null
case java.sql.Types.ROWID => LongType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIME_WITH_TIMEZONE
=> TimestampType
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
=> TimestampType
case -101 => TimestampType // Value for Timestamp with Time Zone in Oracle
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ => null
case _ =>
throw new SQLException("Unrecognized SQL type " + sqlType)
// scalastyle:on
}

if (answer == null) throw new SQLException("Unsupported type " + sqlType)
if (answer == null) {
throw new SQLException("Unsupported type " + JDBCType.valueOf(sqlType).getName)
}
answer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel



/**
* Internal implementation of the user-facing `Catalog`.
*/
Expand Down
24 changes: 22 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql.jdbc

import java.math.BigDecimal
import java.sql.{Date, DriverManager, Timestamp}
import java.sql.{Date, DriverManager, SQLException, Timestamp}
import java.util.{Calendar, GregorianCalendar, Properties}

import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
Expand Down Expand Up @@ -141,6 +141,15 @@ class JDBCSuite extends SparkFunSuite
|OPTIONS (url '$url', dbtable 'TEST.TIMETYPES', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))

conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME ZONE) " +
"AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'")
.executeUpdate()
conn.commit()

conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " +
"AS SELECT '(1, 2, 3)'")
.executeUpdate()
conn.commit()

conn.prepareStatement("create table test.flttypes (a DOUBLE, b REAL, c DECIMAL(38, 18))"
).executeUpdate()
Expand Down Expand Up @@ -919,6 +928,17 @@ class JDBCSuite extends SparkFunSuite
assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
}

test("unsupported types") {
var e = intercept[SparkException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect()
}.getMessage
assert(e.contains("java.lang.UnsupportedOperationException: unimplemented"))
e = intercept[SQLException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect()
}.getMessage
assert(e.contains("Unsupported type ARRAY"))
}

test("SPARK-19318: Connection properties keys should be case-sensitive.") {
def testJdbcOptions(options: JDBCOptions): Unit = {
// Spark JDBC data source options are case-insensitive
Expand Down

0 comments on commit cafca54

Please sign in to comment.