Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39346][SQL][3.3] Convert asserts/illegal state exception to internal errors on each phase #36742

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
Expand Down Expand Up @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
ExpectFailure[IllegalStateException](e => {
ExpectFailure[SparkException](e => {
assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
// The offset of `topic2` should be changed from 2 to 1
assert(e.getMessage.contains("was changed from 2 to 1"))
assert(e.getCause.getMessage.contains("was changed from 2 to 1"))
})
)
}
Expand Down Expand Up @@ -764,12 +766,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {

testStream(df)(
StartStream(checkpointLocation = metadataPath.getAbsolutePath),
ExpectFailure[IllegalStateException](e => {
ExpectFailure[SparkException](e => {
assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
Seq(
s"maximum supported log version is v1, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
assert(e.toString.contains(message))
assert(e.getCause.toString.contains(message))
}
}))
}
Expand Down
14 changes: 3 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils

import org.apache.spark.{SparkException, SparkThrowable, TaskContext}
import org.apache.spark.TaskContext
import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
Expand Down Expand Up @@ -3852,19 +3852,11 @@ class Dataset[T] private[sql](
* the internal error exception.
*/
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
try {
SQLExecution.withNewExecutionId(qe, Some(name)) {
SQLExecution.withNewExecutionId(qe, Some(name)) {
QueryExecution.withInternalError(s"""The "$name" action failed.""") {
qe.executedPlan.resetMetrics()
action(qe.executedPlan)
}
} catch {
case e: SparkThrowable => throw e
case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) =>
throw new SparkException(
errorClass = "INTERNAL_ERROR",
messageParameters = Array(s"""The "$name" action failed."""),
cause = e)
case e: Throwable => throw e
}
}

Expand Down
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand Down Expand Up @@ -180,7 +181,9 @@ class QueryExecution(
}

protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
tracker.measurePhase(phase)(block)
QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") {
tracker.measurePhase(phase)(block)
}
}

def simpleString: String = {
Expand Down Expand Up @@ -484,4 +487,30 @@ object QueryExecution {
val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)
prepareForExecution(preparationRules, sparkPlan.clone())
}

/**
* Converts asserts, null pointer, illegal state exceptions to internal errors.
*/
private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {
case e @ (_: java.lang.IllegalStateException | _: java.lang.NullPointerException |
_: java.lang.AssertionError) =>
new SparkException(
errorClass = "INTERNAL_ERROR",
messageParameters = Array(msg +
" Please, fill a bug report in, and provide the full stack trace."),
cause = e)
case e: Throwable =>
e
}

/**
* Catches asserts, null pointer, illegal state exceptions, and converts them to internal errors.
*/
private[sql] def withInternalError[T](msg: String)(block: => T): T = {
try {
block
} catch {
case e: Throwable => throw toInternalError(msg, e)
}
}
}
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream}
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -319,7 +320,8 @@ abstract class StreamExecution(
// This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
// to `new IOException(ie.toString())` before Hadoop 2.8.
updateStatusMessage("Stopped")
case e: Throwable =>
case t: Throwable =>
val e = QueryExecution.toInternalError(msg = s"Execution of the stream $name failed.", t)
streamDeathCause = new StreamingQueryException(
toDebugString(includeLogicalPlan = isInitialized),
s"Query $prettyIdString terminated with exception: ${e.getMessage}",
Expand Down
Expand Up @@ -22,6 +22,7 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.connector.read.streaming
Expand Down Expand Up @@ -93,8 +94,9 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
testStream(streamEvent) (
AddData(inputData, 1, 2, 3, 4, 5, 6),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
ExpectFailure[IllegalStateException] { e =>
assert(e.getMessage.contains("batch 3 doesn't exist"))
ExpectFailure[SparkException] { e =>
assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
assert(e.getCause.getMessage.contains("batch 3 doesn't exist"))
}
)
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous

import java.sql.Timestamp

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
Expand Down Expand Up @@ -440,8 +440,9 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {

testStream(df)(
StartStream(Trigger.Continuous(1)),
ExpectFailure[IllegalStateException] { e =>
e.getMessage.contains("queue has exceeded its maximum")
ExpectFailure[SparkException] { e =>
assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
e.getCause.getMessage.contains("queue has exceeded its maximum")
}
)
}
Expand Down