Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48688][SQL] Return reasonable error when calling SQL to_avro and from_avro functions but Avro is not loaded by default #47063

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@
],
"sqlState" : "22KD3"
},
"AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE" : {
"message" : [
"Cannot call the <functionName> SQL function because the Avro data source is not loaded.",
"Please restart your job or session with the 'spark-avro' package loaded, such as by using the --packages argument on the command line, and then retry your query or command again."
],
"sqlState" : "22KD3"
},
"BATCH_METADATA_NOT_FOUND" : {
"message" : [
"Unable to find batch <batchMetadataFile>."
Expand Down
18 changes: 18 additions & 0 deletions docs/sql-data-sources-avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,24 @@ write.stream(
{% endhighlight %}
</div>

<div data-lang="sql" markdown="1">
{% highlight sql %}
CREATE TABLE t AS
SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s
FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1);
DECLARE avro_schema STRING;
SET VARIABLE avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';

SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t;

SELECT FROM_AVRO(result, avro_schema, MAP()).u FROM (
SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t);

DROP TEMPORARY VARIABLE avro_schema;
DROP TABLE t;
{% endhighlight %}
</div>
</div>

## Data Source Option
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{MapType, NullType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -107,8 +108,12 @@ case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Ex
case _ =>
Map.empty
}
val constructor =
val constructor = try {
Utils.classForName("org.apache.spark.sql.avro.AvroDataToCatalyst").getConstructors().head
} catch {
case _: java.lang.ClassNotFoundException =>
throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(functionName = "FROM_AVRO")
}
val expr = constructor.newInstance(child, schemaValue, optionsValue)
expr.asInstanceOf[Expression]
}
Expand Down Expand Up @@ -167,8 +172,12 @@ case class ToAvro(child: Expression, jsonFormatSchema: Expression)
case s: UTF8String =>
Some(s.toString)
}
val constructor =
val constructor = try {
Utils.classForName("org.apache.spark.sql.avro.CatalystDataToAvro").getConstructors().head
} catch {
case _: java.lang.ClassNotFoundException =>
throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(functionName = "TO_AVRO")
}
val expr = constructor.newInstance(child, schemaValue)
expr.asInstanceOf[Expression]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4090,4 +4090,11 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map()
)
}

def avroNotLoadedSqlFunctionsUnusable(functionName: String): Throwable = {
new AnalysisException(
errorClass = "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
messageParameters = Map("functionName" -> functionName)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
create table t as
select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s
from values (1, null), (null, 'a') tab(member0, member1)
-- !query analysis
CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t`, ErrorIfExists, [s]
+- Project [named_struct(u, named_struct(member0, member0#x, member1, member1#x)) AS s#x]
+- SubqueryAlias tab
+- LocalRelation [member0#x, member1#x]


-- !query
declare avro_schema string
-- !query analysis
CreateVariable defaultvalueexpression(null, null), false
+- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.avro_schema


-- !query
set variable avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }'
-- !query analysis
SetVariable [variablereference(system.session.avro_schema=CAST(NULL AS STRING))]
+- Project [{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] } AS avro_schema#x]
+- OneRowRelation


-- !query
select from_avro(s, 42, map()) from t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the FROM_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, 42, map())\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 30,
"fragment" : "from_avro(s, 42, map())"
} ]
}


-- !query
select from_avro(s, avro_schema, 42) from t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The third argument of the FROM_AVRO SQL function must be a constant map of strings to strings containing the options to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, variablereference(system.session.avro_schema='{ \"type\": \"record\", \"name\": \"struct\", \"fields\": [{ \"name\": \"u\", \"type\": [\"int\",\"string\"] }] }'), 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 36,
"fragment" : "from_avro(s, avro_schema, 42)"
} ]
}


-- !query
select to_avro(s, 42) from t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the TO_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value to AVRO format",
"sqlExpr" : "\"toavro(s, 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 21,
"fragment" : "to_avro(s, 42)"
} ]
}


-- !query
select to_avro(s, avro_schema) as result from t
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "TO_AVRO"
}
}


-- !query
select from_avro(result, avro_schema, map()).u from (select null as result)
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "FROM_AVRO"
}
}


-- !query
drop temporary variable avro_schema
-- !query analysis
DropVariable false
+- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.avro_schema


-- !query
drop table t
-- !query analysis
DropTable false, false
+- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t
21 changes: 21 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/to_from_avro.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Create some temporary test data.
create table t as
select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s
from values (1, null), (null, 'a') tab(member0, member1);
declare avro_schema string;
set variable avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';

-- Exercise invalid SQL syntax when calling the 'from_avro' and 'to_avro' functions.
select from_avro(s, 42, map()) from t;
select from_avro(s, avro_schema, 42) from t;
select to_avro(s, 42) from t;

-- Avro is not loaded in this testing environment, so queries calling the 'from_avro' or 'to_avro'
-- SQL functions that otherwise pass analysis return appropriate "Avro not loaded" errors here.
select to_avro(s, avro_schema) as result from t;
select from_avro(result, avro_schema, map()).u from (select null as result);

-- Clean up.
drop temporary variable avro_schema;
drop table t;
144 changes: 144 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/to_from_avro.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
create table t as
select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s
from values (1, null), (null, 'a') tab(member0, member1)
-- !query schema
struct<>
-- !query output



-- !query
declare avro_schema string
-- !query schema
struct<>
-- !query output



-- !query
set variable avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }'
-- !query schema
struct<>
-- !query output



-- !query
select from_avro(s, 42, map()) from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the FROM_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, 42, map())\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 30,
"fragment" : "from_avro(s, 42, map())"
} ]
}


-- !query
select from_avro(s, avro_schema, 42) from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The third argument of the FROM_AVRO SQL function must be a constant map of strings to strings containing the options to use for converting the value from AVRO format",
"sqlExpr" : "\"fromavro(s, variablereference(system.session.avro_schema='{ \"type\": \"record\", \"name\": \"struct\", \"fields\": [{ \"name\": \"u\", \"type\": [\"int\",\"string\"] }] }'), 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 36,
"fragment" : "from_avro(s, avro_schema, 42)"
} ]
}


-- !query
select to_avro(s, 42) from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
"sqlState" : "42K09",
"messageParameters" : {
"hint" : "",
"msg" : "The second argument of the TO_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value to AVRO format",
"sqlExpr" : "\"toavro(s, 42)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 21,
"fragment" : "to_avro(s, 42)"
} ]
}


-- !query
select to_avro(s, avro_schema) as result from t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "TO_AVRO"
}
}


-- !query
select from_avro(result, avro_schema, map()).u from (select null as result)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
"errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE",
"sqlState" : "22KD3",
"messageParameters" : {
"functionName" : "FROM_AVRO"
}
}


-- !query
drop temporary variable avro_schema
-- !query schema
struct<>
-- !query output



-- !query
drop table t
-- !query schema
struct<>
-- !query output