Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48228][PYTHON][CONNECT] Implement the missing function validation in ApplyInXXX #46519

Closed

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented May 10, 2024

What changes were proposed in this pull request?

Implement the missing function validation in ApplyInXXX

#46397 fixed this issue for Cogrouped.ApplyInPandas, this PR fix remaining methods.

Why are the changes needed?

for better error message:

In [12]: df1 = spark.range(11)

In [13]: df2 = df1.groupby("id").applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())]))

In [14]: df2.show()

before this PR, an invalid function causes weird execution errors:

24/05/10 11:37:36 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 36)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 524, in init_stream_yield_batches
    for series in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1610, in mapper
    return f(keys, vals)
           ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 488, in <lambda>
    return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
                          ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 483, in wrapped
    result, return_type, _assign_cols_by_name, truncate_return_schema=False
    ^^^^^^
UnboundLocalError: cannot access local variable 'result' where it is not associated with a value

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:523)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:117)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:479)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)

	...

After this PR, the error happens before execution, which is consistent with Spark Classic, and
much clear

PySparkValueError: [INVALID_PANDAS_UDF] Invalid function: pandas_udf with function type GROUPED_MAP or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data).

Does this PR introduce any user-facing change?

yes, error message changes

How was this patch tested?

added tests

Was this patch authored or co-authored using generative AI tooling?

no

nit

nit
@@ -34,6 +34,7 @@
from pyspark.util import PythonEvalType
from pyspark.sql.group import GroupedData as PySparkGroupedData
from pyspark.sql.pandas.group_ops import PandasCogroupedOps as PySparkPandasCogroupedOps
from pyspark.sql.pandas.functions import _validate_pandas_udf # type: ignore[attr-defined]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark Classic invoke pandas_udf in Pandas Functions (ApplyInXXX), pandas_udf includes the function validation.
While in Spark Connect, we can not use pandas_udf due to the differences in underlying implementations: pandas_udf returns a wrapper while Spark Connect requires a UserDefinedFunction object.

@zhengruifeng zhengruifeng requested review from HyukjinKwon and xinrong-meng and removed request for HyukjinKwon May 10, 2024 03:48
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs).

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.0.0.

@zhengruifeng zhengruifeng deleted the missing_check_in_group branch May 10, 2024 05:11
@zhengruifeng
Copy link
Contributor Author

@dongjoon-hyun and @HyukjinKwon thanks for reviews

JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…ion in ApplyInXXX

### What changes were proposed in this pull request?
Implement the missing function validation in ApplyInXXX

apache#46397 fixed this issue for `Cogrouped.ApplyInPandas`, this PR fix remaining methods.

### Why are the changes needed?
for better error message:

```
In [12]: df1 = spark.range(11)

In [13]: df2 = df1.groupby("id").applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())]))

In [14]: df2.show()
```

before this PR, an invalid function causes weird execution errors:
```
24/05/10 11:37:36 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 36)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 524, in init_stream_yield_batches
    for series in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1610, in mapper
    return f(keys, vals)
           ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 488, in <lambda>
    return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
                          ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 483, in wrapped
    result, return_type, _assign_cols_by_name, truncate_return_schema=False
    ^^^^^^
UnboundLocalError: cannot access local variable 'result' where it is not associated with a value

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:523)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:117)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:479)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)

	...
```

After this PR, the error happens before execution, which is consistent with Spark Classic, and
 much clear
```
PySparkValueError: [INVALID_PANDAS_UDF] Invalid function: pandas_udf with function type GROUPED_MAP or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data).

```

### Does this PR introduce _any_ user-facing change?
yes, error message changes

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#46519 from zhengruifeng/missing_check_in_group.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@xinrong-meng
Copy link
Member

Late LGTM, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants