Skip to content

Commit

Permalink
[SPARK-10036] [SQL] Load JDBC driver in DataFrameReader.jdbc and Data…
Browse files Browse the repository at this point in the history
…FrameWriter.jdbc

This PR uses `JDBCRDD.getConnector` to load JDBC driver before creating connection in `DataFrameReader.jdbc` and `DataFrameWriter.jdbc`.

Author: zsxwing <zsxwing@gmail.com>

Closes #8232 from zsxwing/SPARK-10036 and squashes the following commits:

adf75de [zsxwing] Add extraOptions to the connection properties
57f59d4 [zsxwing] Load JDBC driver in DataFrameReader.jdbc and DataFrameWriter.jdbc
  • Loading branch information
zsxwing authored and marmbrus committed Aug 17, 2015
1 parent a4acdab commit f10660f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
table: String,
parts: Array[Partition],
connectionProperties: Properties): DataFrame = {
val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext)
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val relation = JDBCRelation(url, table, parts, props)(sqlContext)
sqlContext.baseRelationToDataFrame(relation)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,13 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* should be included.
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val conn = JdbcUtils.createConnection(url, connectionProperties)
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnection(url, props)

try {
var tableExists = JdbcUtils.tableExists(conn, table)
Expand Down Expand Up @@ -272,7 +278,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
conn.close()
}

JdbcUtils.saveTable(df, url, table, connectionProperties)
JdbcUtils.saveTable(df, url, table, props)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[sql] object JDBCRDD extends Logging {
*/
def resolveTable(url: String, table: String, properties: Properties): StructType = {
val dialect = JdbcDialects.get(url)
val conn: Connection = DriverManager.getConnection(url, properties)
val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)()
try {
val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
try {
Expand Down Expand Up @@ -171,7 +171,8 @@ private[sql] object JDBCRDD extends Logging {
* getConnector is run on the driver code, while the function it returns
* is run on the executor.
*
* @param driver - The class name of the JDBC driver for the given url.
* @param driver - The class name of the JDBC driver for the given url, or null if the class name
* is not necessary.
* @param url - The JDBC url to connect to.
*
* @return A function that loads the driver and connects to the url.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.sql.{Connection, PreparedStatement}
import java.util.Properties

import scala.util.Try
Expand All @@ -36,7 +36,7 @@ object JdbcUtils extends Logging {
* Establishes a JDBC connection.
*/
def createConnection(url: String, connectionProperties: Properties): Connection = {
DriverManager.getConnection(url, connectionProperties)
JDBCRDD.getConnector(connectionProperties.getProperty("driver"), url, connectionProperties)()
}

/**
Expand Down

0 comments on commit f10660f

Please sign in to comment.