-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18419][SQL] JDBCRelation.insert
should not remove Spark options
#15863
Conversation
@@ -54,7 +54,6 @@ object JDBCRDD extends Logging { | |||
def resolveTable(options: JDBCOptions): StructType = { | |||
val url = options.url | |||
val table = options.table | |||
val properties = options.asConnectionProperties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unused.
Thank you for review, @HyukjinKwon . Let me see that. |
Oh, it seems not. I just removed my suggestion. I will take a look at this again if it is possible in similar way. Thanks! |
Yes. It seems not to solve this problem. The problem is the following is case-sensitive.
|
Test build #68563 has finished for PR 15863 at commit
|
Our user-specified JDBC options/parameters are case sensitive, right? |
Thank you for review, @gatorsmile . The purpose of |
Not always. Try this code path. val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
df.write.format("jdbc")
.option("URL", url1)
.option("dbtable", "TEST.SAVETEST")
.options(properties.asScala)
.save() I think we should make them consistent. Can you fix it in this PR? |
Oh, sure. I'll investigate it. |
I updated
|
@@ -314,8 +311,7 @@ case class DataSource( | |||
catalogTable.get, | |||
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L)) | |||
} else { | |||
new InMemoryFileIndex( | |||
sparkSession, globbedPaths, options, partitionSchema) | |||
new InMemoryFileIndex(sparkSession, globbedPaths, options, partitionSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InMemoryFileIndex
should use the original one because it extends PartitioningAwareFileIndex
which uses the following line.
protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
Eventually, it created another case-sensitive Map from the CaseInsensitiveMap
. In that case, it will fail to retrieve data from that map because the key value is changed as lowercases already in CaseInsensitiveMap
.
def newHadoopConfWithOptions(options: Map[String, String]): Configuration = {
val hadoopConf = newHadoopConf()
options.foreach { case (k, v) =>
if ((v ne null) && k != "path" && k != "paths") {
hadoopConf.set(k, v)
}
}
hadoopConf
}
JDBCOptions.asConnectionProperties
to be case-insensitive
Test build #68572 has finished for PR 15863 at commit
|
test("SPARK-18419 JDBCOption keys should be case-insensitive") { | ||
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) | ||
df.write.format("jdbc") | ||
.option("URL", url1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use "Url" or "uRL" to show the case-insensitive more explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @viirya ! I updated it, too.
Test build #68574 has finished for PR 15863 at commit
|
s.createSource( | ||
sparkSession.sqlContext, metadataPath, userSpecifiedSchema, className, options) | ||
s.createSource(sparkSession.sqlContext, metadataPath, userSpecifiedSchema, className, | ||
caseInsensitiveOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code style issue.
s.createSource(
sparkSession.sqlContext,
metadataPath,
userSpecifiedSchema,
className,
caseInsensitiveOptions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
@rxin @cloud-fan Could you take a look at this PR, especially the external behavior impact? Thanks! |
Test build #68575 has finished for PR 15863 at commit
|
@@ -303,4 +303,13 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { | |||
assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," + | |||
" and 'numPartitions' are required.")) | |||
} | |||
|
|||
test("SPARK-18419 JDBCOption keys should be case-insensitive") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it consistent with other options like JsonOtion
?
Thank you for review, @cloud-fan . Actually, this PR includes two different ones. The first commit is about a bug and the others (from seconds) is potential improvement. First, I'll separate them to make each PRs more clearer. |
JDBCOptions.asConnectionProperties
to be case-insensitive
@cloud-fan and @gatorsmile .
|
Test build #68606 has finished for PR 15863 at commit
|
I close this PR since this is already fixed. |
Oh, it's still there. scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties
res0: java.util.Properties = {numpartitions=10}
scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties
res1: java.util.Properties = {numpartitions=10} |
@@ -129,7 +129,7 @@ object JDBCOptions { | |||
private val jdbcOptionNames = ArrayBuffer.empty[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jdbcOptionNames
is the root cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we make it a Set
? seems we only use it for contains
check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I'll update like that.
@cloud-fan and @gatorsmile . |
Also, cc @srowen . |
@@ -130,7 +130,7 @@ private[sql] case class JDBCRelation( | |||
override def insert(data: DataFrame, overwrite: Boolean): Unit = { | |||
val url = jdbcOptions.url | |||
val table = jdbcOptions.table | |||
val properties = jdbcOptions.asConnectionProperties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this should not use asConnectionProperties
. writer
needs options like numPartitions
.
JDBCOptions.asConnectionProperties
to be case-insensitiveJDBCRelation.insert
should not remove Spark options
I updated PR description and focus. |
Test build #69494 has finished for PR 15863 at commit
|
Test build #69493 has finished for PR 15863 at commit
|
The only failure seems to be irrelevant. Also, I'm waiting the last running test.
|
Test build #69498 has finished for PR 15863 at commit
|
parameters.foreach { case (k, v) => properties.setProperty(k, v) } | ||
properties | ||
} | ||
|
||
val asConnectionProperties: Properties = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add some document to explain when we should use asConnectionProperties
and when asProperties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
LGTM except 2 comments, including https://github.com/apache/spark/pull/15863/files#r90584811 |
Test build #69544 has finished for PR 15863 at commit
|
thanks, merging to master/2.1! |
## What changes were proposed in this pull request? Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly using `asConnectionProperties`. Spark options like `numPartitions` should be passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden** because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case options. This PR aims to fix both. **JDBCRelation.insert** ```scala override def insert(data: DataFrame, overwrite: Boolean): Unit = { val url = jdbcOptions.url val table = jdbcOptions.table - val properties = jdbcOptions.asConnectionProperties + val properties = jdbcOptions.asProperties data.write .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) .jdbc(url, table, properties) ``` **JDBCOptions.asConnectionProperties** ```scala scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties res0: java.util.Properties = {numpartitions=10} scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties res1: java.util.Properties = {numpartitions=10} ``` ## How was this patch tested? Pass the Jenkins with a new testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15863 from dongjoon-hyun/SPARK-18419. (cherry picked from commit 55d528f) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Thank you for merging, @cloud-fan . Thank you for review, @gatorsmile , @viirya , @HyukjinKwon ! |
## What changes were proposed in this pull request? Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly using `asConnectionProperties`. Spark options like `numPartitions` should be passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden** because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case options. This PR aims to fix both. **JDBCRelation.insert** ```scala override def insert(data: DataFrame, overwrite: Boolean): Unit = { val url = jdbcOptions.url val table = jdbcOptions.table - val properties = jdbcOptions.asConnectionProperties + val properties = jdbcOptions.asProperties data.write .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) .jdbc(url, table, properties) ``` **JDBCOptions.asConnectionProperties** ```scala scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties res0: java.util.Properties = {numpartitions=10} scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties res1: java.util.Properties = {numpartitions=10} ``` ## How was this patch tested? Pass the Jenkins with a new testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#15863 from dongjoon-hyun/SPARK-18419.
## What changes were proposed in this pull request? Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly using `asConnectionProperties`. Spark options like `numPartitions` should be passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden** because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case options. This PR aims to fix both. **JDBCRelation.insert** ```scala override def insert(data: DataFrame, overwrite: Boolean): Unit = { val url = jdbcOptions.url val table = jdbcOptions.table - val properties = jdbcOptions.asConnectionProperties + val properties = jdbcOptions.asProperties data.write .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) .jdbc(url, table, properties) ``` **JDBCOptions.asConnectionProperties** ```scala scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties res0: java.util.Properties = {numpartitions=10} scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties res1: java.util.Properties = {numpartitions=10} ``` ## How was this patch tested? Pass the Jenkins with a new testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#15863 from dongjoon-hyun/SPARK-18419.
What changes were proposed in this pull request?
Currently,
JDBCRelation.insert
removes Spark options too early by mistakenly usingasConnectionProperties
. Spark options likenumPartitions
should be passed intoDataFrameWriter.jdbc
correctly. This bug have been hidden becauseJDBCOptions.asConnectionProperties
fails to filter out the mixed-case options. This PR aims to fix both.JDBCRelation.insert
JDBCOptions.asConnectionProperties
How was this patch tested?
Pass the Jenkins with a new testcase.