Skip to content
Closed
40 changes: 25 additions & 15 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@
],
"sqlState" : "22008"
},
"DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION" : {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the error class have at least one test? If not, please add one. The same question about other new error classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure there is one already: _LEGACY_ERROR_TEMP_2118.

Please double check before adding more new error messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaliujia apart from INTEGER_OVERFLOW rest of the changes are merely replacing name for the existing class metaname
@MaxGekk all of these are already unit tested within relevant use cases via intercept[] but I can work on changing those cases to assert additionally the class name/msg using checkError( ... errorClass="CLASS_NAME")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can work on changing those cases to assert additionally the class name/msg using checkError ...

Please, do that. The purpose is to make the tests independent from error messages (only valuable message parameters), so, in this way tech editors could edit error message in error-classes.json and don't worry about internal Spark tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, working on it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk Can you please check if all good or not?

"message" : [
"Decimal precision <precision> exceeds max precision <maxPrecision>."
]
},
"DEFAULT_DATABASE_NOT_EXISTS" : {
"message" : [
"Default database <defaultDatabase> does not exist, please create it first or change default database to 'default'."
Expand Down Expand Up @@ -416,6 +421,16 @@
}
}
},
"INCORRECT_END_OFFSET" : {
"message" : [
"Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but it's <endSeconds> now."
]
},
"INCORRECT_RUMP_UP_RATE" : {
"message" : [
"Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but 'rampUpTimeSeconds' is <rampUpTimeSeconds>."
]
},
"INDEX_ALREADY_EXISTS" : {
"message" : [
"Cannot create the index because it already exists. <message>."
Expand Down Expand Up @@ -605,6 +620,11 @@
],
"sqlState" : "22005"
},
"OUT_OF_DECIMAL_TYPE_RANGE" : {
"message" : [
"Out of decimal type range: <value>."
]
},
"PARSE_CHAR_MISSING_LENGTH" : {
"message" : [
"DataType <type> requires a length parameter, for example <type>(10). Please specify the length."
Expand Down Expand Up @@ -814,6 +834,11 @@
},
"sqlState" : "42000"
},
"UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION" : {
"message" : [
"Unscaled value too large for precision. If necessary set <ansiConfig> to false to bypass this error."
]
},
"UNSUPPORTED_DATATYPE" : {
"message" : [
"Unsupported data type <typeName>"
Expand Down Expand Up @@ -3707,21 +3732,6 @@
"Unexpected: <o>"
]
},
"_LEGACY_ERROR_TEMP_2117" : {
"message" : [
"Unscaled value too large for precision. If necessary set <ansiConfig> to false to bypass this error."
]
},
"_LEGACY_ERROR_TEMP_2118" : {
"message" : [
"Decimal precision <precision> exceeds max precision <maxPrecision>"
]
},
"_LEGACY_ERROR_TEMP_2119" : {
"message" : [
"out of decimal type range: <str>"
]
},
"_LEGACY_ERROR_TEMP_2120" : {
"message" : [
"Do not support array of type <clazz>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1260,27 +1260,30 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {

def unscaledValueTooLargeForPrecisionError(): SparkArithmeticException = {
new SparkArithmeticException(
errorClass = "_LEGACY_ERROR_TEMP_2117",
messageParameters = Map("ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
messageParameters = Map(
"ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
context = Array.empty,
summary = "")
}

def decimalPrecisionExceedsMaxPrecisionError(
precision: Int, maxPrecision: Int): SparkArithmeticException = {
new SparkArithmeticException(
errorClass = "_LEGACY_ERROR_TEMP_2118",
errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
messageParameters = Map(
"precision" -> precision.toString(),
"maxPrecision" -> maxPrecision.toString()),
"precision" -> precision.toString,
"maxPrecision" -> maxPrecision.toString
),
context = Array.empty,
summary = "")
}

def outOfDecimalTypeRangeError(str: UTF8String): SparkArithmeticException = {
new SparkArithmeticException(
errorClass = "_LEGACY_ERROR_TEMP_2119",
messageParameters = Map("str" -> str.toString()),
errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
messageParameters = Map(
"value" -> str.toString),
context = Array.empty,
summary = "")
}
Expand Down Expand Up @@ -2384,8 +2387,28 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
new SparkException("Foreach writer has been aborted due to a task failure")
}

def integerOverflowError(message: String): Throwable = {
new ArithmeticException(s"Integer overflow. $message")
def incorrectRumpUpRate(rowsPerSecond: Long,
maxSeconds: Long,
rampUpTimeSeconds: Long): Throwable = {
new SparkRuntimeException(
errorClass = "INCORRECT_RUMP_UP_RATE",
messageParameters = Map(
"rowsPerSecond" -> rowsPerSecond.toString,
"maxSeconds" -> maxSeconds.toString,
"rampUpTimeSeconds" -> rampUpTimeSeconds.toString
))
}

def incorrectEndOffset(rowsPerSecond: Long,
maxSeconds: Long,
endSeconds: Long): Throwable = {
new SparkRuntimeException(
errorClass = "INCORRECT_END_OFFSET",
messageParameters = Map(
"rowsPerSecond" -> rowsPerSecond.toString,
"maxSeconds" -> maxSeconds.toString,
"endSeconds" -> endSeconds.toString
))
}

def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class CastWithAnsiOnSuite extends CastSuiteBase with QueryErrorsBase {
Decimal("12345678901234567890123456789012345678"))
checkExceptionInExpression[ArithmeticException](
cast("123456789012345678901234567890123456789", DecimalType(38, 0)),
"out of decimal type range")
"Out of decimal type range")
checkExceptionInExpression[ArithmeticException](
cast("12345678901234567890123456789012345678", DecimalType(38, 1)),
"cannot be represented as Decimal(38, 1)")
Expand All @@ -262,7 +262,7 @@ class CastWithAnsiOnSuite extends CastSuiteBase with QueryErrorsBase {
Decimal("60000000000000000000000000000000000000"))
checkExceptionInExpression[ArithmeticException](
cast("6E+38", DecimalType(38, 0)),
"out of decimal type range")
"Out of decimal type range")
checkExceptionInExpression[ArithmeticException](
cast("6E+37", DecimalType(38, 1)),
"cannot be represented as Decimal(38, 1)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.types

import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkArithmeticException, SparkFunSuite, SparkNumberFormatException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -60,11 +60,27 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester with SQLHelper
checkDecimal(Decimal(1000000000000000000L, 20, 2), "10000000000000000.00", 20, 2)
checkDecimal(Decimal(Long.MaxValue), Long.MaxValue.toString, 20, 0)
checkDecimal(Decimal(Long.MinValue), Long.MinValue.toString, 20, 0)
intercept[ArithmeticException](Decimal(170L, 2, 1))
intercept[ArithmeticException](Decimal(170L, 2, 0))
intercept[ArithmeticException](Decimal(BigDecimal("10.030"), 2, 1))
intercept[ArithmeticException](Decimal(BigDecimal("-9.95"), 2, 1))
intercept[ArithmeticException](Decimal(1e17.toLong, 17, 0))

checkError(
exception = intercept[SparkArithmeticException](Decimal(170L, 2, 1)),
errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
checkError(
exception = intercept[SparkArithmeticException](Decimal(170L, 2, 0)),
errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
checkError(
exception = intercept[SparkArithmeticException](Decimal(BigDecimal("10.030"), 2, 1)),
errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
parameters = Map("precision" -> "3", "maxPrecision" -> "2"))
checkError(
exception = intercept[SparkArithmeticException](Decimal(BigDecimal("-9.95"), 2, 1)),
errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
parameters = Map("precision" -> "3", "maxPrecision" -> "2"))
checkError(
exception = intercept[SparkArithmeticException](Decimal(1e17.toLong, 17, 0)),
errorClass = "UNSCALED_VALUE_TOO_LARGE_FOR_PRECISION",
parameters = Map("ansiConfig" -> "\"spark.sql.ansi.enabled\""))
}

test("creating decimals with negative scale under legacy mode") {
Expand Down Expand Up @@ -294,8 +310,11 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester with SQLHelper

def checkOutOfRangeFromString(string: String): Unit = {
assert(Decimal.fromString(UTF8String.fromString(string)) === null)
val e = intercept[ArithmeticException](Decimal.fromStringANSI(UTF8String.fromString(string)))
assert(e.getMessage.contains("out of decimal type range"))
checkError(
exception = intercept[SparkArithmeticException](
Decimal.fromStringANSI(UTF8String.fromString(string))),
errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
parameters = Map("value" -> string))
}

checkFromString("12345678901234567890123456789012345678")
Expand All @@ -311,9 +330,15 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester with SQLHelper
checkOutOfRangeFromString("6.0790316E+25569151")

assert(Decimal.fromString(UTF8String.fromString("str")) === null)
val e = intercept[NumberFormatException](Decimal.fromStringANSI(UTF8String.fromString("str")))
assert(e.getMessage.contains(
"""The value 'str' of the type "STRING" cannot be cast to "DECIMAL(10,0)""""))
checkError(
exception = intercept[SparkNumberFormatException](
Decimal.fromStringANSI(UTF8String.fromString("str"))),
errorClass = "CAST_INVALID_INPUT",
parameters = Map(
"expression" -> "'str'",
"sourceType" -> "\"STRING\"",
"targetType" -> "\"DECIMAL(10,0)\"",
"ansiConfig" -> "\"spark.sql.ansi.enabled\""))
}

test("SPARK-35841: Casting string to decimal type doesn't work " +
Expand All @@ -333,7 +358,11 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester with SQLHelper
val values = Array("7.836725755512218E38")
for (string <- values) {
assert(Decimal.fromString(UTF8String.fromString(string)) === null)
intercept[ArithmeticException](Decimal.fromStringANSI(UTF8String.fromString(string)))
checkError(
exception = intercept[SparkArithmeticException](
Decimal.fromStringANSI(UTF8String.fromString(string))),
errorClass = "OUT_OF_DECIMAL_TYPE_RANGE",
parameters = Map("value" -> string))
}

withSQLConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED.key -> "true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ class RateStreamMicroBatchStream(
private val maxSeconds = Long.MaxValue / rowsPerSecond

if (rampUpTimeSeconds > maxSeconds) {
throw QueryExecutionErrors.integerOverflowError(
s"Max offset with $rowsPerSecond rowsPerSecond" +
s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
throw QueryExecutionErrors.incorrectRumpUpRate(
rowsPerSecond, maxSeconds, rampUpTimeSeconds)
}

private[sources] val creationTimeMs = {
Expand Down Expand Up @@ -120,8 +119,7 @@ class RateStreamMicroBatchStream(
val endSeconds = end.asInstanceOf[LongOffset].offset
assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
if (endSeconds > maxSeconds) {
throw QueryExecutionErrors.integerOverflowError("Max offset with " +
s"$rowsPerSecond rowsPerSecond is $maxSeconds, but it's $endSeconds now.")
throw QueryExecutionErrors.incorrectEndOffset(rowsPerSecond, maxSeconds, endSeconds)
}
// Fix "lastTimeMs" for recovery
if (lastTimeMs < TimeUnit.SECONDS.toMillis(endSeconds) + creationTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
Expand Down Expand Up @@ -196,6 +197,45 @@ class RateStreamProviderSuite extends StreamTest {
}
}

testQuietly("microbatch - rump up error") {
val e = intercept[SparkRuntimeException](
new RateStreamMicroBatchStream(
rowsPerSecond = Long.MaxValue,
rampUpTimeSeconds = 2,
options = CaseInsensitiveStringMap.empty(),
checkpointLocation = ""))

checkError(
exception = e,
errorClass = "INCORRECT_RUMP_UP_RATE",
parameters = Map(
"rowsPerSecond" -> Long.MaxValue.toString,
"maxSeconds" -> "1",
"rampUpTimeSeconds" -> "2"))
}

testQuietly("microbatch - end offset error") {
withTempDir { temp =>
val maxSeconds = (Long.MaxValue / 100)
val endSeconds = Long.MaxValue
val e = intercept[SparkRuntimeException](
new RateStreamMicroBatchStream(
rowsPerSecond = 100,
rampUpTimeSeconds = 2,
options = CaseInsensitiveStringMap.empty(),
checkpointLocation = temp.getCanonicalPath)
.planInputPartitions(LongOffset(1), LongOffset(endSeconds)))

checkError(
exception = e,
errorClass = "INCORRECT_END_OFFSET",
parameters = Map(
"rowsPerSecond" -> "100",
"maxSeconds" -> maxSeconds.toString,
"endSeconds" -> endSeconds.toString))
}
}

test("valueAtSecond") {
import RateStreamProvider._

Expand Down Expand Up @@ -270,8 +310,8 @@ class RateStreamProviderSuite extends StreamTest {
.distinct()
testStream(input)(
AdvanceRateManualClock(2),
ExpectFailure[ArithmeticException](t => {
Seq("overflow", "rowsPerSecond").foreach { msg =>
ExpectFailure[SparkRuntimeException](t => {
Seq("INCORRECT_END_OFFSET", "rowsPerSecond").foreach { msg =>
assert(t.getMessage.contains(msg))
}
})
Expand Down