diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index f0bbc26aae156..1cad00ad417fb 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -5288,5 +5288,227 @@ "message" : [ "Operation not allowed: only works on table with location provided: " ] + }, + "_LEGACY_ERROR_TEMP_3000" : { + "message" : [ + "Unexpected Py4J server ." + ] + }, + "_LEGACY_ERROR_TEMP_3001" : { + "message" : [ + "EOFException occurred while reading the port number from 's stdout." + ] + }, + "_LEGACY_ERROR_TEMP_3002" : { + "message" : [ + "Data of type is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_3003" : { + "message" : [ + "Could not compute split, block of RDD not found" + ] + }, + "_LEGACY_ERROR_TEMP_3004" : { + "message" : [ + "Attempted to use after its blocks have been removed!" + ] + }, + "_LEGACY_ERROR_TEMP_3005" : { + "message" : [ + "Histogram on either an empty RDD or RDD containing +/-infinity or NaN" + ] + }, + "_LEGACY_ERROR_TEMP_3006" : { + "message" : [ + "empty RDD" + ] + }, + "_LEGACY_ERROR_TEMP_3007" : { + "message" : [ + "Checkpoint block not found! Either the executor", + "that originally checkpointed this partition is no longer alive, or the original RDD is", + "unpersisted. If this problem persists, you may consider using `rdd.checkpoint()`", + "instead, which is slower than local checkpointing but more fault-tolerant." + ] + }, + "_LEGACY_ERROR_TEMP_3008" : { + "message" : [ + "Cannot use map-side combining with array keys." + ] + }, + "_LEGACY_ERROR_TEMP_3009" : { + "message" : [ + "HashPartitioner cannot partition array keys." + ] + }, + "_LEGACY_ERROR_TEMP_3010" : { + "message" : [ + "reduceByKeyLocally() does not support array keys" + ] + }, + "_LEGACY_ERROR_TEMP_3011" : { + "message" : [ + "This RDD lacks a SparkContext. It could happen in the following cases:", + "(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.", + "(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758." + ] + }, + "_LEGACY_ERROR_TEMP_3012" : { + "message" : [ + "Cannot change storage level of an RDD after it was already assigned a level" + ] + }, + "_LEGACY_ERROR_TEMP_3013" : { + "message" : [ + "Can only zip RDDs with same number of elements in each partition" + ] + }, + "_LEGACY_ERROR_TEMP_3014" : { + "message" : [ + "empty collection" + ] + }, + "_LEGACY_ERROR_TEMP_3015" : { + "message" : [ + "countByValueApprox() does not support arrays" + ] + }, + "_LEGACY_ERROR_TEMP_3016" : { + "message" : [ + "Checkpoint directory has not been set in the SparkContext" + ] + }, + "_LEGACY_ERROR_TEMP_3017" : { + "message" : [ + "Invalid checkpoint file: " + ] + }, + "_LEGACY_ERROR_TEMP_3018" : { + "message" : [ + "Failed to create checkpoint path " + ] + }, + "_LEGACY_ERROR_TEMP_3019" : { + "message" : [ + "Checkpoint RDD has a different number of partitions from original RDD. Original", + "RDD [ID: , num of partitions: ];", + "Checkpoint RDD [ID: , num of partitions: ]." + ] + }, + "_LEGACY_ERROR_TEMP_3020" : { + "message" : [ + "Checkpoint dir must be specified." + ] + }, + "_LEGACY_ERROR_TEMP_3021" : { + "message" : [ + "Error asking standalone scheduler to shut down executors" + ] + }, + "_LEGACY_ERROR_TEMP_3022" : { + "message" : [ + "Error stopping standalone scheduler's driver endpoint" + ] + }, + "_LEGACY_ERROR_TEMP_3023" : { + "message" : [ + "Can't run submitMapStage on RDD with 0 partitions" + ] + }, + "_LEGACY_ERROR_TEMP_3024" : { + "message" : [ + "attempted to access non-existent accumulator " + ] + }, + "_LEGACY_ERROR_TEMP_3025" : { + "message" : [ + "TaskSetManagers should only send Resubmitted task statuses for tasks in ShuffleMapStages." + ] + }, + "_LEGACY_ERROR_TEMP_3026" : { + "message" : [ + "duration() called on unfinished task" + ] + }, + "_LEGACY_ERROR_TEMP_3027" : { + "message" : [ + "Unrecognized : " + ] + }, + "_LEGACY_ERROR_TEMP_3028" : { + "message" : [ + "" + ] + }, + "_LEGACY_ERROR_TEMP_3029" : { + "message" : [ + "Exiting due to error from cluster scheduler: " + ] + }, + "_LEGACY_ERROR_TEMP_3030" : { + "message" : [ + "Task has not locked block for writing" + ] + }, + "_LEGACY_ERROR_TEMP_3031" : { + "message" : [ + "Block does not exist" + ] + }, + "_LEGACY_ERROR_TEMP_3032" : { + "message" : [ + "Error occurred while waiting for replication to finish" + ] + }, + "_LEGACY_ERROR_TEMP_3033" : { + "message" : [ + "Unable to register with external shuffle server due to : " + ] + }, + "_LEGACY_ERROR_TEMP_3034" : { + "message" : [ + "Error occurred while waiting for async. reregistration" + ] + }, + "_LEGACY_ERROR_TEMP_3035" : { + "message" : [ + "Unexpected shuffle block with unsupported shuffle resolver " + ] + }, + "_LEGACY_ERROR_TEMP_3036" : { + "message" : [ + "Failure while trying to store block on ." + ] + }, + "_LEGACY_ERROR_TEMP_3037" : { + "message" : [ + "Block was not found even though it's read-locked" + ] + }, + "_LEGACY_ERROR_TEMP_3038" : { + "message" : [ + "get() failed for block even though we held a lock" + ] + }, + "_LEGACY_ERROR_TEMP_3039" : { + "message" : [ + "BlockManager returned null for BlockStatus query: " + ] + }, + "_LEGACY_ERROR_TEMP_3040" : { + "message" : [ + "BlockManagerMasterEndpoint returned false, expected true." + ] + }, + "_LEGACY_ERROR_TEMP_3041" : { + "message" : [ + "" + ] + }, + "_LEGACY_ERROR_TEMP_3042" : { + "message" : [ + "Failed to get block , which is not a shuffle block" + ] } } diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala index 308ee003d5c2c..6c393e18bd8c3 100644 --- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala +++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkException, TaskNotSerializableException} +import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskNotSerializableException} import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.scheduler.{BarrierJobRunWithDynamicAllocationException, BarrierJobSlotsNumberCheckFailed, BarrierJobUnsupportedRDDChainException} import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager} @@ -35,36 +35,59 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockNotFoundException */ private[spark] object SparkCoreErrors { def unexpectedPy4JServerError(other: Object): Throwable = { - new RuntimeException(s"Unexpected Py4J server ${other.getClass}") + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_3000", + messageParameters = Map("class" -> s"${other.getClass}") + ) } def eofExceptionWhileReadPortNumberError( daemonModule: String, daemonExitValue: Option[Int] = null): Throwable = { - val msg = s"EOFException occurred while reading the port number from $daemonModule's" + - s" stdout" + daemonExitValue.map(v => s" and terminated with code: $v.").getOrElse("") - new SparkException(msg) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3001", + messageParameters = Map( + "daemonModule" -> daemonModule, + "additionalMessage" -> + daemonExitValue.map(v => s" and terminated with code: $v.").getOrElse("") + ), cause = null + ) } def unsupportedDataTypeError(other: Any): Throwable = { - new SparkException(s"Data of type $other is not supported") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3002", + messageParameters = Map("other" -> s"$other"), + cause = null + ) } def rddBlockNotFoundError(blockId: BlockId, id: Int): Throwable = { - new Exception(s"Could not compute split, block $blockId of RDD $id not found") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3003", + messageParameters = Map("blockId" -> s"$blockId", "id" -> s"$id"), + cause = null + ) } def blockHaveBeenRemovedError(string: String): Throwable = { - new SparkException(s"Attempted to use $string after its blocks have been removed!") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3004", + messageParameters = Map("string" -> string), + cause = null + ) } def histogramOnEmptyRDDOrContainingInfinityOrNaNError(): Throwable = { - new UnsupportedOperationException( - "Histogram on either an empty RDD or RDD containing +/-infinity or NaN") + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3005", messageParameters = Map.empty + ) } def emptyRDDError(): Throwable = { - new UnsupportedOperationException("empty RDD") + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3006", messageParameters = Map.empty + ) } def pathNotSupportedError(path: String): Throwable = { @@ -74,12 +97,10 @@ private[spark] object SparkCoreErrors { def checkpointRDDBlockIdNotFoundError(rddBlockId: RDDBlockId): Throwable = { new SparkException( - s""" - |Checkpoint block $rddBlockId not found! Either the executor - |that originally checkpointed this partition is no longer alive, or the original RDD is - |unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` - |instead, which is slower than local checkpointing but more fault-tolerant. - """.stripMargin.replaceAll("\n", " ")) + errorClass = "_LEGACY_ERROR_TEMP_3007", + messageParameters = Map("rddBlockId" -> s"$rddBlockId"), + cause = null + ) } def endOfStreamError(): Throwable = { @@ -87,55 +108,73 @@ private[spark] object SparkCoreErrors { } def cannotUseMapSideCombiningWithArrayKeyError(): Throwable = { - new SparkException("Cannot use map-side combining with array keys.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3008", messageParameters = Map.empty, cause = null + ) } def hashPartitionerCannotPartitionArrayKeyError(): Throwable = { - new SparkException("HashPartitioner cannot partition array keys.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3009", messageParameters = Map.empty, cause = null + ) } def reduceByKeyLocallyNotSupportArrayKeysError(): Throwable = { - new SparkException("reduceByKeyLocally() does not support array keys") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3010", messageParameters = Map.empty, cause = null + ) } def rddLacksSparkContextError(): Throwable = { - new SparkException("This RDD lacks a SparkContext. It could happen in the following cases: " + - "\n(1) RDD transformations and actions are NOT invoked by the driver, but inside of other " + - "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid " + - "because the values transformation and count action cannot be performed inside of the " + - "rdd1.map transformation. For more information, see SPARK-5063.\n(2) When a Spark " + - "Streaming job recovers from checkpoint, this exception will be hit if a reference to " + - "an RDD not defined by the streaming job is used in DStream operations. For more " + - "information, See SPARK-13758.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3011", messageParameters = Map.empty, cause = null + ) } def cannotChangeStorageLevelError(): Throwable = { - new UnsupportedOperationException( - "Cannot change storage level of an RDD after it was already assigned a level") + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3012", messageParameters = Map.empty + ) } def canOnlyZipRDDsWithSamePartitionSizeError(): Throwable = { - new SparkException("Can only zip RDDs with same number of elements in each partition") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3013", messageParameters = Map.empty, cause = null + ) } def emptyCollectionError(): Throwable = { - new UnsupportedOperationException("empty collection") + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3014", messageParameters = Map.empty + ) } def countByValueApproxNotSupportArraysError(): Throwable = { - new SparkException("countByValueApprox() does not support arrays") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3015", messageParameters = Map.empty, cause = null + ) } def checkpointDirectoryHasNotBeenSetInSparkContextError(): Throwable = { - new SparkException("Checkpoint directory has not been set in the SparkContext") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3016", messageParameters = Map.empty, cause = null + ) } def invalidCheckpointFileError(path: Path): Throwable = { - new SparkException(s"Invalid checkpoint file: $path") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3017", + messageParameters = Map("path" -> s"$path"), + cause = null + ) } def failToCreateCheckpointPathError(checkpointDirPath: Path): Throwable = { - new SparkException(s"Failed to create checkpoint path $checkpointDirPath") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3018", + messageParameters = Map("checkpointDirPath" -> s"$checkpointDirPath"), + cause = null + ) } def checkpointRDDHasDifferentNumberOfPartitionsFromOriginalRDDError( @@ -144,11 +183,15 @@ private[spark] object SparkCoreErrors { newRDDId: Int, newRDDLength: Int): Throwable = { new SparkException( - s""" - |Checkpoint RDD has a different number of partitions from original RDD. Original - |RDD [ID: $originalRDDId, num of partitions: $originalRDDLength]; - |Checkpoint RDD [ID: $newRDDId, num of partitions: $newRDDLength]. - """.stripMargin.replaceAll("\n", " ")) + errorClass = "_LEGACY_ERROR_TEMP_3019", + messageParameters = Map( + "originalRDDId" -> s"$originalRDDId", + "originalRDDLength" -> s"$originalRDDLength", + "newRDDId" -> s"$newRDDId", + "newRDDLength" -> s"$newRDDLength" + ), + cause = null + ) } def checkpointFailedToSaveError(task: Int, path: Path): Throwable = { @@ -157,15 +200,21 @@ private[spark] object SparkCoreErrors { } def mustSpecifyCheckpointDirError(): Throwable = { - new SparkException("Checkpoint dir must be specified.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3020", messageParameters = Map.empty, cause = null + ) } def askStandaloneSchedulerToShutDownExecutorsError(e: Exception): Throwable = { - new SparkException("Error asking standalone scheduler to shut down executors", e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3021", messageParameters = Map.empty, cause = e + ) } def stopStandaloneSchedulerDriverEndpointError(e: Exception): Throwable = { - new SparkException("Error stopping standalone scheduler's driver endpoint", e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3022", messageParameters = Map.empty, cause = e + ) } def noExecutorIdleError(id: String): Throwable = { @@ -187,16 +236,21 @@ private[spark] object SparkCoreErrors { } def cannotRunSubmitMapStageOnZeroPartitionRDDError(): Throwable = { - new SparkException("Can't run submitMapStage on RDD with 0 partitions") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3023", messageParameters = Map.empty, cause = null + ) } def accessNonExistentAccumulatorError(id: Long): Throwable = { - new SparkException(s"attempted to access non-existent accumulator $id") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3024", messageParameters = Map("id" -> s"$id"), cause = null + ) } def sendResubmittedTaskStatusForShuffleMapStagesOnlyError(): Throwable = { - new SparkException("TaskSetManagers should only send Resubmitted task " + - "statuses for tasks in ShuffleMapStages.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3025", messageParameters = Map.empty, cause = null + ) } def nonEmptyEventQueueAfterTimeoutError(timeoutMillis: Long): Throwable = { @@ -204,21 +258,38 @@ private[spark] object SparkCoreErrors { } def durationCalledOnUnfinishedTaskError(): Throwable = { - new UnsupportedOperationException("duration() called on unfinished task") + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3026", messageParameters = Map.empty + ) } def unrecognizedSchedulerModePropertyError( schedulerModeProperty: String, schedulingModeConf: String): Throwable = { - new SparkException(s"Unrecognized $schedulerModeProperty: $schedulingModeConf") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3027", + messageParameters = Map( + "schedulerModeProperty" -> schedulerModeProperty, + "schedulingModeConf" -> schedulingModeConf + ), + cause = null + ) } def sparkError(errorMsg: String): Throwable = { - new SparkException(errorMsg) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3028", + messageParameters = Map("errorMsg" -> errorMsg), + cause = null + ) } def clusterSchedulerError(message: String): Throwable = { - new SparkException(s"Exiting due to error from cluster scheduler: $message") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3029", + messageParameters = Map("message" -> message), + cause = null + ) } def failToSerializeTaskError(e: Throwable): Throwable = { @@ -230,11 +301,22 @@ private[spark] object SparkCoreErrors { } def taskHasNotLockedBlockError(currentTaskAttemptId: Long, blockId: BlockId): Throwable = { - new SparkException(s"Task $currentTaskAttemptId has not locked block $blockId for writing") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3030", + messageParameters = Map( + "currentTaskAttemptId" -> s"$currentTaskAttemptId", + "blockId" -> s"$blockId" + ), + cause = null + ) } def blockDoesNotExistError(blockId: BlockId): Throwable = { - new SparkException(s"Block $blockId does not exist") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3031", + messageParameters = Map("blockId" -> s"$blockId"), + cause = null + ) } def cannotSaveBlockOnDecommissionedExecutorError(blockId: BlockId): Throwable = { @@ -242,37 +324,69 @@ private[spark] object SparkCoreErrors { } def waitingForReplicationToFinishError(e: Throwable): Throwable = { - new SparkException("Error occurred while waiting for replication to finish", e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3032", messageParameters = Map.empty, cause = e + ) } def unableToRegisterWithExternalShuffleServerError(e: Throwable): Throwable = { - new SparkException(s"Unable to register with external shuffle server due to : ${e.getMessage}", - e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3033", + messageParameters = Map("message" -> e.getMessage), + cause = e + ) } def waitingForAsyncReregistrationError(e: Throwable): Throwable = { - new SparkException("Error occurred while waiting for async. reregistration", e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3034", messageParameters = Map.empty, cause = e + ) } def unexpectedShuffleBlockWithUnsupportedResolverError( shuffleManager: ShuffleManager, blockId: BlockId): Throwable = { - new SparkException(s"Unexpected shuffle block ${blockId} with unsupported shuffle " + - s"resolver ${shuffleManager.shuffleBlockResolver}") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3035", + messageParameters = Map( + "blockId" -> s"$blockId", + "shuffleBlockResolver" -> s"${shuffleManager.shuffleBlockResolver}" + ), + cause = null + ) } def failToStoreBlockOnBlockManagerError( blockManagerId: BlockManagerId, blockId: BlockId): Throwable = { - new SparkException(s"Failure while trying to store block $blockId on $blockManagerId.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3036", + messageParameters = Map( + "blockId" -> s"$blockId", + "blockManagerId" -> s"$blockManagerId" + ), + cause = null + ) } def readLockedBlockNotFoundError(blockId: BlockId): Throwable = { - new SparkException(s"Block $blockId was not found even though it's read-locked") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3037", + messageParameters = Map( + "blockId" -> s"$blockId" + ), + cause = null + ) } def failToGetBlockWithLockError(blockId: BlockId): Throwable = { - new SparkException(s"get() failed for block $blockId even though we held a lock") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3038", + messageParameters = Map( + "blockId" -> s"$blockId" + ), + cause = null + ) } def blockNotFoundError(blockId: BlockId): Throwable = { @@ -284,11 +398,17 @@ private[spark] object SparkCoreErrors { } def blockStatusQueryReturnedNullError(blockId: BlockId): Throwable = { - new SparkException(s"BlockManager returned null for BlockStatus query: $blockId") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3039", + messageParameters = Map("blockId" -> s"$blockId"), + cause = null + ) } def unexpectedBlockManagerMasterEndpointResultError(): Throwable = { - new SparkException("BlockManagerMasterEndpoint returned false, expected true.") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3040", messageParameters = Map.empty, cause = null + ) } def failToCreateDirectoryError(path: String, maxAttempts: Int): Throwable = { @@ -297,7 +417,9 @@ private[spark] object SparkCoreErrors { } def unsupportedOperationError(): Throwable = { - new UnsupportedOperationException() + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3041", messageParameters = Map.empty + ) } def noSuchElementError(): Throwable = { @@ -316,7 +438,11 @@ private[spark] object SparkCoreErrors { } def failToGetNonShuffleBlockError(blockId: BlockId, e: Throwable): Throwable = { - new SparkException(s"Failed to get block $blockId, which is not a shuffle block", e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_3042", + messageParameters = Map("blockId" -> s"$blockId"), + cause = e + ) } def graphiteSinkInvalidProtocolError(invalidProtocol: String): Throwable = {