Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33600][SQL] Group exception messages in execution/datasources/v2 #31619

Closed
wants to merge 15 commits into from
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -33,7 +33,7 @@ class DatabaseAlreadyExistsException(db: String)

class NamespaceAlreadyExistsException(message: String) extends AnalysisException(message) {
def this(namespace: Array[String]) = {
this(s"Namespace '${namespace.quoted}' already exists")
this(QueryCompilationErrors.namespaceAlreadyExistsExceptionMessage(namespace))
karenfeng marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -44,7 +44,7 @@ class TableAlreadyExistsException(message: String, cause: Option[Throwable] = No
}

def this(tableIdent: Identifier) = {
this(s"Table ${tableIdent.quoted} already exists")
this(QueryCompilationErrors.tableAlreadyExistsExceptionMessage(tableIdent))
}
}

Expand Down
Expand Up @@ -19,11 +19,8 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.Identifier

class CannotReplaceMissingTableException(
tableIdentifier: Identifier,
message: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not want to change the parameters for the class here.

def cannotReplaceMissingTableError(
    tableIdentifier: Identifier, cause: Option[Throwable] = None): Throwable = {
  new CannotReplaceMissingTableException(tableIdentifier, cause)
}

cause: Option[Throwable] = None)
extends AnalysisException(
s"Table $tableIdentifier cannot be replaced as it did not exist." +
s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause)
extends AnalysisException(message, cause = cause)
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.StructType


Expand All @@ -35,7 +35,7 @@ class NoSuchDatabaseException(
class NoSuchNamespaceException(message: String, cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause) {
def this(namespace: Array[String]) = {
this(s"Namespace '${namespace.quoted}' not found")
this(QueryCompilationErrors.noSuchNamespaceExceptionMessage(namespace))
}
}

Expand All @@ -46,7 +46,7 @@ class NoSuchTableException(message: String, cause: Option[Throwable] = None)
}

def this(tableIdent: Identifier) = {
this(s"Table ${tableIdent.quoted} not found")
this(QueryCompilationErrors.noSuchTableExceptionMessage(tableIdent))
}
}

Expand Down
Expand Up @@ -21,20 +21,22 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable, ResolvedView}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.InvalidUDFClassException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SerdeInfo}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.{toPrettySQL, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.connector.catalog.{TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, Table, TableCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType}

/**
* Object for grouping all error messages of the query compilation.
* Currently it includes all AnalysisExceptions.
* Object for grouping error messages from exceptions thrown during query compilation.
* As commands are executed eagerly, this also includes errors thrown during the execution of
* commands, which users can see immediately.
*/
private[spark] object QueryCompilationErrors {

Expand Down Expand Up @@ -742,6 +744,234 @@ private[spark] object QueryCompilationErrors {
"Use sparkSession.udf.register(...) instead.")
}

def batchWriteCapabilityError(
table: Table, v2WriteClassName: String, v1WriteClassName: String): Throwable = {
new AnalysisException(
s"Table ${table.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " +
s"$v2WriteClassName is not an instance of $v1WriteClassName")
}

def unsupportedDeleteByConditionWithSubqueryError(condition: Option[Expression]): Throwable = {
new AnalysisException(
s"Delete by condition with subquery is not supported: $condition")
}

def cannotTranslateExpressionToSourceFilterError(f: Expression): Throwable = {
new AnalysisException("Exec update failed:" +
s" cannot translate expression to source filter: $f")
}

def cannotDeleteTableWhereFiltersError(table: Table, filters: Array[Filter]): Throwable = {
new AnalysisException(
s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}")
}

def deleteOnlySupportedWithV2TablesError(): Throwable = {
new AnalysisException("DELETE is only supported with v2 tables.")
}

def describeDoesNotSupportPartitionForV2TablesError(): Throwable = {
new AnalysisException("DESCRIBE does not support partition for v2 tables.")
}

def unsupportedUserSpecifiedSchemaError(): Throwable = {
new UnsupportedOperationException("user-specified schema")
karenfeng marked this conversation as resolved.
Show resolved Hide resolved
}

def cannotReplaceMissingTableExceptionMessage(tableIdentifier: Identifier): String = {
s"Table $tableIdentifier cannot be replaced as it did not exist." +
s" Use CREATE OR REPLACE TABLE to create the table."
}

def cannotReplaceMissingTableError(
tableIdentifier: Identifier): Throwable = {
new CannotReplaceMissingTableException(
cannotReplaceMissingTableExceptionMessage(tableIdentifier)
)
}

def cannotReplaceMissingTableError(
tableIdentifier: Identifier, cause: Option[Throwable]): Throwable = {
new CannotReplaceMissingTableException(
cannotReplaceMissingTableExceptionMessage(tableIdentifier), cause
)
}

def tableDoesNotSupportError(table: Table, cmd: String): Throwable = {
new AnalysisException(s"Table ${table.name} does not support $cmd.")
}

def unsupportedBatchReadError(table: Table): Throwable = {
tableDoesNotSupportError(table, "batch scan")
}

def unsupportedMicroBatchOrContinuousScanError(table: Table): Throwable = {
tableDoesNotSupportError(table, "either micro-batch or continuous scan")
}

def unsupportedAppendInBatchModeError(table: Table): Throwable = {
tableDoesNotSupportError(table, "append in batch mode")
}

def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
tableDoesNotSupportError(table, "dynamic overwrite in batch mode")
}

def unsupportedTruncateInBatchModeError(table: Table): Throwable = {
tableDoesNotSupportError(table, "truncate in batch mode")
}

def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
tableDoesNotSupportError(table, "overwrite by filter in batch mode")
}

def streamingSourcesDoNotSupportCommonExecutionModeError(
microBatchSources: Seq[String],
continuousSources: Seq[String]): Throwable = {
new AnalysisException(
"The streaming sources in a query do not have a common supported execution mode.\n" +
"Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" +
"Sources support continuous: " + continuousSources.mkString(", "))
}

def noSuchTableExceptionMessage(ident: Identifier): String = {
s"Table ${ident.quoted} not found"
}

def noSuchTableError(ident: Identifier): Throwable = {
new NoSuchTableException(noSuchTableExceptionMessage(ident))
}

def noSuchNamespaceExceptionMessage(namespace: Array[String]): String = {
s"Namespace '${namespace.quoted}' not found"
}

def noSuchNamespaceError(namespace: Array[String]): Throwable = {
new NoSuchNamespaceException(noSuchNamespaceExceptionMessage(namespace))
}

def tableAlreadyExistsExceptionMessage(ident: Identifier): String = {
s"Table ${ident.quoted} already exists"
}

def tableAlreadyExistsError(ident: Identifier): Throwable = {
new TableAlreadyExistsException(tableAlreadyExistsExceptionMessage(ident))
}

def requiresSinglePartNamespaceError(ident: Identifier): Throwable = {
new NoSuchTableException(
s"V2 session catalog requires a single-part namespace: ${ident.quoted}")
}

def namespaceAlreadyExistsExceptionMessage(namespace: Array[String]): String = {
s"Namespace '${namespace.quoted}' already exists"
}

def namespaceAlreadyExistsError(namespace: Array[String]): Throwable = {
new NamespaceAlreadyExistsException(namespaceAlreadyExistsExceptionMessage(namespace))
}

private def notSupportedInJDBCCatalog(cmd: String): Throwable = {
new AnalysisException(s"$cmd is not supported in JDBC catalog.")
}

def cannotCreateJDBCTableUsingProviderError(): Throwable = {
notSupportedInJDBCCatalog("CREATE TABLE ... USING ...")
}

def cannotCreateJDBCTableUsingLocationError(): Throwable = {
notSupportedInJDBCCatalog("CREATE TABLE ... LOCATION ...")
}

def cannotCreateJDBCNamespaceUsingProviderError(): Throwable = {
notSupportedInJDBCCatalog("CREATE NAMESPACE ... LOCATION ...")
}

def cannotCreateJDBCNamespaceWithPropertyError(k: String): Throwable = {
notSupportedInJDBCCatalog(s"CREATE NAMESPACE with property $k")
}

def cannotSetJDBCNamespaceWithPropertyError(k: String): Throwable = {
notSupportedInJDBCCatalog(s"SET NAMESPACE with property $k")
}

def cannotUnsetJDBCNamespaceWithPropertyError(k: String): Throwable = {
notSupportedInJDBCCatalog(s"Remove NAMESPACE property $k")
}

def unsupportedJDBCNamespaceChangeInCatalogError(changes: Seq[NamespaceChange]): Throwable = {
new AnalysisException(s"Unsupported NamespaceChange $changes in JDBC catalog.")
}

private def tableDoesNotSupportError(cmd: String, table: Table): Throwable = {
karenfeng marked this conversation as resolved.
Show resolved Hide resolved
new AnalysisException(s"Table does not support $cmd: ${table.name}")
}

def tableDoesNotSupportReadsError(table: Table): Throwable = {
tableDoesNotSupportError("reads", table)
}

def tableDoesNotSupportWritesError(table: Table): Throwable = {
tableDoesNotSupportError("writes", table)
}

def tableDoesNotSupportDeletesError(table: Table): Throwable = {
tableDoesNotSupportError("deletes", table)
}

def tableDoesNotSupportTruncatesError(table: Table): Throwable = {
tableDoesNotSupportError("truncates", table)
}

def tableDoesNotSupportPartitionManagementError(table: Table): Throwable = {
tableDoesNotSupportError("partition management", table)
}

def tableDoesNotSupportAtomicPartitionManagementError(table: Table): Throwable = {
tableDoesNotSupportError("atomic partition management", table)
}

def cannotRenameTableWithAlterViewError(): Throwable = {
new AnalysisException(
"Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead.")
}

private def notSupportedForV2TablesError(cmd: String): Throwable = {
new AnalysisException(s"$cmd is not supported for v2 tables.")
}

def analyzeTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("ANALYZE TABLE")
}

def alterTableRecoverPartitionsNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("ALTER TABLE ... RECOVER PARTITIONS")
}

def alterTableSerDePropertiesNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]")
}

def loadDataNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("LOAD DATA")
}

def showCreateTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("SHOW CREATE TABLE")
}

def truncateTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("TRUNCATE TABLE")
}

def showColumnsNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("SHOW COLUMNS")
}

def repairTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("MSCK REPAIR TABLE")
}

def databaseFromV1SessionCatalogNotSpecifiedError(): Throwable = {
new AnalysisException("Database from v1 session catalog is not specified")
}
Expand Down