Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ package org.apache.spark.errors

import java.io.IOException
import java.util.concurrent.TimeoutException
import javax.ws.rs.WebApplicationException
import javax.ws.rs.core.Response
import javax.ws.rs.core.Response.Status

import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkException, TaskNotSerializableException}
import org.apache.spark.scheduler.{BarrierJobRunWithDynamicAllocationException, BarrierJobSlotsNumberCheckFailed, BarrierJobUnsupportedRDDChainException}
import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager}
import org.apache.spark.status.KVUtils.MetadataMismatchException
import org.apache.spark.status.api.v1.{BadParameterException, ForbiddenException, NotFoundException, ServiceUnavailable}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockNotFoundException, BlockSavedOnDecommissionedBlockManagerException, RDDBlockId, UnrecognizedBlockId}

/**
Expand Down Expand Up @@ -145,6 +150,104 @@ object SparkCoreErrors {
new SparkException("Checkpoint dir must be specified.")
}

def failToGetApplicationInfoError(): Throwable = {
new NoSuchElementException("Failed to get the application information. " +
"If you are starting up Spark, please wait a while until it's ready.")
}

def noStageWithIdError(stageId: Int): Throwable = {
new NoSuchElementException(s"No stage with id $stageId")
}

def failToGetApplicationSummaryError(): Throwable = {
new NoSuchElementException("Failed to get the application summary. " +
"If you are starting up Spark, please wait a while until it's ready.")
}

def metadataMismatchError(): Throwable = {
new MetadataMismatchException()
}

def indexOutOfBoundError(idx: Int): Throwable = {
new IndexOutOfBoundsException(idx.toString)
}

def notAuthorizedUserError(user: String): Throwable = {
new ForbiddenException(s"""user "$user" is not authorized""")
}

def appNotFoundError(appKey: String): Throwable = {
new NotFoundException(s"no such app: $appKey")
}

def unknownJobError(jobId: Int): Throwable = {
new NotFoundException(s"unknown job: $jobId")
}

def invalidExecutorIdError(url: String): Throwable = {
new BadParameterException(s"Invalid executorId: neither '$url' nor number.")
}

def threadDumpsNotAvailableError(): Throwable = {
new ServiceUnavailable("Thread dumps not available through the history server.")
}

def noThreadDumpAvailableError(): Throwable = {
new NotFoundException("No thread dump is available.")
}

def uriNotFoundError(uri: String): Throwable = {
new NotFoundException(uri)
}

def executorNotExistError(): Throwable = {
new NotFoundException("Executor does not exist.")
}

def executorIsNotActiveError(): Throwable = {
new BadParameterException("Executor is not active.")
}

def noRddFoundError(rddId: Int): Throwable = {
new NotFoundException(s"no rdd found w/ id $rddId")
}

def eventLogsNotAvailableError(appId: String): Throwable = {
new ServiceUnavailable(s"Event logs are not available for app: $appId.")
}

def unknownAppError(appId: String): Throwable = {
new NotFoundException(s"unknown app: $appId")
}

def unknownAppWithAttemptError(appId: String, attemptId: String): Throwable = {
new NotFoundException(s"unknown app $appId, attempt $attemptId")
}

def unknownStageError(stageId: Int): Throwable = {
new NotFoundException(s"unknown stage: $stageId")
}

def unknownAttemptForStageError(stageId: Int, msg: String): Throwable = {
new NotFoundException(s"unknown attempt for stage $stageId. Found attempts: [$msg]")
}

def noTaskReportedMetricsError(stageId: Int, stageAttemptId: Int): Throwable = {
new NotFoundException(s"No tasks reported metrics for $stageId / $stageAttemptId yet.")
}

def badParameterError(param: String, exp: String, actual: String): Throwable = {
new BadParameterException(param, exp, actual)
}

def webApplicationError(originalValue: String): Throwable = {
new WebApplicationException(
Response.status(Status.BAD_REQUEST)
.entity("Couldn't parse date: " + originalValue)
.build()
)
}

def askStandaloneSchedulerToShutDownExecutorsError(e: Exception): Throwable = {
new SparkException("Error asking standalone scheduler to shut down executors", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
import org.apache.spark.ui.scope._
Expand All @@ -48,8 +49,7 @@ private[spark] class AppStatusStore(
}
} catch {
case _: NoSuchElementException =>
throw new NoSuchElementException("Failed to get the application information. " +
"If you are starting up Spark, please wait a while until it's ready.")
throw SparkCoreErrors.failToGetApplicationInfoError()
}
}

Expand Down Expand Up @@ -157,7 +157,7 @@ private[spark] class AppStatusStore(
if (it.hasNext()) {
it.next().info
} else {
throw new NoSuchElementException(s"No stage with id $stageId")
throw SparkCoreErrors.noStageWithIdError(stageId)
}
} finally {
it.close()
Expand Down Expand Up @@ -659,8 +659,7 @@ private[spark] class AppStatusStore(
store.read(classOf[AppSummary], classOf[AppSummary].getName())
} catch {
case _: NoSuchElementException =>
throw new NoSuchElementException("Failed to get the application summary. " +
"If you are starting up Spark, please wait a while until it's ready.")
throw SparkCoreErrors.failToGetApplicationSummaryError()
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/status/KVUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.reflect.{classTag, ClassTag}
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.util.kvstore._

Expand Down Expand Up @@ -62,7 +63,7 @@ private[spark] object KVUtils extends Logging {
db.setMetadata(metadata)
} else if (dbMeta != metadata) {
db.close()
throw new MetadataMismatchException()
throw SparkCoreErrors.metadataMismatchError()
}

db
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.immutable.{HashSet, TreeSet}
import scala.collection.mutable.HashMap

import org.apache.spark.JobExecutionStatus
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
Expand Down Expand Up @@ -846,7 +847,7 @@ private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] {
curr += 1
e = e.next
}
if (e != null) e.value else throw new IndexOutOfBoundsException(idx.toString)
if (e != null) e.value else throw SparkCoreErrors.indexOutOfBoundError(idx)
}

override def iterator: Iterator[v1.RDDPartitionInfo] = {
Expand All @@ -861,7 +862,7 @@ private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] {
current = tmp.next
tmp.value
} else {
throw new NoSuchElementException()
throw SparkCoreErrors.noSuchElementException()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer

import org.apache.spark.SecurityManager
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.ui.{SparkUI, UIUtils}

/**
Expand Down Expand Up @@ -137,41 +138,41 @@ private[v1] trait BaseAppResource extends ApiRequestContext {
uiRoot.withSparkUI(appId, Option(attemptId)) { ui =>
val user = httpRequest.getRemoteUser()
if (!ui.securityManager.checkUIViewPermissions(user)) {
throw new ForbiddenException(raw"""user "$user" is not authorized""")
throw SparkCoreErrors.notAuthorizedUserError(user)
}
fn(ui)
}
} catch {
case _: NoSuchElementException =>
val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId)
throw new NotFoundException(s"no such app: $appKey")
throw SparkCoreErrors.appNotFoundError(appKey)
}
}

protected def checkUIViewPermissions(): Unit = {
try {
val user = httpRequest.getRemoteUser()
if (!uiRoot.checkUIViewPermissions(appId, Option(attemptId), user)) {
throw new ForbiddenException(raw"""user "$user" is not authorized""")
throw SparkCoreErrors.notAuthorizedUserError(user)
}
} catch {
case _: NoSuchElementException =>
val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId)
throw new NotFoundException(s"no such app: $appKey")
throw SparkCoreErrors.appNotFoundError(appKey)
}
}
}

private[v1] class ForbiddenException(msg: String) extends WebApplicationException(
private[spark] class ForbiddenException(msg: String) extends WebApplicationException(
UIUtils.buildErrorResponse(Response.Status.FORBIDDEN, msg))

private[v1] class NotFoundException(msg: String) extends WebApplicationException(
private[spark] class NotFoundException(msg: String) extends WebApplicationException(
UIUtils.buildErrorResponse(Response.Status.NOT_FOUND, msg))

private[v1] class ServiceUnavailable(msg: String) extends WebApplicationException(
private[spark] class ServiceUnavailable(msg: String) extends WebApplicationException(
UIUtils.buildErrorResponse(Response.Status.SERVICE_UNAVAILABLE, msg))

private[v1] class BadParameterException(msg: String) extends WebApplicationException(
private[spark] class BadParameterException(msg: String) extends WebApplicationException(
UIUtils.buildErrorResponse(Response.Status.BAD_REQUEST, msg)) {
def this(param: String, exp: String, actual: String) = {
this(raw"""Bad value for parameter "$param". Expected a $exp, got "$actual"""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.ws.rs.core.{MediaType, Response, StreamingOutput}
import scala.util.control.NonFatal

import org.apache.spark.{JobExecutionStatus, SparkContext}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.status.api.v1
import org.apache.spark.util.Utils

Expand All @@ -44,7 +45,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
ui.store.job(jobId)
} catch {
case _: NoSuchElementException =>
throw new NotFoundException("unknown job: " + jobId)
throw SparkCoreErrors.unknownJobError(jobId)
}
}

Expand All @@ -56,22 +57,21 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
@Path("executors/{executorId}/threads")
def threadDump(@PathParam("executorId") execId: String): Array[ThreadStackTrace] = withUI { ui =>
if (execId != SparkContext.DRIVER_IDENTIFIER && !execId.forall(Character.isDigit)) {
throw new BadParameterException(
s"Invalid executorId: neither '${SparkContext.DRIVER_IDENTIFIER}' nor number.")
throw SparkCoreErrors.invalidExecutorIdError(SparkContext.DRIVER_IDENTIFIER)
}

val safeSparkContext = ui.sc.getOrElse {
throw new ServiceUnavailable("Thread dumps not available through the history server.")
throw SparkCoreErrors.threadDumpsNotAvailableError()
}

ui.store.asOption(ui.store.executorSummary(execId)) match {
case Some(executorSummary) if executorSummary.isActive =>
val safeThreadDump = safeSparkContext.getExecutorThreadDump(execId).getOrElse {
throw new NotFoundException("No thread dump is available.")
throw SparkCoreErrors.noThreadDumpAvailableError()
}
safeThreadDump
case Some(_) => throw new BadParameterException("Executor is not active.")
case _ => throw new NotFoundException("Executor does not exist.")
case Some(_) => throw SparkCoreErrors.executorIsNotActiveError()
case _ => throw SparkCoreErrors.executorNotExistError()
}
}

Expand All @@ -97,7 +97,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
ui.store.rdd(rddId)
} catch {
case _: NoSuchElementException =>
throw new NotFoundException(s"no rdd found w/ id $rddId")
throw SparkCoreErrors.noRddFoundError(rddId)
}
}

Expand Down Expand Up @@ -155,7 +155,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
.build()
} catch {
case NonFatal(_) =>
throw new ServiceUnavailable(s"Event logs are not available for app: $appId.")
throw SparkCoreErrors.eventLogsNotAvailableError(appId)
}
}

Expand All @@ -166,7 +166,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
@Path("{attemptId}")
def applicationAttempt(): Class[OneApplicationAttemptResource] = {
if (attemptId != null) {
throw new NotFoundException(httpRequest.getRequestURI())
throw SparkCoreErrors.uriNotFoundError(httpRequest.getRequestURI())
}
classOf[OneApplicationAttemptResource]
}
Expand All @@ -178,7 +178,7 @@ private[v1] class OneApplicationResource extends AbstractApplicationResource {
@GET
def getApp(): ApplicationInfo = {
val app = uiRoot.getApplicationInfo(appId)
app.getOrElse(throw new NotFoundException("unknown app: " + appId))
app.getOrElse(app.getOrElse(throw SparkCoreErrors.unknownAppError(appId)))
}

}
Expand All @@ -192,7 +192,7 @@ private[v1] class OneApplicationAttemptResource extends AbstractApplicationResou
app.attempts.find(_.attemptId.contains(attemptId))
}
.getOrElse {
throw new NotFoundException(s"unknown app $appId, attempt $attemptId")
throw SparkCoreErrors.unknownAppWithAttemptError(appId, attemptId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package org.apache.spark.status.api.v1

import java.text.{ParseException, SimpleDateFormat}
import java.util.{Locale, TimeZone}
import javax.ws.rs.WebApplicationException
import javax.ws.rs.core.Response
import javax.ws.rs.core.Response.Status

import org.apache.spark.errors.SparkCoreErrors

private[v1] class SimpleDateParam(val originalValue: String) {

Expand All @@ -36,12 +35,7 @@ private[v1] class SimpleDateParam(val originalValue: String) {
gmtDay.parse(originalValue).getTime()
} catch {
case _: ParseException =>
throw new WebApplicationException(
Response
.status(Status.BAD_REQUEST)
.entity("Couldn't parse date: " + originalValue)
.build()
)
throw SparkCoreErrors.webApplicationError(originalValue)
}
}
}
Expand Down
Loading