From a480e2102781268c1ba1c1443d8a5b6edf577954 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 21 Jun 2024 16:46:49 -0700 Subject: [PATCH 1/2] commit --- .../resources/error/error-conditions.json | 7 + .../expressions/toFromAvroSqlFunctions.scala | 13 +- .../sql/errors/QueryCompilationErrors.scala | 7 + .../analyzer-results/to_from_avro.sql.out | 132 ++++++++++++++++ .../sql-tests/inputs/to_from_avro.sql | 21 +++ .../sql-tests/results/to_from_avro.sql.out | 144 ++++++++++++++++++ 6 files changed, 322 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/analyzer-results/to_from_avro.sql.out create mode 100644 sql/core/src/test/resources/sql-tests/inputs/to_from_avro.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/to_from_avro.sql.out diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index db8300c18c943..2e012d67d58ba 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -106,6 +106,13 @@ ], "sqlState" : "22KD3" }, + "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE" : { + "message" : [ + "Cannot call the SQL function because the Avro data source is not loaded.", + "Please restart your job or session with the 'spark-avro' package loaded, such as by using the --packages argument on the command line, and then retry your query or command again." + ], + "sqlState" : "22KD3" + }, "BATCH_METADATA_NOT_FOUND" : { "message" : [ "Unable to find batch ." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala index 507511a360071..ca53058230fb8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.util.ArrayBasedMapData +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{MapType, NullType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -107,8 +108,12 @@ case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Ex case _ => Map.empty } - val constructor = + val constructor = try { Utils.classForName("org.apache.spark.sql.avro.AvroDataToCatalyst").getConstructors().head + } catch { + case _: java.lang.ClassNotFoundException => + throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(functionName = "FROM_AVRO") + } val expr = constructor.newInstance(child, schemaValue, optionsValue) expr.asInstanceOf[Expression] } @@ -167,8 +172,12 @@ case class ToAvro(child: Expression, jsonFormatSchema: Expression) case s: UTF8String => Some(s.toString) } - val constructor = + val constructor = try { Utils.classForName("org.apache.spark.sql.avro.CatalystDataToAvro").getConstructors().head + } catch { + case _: java.lang.ClassNotFoundException => + throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(functionName = "TO_AVRO") + } val expr = constructor.newInstance(child, schemaValue) expr.asInstanceOf[Expression] } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 81e6ad027a15f..d3bd265d0459e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -4090,4 +4090,11 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map() ) } + + def avroNotLoadedSqlFunctionsUnusable(functionName: String): Throwable = { + new AnalysisException( + errorClass = "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE", + messageParameters = Map("functionName" -> functionName) + ) + } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/to_from_avro.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/to_from_avro.sql.out new file mode 100644 index 0000000000000..951a4025d5fb2 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/to_from_avro.sql.out @@ -0,0 +1,132 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +create table t as + select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s + from values (1, null), (null, 'a') tab(member0, member1) +-- !query analysis +CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t`, ErrorIfExists, [s] + +- Project [named_struct(u, named_struct(member0, member0#x, member1, member1#x)) AS s#x] + +- SubqueryAlias tab + +- LocalRelation [member0#x, member1#x] + + +-- !query +declare avro_schema string +-- !query analysis +CreateVariable defaultvalueexpression(null, null), false ++- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.avro_schema + + +-- !query +set variable avro_schema = + '{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }' +-- !query analysis +SetVariable [variablereference(system.session.avro_schema=CAST(NULL AS STRING))] ++- Project [{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] } AS avro_schema#x] + +- OneRowRelation + + +-- !query +select from_avro(s, 42, map()) from t +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + "sqlState" : "42K09", + "messageParameters" : { + "hint" : "", + "msg" : "The second argument of the FROM_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value from AVRO format", + "sqlExpr" : "\"fromavro(s, 42, map())\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "from_avro(s, 42, map())" + } ] +} + + +-- !query +select from_avro(s, avro_schema, 42) from t +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + "sqlState" : "42K09", + "messageParameters" : { + "hint" : "", + "msg" : "The third argument of the FROM_AVRO SQL function must be a constant map of strings to strings containing the options to use for converting the value from AVRO format", + "sqlExpr" : "\"fromavro(s, variablereference(system.session.avro_schema='{ \"type\": \"record\", \"name\": \"struct\", \"fields\": [{ \"name\": \"u\", \"type\": [\"int\",\"string\"] }] }'), 42)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 36, + "fragment" : "from_avro(s, avro_schema, 42)" + } ] +} + + +-- !query +select to_avro(s, 42) from t +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + "sqlState" : "42K09", + "messageParameters" : { + "hint" : "", + "msg" : "The second argument of the TO_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value to AVRO format", + "sqlExpr" : "\"toavro(s, 42)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 21, + "fragment" : "to_avro(s, 42)" + } ] +} + + +-- !query +select to_avro(s, avro_schema) as result from t +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE", + "sqlState" : "22KD3", + "messageParameters" : { + "functionName" : "TO_AVRO" + } +} + + +-- !query +select from_avro(result, avro_schema, map()).u from (select null as result) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE", + "sqlState" : "22KD3", + "messageParameters" : { + "functionName" : "FROM_AVRO" + } +} + + +-- !query +drop temporary variable avro_schema +-- !query analysis +DropVariable false ++- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.avro_schema + + +-- !query +drop table t +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t diff --git a/sql/core/src/test/resources/sql-tests/inputs/to_from_avro.sql b/sql/core/src/test/resources/sql-tests/inputs/to_from_avro.sql new file mode 100644 index 0000000000000..12541ff26e24e --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/to_from_avro.sql @@ -0,0 +1,21 @@ +-- Create some temporary test data. +create table t as + select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s + from values (1, null), (null, 'a') tab(member0, member1); +declare avro_schema string; +set variable avro_schema = + '{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }'; + +-- Exercise invalid SQL syntax when calling the 'from_avro' and 'to_avro' functions. +select from_avro(s, 42, map()) from t; +select from_avro(s, avro_schema, 42) from t; +select to_avro(s, 42) from t; + +-- Avro is not loaded in this testing environment, so queries calling the 'from_avro' or 'to_avro' +-- SQL functions that otherwise pass analysis return appropriate "Avro not loaded" errors here. +select to_avro(s, avro_schema) as result from t; +select from_avro(result, avro_schema, map()).u from (select null as result); + +-- Clean up. +drop temporary variable avro_schema; +drop table t; diff --git a/sql/core/src/test/resources/sql-tests/results/to_from_avro.sql.out b/sql/core/src/test/resources/sql-tests/results/to_from_avro.sql.out new file mode 100644 index 0000000000000..f9f491bd70fd1 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/to_from_avro.sql.out @@ -0,0 +1,144 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +create table t as + select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s + from values (1, null), (null, 'a') tab(member0, member1) +-- !query schema +struct<> +-- !query output + + + +-- !query +declare avro_schema string +-- !query schema +struct<> +-- !query output + + + +-- !query +set variable avro_schema = + '{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }' +-- !query schema +struct<> +-- !query output + + + +-- !query +select from_avro(s, 42, map()) from t +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + "sqlState" : "42K09", + "messageParameters" : { + "hint" : "", + "msg" : "The second argument of the FROM_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value from AVRO format", + "sqlExpr" : "\"fromavro(s, 42, map())\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "from_avro(s, 42, map())" + } ] +} + + +-- !query +select from_avro(s, avro_schema, 42) from t +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + "sqlState" : "42K09", + "messageParameters" : { + "hint" : "", + "msg" : "The third argument of the FROM_AVRO SQL function must be a constant map of strings to strings containing the options to use for converting the value from AVRO format", + "sqlExpr" : "\"fromavro(s, variablereference(system.session.avro_schema='{ \"type\": \"record\", \"name\": \"struct\", \"fields\": [{ \"name\": \"u\", \"type\": [\"int\",\"string\"] }] }'), 42)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 36, + "fragment" : "from_avro(s, avro_schema, 42)" + } ] +} + + +-- !query +select to_avro(s, 42) from t +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + "sqlState" : "42K09", + "messageParameters" : { + "hint" : "", + "msg" : "The second argument of the TO_AVRO SQL function must be a constant string containing the JSON representation of the schema to use for converting the value to AVRO format", + "sqlExpr" : "\"toavro(s, 42)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 21, + "fragment" : "to_avro(s, 42)" + } ] +} + + +-- !query +select to_avro(s, avro_schema) as result from t +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE", + "sqlState" : "22KD3", + "messageParameters" : { + "functionName" : "TO_AVRO" + } +} + + +-- !query +select from_avro(result, avro_schema, map()).u from (select null as result) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "AVRO_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE", + "sqlState" : "22KD3", + "messageParameters" : { + "functionName" : "FROM_AVRO" + } +} + + +-- !query +drop temporary variable avro_schema +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table t +-- !query schema +struct<> +-- !query output + From 84ac6fb1fbe79a7d7667264808b9976ea4d811b6 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Fri, 21 Jun 2024 17:03:35 -0700 Subject: [PATCH 2/2] add docs --- docs/sql-data-sources-avro.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index d717899564299..3721f92d93266 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -225,6 +225,24 @@ write.stream( {% endhighlight %} +
+{% highlight sql %} +CREATE TABLE t AS + SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s + FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1); +DECLARE avro_schema STRING; +SET VARIABLE avro_schema = + '{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }'; + +SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t; + +SELECT FROM_AVRO(result, avro_schema, MAP()).u FROM ( + SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t); + +DROP TEMPORARY VARIABLE avro_schema; +DROP TABLE t; +{% endhighlight %} +
## Data Source Option