Skip to content

Commit

Permalink
[SPARK-33240][SQL] Fail fast when fails to instantiate configured v2 …
Browse files Browse the repository at this point in the history
…session catalog

### What changes were proposed in this pull request?

This patch proposes to change the behavior on failing fast when Spark fails to instantiate configured v2 session catalog.

### Why are the changes needed?

The Spark behavior is against the intention of the end users - if end users configure session catalog which Spark would fail to initialize, Spark would swallow the error with only logging the error message and silently use the default catalog implementation.

This follows the voices on [discussion thread](https://lists.apache.org/thread.html/rdfa22a5ebdc4ac66e2c5c8ff0cd9d750e8a1690cd6fb456d119c2400%40%3Cdev.spark.apache.org%3E) in dev mailing list.

### Does this PR introduce _any_ user-facing change?

Yes. After the PR Spark will fail immediately if Spark fails to instantiate configured session catalog.

### How was this patch tested?

New UT added.

Closes #30147 from HeartSaVioR/SPARK-33240.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
HeartSaVioR committed Oct 28, 2020
1 parent ba2a113 commit f6550d0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.connector.catalog

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
Expand Down Expand Up @@ -81,15 +80,8 @@ class CatalogManager(
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
private[sql] def v2SessionCatalog: CatalogPlugin = {
conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { customV2SessionCatalog =>
try {
catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
} catch {
case NonFatal(_) =>
logError(
"Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)
defaultSessionCatalog
}
conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { _ =>
catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
}.getOrElse(defaultSessionCatalog)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.Try

import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression}
Expand Down Expand Up @@ -254,6 +255,22 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
}
}

test("SPARK-33240: fail the query when instantiation on session catalog fails") {
try {
spark.sessionState.catalogManager.reset()
spark.conf.set(
V2_SESSION_CATALOG_IMPLEMENTATION.key, "InvalidCatalogClass")
val e = intercept[SparkException] {
sql(s"create table t1 (id bigint) using $format")
}

assert(e.getMessage.contains("Cannot find catalog plugin class"))
assert(e.getMessage.contains("InvalidCatalogClass"))
} finally {
spark.sessionState.catalogManager.reset()
}
}

private def checkV2Identifiers(
plan: LogicalPlan,
identifier: String = "t1",
Expand Down

0 comments on commit f6550d0

Please sign in to comment.