Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33600][SQL] Group exception messages in execution/datasources/v2 #31619

Closed
wants to merge 15 commits into from
Expand Up @@ -26,4 +26,4 @@ class CannotReplaceMissingTableException(
cause: Option[Throwable] = None)
extends AnalysisException(
s"Table $tableIdentifier cannot be replaced as it did not exist." +
s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause)
s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause)
Expand Up @@ -21,20 +21,22 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable, ResolvedView}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.InvalidUDFClassException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SerdeInfo}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.{toPrettySQL, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.connector.catalog.{TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, Table, TableCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType}

/**
* Object for grouping all error messages of the query compilation.
* Currently it includes all AnalysisExceptions.
* Object for grouping error messages from exceptions thrown during query compilation.
* As commands are executed eagerly, this also includes errors thrown during the execution of
* commands, which users can see immediately.
*/
private[spark] object QueryCompilationErrors {

Expand Down Expand Up @@ -742,6 +744,205 @@ private[spark] object QueryCompilationErrors {
"Use sparkSession.udf.register(...) instead.")
}

def batchWriteCapabilityError(
table: Table, v2WriteClassName: String, v1WriteClassName: String): Throwable = {
new AnalysisException(
s"Table ${table.name} declares ${TableCapability.V1_BATCH_WRITE} capability but " +
s"$v2WriteClassName is not an instance of $v1WriteClassName")
}

def unsupportedDeleteByConditionWithSubqueryError(condition: Option[Expression]): Throwable = {
new AnalysisException(
s"Delete by condition with subquery is not supported: $condition")
}

def cannotTranslateExpressionToSourceFilterError(f: Expression): Throwable = {
new AnalysisException("Exec update failed:" +
s" cannot translate expression to source filter: $f")
}

def cannotDeleteTableWhereFiltersError(table: Table, filters: Array[Filter]): Throwable = {
new AnalysisException(
s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}")
}

def deleteOnlySupportedWithV2TablesError(): Throwable = {
new AnalysisException("DELETE is only supported with v2 tables.")
}

def describeDoesNotSupportPartitionForV2TablesError(): Throwable = {
new AnalysisException("DESCRIBE does not support partition for v2 tables.")
}

def cannotReplaceMissingTableError(
tableIdentifier: Identifier): Throwable = {
new CannotReplaceMissingTableException(tableIdentifier)
}

def cannotReplaceMissingTableError(
tableIdentifier: Identifier, cause: Option[Throwable]): Throwable = {
new CannotReplaceMissingTableException(tableIdentifier, cause)
}

def unsupportedTableOperationError(table: Table, cmd: String): Throwable = {
new AnalysisException(s"Table ${table.name} does not support $cmd.")
}

def unsupportedBatchReadError(table: Table): Throwable = {
unsupportedTableOperationError(table, "batch scan")
}

def unsupportedMicroBatchOrContinuousScanError(table: Table): Throwable = {
unsupportedTableOperationError(table, "either micro-batch or continuous scan")
}

def unsupportedAppendInBatchModeError(table: Table): Throwable = {
unsupportedTableOperationError(table, "append in batch mode")
}

def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
unsupportedTableOperationError(table, "dynamic overwrite in batch mode")
}

def unsupportedTruncateInBatchModeError(table: Table): Throwable = {
unsupportedTableOperationError(table, "truncate in batch mode")
}

def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
unsupportedTableOperationError(table, "overwrite by filter in batch mode")
}

def streamingSourcesDoNotSupportCommonExecutionModeError(
microBatchSources: Seq[String],
continuousSources: Seq[String]): Throwable = {
new AnalysisException(
"The streaming sources in a query do not have a common supported execution mode.\n" +
"Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" +
"Sources support continuous: " + continuousSources.mkString(", "))
}

def noSuchTableError(ident: Identifier): Throwable = {
new NoSuchTableException(ident)
}

def noSuchNamespaceError(namespace: Array[String]): Throwable = {
new NoSuchNamespaceException(namespace)
}

def tableAlreadyExistsError(ident: Identifier): Throwable = {
new TableAlreadyExistsException(ident)
}

def requiresSinglePartNamespaceError(ident: Identifier): Throwable = {
new NoSuchTableException(
s"V2 session catalog requires a single-part namespace: ${ident.quoted}")
}

def namespaceAlreadyExistsError(namespace: Array[String]): Throwable = {
new NamespaceAlreadyExistsException(namespace)
}

private def notSupportedInJDBCCatalog(cmd: String): Throwable = {
new AnalysisException(s"$cmd is not supported in JDBC catalog.")
}

def cannotCreateJDBCTableUsingProviderError(): Throwable = {
notSupportedInJDBCCatalog("CREATE TABLE ... USING ...")
}

def cannotCreateJDBCTableUsingLocationError(): Throwable = {
notSupportedInJDBCCatalog("CREATE TABLE ... LOCATION ...")
}

def cannotCreateJDBCNamespaceUsingProviderError(): Throwable = {
notSupportedInJDBCCatalog("CREATE NAMESPACE ... LOCATION ...")
}

def cannotCreateJDBCNamespaceWithPropertyError(k: String): Throwable = {
notSupportedInJDBCCatalog(s"CREATE NAMESPACE with property $k")
}

def cannotSetJDBCNamespaceWithPropertyError(k: String): Throwable = {
notSupportedInJDBCCatalog(s"SET NAMESPACE with property $k")
}

def cannotUnsetJDBCNamespaceWithPropertyError(k: String): Throwable = {
notSupportedInJDBCCatalog(s"Remove NAMESPACE property $k")
}

def unsupportedJDBCNamespaceChangeInCatalogError(changes: Seq[NamespaceChange]): Throwable = {
new AnalysisException(s"Unsupported NamespaceChange $changes in JDBC catalog.")
}

private def tableDoesNotSupportError(cmd: String, table: Table): Throwable = {
karenfeng marked this conversation as resolved.
Show resolved Hide resolved
new AnalysisException(s"Table does not support $cmd: ${table.name}")
}

def tableDoesNotSupportReadsError(table: Table): Throwable = {
tableDoesNotSupportError("reads", table)
}

def tableDoesNotSupportWritesError(table: Table): Throwable = {
tableDoesNotSupportError("writes", table)
}

def tableDoesNotSupportDeletesError(table: Table): Throwable = {
tableDoesNotSupportError("deletes", table)
}

def tableDoesNotSupportTruncatesError(table: Table): Throwable = {
tableDoesNotSupportError("truncates", table)
}

def tableDoesNotSupportPartitionManagementError(table: Table): Throwable = {
tableDoesNotSupportError("partition management", table)
}

def tableDoesNotSupportAtomicPartitionManagementError(table: Table): Throwable = {
tableDoesNotSupportError("atomic partition management", table)
}

def cannotRenameTableWithAlterViewError(): Throwable = {
new AnalysisException(
"Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead.")
}

private def notSupportedForV2TablesError(cmd: String): Throwable = {
new AnalysisException(s"$cmd is not supported for v2 tables.")
}

def analyzeTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("ANALYZE TABLE")
}

def alterTableRecoverPartitionsNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("ALTER TABLE ... RECOVER PARTITIONS")
}

def alterTableSerDePropertiesNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]")
}

def loadDataNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("LOAD DATA")
}

def showCreateTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("SHOW CREATE TABLE")
}

def truncateTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("TRUNCATE TABLE")
}

def showColumnsNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("SHOW COLUMNS")
}

def repairTableNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("MSCK REPAIR TABLE")
}

def databaseFromV1SessionCatalogNotSpecifiedError(): Throwable = {
new AnalysisException("Database from v1 session catalog is not specified")
}
Expand Down
Expand Up @@ -17,26 +17,31 @@

package org.apache.spark.sql.errors

import java.io.IOException
import java.io.{FileNotFoundException, IOException}
import java.net.URISyntaxException
import java.time.DateTimeException

import org.apache.hadoop.fs.Path
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException

import org.apache.spark.SparkException
import org.apache.spark.{Partition, SparkException}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
import org.apache.spark.sql.catalyst.expressions.{Expression, UnevaluableAggregate}
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.types.{DataType, Decimal, StructType}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String

/**
* Object for grouping all error messages of the query runtime.
* Currently it includes all SparkExceptions and RuntimeExceptions(e.g.
* UnsupportedOperationException, IllegalStateException).
* Object for grouping error messages from (most) exceptions thrown during query execution.
* This does not include exceptions thrown during the eager execution of commands, which are
* grouped into [[QueryCompilationErrors]].
*/
object QueryExecutionErrors {

Expand Down Expand Up @@ -322,4 +327,114 @@ object QueryExecutionErrors {
def compilerError(e: CompileException): Throwable = {
new CompileException(failedToCompileMsg(e), e.getLocation)
}

def unsupportedTableChangeError(e: IllegalArgumentException): Throwable = {
new SparkException(s"Unsupported table change: ${e.getMessage}", e)
}

def notADatasourceRDDPartitionError(split: Partition): Throwable = {
new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split")
}

def endOfStreamError(): Throwable = {
new NoSuchElementException("End of stream")
}

def fallbackV1RelationReportsInconsistentSchemaError(
v2Schema: StructType, v1Schema: StructType): Throwable = {
new IllegalArgumentException(
"The fallback v1 relation reports inconsistent schema:\n" +
"Schema of v2 scan: " + v2Schema + "\n" +
"Schema of v1 relation: " + v1Schema)
}

def cannotDropNonemptyNamespaceError(namespace: Seq[String]): Throwable = {
new SparkException(
s"Cannot drop a non-empty namespace: ${namespace.quoted}. " +
"Use CASCADE option to drop a non-empty namespace.")
}

def noRecordsFromEmptyDataReaderError(): Throwable = {
new IOException("No records should be returned from EmptyDataReader")
}

def fileNotFoundError(e: FileNotFoundException): Throwable = {
new FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"recreating the Dataset/DataFrame involved.")
}

def unsupportedSchemaColumnConvertError(
filePath: String,
column: String,
logicalType: String,
physicalType: String,
e: Exception): Throwable = {
val message = "Parquet column cannot be converted in " +
s"file $filePath. Column: $column, " +
s"Expected: $logicalType, Found: $physicalType"
new QueryExecutionException(message, e)
}

def cannotReadParquetFilesError(e: Exception): Throwable = {
val message = "Encounter error while reading parquet files. " +
"One possible cause: Parquet column cannot be converted in the " +
"corresponding files. Details: "
new QueryExecutionException(message, e)
}

def cannotCreateColumnarReaderError(): Throwable = {
new UnsupportedOperationException("Cannot create columnar reader.")
}

def invalidNamespaceNameError(namespace: Array[String]): Throwable = {
new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}")
}

def unsupportedPartitionTransformError(transform: Transform): Throwable = {
new UnsupportedOperationException(
s"SessionCatalog does not support partition transform: $transform")
}

def missingDatabaseLocationError(): Throwable = {
new IllegalArgumentException("Missing database location")
}

def cannotRemoveReservedPropertyError(property: String): Throwable = {
new UnsupportedOperationException(s"Cannot remove reserved property: $property")
}

def namespaceNotEmptyError(namespace: Array[String]): Throwable = {
new IllegalStateException(s"Namespace ${namespace.quoted} is not empty")
}

def writingJobFailedError(cause: Throwable): Throwable = {
new SparkException("Writing job failed.", cause)
}

def writingJobAbortedError(e: Throwable): Throwable = {
new SparkException("Writing job aborted.", e)
}

def commitDeniedError(
partId: Int, taskId: Long, attemptId: Int, stageId: Int, stageAttempt: Int): Throwable = {
val message = s"Commit denied for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)"
new CommitDeniedException(message, stageId, partId, attemptId)
}

def unsupportedTableWritesError(ident: Identifier): Throwable = {
new SparkException(
s"Table implementation does not support writes: ${ident.quoted}")
}

def cannotCreateJDBCTableWithPartitionsError(): Throwable = {
new UnsupportedOperationException("Cannot create JDBC table with partition")
}

def unsupportedUserSpecifiedSchemaError(): Throwable = {
new UnsupportedOperationException("user-specified schema")
}
}