Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 22 additions & 31 deletions sql/core/src/main/scala/org/apache/spark/sql/classic/Catalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.classic
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import org.apache.spark.SparkThrowable
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{CATALOG_NAME, DATABASE_NAME, TABLE_NAME}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog
import org.apache.spark.sql.catalog.{CatalogMetadata, Column, Database, Function, Table}
Expand Down Expand Up @@ -51,7 +52,7 @@ import org.apache.spark.util.ArrayImplicits._
/**
* Internal implementation of the user-facing `Catalog`.
*/
class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
class Catalog(sparkSession: SparkSession) extends catalog.Catalog with Logging {

private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog

Expand Down Expand Up @@ -185,20 +186,25 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
try {
Some(makeTable(nameParts))
} catch {
case e: SparkThrowable with Throwable =>
Catalog.ListTable.ERROR_HANDLING_RULES.get(e.getCondition) match {
case Some(Catalog.ListTable.Skip) => None
case Some(Catalog.ListTable.ReturnPartialResults) if !isTempView =>
Some(new Table(
name = tableName,
catalog = catalogName,
namespace = ns.toArray,
description = null,
tableType = null,
isTemporary = false
))
case _ => throw e
}
case e: AnalysisException if e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" => None
// Swallow non-fatal throwables when resolving a table or view and
// return a table or view with partial results to
// prevent listTables from breaking easily due to a broken table or view.
case NonFatal(e) if !isTempView =>
val table = new Table(
name = tableName,
catalog = catalogName,
namespace = ns.toArray,
description = null,
tableType = null,
isTemporary = false
)
logWarning(log"Unable to resolve the table or view [" +
log"catalog=${MDC(CATALOG_NAME, table.catalog)}, " +
log"database=${MDC(DATABASE_NAME, table.database)}, " +
log"name=${MDC(TABLE_NAME, table.name)}" +
log"]; partial results will be returned.", e)
Some(table)
}
}

Expand Down Expand Up @@ -968,19 +974,4 @@ private[sql] object Catalog {
}

private val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists"

private object ListTable {

sealed trait ErrorHandlingAction

case object Skip extends ErrorHandlingAction

case object ReturnPartialResults extends ErrorHandlingAction

val ERROR_HANDLING_RULES: Map[String, ErrorHandlingAction] = Map(
"UNSUPPORTED_FEATURE.HIVE_TABLE_TYPE" -> ReturnPartialResults,
"TABLE_OR_VIEW_NOT_FOUND" -> Skip,
"DATA_SOURCE_NOT_FOUND" -> ReturnPartialResults
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1136,26 +1136,6 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
}
}

test("SPARK-55957: listTables returns partial results when a table has DATA_SOURCE_NOT_FOUND") {
// Create a normal table (resolvable)
createTable("good_table")
// Create a table with a non-existent provider so resolution throws DATA_SOURCE_NOT_FOUND
val badTableMeta = utils.newTable("bad_table", None).copy(
provider = Some("non.existent.ProviderClass"))
sessionCatalog.createTable(badTableMeta, ignoreIfExists = false)
// Without ERROR_HANDLING_RULES for DATA_SOURCE_NOT_FOUND, listTables() would throw.
// With the fix, we get partial results: good_table fully resolved, bad_table as placeholder.
val tables = spark.catalog.listTables().collect()
assert(tables.length == 2, s"Expected 2 tables, got: ${tables.map(_.name).toList}")
val names = tables.map(_.name).toSet
assert(names == Set("good_table", "bad_table"),
s"Expected good_table and bad_table, got: $names")
val goodTable = tables.find(_.name == "good_table").get
val badTable = tables.find(_.name == "bad_table").get
assert(goodTable.tableType != null, "good_table should be fully resolved")
assert(badTable.tableType == null, "bad_table should be partial result (tableType null)")
}

private def getConstructorParameterValues(obj: DefinedByConstructorParams): Seq[AnyRef] = {
ScalaReflection.getConstructorParameterNames(obj.getClass).map { name =>
obj.getClass.getMethod(name).invoke(obj)
Expand Down