Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,11 @@
"Alter the table <tableName>."
]
},
"CONNECTION" : {
"message" : [
"Couldn't connect to the database"
]
},
"CREATE_INDEX" : {
"message" : [
"Create the index <indexName> in the <tableName> table."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ class ClientE2ETestSuite
assert(result.length == 10)
} finally {
// clean up
assertThrows[SparkException] {
assertThrows[AnalysisException] {
spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,17 @@ object JdbcUtils extends Logging with SQLConfHelper {

def withConnection[T](options: JDBCOptions)(f: Connection => T): T = {
val dialect = JdbcDialects.get(options.url)
val conn = dialect.createConnectionFactory(options)(-1)

var conn : Connection = null
classifyException(
condition = "FAILED_JDBC.CONNECTION",
messageParameters = Map("url" -> options.getRedactUrl()),
dialect,
description = "Failed to connect",
isRuntime = false
) {
conn = dialect.createConnectionFactory(options)(-1)
}
try {
f(conn)
} finally {
Expand Down
54 changes: 43 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1939,17 +1939,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
assert(getRowCount(df2) < df3.count())
}

test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") {
val e = intercept[IllegalArgumentException] {
val opts = Map(
"url" -> "jdbc:mysql://localhost/db",
"dbtable" -> "table",
"driver" -> "org.postgresql.Driver"
)
spark.read.format("jdbc").options(opts).load()
}.getMessage
assert(e.contains("The driver could not open a JDBC connection. " +
"Check the URL: jdbc:mysql://localhost/db"))
test("SPARK-26383 throw FAILED_JDBC.CONNECTION if wrong kind of driver to the given url") {
val url = "jdbc:mysql://localhost/db"
checkError(
exception = intercept[AnalysisException] {
val opts = Map(
"url" -> url,
"dbtable" -> "table",
"driver" -> "org.postgresql.Driver"
)
spark.read.format("jdbc").options(opts).load()
},
condition = "FAILED_JDBC.CONNECTION",
parameters = Map("url" -> url)
)
}

test("support casting patterns for lower/upper bounds of TimestampType") {
Expand Down Expand Up @@ -2273,4 +2276,33 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
parameters = Map("jdbcDialect" -> dialect.getClass.getSimpleName))
}
}

test("FAILED_JDBC.CONNECTION") {
val testUrls = Seq(
"jdbc:mysql",
"jdbc:postgresql",
"jdbc:sqlserver",
"jdbc:db2",
"jdbc:h2",
"jdbc:teradata",
"jdbc:databricks"
)

testUrls.foreach { connectionUrl =>
val url = s"$connectionUrl://invalid_url/"
val options = new JDBCOptions(Map(
"url" -> url,
"dbtable" -> "invalid_table"
))
checkError(
exception = intercept[AnalysisException] {
JdbcUtils.withConnection(options) { conn =>
conn.getMetaData
}
},
condition = "FAILED_JDBC.CONNECTION",
parameters = Map("url" -> url)
)
}
}
}