diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 2017f1fc331e..418c5176d9a7 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1727,6 +1727,11 @@ "Alter the table ." ] }, + "CONNECTION" : { + "message" : [ + "Couldn't connect to the database" + ] + }, "CREATE_INDEX" : { "message" : [ "Create the index in the table." diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala index 33524c8df269..b9f72badd45f 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala @@ -461,7 +461,7 @@ class ClientE2ETestSuite assert(result.length == 10) } finally { // clean up - assertThrows[SparkException] { + assertThrows[AnalysisException] { spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 926c133d2875..3766cd1ad641 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1315,7 +1315,17 @@ object JdbcUtils extends Logging with SQLConfHelper { def withConnection[T](options: JDBCOptions)(f: Connection => T): T = { val dialect = JdbcDialects.get(options.url) - val conn = dialect.createConnectionFactory(options)(-1) + + var conn : Connection = null + classifyException( + condition = "FAILED_JDBC.CONNECTION", + messageParameters = Map("url" -> options.getRedactUrl()), + dialect, + description = "Failed to connect", + isRuntime = false + ) { + conn = dialect.createConnectionFactory(options)(-1) + } try { f(conn) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 276062fb1daa..e38648e9468c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1939,17 +1939,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession { assert(getRowCount(df2) < df3.count()) } - test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") { - val e = intercept[IllegalArgumentException] { - val opts = Map( - "url" -> "jdbc:mysql://localhost/db", - "dbtable" -> "table", - "driver" -> "org.postgresql.Driver" - ) - spark.read.format("jdbc").options(opts).load() - }.getMessage - assert(e.contains("The driver could not open a JDBC connection. " + - "Check the URL: jdbc:mysql://localhost/db")) + test("SPARK-26383 throw FAILED_JDBC.CONNECTION if wrong kind of driver to the given url") { + val url = "jdbc:mysql://localhost/db" + checkError( + exception = intercept[AnalysisException] { + val opts = Map( + "url" -> url, + "dbtable" -> "table", + "driver" -> "org.postgresql.Driver" + ) + spark.read.format("jdbc").options(opts).load() + }, + condition = "FAILED_JDBC.CONNECTION", + parameters = Map("url" -> url) + ) } test("support casting patterns for lower/upper bounds of TimestampType") { @@ -2273,4 +2276,33 @@ class JDBCSuite extends QueryTest with SharedSparkSession { parameters = Map("jdbcDialect" -> dialect.getClass.getSimpleName)) } } + + test("FAILED_JDBC.CONNECTION") { + val testUrls = Seq( + "jdbc:mysql", + "jdbc:postgresql", + "jdbc:sqlserver", + "jdbc:db2", + "jdbc:h2", + "jdbc:teradata", + "jdbc:databricks" + ) + + testUrls.foreach { connectionUrl => + val url = s"$connectionUrl://invalid_url/" + val options = new JDBCOptions(Map( + "url" -> url, + "dbtable" -> "invalid_table" + )) + checkError( + exception = intercept[AnalysisException] { + JdbcUtils.withConnection(options) { conn => + conn.getMetaData + } + }, + condition = "FAILED_JDBC.CONNECTION", + parameters = Map("url" -> url) + ) + } + } }