Skip to content

Commit

Permalink
[SPARK-21519][SQL] Add an option to the JDBC data source to initializ…
Browse files Browse the repository at this point in the history
…e the target DB environment

Add an option to the JDBC data source to initialize the environment of the remote database session

## What changes were proposed in this pull request?

This proposes an option to the JDBC datasource, tentatively called " sessionInitStatement" to implement the functionality of session initialization present for example in the Sqoop connector for Oracle (see https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_oraoop_oracle_session_initialization_statements ) . After each database session is opened to the remote DB, and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block in the case of Oracle).

See also https://issues.apache.org/jira/browse/SPARK-21519

## How was this patch tested?

Manually tested using Spark SQL data source and Oracle JDBC

Author: LucaCanali <luca.canali@cern.ch>

Closes apache#18724 from LucaCanali/JDBC_datasource_sessionInitStatement.
  • Loading branch information
LucaCanali authored and gatorsmile committed Aug 11, 2017
1 parent 2387f1e commit 0377338
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 0 deletions.
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,13 @@ the following case-insensitive options:
</td>
</tr>

<tr>
<td><code>sessionInitStatement</code></td>
<td>
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code>
</td>
</tr>

<tr>
<td><code>truncate</code></td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class JDBCOptions(
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
}
// An option to execute custom SQL before fetching data from the remote DB
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
}

object JDBCOptions {
Expand All @@ -161,4 +163,5 @@ object JDBCOptions {
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,21 @@ private[jdbc] class JDBCRDD(
import scala.collection.JavaConverters._
dialect.beforeFetch(conn, options.asProperties.asScala.toMap)

// This executes a generic SQL statement (or PL/SQL block) before reading
// the table/query via JDBC. Use this feature to initialize the database
// session environment, e.g. for optimizations and/or troubleshooting.
options.sessionInitStatement match {
case Some(sql) =>
val statement = conn.prepareStatement(sql)
logInfo(s"Executing sessionInitStatement: $sql")
try {
statement.execute()
} finally {
statement.close()
}
case None =>
}

// H2's JDBC driver does not support the setSchema() method. We pass a
// fully-qualified table name in the SELECT statement. I don't know how to
// talk about a table in a completely portable way.
Expand Down
31 changes: 31 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1044,4 +1044,35 @@ class JDBCSuite extends SparkFunSuite
assert(sql("select * from people_view").count() == 3)
}
}

test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") {
val initSQL1 = "SET @MYTESTVAR 21519"
val df1 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))")
.option("sessionInitStatement", initSQL1)
.load()
assert(df1.collect() === Array(Row(21519)))

val initSQL2 = "SET SCHEMA DUMMY"
val df2 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "TEST.PEOPLE")
.option("sessionInitStatement", initSQL2)
.load()
val e = intercept[SparkException] {df2.collect()}.getMessage
assert(e.contains("""Schema "DUMMY" not found"""))

sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW test_sessionInitStatement
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$urlWithUserAndPass',
|dbtable '(SELECT NVL(@MYTESTVAR1, -1), NVL(@MYTESTVAR2, -1))',
|sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234')
""".stripMargin)

val df3 = sql("SELECT * FROM test_sessionInitStatement")
assert(df3.collect() === Array(Row(21519, 1234)))
}
}

0 comments on commit 0377338

Please sign in to comment.