Skip to content

Commit

Permalink
[SPARK-38464][CORE] Use error classes in org.apache.spark.io
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR aims to change exceptions created in package org.apache.spark.io to use error class.

This PR also adds `toConf` and `toConfVal` in `SparkCoreErrors`.

### Why are the changes needed?
This is to move exceptions created in package org.apache.spark.io to error class.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Updated existing tests.

Closes #41277 from bozhang2820/spark-38464.

Authored-by: Bo Zhang <bo.zhang@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
bozhang2820 authored and MaxGekk committed May 24, 2023
1 parent 0e8e4ae commit 76f82bd
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 8 deletions.
10 changes: 10 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@
],
"sqlState" : "22003"
},
"CODEC_NOT_AVAILABLE" : {
"message" : [
"The codec <codecName> is not available. Consider to set the config <configKey> to <configVal>."
]
},
"CODEC_SHORT_NAME_NOT_FOUND" : {
"message" : [
"Cannot find a short name for the codec <codecName>."
]
},
"COLUMN_ALIASES_IS_NOT_ALLOWED" : {
"message" : [
"Columns aliases are not allowed in <op>."
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -466,4 +466,16 @@ private[spark] object SparkCoreErrors {
"requestedBytes" -> requestedBytes.toString,
"receivedBytes" -> receivedBytes.toString).asJava)
}

private def quoteByDefault(elem: String): String = {
"\"" + elem + "\""
}

def toConf(conf: String): String = {
quoteByDefault(conf)
}

def toConfVal(conf: String): String = {
quoteByDefault(conf)
}
}
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
import net.jpountz.xxhash.XXHashFactory
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkIllegalArgumentException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.errors.SparkCoreErrors.{toConf, toConfVal}
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -88,8 +89,12 @@ private[spark] object CompressionCodec {
} catch {
case _: ClassNotFoundException | _: IllegalArgumentException => None
}
codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
codec.getOrElse(throw new SparkIllegalArgumentException(
errorClass = "CODEC_NOT_AVAILABLE",
messageParameters = Map(
"codecName" -> codecName,
"configKey" -> toConf(configKey),
"configVal" -> toConfVal(FALLBACK_COMPRESSION_CODEC))))
}

/**
Expand All @@ -102,7 +107,9 @@ private[spark] object CompressionCodec {
} else {
shortCompressionCodecNames
.collectFirst { case (k, v) if v == codecName => k }
.getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
.getOrElse { throw new SparkIllegalArgumentException(
errorClass = "CODEC_SHORT_NAME_NOT_FOUND",
messageParameters = Map("codecName" -> codecName))}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.google.common.io.ByteStreams

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.internal.config.IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED

class CompressionCodecSuite extends SparkFunSuite {
Expand Down Expand Up @@ -127,9 +127,17 @@ class CompressionCodecSuite extends SparkFunSuite {
}

test("bad compression codec") {
intercept[IllegalArgumentException] {
CompressionCodec.createCodec(conf, "foobar")
}
checkError(
exception = intercept[SparkIllegalArgumentException] {
CompressionCodec.createCodec(conf, "foobar")
},
errorClass = "CODEC_NOT_AVAILABLE",
parameters = Map(
"codecName" -> "foobar",
"configKey" -> "\"spark.io.compression.codec\"",
"configVal" -> "\"snappy\""
)
)
}

private def testConcatenationOfSerializedStreams(codec: CompressionCodec): Unit = {
Expand Down

0 comments on commit 76f82bd

Please sign in to comment.