Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35062][SQL] Group exception messages in sql/streaming #32464

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1391,4 +1391,58 @@ private[spark] object QueryCompilationErrors {
def functionUnsupportedInV2CatalogError(): Throwable = {
new AnalysisException("function is only supported in v1 catalog")
}

def operateHiveDataSourceDirectlyError(operation: String): Throwable = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about cannotOperateOnHiveDataSourceError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about cannotOperateOnHiveDataSourceFilesError ?

new AnalysisException("Hive data source can only be used with tables, you can not " +
s"$operation files of Hive data source directly.")
}

def setPathOptionAndCallWithPathParameterError(method: String): Throwable = {
new AnalysisException(
s"""
|There is a 'path' option set and $method() is called with a path
|parameter. Either remove the path option, or call $method() without the parameter.
|To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.
""".stripMargin.replaceAll("\n", " "))
}

def userSpecifiedSchemaWithTextFileError(): Throwable = {
new AnalysisException("User specified schema not supported with `textFile`")
}

def tempViewNotSupportStreamingWriteError(viewName: String): Throwable = {
new AnalysisException(s"Temporary view $viewName doesn't support streaming write")
}

def streamingIntoViewNotSupportedError(viewName: String): Throwable = {
new AnalysisException(s"Streaming into views $viewName is not supported.")
}

def inputSourceDiffersFromDataSourceProviderError(
source: String, tableName: String, table: CatalogTable): Throwable = {
new AnalysisException(s"The input source($source) is different from the table " +
s"$tableName's data source provider(${table.provider.get}).")
}

def tableNotSupportStreamingWriteError(tableName: String, t: Table): Throwable = {
new AnalysisException(s"Table $tableName doesn't support streaming write - $t")
}

def queryNameNotSpecifiedForMemorySinkError(): Throwable = {
new AnalysisException("queryName must be specified for memory sink")
}

def sourceNotSupportedWithContinuousTriggerError(source: String): Throwable = {
new AnalysisException(s"'$source' is not supported with continuous trigger")
}

def columnNotFoundInExistingColumnsError(
columnType: String, columnName: String, validColumnNames: Seq[String]): Throwable = {
new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})")
}

def operationNotSupportPartitioningError(operation: String): Throwable = {
new AnalysisException(s"'$operation' does not support partitioning")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.URISyntaxException
import java.sql.{SQLException, SQLFeatureNotSupportedException}
import java.time.{DateTimeException, LocalDate}
import java.time.temporal.ChronoField
import java.util.ConcurrentModificationException

import org.apache.hadoop.fs.{FileStatus, Path}
import org.codehaus.commons.compiler.CompileException
Expand Down Expand Up @@ -875,4 +876,13 @@ object QueryExecutionErrors {
def cannotCastUTF8StringToDataTypeError(s: UTF8String, to: DataType): Throwable = {
new DateTimeException(s"Cannot cast $s to $to.")
}

def registeringStreamingQueryListenerError(e: Exception): Throwable = {
new SparkException("Exception when registering StreamingQueryListener", e)
}

def concurrentQueryInstanceError(): Throwable = {
new ConcurrentModificationException(
"Another instance of this query was just started by a concurrent session.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import scala.collection.JavaConverters._

import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2}
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -195,8 +195,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo

private def loadInternal(path: Option[String]): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
throw QueryCompilationErrors.operateHiveDataSourceDirectlyError("read")
}

val optionsWithPath = if (path.isEmpty) {
Expand Down Expand Up @@ -256,9 +255,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
def load(path: String): DataFrame = {
if (!sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path")) {
throw new AnalysisException("There is a 'path' option set and load() is called with a path" +
"parameter. Either remove the path option, or call load() without the parameter. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
throw QueryCompilationErrors.setPathOptionAndCallWithPathParameterError("load")
}
loadInternal(Some(path))
}
Expand Down Expand Up @@ -597,7 +594,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/
def textFile(path: String): Dataset[String] = {
if (userSpecifiedSchema.nonEmpty) {
throw new AnalysisException("User specified schema not supported with `textFile`")
throw QueryCompilationErrors.userSpecifiedSchemaWithTextFileError()
}
text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -276,9 +276,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
def start(path: String): StreamingQuery = {
if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path")) {
throw new AnalysisException("There is a 'path' option set and start() is called with a " +
"path parameter. Either remove the path option, or call start() without the parameter. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
throw QueryCompilationErrors.setPathOptionAndCallWithPathParameterError("start")
}
startInternal(Some(path))
}
Expand Down Expand Up @@ -332,7 +330,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
// on analyzer to resolve it. Directly lookup only for temp view to provide clearer message.
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) {
throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write")
throw QueryCompilationErrors.tempViewNotSupportStreamingWriteError(tableName)
}

if (!catalog.asTableCatalog.tableExists(identifier)) {
Expand Down Expand Up @@ -361,12 +359,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

def writeToV1Table(table: CatalogTable): StreamingQuery = {
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(s"Streaming into views $tableName is not supported.")
throw QueryCompilationErrors.streamingIntoViewNotSupportedError(tableName)
}
require(table.provider.isDefined)
if (source != table.provider.get) {
throw new AnalysisException(s"The input source($source) is different from the table " +
s"$tableName's data source provider(${table.provider.get}).")
throw QueryCompilationErrors.inputSourceDiffersFromDataSourceProviderError(
source, tableName, table)
}
format(table.provider.get)
.option("path", new Path(table.location).toString).start()
Expand All @@ -380,21 +378,19 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
writeToV1Table(t.v1Table)
case t: V1Table =>
writeToV1Table(t.v1Table)
case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " +
s"write - $t")
case t => throw QueryCompilationErrors.tableNotSupportStreamingWriteError(tableName, t)
}
}

private def startInternal(path: Option[String]): StreamingQuery = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
throw QueryCompilationErrors.operateHiveDataSourceDirectlyError("write")
}

if (source == SOURCE_NAME_MEMORY) {
assertNotPartitioned(SOURCE_NAME_MEMORY)
if (extraOptions.get("queryName").isEmpty) {
throw new AnalysisException("queryName must be specified for memory sink")
throw QueryCompilationErrors.queryNameNotSpecifiedForMemorySinkError()
}
val sink = new MemorySink()
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes))
Expand All @@ -409,7 +405,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
} else if (source == SOURCE_NAME_FOREACH_BATCH) {
assertNotPartitioned(SOURCE_NAME_FOREACH_BATCH)
if (trigger.isInstanceOf[ContinuousTrigger]) {
throw new AnalysisException(s"'$source' is not supported with continuous trigger")
throw QueryCompilationErrors.sourceNotSupportedWithContinuousTriggerError(source)
}
val sink = new ForeachBatchSink[T](foreachBatchWriter, ds.exprEnc)
startQuery(sink, extraOptions)
Expand Down Expand Up @@ -556,13 +552,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private def normalize(columnName: String, columnType: String): String = {
val validColumnNames = df.logicalPlan.output.map(_.name)
validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName))
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})"))
.getOrElse(throw QueryCompilationErrors.columnNotFoundInExistingColumnsError(
columnType, columnName, validColumnNames))
}

private def assertNotPartitioned(operation: String): Unit = {
if (partitioningColumns.isDefined) {
throw new AnalysisException(s"'$operation' does not support partitioning")
throw QueryCompilationErrors.operationNotSupportPartitioningError(operation)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package org.apache.spark.sql.streaming

import java.util.{ConcurrentModificationException, UUID}
import java.util.UUID
import java.util.concurrent.{TimeoutException, TimeUnit}
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
Expand Down Expand Up @@ -81,7 +81,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
} catch {
case e: Exception =>
throw new SparkException("Exception when registering StreamingQueryListener", e)
throw QueryExecutionErrors.registeringStreamingQueryListenerError(e)
}

/**
Expand Down Expand Up @@ -371,8 +371,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(
query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper
if (oldActiveQuery != null) {
throw new ConcurrentModificationException(
"Another instance of this query was just started by a concurrent session.")
throw QueryExecutionErrors.concurrentQueryInstanceError()
}
activeQueries.put(query.id, query)
}
Expand Down