Skip to content

Commit

Permalink
[SPARK-18419][SQL] JDBCRelation.insert should not remove Spark options
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly using `asConnectionProperties`. Spark options like `numPartitions` should be passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden** because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case options. This PR aims to fix both.

**JDBCRelation.insert**
```scala
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
  val url = jdbcOptions.url
  val table = jdbcOptions.table
- val properties = jdbcOptions.asConnectionProperties
+ val properties = jdbcOptions.asProperties
  data.write
    .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
    .jdbc(url, table, properties)
```

**JDBCOptions.asConnectionProperties**
```scala
scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties
res0: java.util.Properties = {numpartitions=10}
scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties
res1: java.util.Properties = {numpartitions=10}
```

## How was this patch tested?

Pass the Jenkins with a new testcase.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #15863 from dongjoon-hyun/SPARK-18419.
  • Loading branch information
dongjoon-hyun authored and cloud-fan committed Dec 2, 2016
1 parent 294163e commit 55d528f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager}
import java.util.Properties

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

/**
Expand All @@ -41,10 +39,23 @@ class JDBCOptions(
JDBCOptions.JDBC_TABLE_NAME -> table)))
}

/**
* Returns a property with all options.
*/
val asProperties: Properties = {
val properties = new Properties()
parameters.foreach { case (k, v) => properties.setProperty(k, v) }
properties
}

/**
* Returns a property with all options except Spark internal data source options like `url`,
* `dbtable`, and `numPartition`. This should be used when invoking JDBC API like `Driver.connect`
* because each DBMS vendor has its own property list for JDBC driver. See SPARK-17776.
*/
val asConnectionProperties: Properties = {
val properties = new Properties()
// We should avoid to pass the options into properties. See SPARK-17776.
parameters.filterKeys(!jdbcOptionNames.contains(_))
parameters.filterKeys(key => !jdbcOptionNames(key.toLowerCase))
.foreach { case (k, v) => properties.setProperty(k, v) }
properties
}
Expand Down Expand Up @@ -126,10 +137,10 @@ class JDBCOptions(
}

object JDBCOptions {
private val jdbcOptionNames = ArrayBuffer.empty[String]
private val jdbcOptionNames = collection.mutable.Set[String]()

private def newOption(name: String): String = {
jdbcOptionNames += name
jdbcOptionNames += name.toLowerCase
name
}

Expand Down
Expand Up @@ -54,7 +54,6 @@ object JDBCRDD extends Logging {
def resolveTable(options: JDBCOptions): StructType = {
val url = options.url
val table = options.table
val properties = options.asConnectionProperties
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
try {
Expand Down
Expand Up @@ -131,7 +131,7 @@ private[sql] case class JDBCRelation(
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val url = jdbcOptions.url
val table = jdbcOptions.table
val properties = jdbcOptions.asConnectionProperties
val properties = jdbcOptions.asProperties
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(url, table, properties)
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -890,4 +891,13 @@ class JDBCSuite extends SparkFunSuite
assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1 OR Name = 'mary'").collect().size == 2)
assert(sql("SELECT * FROM mixedCaseCols WHERE Name = 'mary' AND Id = 2").collect().size == 1)
}

test("SPARK-18419: Fix `asConnectionProperties` to filter case-insensitively") {
val parameters = Map(
"url" -> "jdbc:mysql://localhost:3306/temp",
"dbtable" -> "t1",
"numPartitions" -> "10")
assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
}
}

0 comments on commit 55d528f

Please sign in to comment.