diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala index b17a81549cafc..1efc54cd4f006 100644 --- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala +++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala @@ -19,12 +19,17 @@ package org.apache.spark.errors import java.io.IOException import java.util.concurrent.TimeoutException +import javax.ws.rs.WebApplicationException +import javax.ws.rs.core.Response +import javax.ws.rs.core.Response.Status import org.apache.hadoop.fs.Path import org.apache.spark.{SparkException, TaskNotSerializableException} import org.apache.spark.scheduler.{BarrierJobRunWithDynamicAllocationException, BarrierJobSlotsNumberCheckFailed, BarrierJobUnsupportedRDDChainException} import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager} +import org.apache.spark.status.KVUtils.MetadataMismatchException +import org.apache.spark.status.api.v1.{BadParameterException, ForbiddenException, NotFoundException, ServiceUnavailable} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockNotFoundException, BlockSavedOnDecommissionedBlockManagerException, RDDBlockId, UnrecognizedBlockId} /** @@ -145,6 +150,104 @@ object SparkCoreErrors { new SparkException("Checkpoint dir must be specified.") } + def failToGetApplicationInfoError(): Throwable = { + new NoSuchElementException("Failed to get the application information. " + + "If you are starting up Spark, please wait a while until it's ready.") + } + + def noStageWithIdError(stageId: Int): Throwable = { + new NoSuchElementException(s"No stage with id $stageId") + } + + def failToGetApplicationSummaryError(): Throwable = { + new NoSuchElementException("Failed to get the application summary. " + + "If you are starting up Spark, please wait a while until it's ready.") + } + + def metadataMismatchError(): Throwable = { + new MetadataMismatchException() + } + + def indexOutOfBoundError(idx: Int): Throwable = { + new IndexOutOfBoundsException(idx.toString) + } + + def notAuthorizedUserError(user: String): Throwable = { + new ForbiddenException(s"""user "$user" is not authorized""") + } + + def appNotFoundError(appKey: String): Throwable = { + new NotFoundException(s"no such app: $appKey") + } + + def unknownJobError(jobId: Int): Throwable = { + new NotFoundException(s"unknown job: $jobId") + } + + def invalidExecutorIdError(url: String): Throwable = { + new BadParameterException(s"Invalid executorId: neither '$url' nor number.") + } + + def threadDumpsNotAvailableError(): Throwable = { + new ServiceUnavailable("Thread dumps not available through the history server.") + } + + def noThreadDumpAvailableError(): Throwable = { + new NotFoundException("No thread dump is available.") + } + + def uriNotFoundError(uri: String): Throwable = { + new NotFoundException(uri) + } + + def executorNotExistError(): Throwable = { + new NotFoundException("Executor does not exist.") + } + + def executorIsNotActiveError(): Throwable = { + new BadParameterException("Executor is not active.") + } + + def noRddFoundError(rddId: Int): Throwable = { + new NotFoundException(s"no rdd found w/ id $rddId") + } + + def eventLogsNotAvailableError(appId: String): Throwable = { + new ServiceUnavailable(s"Event logs are not available for app: $appId.") + } + + def unknownAppError(appId: String): Throwable = { + new NotFoundException(s"unknown app: $appId") + } + + def unknownAppWithAttemptError(appId: String, attemptId: String): Throwable = { + new NotFoundException(s"unknown app $appId, attempt $attemptId") + } + + def unknownStageError(stageId: Int): Throwable = { + new NotFoundException(s"unknown stage: $stageId") + } + + def unknownAttemptForStageError(stageId: Int, msg: String): Throwable = { + new NotFoundException(s"unknown attempt for stage $stageId. Found attempts: [$msg]") + } + + def noTaskReportedMetricsError(stageId: Int, stageAttemptId: Int): Throwable = { + new NotFoundException(s"No tasks reported metrics for $stageId / $stageAttemptId yet.") + } + + def badParameterError(param: String, exp: String, actual: String): Throwable = { + new BadParameterException(param, exp, actual) + } + + def webApplicationError(originalValue: String): Throwable = { + new WebApplicationException( + Response.status(Status.BAD_REQUEST) + .entity("Couldn't parse date: " + originalValue) + .build() + ) + } + def askStandaloneSchedulerToShutDownExecutorsError(e: Exception): Throwable = { new SparkException("Error asking standalone scheduler to shut down executors", e) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index a8a16cda96c87..3b882e8ab8cc7 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID import org.apache.spark.ui.scope._ @@ -48,8 +49,7 @@ private[spark] class AppStatusStore( } } catch { case _: NoSuchElementException => - throw new NoSuchElementException("Failed to get the application information. " + - "If you are starting up Spark, please wait a while until it's ready.") + throw SparkCoreErrors.failToGetApplicationInfoError() } } @@ -157,7 +157,7 @@ private[spark] class AppStatusStore( if (it.hasNext()) { it.next().info } else { - throw new NoSuchElementException(s"No stage with id $stageId") + throw SparkCoreErrors.noStageWithIdError(stageId) } } finally { it.close() @@ -659,8 +659,7 @@ private[spark] class AppStatusStore( store.read(classOf[AppSummary], classOf[AppSummary].getName()) } catch { case _: NoSuchElementException => - throw new NoSuchElementException("Failed to get the application summary. " + - "If you are starting up Spark, please wait a while until it's ready.") + throw SparkCoreErrors.failToGetApplicationSummaryError() } } diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index c79f2dcd86533..40cef8f79e0dd 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -26,6 +26,7 @@ import scala.reflect.{classTag, ClassTag} import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.util.kvstore._ @@ -62,7 +63,7 @@ private[spark] object KVUtils extends Logging { db.setMetadata(metadata) } else if (dbMeta != metadata) { db.close() - throw new MetadataMismatchException() + throw SparkCoreErrors.metadataMismatchError() } db diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index fc5fc32d64004..b2c01653aa42b 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -25,6 +25,7 @@ import scala.collection.immutable.{HashSet, TreeSet} import scala.collection.mutable.HashMap import org.apache.spark.JobExecutionStatus +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest} import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} @@ -846,7 +847,7 @@ private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { curr += 1 e = e.next } - if (e != null) e.value else throw new IndexOutOfBoundsException(idx.toString) + if (e != null) e.value else throw SparkCoreErrors.indexOutOfBoundError(idx) } override def iterator: Iterator[v1.RDDPartitionInfo] = { @@ -861,7 +862,7 @@ private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { current = tmp.next tmp.value } else { - throw new NoSuchElementException() + throw SparkCoreErrors.noSuchElementException() } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index cc21c1488f67c..73b06852c98d0 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -28,6 +28,7 @@ import org.glassfish.jersey.server.ServerProperties import org.glassfish.jersey.servlet.ServletContainer import org.apache.spark.SecurityManager +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.ui.{SparkUI, UIUtils} /** @@ -137,14 +138,14 @@ private[v1] trait BaseAppResource extends ApiRequestContext { uiRoot.withSparkUI(appId, Option(attemptId)) { ui => val user = httpRequest.getRemoteUser() if (!ui.securityManager.checkUIViewPermissions(user)) { - throw new ForbiddenException(raw"""user "$user" is not authorized""") + throw SparkCoreErrors.notAuthorizedUserError(user) } fn(ui) } } catch { case _: NoSuchElementException => val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId) - throw new NotFoundException(s"no such app: $appKey") + throw SparkCoreErrors.appNotFoundError(appKey) } } @@ -152,26 +153,26 @@ private[v1] trait BaseAppResource extends ApiRequestContext { try { val user = httpRequest.getRemoteUser() if (!uiRoot.checkUIViewPermissions(appId, Option(attemptId), user)) { - throw new ForbiddenException(raw"""user "$user" is not authorized""") + throw SparkCoreErrors.notAuthorizedUserError(user) } } catch { case _: NoSuchElementException => val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId) - throw new NotFoundException(s"no such app: $appKey") + throw SparkCoreErrors.appNotFoundError(appKey) } } } -private[v1] class ForbiddenException(msg: String) extends WebApplicationException( +private[spark] class ForbiddenException(msg: String) extends WebApplicationException( UIUtils.buildErrorResponse(Response.Status.FORBIDDEN, msg)) -private[v1] class NotFoundException(msg: String) extends WebApplicationException( +private[spark] class NotFoundException(msg: String) extends WebApplicationException( UIUtils.buildErrorResponse(Response.Status.NOT_FOUND, msg)) -private[v1] class ServiceUnavailable(msg: String) extends WebApplicationException( +private[spark] class ServiceUnavailable(msg: String) extends WebApplicationException( UIUtils.buildErrorResponse(Response.Status.SERVICE_UNAVAILABLE, msg)) -private[v1] class BadParameterException(msg: String) extends WebApplicationException( +private[spark] class BadParameterException(msg: String) extends WebApplicationException( UIUtils.buildErrorResponse(Response.Status.BAD_REQUEST, msg)) { def this(param: String, exp: String, actual: String) = { this(raw"""Bad value for parameter "$param". Expected a $exp, got "$actual"""") diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala index ef17168ebce62..93e5477f6f80b 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala @@ -25,6 +25,7 @@ import javax.ws.rs.core.{MediaType, Response, StreamingOutput} import scala.util.control.NonFatal import org.apache.spark.{JobExecutionStatus, SparkContext} +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.status.api.v1 import org.apache.spark.util.Utils @@ -44,7 +45,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { ui.store.job(jobId) } catch { case _: NoSuchElementException => - throw new NotFoundException("unknown job: " + jobId) + throw SparkCoreErrors.unknownJobError(jobId) } } @@ -56,22 +57,21 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @Path("executors/{executorId}/threads") def threadDump(@PathParam("executorId") execId: String): Array[ThreadStackTrace] = withUI { ui => if (execId != SparkContext.DRIVER_IDENTIFIER && !execId.forall(Character.isDigit)) { - throw new BadParameterException( - s"Invalid executorId: neither '${SparkContext.DRIVER_IDENTIFIER}' nor number.") + throw SparkCoreErrors.invalidExecutorIdError(SparkContext.DRIVER_IDENTIFIER) } val safeSparkContext = ui.sc.getOrElse { - throw new ServiceUnavailable("Thread dumps not available through the history server.") + throw SparkCoreErrors.threadDumpsNotAvailableError() } ui.store.asOption(ui.store.executorSummary(execId)) match { case Some(executorSummary) if executorSummary.isActive => val safeThreadDump = safeSparkContext.getExecutorThreadDump(execId).getOrElse { - throw new NotFoundException("No thread dump is available.") + throw SparkCoreErrors.noThreadDumpAvailableError() } safeThreadDump - case Some(_) => throw new BadParameterException("Executor is not active.") - case _ => throw new NotFoundException("Executor does not exist.") + case Some(_) => throw SparkCoreErrors.executorIsNotActiveError() + case _ => throw SparkCoreErrors.executorNotExistError() } } @@ -97,7 +97,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { ui.store.rdd(rddId) } catch { case _: NoSuchElementException => - throw new NotFoundException(s"no rdd found w/ id $rddId") + throw SparkCoreErrors.noRddFoundError(rddId) } } @@ -155,7 +155,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { .build() } catch { case NonFatal(_) => - throw new ServiceUnavailable(s"Event logs are not available for app: $appId.") + throw SparkCoreErrors.eventLogsNotAvailableError(appId) } } @@ -166,7 +166,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @Path("{attemptId}") def applicationAttempt(): Class[OneApplicationAttemptResource] = { if (attemptId != null) { - throw new NotFoundException(httpRequest.getRequestURI()) + throw SparkCoreErrors.uriNotFoundError(httpRequest.getRequestURI()) } classOf[OneApplicationAttemptResource] } @@ -178,7 +178,7 @@ private[v1] class OneApplicationResource extends AbstractApplicationResource { @GET def getApp(): ApplicationInfo = { val app = uiRoot.getApplicationInfo(appId) - app.getOrElse(throw new NotFoundException("unknown app: " + appId)) + app.getOrElse(app.getOrElse(throw SparkCoreErrors.unknownAppError(appId))) } } @@ -192,7 +192,7 @@ private[v1] class OneApplicationAttemptResource extends AbstractApplicationResou app.attempts.find(_.attemptId.contains(attemptId)) } .getOrElse { - throw new NotFoundException(s"unknown app $appId, attempt $attemptId") + throw SparkCoreErrors.unknownAppWithAttemptError(appId, attemptId) } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala b/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala index d8d5e8958b23c..d1f852dc7f2cc 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/SimpleDateParam.scala @@ -18,9 +18,8 @@ package org.apache.spark.status.api.v1 import java.text.{ParseException, SimpleDateFormat} import java.util.{Locale, TimeZone} -import javax.ws.rs.WebApplicationException -import javax.ws.rs.core.Response -import javax.ws.rs.core.Response.Status + +import org.apache.spark.errors.SparkCoreErrors private[v1] class SimpleDateParam(val originalValue: String) { @@ -36,12 +35,7 @@ private[v1] class SimpleDateParam(val originalValue: String) { gmtDay.parse(originalValue).getTime() } catch { case _: ParseException => - throw new WebApplicationException( - Response - .status(Status.BAD_REQUEST) - .entity("Couldn't parse date: " + originalValue) - .build() - ) + throw SparkCoreErrors.webApplicationError(originalValue) } } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 26dfa5af101e3..edc5f1d895f6c 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -22,6 +22,7 @@ import javax.ws.rs.core.{Context, MediaType, MultivaluedMap, UriInfo} import scala.collection.JavaConverters._ +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.status.api.v1.TaskStatus._ import org.apache.spark.ui.UIUtils import org.apache.spark.ui.jobs.ApiHelper._ @@ -75,7 +76,7 @@ private[v1] class StagesResource extends BaseAppResource { if (ret.nonEmpty) { ret } else { - throw new NotFoundException(s"unknown stage: $stageId") + throw SparkCoreErrors.unknownStageError(stageId) } } } @@ -98,13 +99,12 @@ private[v1] class StagesResource extends BaseAppResource { case _: NoSuchElementException => // Change the message depending on whether there are any attempts for the requested stage. val all = ui.store.stageData(stageId, false, taskStatus) - val msg = if (all.nonEmpty) { + if (all.nonEmpty) { val ids = all.map(_.attemptId) - s"unknown attempt for stage $stageId. Found attempts: [${ids.mkString(",")}]" + throw SparkCoreErrors.unknownAttemptForStageError(stageId, ids.mkString(",")) } else { - s"unknown stage: $stageId" + throw SparkCoreErrors.unknownStageError(stageId) } - throw new NotFoundException(msg) } } @@ -117,7 +117,7 @@ private[v1] class StagesResource extends BaseAppResource { : TaskMetricDistributions = withUI { ui => val quantiles = parseQuantileString(quantileString) ui.store.taskSummary(stageId, stageAttemptId, quantiles).getOrElse( - throw new NotFoundException(s"No tasks reported metrics for $stageId / $stageAttemptId yet.")) + throw SparkCoreErrors.noTaskReportedMetricsError(stageId, stageAttemptId)) } @GET @@ -263,7 +263,7 @@ private[v1] class StagesResource extends BaseAppResource { s.toDouble } catch { case nfe: NumberFormatException => - throw new BadParameterException("quantiles", "double", s) + throw SparkCoreErrors.badParameterError("quantiles", "double", s) } } }