diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index fcb9ec249dbb7..1b75f89cc1076 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -187,6 +187,16 @@ ], "sqlState" : "22003" }, + "CODEC_NOT_AVAILABLE" : { + "message" : [ + "The codec is not available. Consider to set the config to ." + ] + }, + "CODEC_SHORT_NAME_NOT_FOUND" : { + "message" : [ + "Cannot find a short name for the codec ." + ] + }, "COLUMN_ALIASES_IS_NOT_ALLOWED" : { "message" : [ "Columns aliases are not allowed in ." diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala index 8abb256432804..f8e7f2db259d9 100644 --- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala +++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala @@ -466,4 +466,16 @@ private[spark] object SparkCoreErrors { "requestedBytes" -> requestedBytes.toString, "receivedBytes" -> receivedBytes.toString).asJava) } + + private def quoteByDefault(elem: String): String = { + "\"" + elem + "\"" + } + + def toConf(conf: String): String = { + quoteByDefault(conf) + } + + def toConfVal(conf: String): String = { + quoteByDefault(conf) + } } diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index eb3dc938d4dc1..0bb392deb3923 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -26,8 +26,9 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory} import net.jpountz.xxhash.XXHashFactory import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkIllegalArgumentException} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.errors.SparkCoreErrors.{toConf, toConfVal} import org.apache.spark.internal.config._ import org.apache.spark.util.Utils @@ -88,8 +89,12 @@ private[spark] object CompressionCodec { } catch { case _: ClassNotFoundException | _: IllegalArgumentException => None } - codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + - s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) + codec.getOrElse(throw new SparkIllegalArgumentException( + errorClass = "CODEC_NOT_AVAILABLE", + messageParameters = Map( + "codecName" -> codecName, + "configKey" -> toConf(configKey), + "configVal" -> toConfVal(FALLBACK_COMPRESSION_CODEC)))) } /** @@ -102,7 +107,9 @@ private[spark] object CompressionCodec { } else { shortCompressionCodecNames .collectFirst { case (k, v) if v == codecName => k } - .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") } + .getOrElse { throw new SparkIllegalArgumentException( + errorClass = "CODEC_SHORT_NAME_NOT_FOUND", + messageParameters = Map("codecName" -> codecName))} } } diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 18520ff96a599..244c007f53925 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import com.google.common.io.ByteStreams -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite, SparkIllegalArgumentException} import org.apache.spark.internal.config.IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED class CompressionCodecSuite extends SparkFunSuite { @@ -127,9 +127,17 @@ class CompressionCodecSuite extends SparkFunSuite { } test("bad compression codec") { - intercept[IllegalArgumentException] { - CompressionCodec.createCodec(conf, "foobar") - } + checkError( + exception = intercept[SparkIllegalArgumentException] { + CompressionCodec.createCodec(conf, "foobar") + }, + errorClass = "CODEC_NOT_AVAILABLE", + parameters = Map( + "codecName" -> "foobar", + "configKey" -> "\"spark.io.compression.codec\"", + "configVal" -> "\"snappy\"" + ) + ) } private def testConcatenationOfSerializedStreams(codec: CompressionCodec): Unit = {