Skip to content

Commit

Permalink
[SPARK-27596] The JDBC 'query' option doesn't work for Oracle database
Browse files Browse the repository at this point in the history
  • Loading branch information
dilipbiswal committed May 6, 2019
1 parent 6c2d351 commit da136ed
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,30 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getInt(1) == 20)
assert(rows(0).getString(2) == "1")
}

test("query JDBC option") {
val expectedResult = Set(
(42, "fred"),
(17, "dave")
).map { case (x, y) =>
Row(Integer.valueOf(x), String.valueOf(y))
}

val query = "SELECT x, y FROM tbl WHERE x > 10"
// query option to pass on the query string.
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", query)
.load()
assert(df.collect.toSet === expectedResult)

// query option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$jdbcUrl', query '$query')
""".stripMargin.replaceAll("\n", " "))
assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
import java.util.Properties

import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.tags.DockerTest

@DockerTest
Expand Down Expand Up @@ -152,4 +153,30 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}

test("query JDBC option") {
val expectedResult = Set(
(42, "fred"),
(17, "dave")
).map { case (x, y) =>
Row(Integer.valueOf(x), String.valueOf(y))
}

val query = "SELECT x, y FROM tbl WHERE x > 10"
// query option to pass on the query string.
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", query)
.load()
assert(df.collect.toSet === expectedResult)

// query option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$jdbcUrl', query '$query')
""".stripMargin.replaceAll("\n", " "))
assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,4 +485,32 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
}
assert(df2.collect.toSet === expectedResult)
}

test("query JDBC option") {
val expectedResult = Set(
(1, "1991-11-09", "1996-01-01 01:23:45")
).map { case (id, date, timestamp) =>
Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp))
}

val query = "SELECT id, d, t FROM datetime WHERE id = 1"
// query option to pass on the query string.
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", query)
.option("oracle.jdbc.mapDateToTimestamp", "false")
.load()
assert(df.collect.toSet === expectedResult)

// query option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$jdbcUrl',
| query '$query',
| oracle.jdbc.mapDateToTimestamp false)
""".stripMargin.replaceAll("\n", " "))
assert(sql("select id, d, t from queryOption").collect.toSet == expectedResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.sql.Connection
import java.util.Properties

import org.apache.spark.sql.Column
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType}
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -180,4 +181,29 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getSeq(8) == Seq("""{"a": "foo", "b": "bar"}""", """{"a": 1, "b": 2}"""))
assert(rows(0).getSeq(9) == Seq("""{"a": 1, "b": 2, "c": 3}"""))
}

test("query JDBC option") {
val expectedResult = Set(
(42, 123456789012345L)
).map { case (c1, c3) =>
Row(Integer.valueOf(c1), java.lang.Long.valueOf(c3))
}

val query = "SELECT c1, c3 FROM bar WHERE c1 IS NOT NULL"
// query option to pass on the query string.
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", query)
.load()
assert(df.collect.toSet === expectedResult)

// query option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$jdbcUrl', query '$query')
""".stripMargin.replaceAll("\n", " "))
assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class JDBCOptions(
if (subquery.isEmpty) {
throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
} else {
s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}"
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
}
}

Expand Down

0 comments on commit da136ed

Please sign in to comment.