-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45523][Python] Return useful error message if UDTF returns None for any non-nullable column #43356
Conversation
cc @ueshin |
8e9d028
to
c6769d5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ueshin for your reviews, please take another look!
respond to code review comments respond to code review comments respond to code review comments respond to code review comments respond to code review comments commit respond to code review comments respond to code review comments
c6769d5
to
d0f16f1
Compare
python/pyspark/worker.py
Outdated
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any: | |||
"the query again." | |||
) | |||
|
|||
# Compares each UDTF output row against the output schema for this particular UDTF call, | |||
# raising an error if the two are incompatible. | |||
def check_output_row_against_schema(row: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin do you think this will add extra performance overhead if we check this for each output row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: In a previous iteration of this PR, I had a check to see if the schema contained any non-nullable columns in order to enable this. However, I would like to later extend these checks to compare provided row values against the expected output schema column types, which currently produce internal exceptions instead of good error messages if they don't match. We would need to check every value in every row in that case, so I figured it was OK to just do that here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will add huge performance overhead.
@dtenedor Could we at least build the check function based on the data type in advance?
check_output_row_against_schema = _build_null_checker(return_type)
Checking the data type and nullable
each row should be too expensive.
The builder should be placed somewhere reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I added this check back for now.
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
Outdated
Show resolved
Hide resolved
respond to code review comments
python/pyspark/worker.py
Outdated
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any: | |||
"the query again." | |||
) | |||
|
|||
# Compares each UDTF output row against the output schema for this particular UDTF call, | |||
# raising an error if the two are incompatible. | |||
def check_output_row_against_schema(row: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will add huge performance overhead.
@dtenedor Could we at least build the check function based on the data type in advance?
check_output_row_against_schema = _build_null_checker(return_type)
Checking the data type and nullable
each row should be too expensive.
The builder should be placed somewhere reusable.
python/pyspark/worker.py
Outdated
@@ -879,6 +940,8 @@ def verify_result(result): | |||
verify_pandas_result( | |||
result, return_type, assign_cols_by_name=False, truncate_return_schema=False | |||
) | |||
for result_tuple in result.itertuples(): | |||
check_output_row_against_schema(list(result_tuple)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we move this to before the pandas DataFrame is created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that originally but the UDTF result is an Iterable and it turns out that iterating through it consumes the values, making it impossible to create the DataFrame after because the iterator is empty :)
python/pyspark/worker.py
Outdated
sub_value, data_type.elementType, data_type.containsNull | ||
) | ||
elif isinstance(data_type, StructType) and isinstance(value, Row): | ||
for field_name, field_value in value.asDict().items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adDict
will break the case there are duplicated field names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point; I switched this to iterate through the row using column indexes instead.
python/pyspark/worker.py
Outdated
elif isinstance(value, Row): | ||
items = value.asDict().items() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what case does this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out, never :) I removed this check, it is simpler now.
The failed tests seem not related to this PR. |
Thanks! merging to master. |
### What changes were proposed in this pull request? This is a follow-up of #43356. Refactor the null-checking to have shortcuts. ### Why are the changes needed? The null-check can have shortcuts for some cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43492 from ueshin/issues/SPARK-45523/nullcheck. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
What changes were proposed in this pull request?
This PR updates Python UDTF evaluation to return a useful error message if UDTF returns None for any non-nullable column.
This implementation also checks recursively for None values in subfields of array/struct/map columns as well.
For example:
Why are the changes needed?
Previously this case returned a null pointer exception.
Does this PR introduce any user-facing change?
Yes, see above.
How was this patch tested?
This PR adds new test coverage.
Was this patch authored or co-authored using generative AI tooling?
No