Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48142][PYTHON][CONNECT][TESTS] Enable CogroupedApplyInPandasTests.test_wrong_args #46397

Closed

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented May 6, 2024

What changes were proposed in this pull request?

Enable CogroupedApplyInPandasTests.test_wrong_args by including a missing check

Why are the changes needed?

for test coverage

Does this PR introduce any user-facing change?

no

How was this patch tested?

ci

Was this patch authored or co-authored using generative AI tooling?

no

@zhengruifeng zhengruifeng changed the title [MINOR][DOCS] Make a dataframe.join doctest deterministic [WIP] Enable CogroupedApplyInPandasTests.test_wrong_args May 6, 2024
@zhengruifeng zhengruifeng changed the title [WIP] Enable CogroupedApplyInPandasTests.test_wrong_args [WIP] Enable CogroupedApplyInPandasTests.test_wrong_args May 6, 2024
@zhengruifeng zhengruifeng changed the title [WIP] Enable CogroupedApplyInPandasTests.test_wrong_args [SPARK-48142][PYTHON][CONNECT][TESTS] Enable CogroupedApplyInPandasTests.test_wrong_args May 6, 2024
func,
schema,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
)

udf_obj = UserDefinedFunction(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PySpark Classic directly uses the pandas_udf method which include this check

# The usage of the pandas_udf is internal so type checking is disabled.
udf = pandas_udf(
func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF
) # type: ignore[call-overload]

while in Spark Connect, it cannot use the output of pandas_udf method here and requires a UserDefinedFunction object.

So I factor the check to a separate method and invoke it here.

nit

nit
@xinrong-meng
Copy link
Member

LGTM thank you!

@zhengruifeng
Copy link
Contributor Author

thanks @xinrong-meng

merged to master

@zhengruifeng zhengruifeng deleted the fix_pandas_udf_check branch May 7, 2024 01:16
dongjoon-hyun pushed a commit that referenced this pull request May 10, 2024
…ion in ApplyInXXX

### What changes were proposed in this pull request?
Implement the missing function validation in ApplyInXXX

#46397 fixed this issue for `Cogrouped.ApplyInPandas`, this PR fix remaining methods.

### Why are the changes needed?
for better error message:

```
In [12]: df1 = spark.range(11)

In [13]: df2 = df1.groupby("id").applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())]))

In [14]: df2.show()
```

before this PR, an invalid function causes weird execution errors:
```
24/05/10 11:37:36 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 36)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 524, in init_stream_yield_batches
    for series in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1610, in mapper
    return f(keys, vals)
           ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 488, in <lambda>
    return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
                          ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 483, in wrapped
    result, return_type, _assign_cols_by_name, truncate_return_schema=False
    ^^^^^^
UnboundLocalError: cannot access local variable 'result' where it is not associated with a value

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:523)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:117)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:479)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)

	...
```

After this PR, the error happens before execution, which is consistent with Spark Classic, and
 much clear
```
PySparkValueError: [INVALID_PANDAS_UDF] Invalid function: pandas_udf with function type GROUPED_MAP or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data).

```

### Does this PR introduce _any_ user-facing change?
yes, error message changes

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #46519 from zhengruifeng/missing_check_in_group.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…ests.test_wrong_args`

### What changes were proposed in this pull request?
Enable `CogroupedApplyInPandasTests.test_wrong_args` by including a missing check

### Why are the changes needed?
for test coverage

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#46397 from zhengruifeng/fix_pandas_udf_check.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
…ion in ApplyInXXX

### What changes were proposed in this pull request?
Implement the missing function validation in ApplyInXXX

apache#46397 fixed this issue for `Cogrouped.ApplyInPandas`, this PR fix remaining methods.

### Why are the changes needed?
for better error message:

```
In [12]: df1 = spark.range(11)

In [13]: df2 = df1.groupby("id").applyInPandas(lambda: 1, StructType([StructField("d", DoubleType())]))

In [14]: df2.show()
```

before this PR, an invalid function causes weird execution errors:
```
24/05/10 11:37:36 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 36)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main
    process()
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream
    for batch in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 524, in init_stream_yield_batches
    for series in iterator:
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1610, in mapper
    return f(keys, vals)
           ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 488, in <lambda>
    return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
                          ^^^^^^^^^^^^^
  File "/Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 483, in wrapped
    result, return_type, _assign_cols_by_name, truncate_return_schema=False
    ^^^^^^
UnboundLocalError: cannot access local variable 'result' where it is not associated with a value

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:523)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:117)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:479)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)

	...
```

After this PR, the error happens before execution, which is consistent with Spark Classic, and
 much clear
```
PySparkValueError: [INVALID_PANDAS_UDF] Invalid function: pandas_udf with function type GROUPED_MAP or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data).

```

### Does this PR introduce _any_ user-facing change?
yes, error message changes

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#46519 from zhengruifeng/missing_check_in_group.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants