Skip to content

Commit

Permalink
[SPARK-48688][SQL] Return reasonable error when calling SQL to_avro a…
Browse files Browse the repository at this point in the history
…nd from_avro functions but Avro is not loaded by default

### What changes were proposed in this pull request?

This PR updates the new `to_avro` and `from_avro` SQL functions added in #46977 to return reasonable errors when Avro is not loaded by default.

### Why are the changes needed?

According to the [Apache Spark Avro Data Source Guide](https://spark.apache.org/docs/latest/sql-data-sources-avro.html), Avro is not loaded into Spark by default. With this change, users get reasonable error messages if they try to call the `to_avro` or `from_avro` SQL functions in this case with instructions telling them what to do, rather than obscure Java `ClassNotFoundException`s.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds golden file based test coverage.

### Was this patch authored or co-authored using generative AI tooling?

No GitHub copilot this time.

Closes #47063 from dtenedor/to-from-avro-error-not-loaded.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
dtenedor authored and HyukjinKwon committed Jun 23, 2024
1 parent 4b37eb8 commit e972dae
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 2 deletions.
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@
],
"sqlState" : "22KD3"
},
"AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE" : {
"message" : [
"Cannot call the <functionName> SQL function because the Avro data source is not loaded.",
"Please restart your job or session with the 'spark-avro' package loaded, such as by using the --packages argument on the command line, and then retry your query or command again."
],
"sqlState" : "22KD3"
},
"BATCH_METADATA_NOT_FOUND" : {
"message" : [
"Unable to find batch <batchMetadataFile>."
Expand Down
18 changes: 18 additions & 0 deletions docs/sql-data-sources-avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,24 @@ write.stream(
{% endhighlight %}
</div>

<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TABLE t AS
SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s
FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1);
DECLARE avro_schema STRING;
SET VARIABLE avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';

SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t;

SELECT FROM_AVRO(result, avro_schema, MAP()).u FROM (
SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t);

DROP TEMPORARY VARIABLE avro_schema;
DROP TABLE t;
{% endhighlight %}
</div>
</div>

## Data Source Option
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{MapType, NullType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -107,8 +108,12 @@ case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Ex
case _ =>
Map.empty
}
val constructor =
val constructor = try {
Utils.classForName("org.apache.spark.sql.avro.AvroDataToCatalyst").getConstructors().head
} catch {
case _: java.lang.ClassNotFoundException =>
throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(functionName = "FROM_AVRO")
}
val expr = constructor.newInstance(child, schemaValue, optionsValue)
expr.asInstanceOf[Expression]
}
Expand Down Expand Up @@ -167,8 +172,12 @@ case class ToAvro(child: Expression, jsonFormatSchema: Expression)
case s: UTF8String =>
Some(s.toString)
}
val constructor =
val constructor = try {
Utils.classForName("org.apache.spark.sql.avro.CatalystDataToAvro").getConstructors().head
} catch {
case _: java.lang.ClassNotFoundException =>
throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(functionName = "TO_AVRO")
}
val expr = constructor.newInstance(child, schemaValue)
expr.asInstanceOf[Expression]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4090,4 +4090,11 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map()
)
}

def avroNotLoadedSqlFunctionsUnusable(functionName: String): Throwable = {
new AnalysisException(
errorClass = "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
messageParameters = Map("functionName" -> functionName)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
create table t as
select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s
from values (1, null), (null, 'a') tab(member0, member1)
-- !query analysis
CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t`, ErrorIfExists, [s]
+- Project [named_struct(u, named_struct(member0, member0#x, member1, member1#x)) AS s#x]
+- SubqueryAlias tab
+- LocalRelation [member0#x, member1#x]


-- !query
declare avro_schema string
-- !query analysis
CreateVariable defaultvalueexpression(null, null), false
+- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.avro_schema


-- !query
set variable avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }'
-- !query analysis
SetVariable [variablereference(system.session.avro_schema=CAST(NULL AS STRING))]
+- Project [{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] } AS avro_schema#x]
+- OneRowRelation


-- !query
select from_avro(s, 42, map()) from t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the FROM_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, 42, map())\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 30,
"fragment" : "from_avro(s, 42, map())"
} ]
}


-- !query
select from_avro(s, avro_schema, 42) from t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The third argument of the FROM_AVRO SQL function must be a constant map of strings to strings containing the options to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, variablereference(system.session.avro_schema='{ \"type\": \"record\", \"name\": \"struct\", \"fields\": [{ \"name\": \"u\", \"type\": [\"int\",\"string\"] }] }'), 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 36,
"fragment" : "from_avro(s, avro_schema, 42)"
} ]
}


-- !query
select to_avro(s, 42) from t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the TO_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value to AVRO format",
"sqlExpr" : "\"toavro(s, 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 21,
"fragment" : "to_avro(s, 42)"
} ]
}


-- !query
select to_avro(s, avro_schema) as result from t
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "TO_AVRO"
}
}


-- !query
select from_avro(result, avro_schema, map()).u from (select null as result)
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "FROM_AVRO"
}
}


-- !query
drop temporary variable avro_schema
-- !query analysis
DropVariable false
+- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.avro_schema


-- !query
drop table t
-- !query analysis
DropTable false, false
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t
21 changes: 21 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/to_from_avro.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Create some temporary test data.
create table t as
select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s
from values (1, null), (null, 'a') tab(member0, member1);
declare avro_schema string;
set variable avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';

-- Exercise invalid SQL syntax when calling the 'from_avro' and 'to_avro' functions.
select from_avro(s, 42, map()) from t;
select from_avro(s, avro_schema, 42) from t;
select to_avro(s, 42) from t;

-- Avro is not loaded in this testing environment, so queries calling the 'from_avro' or 'to_avro'
-- SQL functions that otherwise pass analysis return appropriate "Avro not loaded" errors here.
select to_avro(s, avro_schema) as result from t;
select from_avro(result, avro_schema, map()).u from (select null as result);

-- Clean up.
drop temporary variable avro_schema;
drop table t;
144 changes: 144 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/to_from_avro.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
create table t as
select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s
from values (1, null), (null, 'a') tab(member0, member1)
-- !query schema
struct<>
-- !query output



-- !query
declare avro_schema string
-- !query schema
struct<>
-- !query output



-- !query
set variable avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }'
-- !query schema
struct<>
-- !query output



-- !query
select from_avro(s, 42, map()) from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the FROM_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, 42, map())\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 30,
"fragment" : "from_avro(s, 42, map())"
} ]
}


-- !query
select from_avro(s, avro_schema, 42) from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The third argument of the FROM_AVRO SQL function must be a constant map of strings to strings containing the options to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, variablereference(system.session.avro_schema='{ \"type\": \"record\", \"name\": \"struct\", \"fields\": [{ \"name\": \"u\", \"type\": [\"int\",\"string\"] }] }'), 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 36,
"fragment" : "from_avro(s, avro_schema, 42)"
} ]
}


-- !query
select to_avro(s, 42) from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the TO_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value to AVRO format",
"sqlExpr" : "\"toavro(s, 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 21,
"fragment" : "to_avro(s, 42)"
} ]
}


-- !query
select to_avro(s, avro_schema) as result from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "TO_AVRO"
}
}


-- !query
select from_avro(result, avro_schema, map()).u from (select null as result)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "FROM_AVRO"
}
}


-- !query
drop temporary variable avro_schema
-- !query schema
struct<>
-- !query output



-- !query
drop table t
-- !query schema
struct<>
-- !query output

0 comments on commit e972dae

Please sign in to comment.