Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47002][Python] Return better error message if UDTF 'analyze' method 'orderBy' field accidentally returns a list of strings #45062

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions python/pyspark/sql/worker/analyze_udtf.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
write_with_length,
SpecialLengths,
)
from pyspark.sql.functions import PartitioningColumn, SelectedColumn
from pyspark.sql.functions import OrderingColumn, PartitioningColumn, SelectedColumn
from pyspark.sql.types import _parse_datatype_json_string, StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
from pyspark.util import handle_worker_exception
Expand Down Expand Up @@ -163,6 +163,18 @@ def format_error(msg: str) -> str:
but the 'schema' field had the wrong type: {type(result.schema)}"""
)
)

def invalid_analyze_result_field(field_name: str, expected_field: str) -> PySparkValueError:
return PySparkValueError(
format_error(
f"""
{error_prefix} because the static 'analyze' method returned an
'AnalyzeResult' object with the '{field_name}' field set to a value besides a
list or tuple of '{expected_field}' objects. Please update the table function
and then try the query again."""
)
)

has_table_arg = any(arg.isTable for arg in args) or any(
arg.isTable for arg in kwargs.values()
)
Expand Down Expand Up @@ -190,32 +202,18 @@ def format_error(msg: str) -> str:
set to empty, and then try the query again."""
)
)
elif isinstance(result.partitionBy, (list, tuple)) and (
len(result.partitionBy) > 0
and not all([isinstance(val, PartitioningColumn) for val in result.partitionBy])
elif not isinstance(result.partitionBy, (list, tuple)) or not all(
isinstance(val, PartitioningColumn) for val in result.partitionBy
):
raise PySparkValueError(
format_error(
f"""
{error_prefix} because the static 'analyze' method returned an
'AnalyzeResult' object with the 'partitionBy' field set to a value besides a
list or tuple of 'PartitioningColumn' objects. Please update the table function
and then try the query again."""
)
)
elif isinstance(result.select, (list, tuple)) and (
len(result.select) > 0
and not all([isinstance(val, SelectedColumn) for val in result.select])
raise invalid_analyze_result_field("partitionBy", "PartitioningColumn")
elif not isinstance(result.orderBy, (list, tuple)) or not all(
isinstance(val, OrderingColumn) for val in result.orderBy
):
raise PySparkValueError(
format_error(
f"""
{error_prefix} because the static 'analyze' method returned an
'AnalyzeResult' object with the 'select' field set to a value besides a
list or tuple of 'SelectedColumn' objects. Please update the table function
and then try the query again."""
)
)
raise invalid_analyze_result_field("orderBy", "OrderingColumn")
elif not isinstance(result.select, (list, tuple)) or not all(
isinstance(val, SelectedColumn) for val in result.select
):
raise invalid_analyze_result_field("select", "SelectedColumn")

# Return the analyzed schema.
write_with_length(result.schema.json().encode("utf-8"), outfile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,26 @@ org.apache.spark.sql.AnalysisException
}


-- !query
SELECT * FROM UDTFInvalidOrderByStringList(TABLE(t2))
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON",
"sqlState" : "38000",
"messageParameters" : {
"msg" : "Failed to evaluate the user-defined table function 'UDTFInvalidOrderByStringList' because the static 'analyze' method returned an 'AnalyzeResult' object with the 'orderBy' field set to a value besides a list or tuple of 'OrderingColumn' objects. Please update the table function and then try the query again."
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 15,
"stopIndex" : 53,
"fragment" : "UDTFInvalidOrderByStringList(TABLE(t2))"
} ]
}


-- !query
SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2))
-- !query analysis
Expand Down
1 change: 1 addition & 0 deletions sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ SELECT * FROM UDTFInvalidSelectExprParseError(TABLE(t2));
SELECT * FROM UDTFInvalidSelectExprStringValue(TABLE(t2));
SELECT * FROM UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2));
SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2));
SELECT * FROM UDTFInvalidOrderByStringList(TABLE(t2));
-- As a reminder, UDTFInvalidPartitionByAndWithSinglePartition returns this analyze result:
-- AnalyzeResult(
-- schema=StructType()
Expand Down
22 changes: 22 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,28 @@ org.apache.spark.sql.AnalysisException
}


-- !query
SELECT * FROM UDTFInvalidOrderByStringList(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
{
"errorClass" : "TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON",
"sqlState" : "38000",
"messageParameters" : {
"msg" : "Failed to evaluate the user-defined table function 'UDTFInvalidOrderByStringList' because the static 'analyze' method returned an 'AnalyzeResult' object with the 'orderBy' field set to a value besides a list or tuple of 'OrderingColumn' objects. Please update the table function and then try the query again."
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 15,
"stopIndex" : 53,
"fragment" : "UDTFInvalidOrderByStringList(TABLE(t2))"
} ]
}


-- !query
SELECT * FROM UDTFInvalidPartitionByAndWithSinglePartition(TABLE(t2))
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,10 @@ object IntegratedUDFTestUtils extends SQLHelper {
| .add("total", IntegerType())
| .add("last", IntegerType()),
| partitionBy=[
| PartitioningColumn("$partitionBy")
| $partitionBy
| ],
| orderBy=[
| OrderingColumn("$orderBy")
| $orderBy
| ],
| select=[
| $select
Expand All @@ -631,65 +631,71 @@ object IntegratedUDFTestUtils extends SQLHelper {

object UDTFPartitionByOrderBy
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
orderBy = "input",
partitionBy = "PartitioningColumn(\"partition_col\")",
orderBy = "OrderingColumn(\"input\")",
select = "")

object UDTFPartitionByOrderByComplexExpr
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col + 1",
orderBy = "RANDOM(42)",
partitionBy = "PartitioningColumn(\"partition_col + 1\")",
orderBy = "OrderingColumn(\"RANDOM(42)\")",
select = "")

object UDTFPartitionByOrderBySelectExpr
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
orderBy = "input",
partitionBy = "PartitioningColumn(\"partition_col\")",
orderBy = "OrderingColumn(\"input\")",
select = "SelectedColumn(\"partition_col\"), SelectedColumn(\"input\")")

object UDTFPartitionByOrderBySelectComplexExpr
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col + 1",
orderBy = "RANDOM(42)",
partitionBy = "PartitioningColumn(\"partition_col + 1\")",
orderBy = "OrderingColumn(\"RANDOM(42)\")",
select = "SelectedColumn(\"partition_col\"), " +
"SelectedColumn(name=\"input + 1\", alias=\"input\")")

object UDTFPartitionByOrderBySelectExprOnlyPartitionColumn
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
orderBy = "input",
partitionBy = "PartitioningColumn(\"partition_col\")",
orderBy = "OrderingColumn(\"input\")",
select = "SelectedColumn(\"partition_col\")")

object UDTFInvalidPartitionByOrderByParseError
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "unparsable",
orderBy = "input",
partitionBy = "PartitioningColumn(\"unparsable\")",
orderBy = "OrderingColumn(\"input\")",
select = "")

object UDTFInvalidOrderByAscKeyword
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
orderBy = "partition_col ASC",
partitionBy = "PartitioningColumn(\"partition_col\")",
orderBy = "OrderingColumn(\"partition_col ASC\")",
select = "")

object UDTFInvalidSelectExprParseError
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
orderBy = "input",
partitionBy = "PartitioningColumn(\"partition_col\")",
orderBy = "OrderingColumn(\"input\")",
select = "SelectedColumn(\"unparsable\")")

object UDTFInvalidSelectExprStringValue
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
orderBy = "input",
partitionBy = "PartitioningColumn(\"partition_col\")",
orderBy = "OrderingColumn(\"input\")",
select = "\"partition_cll\"")

object UDTFInvalidComplexSelectExprMissingAlias
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col + 1",
orderBy = "RANDOM(42)",
partitionBy = "PartitioningColumn(\"partition_col + 1\")",
orderBy = "OrderingColumn(\"RANDOM(42)\")",
select = "SelectedColumn(name=\"input + 1\")")

object UDTFInvalidOrderByStringList
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "PartitioningColumn(\"partition_col\")",
orderBy = "\"partition_col\"",
select = "")

object UDTFInvalidPartitionByAndWithSinglePartition extends TestUDTF {
val pythonScript: String =
s"""
Expand Down Expand Up @@ -1197,6 +1203,7 @@ object IntegratedUDFTestUtils extends SQLHelper {
UDTFWithSinglePartition,
UDTFPartitionByOrderBy,
UDTFInvalidOrderByAscKeyword,
UDTFInvalidOrderByStringList,
UDTFInvalidSelectExprParseError,
UDTFInvalidSelectExprStringValue,
UDTFInvalidComplexSelectExprMissingAlias,
Expand Down