Skip to content

Commit 270327d

Browse files
mrk-andreevMaxGekk
authored andcommitted
[SPARK-49044][SQL] ValidateExternalType should return child in error
### What changes were proposed in this pull request? In this PR was an extended error message for checkType in ValidateExternalType with information about child nodes. ### Why are the changes needed? When we have mixed schema rows, the error message "{actual} is not a valid external type for schema of {expected}" doesn't help to understand the column with the problem. I suggest adding information about the source column. After fix error message may contain extra info ``` The external type [B is not valid for the type "STRING" at the expression "getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, f3)". ``` #### Example ```scala class ErrorMsgSuite extends AnyFunSuite with SharedSparkContext { test("shouldThrowSchemaError") { val seq: Seq[Row] = Seq( Row( toBytes("0"), toBytes(""), 1L, ), Row( toBytes("0"), toBytes(""), 1L, ), ) val schema: StructType = new StructType() .add("f1", BinaryType) .add("f3", StringType) .add("f2", LongType) val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(seq), schema) val exception = intercept[RuntimeException] { df.show() } assert( exception.getCause.getMessage .contains("[B is not a valid external type for schema of string") ) assertResult( "[B is not a valid external type for schema of string" )(exception.getCause.getMessage) } def toBytes(x: String): Array[Byte] = x.toCharArray.map(_.toByte) } ``` After fix error message may contain extra info ``` The external type [B is not valid for the type "STRING" at the expression "getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, f3)". ``` Example: https://github.com/mrk-andreev/example-spark-schema/blob/main/spark_4.0.0/src/test/scala/ErrorMsgSuite.scala ### Does this PR introduce _any_ user-facing change? This changes extends error message in case of "not a valid external type". #### Example before ``` java.lang.Integer is not a valid external type for schema of double ``` #### Example after ``` The external type [B is not valid for the type "STRING" at the expression "getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, c0)". ``` ### How was this patch tested? This changes covered by new unit test `test("SPARK-49044 ValidateExternalType should return child in error")` and by new integration test `test("SPARK-49044 ValidateExternalType should be user visible")` ### Was this patch authored or co-authored using generative AI tooling? No Closes #47522 from mrk-andreev/improve-error-for-cast-v2. Authored-by: Mark Andreev <mark.andreev@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent 2a6080c commit 270327d

File tree

7 files changed

+133
-13
lines changed

7 files changed

+133
-13
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2177,6 +2177,12 @@
21772177
],
21782178
"sqlState" : "42001"
21792179
},
2180+
"INVALID_EXTERNAL_TYPE" : {
2181+
"message" : [
2182+
"The external type <externalType> is not valid for the type <type> at the expression <expr>."
2183+
],
2184+
"sqlState" : "42K0N"
2185+
},
21802186
"INVALID_EXTRACT_BASE_FIELD_TYPE" : {
21812187
"message" : [
21822188
"Can't extract a value from <base>. Need a complex type [STRUCT, ARRAY, MAP] but got <other>."

common/utils/src/main/resources/error/error-states.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4625,6 +4625,12 @@
46254625
"standard": "N",
46264626
"usedBy": ["Spark"]
46274627
},
4628+
"42K0N": {
4629+
"description": "Invalid external type.",
4630+
"origin": "Spark",
4631+
"standard": "N",
4632+
"usedBy": ["Spark"]
4633+
},
46284634
"42KD0": {
46294635
"description": "Ambiguous name reference.",
46304636
"origin": "Databricks",

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2023,8 +2023,6 @@ case class ValidateExternalType(child: Expression, expected: DataType, externalD
20232023

20242024
override val dataType: DataType = externalDataType
20252025

2026-
private lazy val errMsg = s" is not a valid external type for schema of ${expected.simpleString}"
2027-
20282026
private lazy val checkType: (Any) => Boolean = expected match {
20292027
case _: DecimalType =>
20302028
(value: Any) => {
@@ -2057,14 +2055,12 @@ case class ValidateExternalType(child: Expression, expected: DataType, externalD
20572055
if (checkType(input)) {
20582056
input
20592057
} else {
2060-
throw new RuntimeException(s"${input.getClass.getName}$errMsg")
2058+
throw QueryExecutionErrors.invalidExternalTypeError(
2059+
input.getClass.getName, expected, child)
20612060
}
20622061
}
20632062

20642063
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
2065-
// Use unnamed reference that doesn't create a local field here to reduce the number of fields
2066-
// because errMsgField is used only when the type doesn't match.
2067-
val errMsgField = ctx.addReferenceObj("errMsg", errMsg)
20682064
val input = child.genCode(ctx)
20692065
val obj = input.value
20702066
def genCheckTypes(classes: Seq[Class[_]]): String = {
@@ -2090,14 +2086,22 @@ case class ValidateExternalType(child: Expression, expected: DataType, externalD
20902086
s"$obj instanceof ${CodeGenerator.boxedType(dataType)}"
20912087
}
20922088

2089+
// Use unnamed reference that doesn't create a local field here to reduce the number of fields
2090+
// because errMsgField is used only when the type doesn't match.
2091+
val expectedTypeField = ctx.addReferenceObj(
2092+
"expectedTypeField", expected)
2093+
val childExpressionMsgField = ctx.addReferenceObj(
2094+
"childExpressionMsgField", child)
2095+
20932096
val code = code"""
20942097
${input.code}
20952098
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
20962099
if (!${input.isNull}) {
20972100
if ($typeCheck) {
20982101
${ev.value} = (${CodeGenerator.boxedType(dataType)}) $obj;
20992102
} else {
2100-
throw new RuntimeException($obj.getClass().getName() + $errMsgField);
2103+
throw QueryExecutionErrors.invalidExternalTypeError(
2104+
$obj.getClass().getName(), $expectedTypeField, $childExpressionMsgField);
21012105
}
21022106
}
21032107

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,20 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
478478
)
479479
}
480480

481+
def invalidExternalTypeError(
482+
actualType: String,
483+
expectedType: DataType,
484+
childExpression: Expression): SparkRuntimeException = {
485+
new SparkRuntimeException(
486+
errorClass = "INVALID_EXTERNAL_TYPE",
487+
messageParameters = Map(
488+
"externalType" -> actualType,
489+
"type" -> toSQLType(expectedType),
490+
"expr" -> toSQLExpr(childExpression)
491+
)
492+
)
493+
}
494+
481495
def notOverrideExpectedMethodsError(
482496
className: String, m1: String, m2: String): SparkRuntimeException = {
483497
new SparkRuntimeException(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -288,29 +288,33 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
288288
val encoder = ExpressionEncoder(schema)
289289
toRow(encoder, Row(1.toShort))
290290
}
291-
assert(e1.getCause.getMessage.contains("java.lang.Short is not a valid external type"))
291+
assert(e1.getCause.getMessage.contains("The external type java.lang.Short " +
292+
"is not valid for the type \"INT\""))
292293

293294
val e2 = intercept[RuntimeException] {
294295
val schema = new StructType().add("a", StringType)
295296
val encoder = ExpressionEncoder(schema)
296297
toRow(encoder, Row(1))
297298
}
298-
assert(e2.getCause.getMessage.contains("java.lang.Integer is not a valid external type"))
299+
assert(e2.getCause.getMessage.contains("The external type java.lang.Integer " +
300+
"is not valid for the type \"STRING\""))
299301

300302
val e3 = intercept[RuntimeException] {
301303
val schema = new StructType().add("a",
302304
new StructType().add("b", IntegerType).add("c", StringType))
303305
val encoder = ExpressionEncoder(schema)
304306
toRow(encoder, Row(1 -> "a"))
305307
}
306-
assert(e3.getCause.getMessage.contains("scala.Tuple2 is not a valid external type"))
308+
assert(e3.getCause.getMessage.contains("The external type scala.Tuple2 is not valid " +
309+
"for the type \"STRUCT<b: INT, c: STRING>\""))
307310

308311
val e4 = intercept[RuntimeException] {
309312
val schema = new StructType().add("a", ArrayType(TimestampType))
310313
val encoder = ExpressionEncoder(schema)
311314
toRow(encoder, Row(Array("a")))
312315
}
313-
assert(e4.getCause.getMessage.contains("java.lang.String is not a valid external type"))
316+
assert(e4.getCause.getMessage.contains("The external type java.lang.String is not valid " +
317+
"for the type \"TIMESTAMP\""))
314318
}
315319

316320
private def roundTripArray[T](dt: DataType, nullable: Boolean, data: Array[T]): Unit = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -547,13 +547,55 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
547547
checkObjectExprEvaluation(validateType, input, InternalRow.fromSeq(Seq(Row(input))))
548548
}
549549

550-
checkExceptionInExpression[RuntimeException](
550+
checkExceptionInExpression[SparkRuntimeException](
551551
ValidateExternalType(
552552
GetExternalRowField(inputObject, index = 0, fieldName = "c0"),
553553
DoubleType,
554554
DoubleType),
555555
InternalRow.fromSeq(Seq(Row(1))),
556-
"java.lang.Integer is not a valid external type for schema of double")
556+
"The external type java.lang.Integer is not valid for the type \"DOUBLE\"")
557+
}
558+
559+
test("SPARK-49044 ValidateExternalType should return child in error") {
560+
val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true)
561+
Seq(
562+
(true, BooleanType),
563+
(2.toByte, ByteType),
564+
(5.toShort, ShortType),
565+
(23, IntegerType),
566+
(61L, LongType),
567+
(1.0f, FloatType),
568+
(10.0, DoubleType),
569+
("abcd".getBytes, BinaryType),
570+
("abcd", StringType),
571+
(BigDecimal.valueOf(10), DecimalType.IntDecimal),
572+
(IntervalUtils.stringToInterval(UTF8String.fromString("interval 3 day")),
573+
CalendarIntervalType),
574+
(java.math.BigDecimal.valueOf(10), DecimalType.BigIntDecimal),
575+
(Array(3, 2, 1), ArrayType(IntegerType))
576+
).foreach { case (input, dt) =>
577+
val enc = RowEncoder.encoderForDataType(dt, lenient = false)
578+
val validateType = ValidateExternalType(
579+
GetExternalRowField(inputObject, index = 0, fieldName = "c0"),
580+
dt,
581+
EncoderUtils.lenientExternalDataTypeFor(enc))
582+
checkObjectExprEvaluation(validateType, input, InternalRow.fromSeq(Seq(Row(input))))
583+
}
584+
585+
checkErrorInExpression[SparkRuntimeException](
586+
expression = ValidateExternalType(
587+
GetExternalRowField(inputObject, index = 0, fieldName = "c0"),
588+
DoubleType,
589+
DoubleType),
590+
inputRow = InternalRow.fromSeq(Seq(Row(1))),
591+
errorClass = "INVALID_EXTERNAL_TYPE",
592+
parameters = Map[String, String](
593+
"externalType" -> "java.lang.Integer",
594+
"type" -> "\"DOUBLE\"",
595+
"expr" -> ("\"getexternalrowfield(input[0, org.apache.spark.sql.Row, true], " +
596+
"0, c0)\"")
597+
)
598+
)
557599
}
558600

559601
private def javaMapSerializerFor(
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.SparkRuntimeException
21+
import org.apache.spark.sql.{QueryTest, Row}
22+
import org.apache.spark.sql.test.SharedSparkSession
23+
import org.apache.spark.sql.types.{StringType, StructType}
24+
25+
class ValidateExternalTypeSuite extends QueryTest with SharedSparkSession {
26+
test("SPARK-49044 ValidateExternalType should be user visible") {
27+
checkError(
28+
exception = intercept[SparkRuntimeException] {
29+
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
30+
Row(
31+
"".toCharArray.map(_.toByte)
32+
)
33+
)), new StructType().add("f3", StringType)).show()
34+
}.getCause.asInstanceOf[SparkRuntimeException],
35+
errorClass = "INVALID_EXTERNAL_TYPE",
36+
parameters = Map(
37+
("externalType", "[B"),
38+
("type", "\"STRING\""),
39+
("expr", "\"getexternalrowfield(assertnotnull(" +
40+
"input[0, org.apache.spark.sql.Row, true]), 0, f3)\"")
41+
)
42+
)
43+
}
44+
}

0 commit comments

Comments
 (0)