Skip to content

Commit

Permalink
Group exception messages in sql/streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed May 7, 2021
1 parent 94bbca3 commit 23c7f91
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1351,4 +1351,58 @@ private[spark] object QueryCompilationErrors {
new AnalysisException(
s"Ambiguous field name: $fieldName. Found multiple columns that can match: $names")
}

def operateHiveDataSourceDirectlyError(operation: String): Throwable = {
new AnalysisException("Hive data source can only be used with tables, you can not " +
s"$operation files of Hive data source directly.")
}

def setPathOptionAndCallWithPathParameterError(method: String): Throwable = {
new AnalysisException(
s"""
|There is a 'path' option set and $method() is called with a path
|parameter. Either remove the path option, or call $method() without the parameter.
|To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.
""".stripMargin.replaceAll("\n", " "))
}

def userSpecifiedSchemaWithTextFileError(): Throwable = {
new AnalysisException("User specified schema not supported with `textFile`")
}

def tempViewNotSupportStreamingWriteError(viewName: String): Throwable = {
new AnalysisException(s"Temporary view $viewName doesn't support streaming write")
}

def streamingIntoViewNotSupportedError(viewName: String): Throwable = {
new AnalysisException(s"Streaming into views $viewName is not supported.")
}

def inputSourceDiffersFromDataSourceProviderError(
source: String, tableName: String, table: CatalogTable): Throwable = {
new AnalysisException(s"The input source($source) is different from the table " +
s"$tableName's data source provider(${table.provider.get}).")
}

def tableNotSupportStreamingWriteError(tableName: String, t: Table): Throwable = {
new AnalysisException(s"Table $tableName doesn't support streaming write - $t")
}

def queryNameNotSpecifiedForMemorySinkError(): Throwable = {
new AnalysisException("queryName must be specified for memory sink")
}

def sourceNotSupportedWithContinuousTriggerError(source: String): Throwable = {
new AnalysisException(s"'$source' is not supported with continuous trigger")
}

def columnNotFoundInExistingColumnsError(
columnType: String, columnName: String, validColumnNames: Seq[String]): Throwable = {
new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})")
}

def operationNotSupportPartitioningError(operation: String): Throwable = {
new AnalysisException(s"'$operation' does not support partitioning")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{FileNotFoundException, IOException}
import java.net.URISyntaxException
import java.sql.{SQLException, SQLFeatureNotSupportedException}
import java.time.DateTimeException
import java.util.ConcurrentModificationException

import org.apache.hadoop.fs.{FileStatus, Path}
import org.codehaus.commons.compiler.CompileException
Expand Down Expand Up @@ -823,4 +824,13 @@ object QueryExecutionErrors {
new SparkException(s"Failed to merge incompatible data types ${left.catalogString}" +
s" and ${right.catalogString}")
}

def registeringStreamingQueryListenerError(e: Exception): Throwable = {
new SparkException("Exception when registering StreamingQueryListener", e)
}

def concurrentQueryInstanceError(): Throwable = {
new ConcurrentModificationException(
"Another instance of this query was just started by a concurrent session.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import scala.collection.JavaConverters._

import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2}
Expand Down Expand Up @@ -195,8 +196,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo

private def loadInternal(path: Option[String]): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
throw QueryCompilationErrors.operateHiveDataSourceDirectlyError("read")
}

val optionsWithPath = if (path.isEmpty) {
Expand Down Expand Up @@ -256,9 +256,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
def load(path: String): DataFrame = {
if (!sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path")) {
throw new AnalysisException("There is a 'path' option set and load() is called with a path" +
"parameter. Either remove the path option, or call load() without the parameter. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
throw QueryCompilationErrors.setPathOptionAndCallWithPathParameterError("load")
}
loadInternal(Some(path))
}
Expand Down Expand Up @@ -597,7 +595,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/
def textFile(path: String): Dataset[String] = {
if (userSpecifiedSchema.nonEmpty) {
throw new AnalysisException("User specified schema not supported with `textFile`")
throw QueryCompilationErrors.userSpecifiedSchemaWithTextFileError()
}
text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2}
Expand Down Expand Up @@ -276,9 +277,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
def start(path: String): StreamingQuery = {
if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path")) {
throw new AnalysisException("There is a 'path' option set and start() is called with a " +
"path parameter. Either remove the path option, or call start() without the parameter. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
throw QueryCompilationErrors.setPathOptionAndCallWithPathParameterError("start")
}
startInternal(Some(path))
}
Expand Down Expand Up @@ -332,7 +331,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
// on analyzer to resolve it. Directly lookup only for temp view to provide clearer message.
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) {
throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write")
throw QueryCompilationErrors.tempViewNotSupportStreamingWriteError(tableName)
}

if (!catalog.asTableCatalog.tableExists(identifier)) {
Expand Down Expand Up @@ -361,12 +360,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

def writeToV1Table(table: CatalogTable): StreamingQuery = {
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(s"Streaming into views $tableName is not supported.")
throw QueryCompilationErrors.streamingIntoViewNotSupportedError(tableName)
}
require(table.provider.isDefined)
if (source != table.provider.get) {
throw new AnalysisException(s"The input source($source) is different from the table " +
s"$tableName's data source provider(${table.provider.get}).")
throw QueryCompilationErrors.inputSourceDiffersFromDataSourceProviderError(
source, tableName, table)
}
format(table.provider.get)
.option("path", new Path(table.location).toString).start()
Expand All @@ -380,21 +379,19 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
writeToV1Table(t.v1Table)
case t: V1Table =>
writeToV1Table(t.v1Table)
case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " +
s"write - $t")
case t => throw QueryCompilationErrors.tableNotSupportStreamingWriteError(tableName, t)
}
}

private def startInternal(path: Option[String]): StreamingQuery = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
throw QueryCompilationErrors.operateHiveDataSourceDirectlyError("write")
}

if (source == SOURCE_NAME_MEMORY) {
assertNotPartitioned(SOURCE_NAME_MEMORY)
if (extraOptions.get("queryName").isEmpty) {
throw new AnalysisException("queryName must be specified for memory sink")
throw QueryCompilationErrors.queryNameNotSpecifiedForMemorySinkError()
}
val sink = new MemorySink()
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes))
Expand All @@ -409,7 +406,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
} else if (source == SOURCE_NAME_FOREACH_BATCH) {
assertNotPartitioned(SOURCE_NAME_FOREACH_BATCH)
if (trigger.isInstanceOf[ContinuousTrigger]) {
throw new AnalysisException(s"'$source' is not supported with continuous trigger")
throw QueryCompilationErrors.sourceNotSupportedWithContinuousTriggerError(source)
}
val sink = new ForeachBatchSink[T](foreachBatchWriter, ds.exprEnc)
startQuery(sink, extraOptions)
Expand Down Expand Up @@ -556,13 +553,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private def normalize(columnName: String, columnType: String): String = {
val validColumnNames = df.logicalPlan.output.map(_.name)
validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName))
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})"))
.getOrElse(throw QueryCompilationErrors.columnNotFoundInExistingColumnsError(
columnType, columnName, validColumnNames))
}

private def assertNotPartitioned(operation: String): Unit = {
if (partitioningColumns.isDefined) {
throw new AnalysisException(s"'$operation' does not support partitioning")
throw QueryCompilationErrors.operationNotSupportPartitioningError(operation)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package org.apache.spark.sql.streaming

import java.util.{ConcurrentModificationException, UUID}
import java.util.UUID
import java.util.concurrent.{TimeoutException, TimeUnit}
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
Expand Down Expand Up @@ -81,7 +81,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
} catch {
case e: Exception =>
throw new SparkException("Exception when registering StreamingQueryListener", e)
throw QueryExecutionErrors.registeringStreamingQueryListenerError(e)
}

/**
Expand Down Expand Up @@ -371,8 +371,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(
query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper
if (oldActiveQuery != null) {
throw new ConcurrentModificationException(
"Another instance of this query was just started by a concurrent session.")
throw QueryExecutionErrors.concurrentQueryInstanceError()
}
activeQueries.put(query.id, query)
}
Expand Down

0 comments on commit 23c7f91

Please sign in to comment.