Skip to content

Commit

Permalink
[SPARK-35057][SQL] Group exception messages in hive/thriftserver
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR group exception messages in `sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32646 from beliefer/SPARK-35057.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and cloud-fan committed May 27, 2021
1 parent 5cc17ba commit 3e19080
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 17 deletions.
Expand Up @@ -1217,4 +1217,17 @@ object QueryExecutionErrors {
s"Parse Mode: ${FailFastMode.name}. To process malformed records as null " +
"result, try setting the option 'mode' as 'PERMISSIVE'.", e)
}

def remoteOperationsUnsupportedError(): Throwable = {
new RuntimeException("Remote operations not supported")
}

def invalidKerberosConfigForHiveServer2Error(): Throwable = {
new IOException(
"HiveServer2 Kerberos principal or keytab is not correctly configured")
}

def parentSparkUIToAttachTabNotFoundError(): Throwable = {
new SparkException("Parent SparkUI to attach this tab to not found!")
}
}
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver

import java.io.IOException
import java.util.concurrent.RejectedExecutionException

import org.apache.hive.service.ServiceException
import org.apache.hive.service.cli.{HiveSQLException, OperationType}

/**
* Object for grouping error messages from (most) exceptions thrown during
* hive execution with thrift server.
*/
object HiveThriftServerErrors {

def taskExecutionRejectedError(rejected: RejectedExecutionException): Throwable = {
new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
}

def runningQueryError(e: Throwable): Throwable = {
new HiveSQLException(s"Error running query: ${e.toString}", e)
}

def hiveOperatingError(operationType: OperationType, e: Throwable): Throwable = {
new HiveSQLException(s"Error operating $operationType ${e.getMessage}", e)
}

def failedToOpenNewSessionError(e: Throwable): Throwable = {
new HiveSQLException(s"Failed to open new session: $e", e)
}

def cannotLoginToKerberosError(e: Throwable): Throwable = {
new ServiceException("Unable to login to kerberos with given principal/keytab", e)
}

def cannotLoginToSpnegoError(
principal: String, keyTabFile: String, e: IOException): Throwable = {
new ServiceException("Unable to login to spnego with given principal " +
s"$principal and keytab $keyTabFile: $e", e)
}

def failedToStartServiceError(serviceName: String, e: Throwable): Throwable = {
new ServiceException(s"Failed to Start $serviceName", e)
}
}
Expand Up @@ -255,8 +255,7 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
throw HiveThriftServerErrors.taskExecutionRejectedError(rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
Expand Down Expand Up @@ -322,7 +321,7 @@ private[hive] class SparkExecuteStatementOperation(
statementId, e.getMessage, SparkUtils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
case _ => throw HiveThriftServerErrors.runningQueryError(e)
}
}
} finally {
Expand Down
Expand Up @@ -102,7 +102,7 @@ private[hive] trait SparkOperation extends Operation with Logging {
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
case _ => throw HiveThriftServerErrors.hiveOperatingError(getType, e)
}
}
}
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
Expand Down Expand Up @@ -143,7 +144,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {

if (isRemoteMode(sessionState)) {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
throw QueryExecutionErrors.remoteOperationsUnsupportedError()
}
// Respect the configurations set by --hiveconf from the command line
// (based on Hive's CliDriver).
Expand Down Expand Up @@ -330,7 +331,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
}
} else {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
throw QueryExecutionErrors.remoteOperationsUnsupportedError()
}

override def setHiveVariables(hiveVariables: java.util.Map[String, String]): Unit = {
Expand Down
Expand Up @@ -28,14 +28,15 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.Utils
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hive.service.{AbstractService, CompositeService, Service, ServiceException}
import org.apache.hive.service.{AbstractService, CompositeService, Service}
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
import org.apache.hive.service.server.HiveServer2
import org.slf4j.Logger

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._

private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext)
Expand All @@ -56,8 +57,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
val principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)
val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB)
if (principal.isEmpty || keyTabFile.isEmpty) {
throw new IOException(
"HiveServer2 Kerberos principal or keytab is not correctly configured")
throw QueryExecutionErrors.invalidKerberosConfigForHiveServer2Error()
}

val originalUgi = UserGroupInformation.getCurrentUser
Expand All @@ -72,7 +72,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
setSuperField(this, "serviceUGI", sparkServiceUGI)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
throw HiveThriftServerErrors.cannotLoginToKerberosError(e)
}

// Try creating spnego UGI if it is configured.
Expand All @@ -84,8 +84,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
setSuperField(this, "httpUGI", httpUGI)
} catch {
case e: IOException =>
throw new ServiceException("Unable to login to spnego with given principal " +
s"$principal and keytab $keyTabFile: $e", e)
throw HiveThriftServerErrors.cannotLoginToSpnegoError(principal, keyTabFile, e)
}
}
}
Expand Down Expand Up @@ -149,7 +148,7 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
logError(s"Error starting services $getName", e)
invoke(classOf[CompositeService], this, "stop",
classOf[Int] -> new Integer(serviceStartCount))
throw new ServiceException("Failed to Start " + getName, e)
throw HiveThriftServerErrors.failedToStartServiceError(getName, e)
}
}
}
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
import scala.util.control.NonFatal

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
import org.apache.hive.service.cli.SessionHandle
import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2
Expand Down Expand Up @@ -80,7 +80,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
case NonFatal(inner) =>
logWarning("Error closing session", inner)
}
throw new HiveSQLException("Failed to open new session: " + e, e)
throw HiveThriftServerErrors.failedToOpenNewSessionError(e)
}
}

Expand Down
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.sql.hive.thriftserver.ui

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.ui.{SparkUI, SparkUITab}

/**
Expand All @@ -44,7 +45,7 @@ private[thriftserver] class ThriftServerTab(
private[thriftserver] object ThriftServerTab {
def getSparkUI(sparkContext: SparkContext): SparkUI = {
sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
throw QueryExecutionErrors.parentSparkUIToAttachTabNotFoundError()
}
}
}

0 comments on commit 3e19080

Please sign in to comment.