Skip to content

Commit

Permalink
[SPARK-32412][SQL] Unify error handling for spark thrift server opera…
Browse files Browse the repository at this point in the history
…tions

### What changes were proposed in this pull request?

Log error/warn message only once at the server-side for both sync and async modes

### Why are the changes needed?

In b151194 we make the error logging for  SparkExecuteStatementOperation with `runInBackground=true` not duplicated, but the operations with runInBackground=false and other metadata operation still will be log twice which happened in the operation's `runInternal` method and ThriftCLIService.

In this PR, I propose to reflect the logic to get a unified error handling approach.

### Does this PR introduce _any_ user-facing change?

Yes, when spark.sql.hive.thriftServer.async=false and people call sync APIs the error message will be logged only once at server-side.
### How was this patch tested?

locally verified the result in target/unit-test.log

add unit tests.

Closes #29204 from yaooqinn/SPARK-32412.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
yaooqinn authored and cloud-fan committed Jul 30, 2020
1 parent e1d7321 commit 510a165
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 75 deletions.
Expand Up @@ -19,11 +19,9 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Map => JMap}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
Expand All @@ -38,7 +36,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.VariableSubstitution
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.{Utils => SparkUtils}

private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
Expand Down Expand Up @@ -113,7 +110,7 @@ private[hive] class SparkExecuteStatementOperation(
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
Expand Down Expand Up @@ -182,7 +179,7 @@ private[hive] class SparkExecuteStatementOperation(
resultOffset += 1
}
previousFetchEndOffset = resultOffset
log.info(s"Returning result set with ${curRow} rows from offsets " +
logInfo(s"Returning result set with ${curRow} rows from offsets " +
s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId")
resultRowSet
}
Expand Down Expand Up @@ -219,7 +216,9 @@ private[hive] class SparkExecuteStatementOperation(
execute()
}
} catch {
case e: HiveSQLException => setOperationException(e)
case e: HiveSQLException =>
setOperationException(e)
logError(s"Error executing query with $statementId,", e)
}
}
}
Expand All @@ -239,21 +238,7 @@ private[hive] class SparkExecuteStatementOperation(
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
} catch onError()
}
}

Expand Down Expand Up @@ -294,30 +279,7 @@ private[hive] class SparkExecuteStatementOperation(
}
dataTypes = result.schema.fields.map(_.dataType)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
val currentState = getStatus().getState()
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
}
}
onError(needCancel = true)
} finally {
synchronized {
if (!getStatus.getState.isTerminal) {
Expand Down Expand Up @@ -348,9 +310,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
// RDDs will be cleaned automatically upon garbage collection.
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}

Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.concurrent.RejectedExecutionException

import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.Operation

Expand Down Expand Up @@ -94,15 +96,32 @@ private[hive] trait SparkOperation extends Operation with Logging {
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}

protected def onError(): PartialFunction[Throwable, Unit] = {
protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError(s"Error operating $getType with $statementId", e)
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId)
val currentState = getStatus.getState
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case rejected: RejectedExecutionException =>
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case _ =>
val tips = if (shouldRunAsync()) " in background" else ""
throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e)
}
}
}
}
Expand Up @@ -21,6 +21,8 @@ import java.sql.SQLException

import org.apache.hive.service.cli.HiveSQLException

import org.apache.spark.sql.hive.HiveUtils

trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("the scratch dir will be deleted during server start but recreated with new operation") {
Expand Down Expand Up @@ -52,31 +54,51 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("Full stack traces as error message for jdbc or thrift client") {
val sql = "select date_sub(date'2011-11-11', '1.2')"
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]

val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
val e = intercept[HiveSQLException] {
client.executeStatement(
sessionHandle,
sql,
confOverlay)
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val e = intercept[HiveSQLException] {
client.executeStatement(sessionHandle, sql, confOverlay)
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}

withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay)
var status = client.getOperationStatus(opHandle)
while (!status.getState.isTerminal) {
Thread.sleep(10)
status = client.getOperationStatus(opHandle)
}
val e = status.getOperationException

assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}

withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
Seq("true", "false").foreach { value =>
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) {
withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
}
assert(e.getMessage.contains(
"The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains(
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
}
Expand Down

0 comments on commit 510a165

Please sign in to comment.