Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28881][PYTHON][TESTS] Add a test to make sure toPandas with Arrow optimization throws an exception per maxResultSize #25594

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR proposes to add a test case for:

./bin/pyspark --conf spark.driver.maxResultSize=1m
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
Empty DataFrame
Columns: [id]
Index: []

which can result in partial results (see #25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed.

Why are the changes needed?

To prevent the same regression in the future.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Test was added.

@HyukjinKwon HyukjinKwon changed the title [SPARK-27992][PYTHON][TESTS] Add a test to make sure toPandas with Arrow optimization throws an exception per maxResultSize [SPARK-27881][PYTHON][TESTS] Add a test to make sure toPandas with Arrow optimization throws an exception per maxResultSize Aug 27, 2019
@HyukjinKwon HyukjinKwon changed the title [SPARK-27881][PYTHON][TESTS] Add a test to make sure toPandas with Arrow optimization throws an exception per maxResultSize [SPARK-28881][PYTHON][TESTS] Add a test to make sure toPandas with Arrow optimization throws an exception per maxResultSize Aug 27, 2019
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA

This comment has been minimized.

@dongjoon-hyun
Copy link
Member

Ur, one of the pythons (python2/python3/pypy) seems to fail due to Pandas.

Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests/test_arrow.py", line 443, in test_exception_by_max_results
    self.spark.range(0, 10000, 1, 100).toPandas()
AssertionError: "is bigger than" does not match "Pandas >= 0.23.2 must be installed; however, it was not found."


# Explicitly enable Arrow and disable fallback.
cls.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
cls.spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, do we need these for test cases? This are opposite to the default values. In the reported JIRA's scenario, we didn't change the default configuration. Did we change the default values for those configurations in 3.0.0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.execution.arrow.enabled now has an alias spark.sql.execution.arrow.pyspark.enabled as of d6632d1

And, spark.sql.execution.arrow.pyspark.fallback.enabled was just set to narrow down test scope. Correct behaviour should be a failure in Arrow optimized code path (although the JIRAs' case, it fails in Arrow optimized path first, and then fails again in non-Arrow optimized path).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to have a test that fails with default settings, for branch 2.4.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the issue SPARK-28881 happened because it passed in Arrow optimized code path, and it returned partial data or empty data).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to not use aliases when I port to branch-2.4 because techincally spark.sql.execution.arrow.enabled and spark.sql.execution.arrow.fallback.enabled are supposed to be removed out in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configurations just narrow down the testing scope, and test_arrow.py is supposed to test this scope (as can be seen ArrowTests).

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109799 has finished for PR 25594 at commit 55bed86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 27, 2019

Test build #109800 has finished for PR 25594 at commit 87840b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class MaxResultArrowTests(unittest.TestCase):

@HyukjinKwon
Copy link
Member Author

Merged to master.

It will be backported together at #25593

@HyukjinKwon
Copy link
Member Author

Thanks, @dongjoon-hyun and @cloud-fan

HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Aug 27, 2019
…row optimization throws an exception per maxResultSize

This PR proposes to add a test case for:

```bash
./bin/pyspark --conf spark.driver.maxResultSize=1m
spark.conf.set("spark.sql.execution.arrow.enabled",True)
```

```python
spark.range(10000000).toPandas()
```

```
Empty DataFrame
Columns: [id]
Index: []
```

which can result in partial results (see apache#25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed.

To prevent the same regression in the future.

No.

Test was added.

Closes apache#25594 from HyukjinKwon/SPARK-28881.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Aug 27, 2019
…nection thread to propagate errors

### What changes were proposed in this pull request?

This PR proposes to backport #24834 with minimised changes, and the tests added at #25594.

#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM.

However, actually this PR fixed another problem accidentally (see  #25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by #21546.

Root cause is that, seems

https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384

`runJob` with `resultHandler` seems able to write partial output.

JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below:

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```
```
Empty DataFrame
Columns: [id]
Index: []
```

With this change, it lets Python process catches exceptions from JVM.

### Why are the changes needed?

It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3.

### Does this PR introduce any user-facing change?

Yes.

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas
    batches = self._collectAsArrow()
  File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow
    jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
  File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/.../pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB)
```

now throws an exception as expected.

### How was this patch tested?

Manually as described above. unittest added.

Closes #25593 from HyukjinKwon/SPARK-27992.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…nection thread to propagate errors

### What changes were proposed in this pull request?

This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594.

apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM.

However, actually this PR fixed another problem accidentally (see  apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546.

Root cause is that, seems

https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384

`runJob` with `resultHandler` seems able to write partial output.

JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below:

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```
```
Empty DataFrame
Columns: [id]
Index: []
```

With this change, it lets Python process catches exceptions from JVM.

### Why are the changes needed?

It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3.

### Does this PR introduce any user-facing change?

Yes.

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas
    batches = self._collectAsArrow()
  File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow
    jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
  File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/.../pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB)
```

now throws an exception as expected.

### How was this patch tested?

Manually as described above. unittest added.

Closes apache#25593 from HyukjinKwon/SPARK-27992.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…nection thread to propagate errors

### What changes were proposed in this pull request?

This PR proposes to backport apache#24834 with minimised changes, and the tests added at apache#25594.

apache#24834 was not backported before because basically it targeted a better exception by propagating the exception from JVM.

However, actually this PR fixed another problem accidentally (see  apache#25594 and [SPARK-28881](https://issues.apache.org/jira/browse/SPARK-28881)). This regression seems introduced by apache#21546.

Root cause is that, seems

https://github.com/apache/spark/blob/23bed0d3c08e03085d3f0c3a7d457eedd30bd67f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3370-L3384

`runJob` with `resultHandler` seems able to write partial output.

JVM throws an exception but, since the JVM exception is not propagated into Python process, Python process doesn't know if the exception is thrown or not from JVM (it just closes the socket), which results as below:

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```
```
Empty DataFrame
Columns: [id]
Index: []
```

With this change, it lets Python process catches exceptions from JVM.

### Why are the changes needed?

It returns incorrect data. And potentially it returns partial results when an exception happens in JVM sides. This is a regression. The codes work fine in Spark 2.3.3.

### Does this PR introduce any user-facing change?

Yes.

```
./bin/pyspark --conf spark.driver.maxResultSize=1m
```
```python
spark.conf.set("spark.sql.execution.arrow.enabled",True)
spark.range(10000000).toPandas()
```

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pyspark/sql/dataframe.py", line 2122, in toPandas
    batches = self._collectAsArrow()
  File "/.../pyspark/sql/dataframe.py", line 2184, in _collectAsArrow
    jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
  File "/.../lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/.../pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/.../lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o42.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    ...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (6.5 MB) is bigger than spark.driver.maxResultSize (1024.0 KB)
```

now throws an exception as expected.

### How was this patch tested?

Manually as described above. unittest added.

Closes apache#25593 from HyukjinKwon/SPARK-27992.

Lead-authored-by: HyukjinKwon <gurwls223@apache.org>
Co-authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@HyukjinKwon HyukjinKwon deleted the SPARK-28881 branch March 3, 2020 01:18
rshkv pushed a commit to palantir/spark that referenced this pull request Jun 4, 2020
…row optimization throws an exception per maxResultSize

### What changes were proposed in this pull request?
This PR proposes to add a test case for:

```bash
./bin/pyspark --conf spark.driver.maxResultSize=1m
spark.conf.set("spark.sql.execution.arrow.enabled",True)
```

```python
spark.range(10000000).toPandas()
```

```
Empty DataFrame
Columns: [id]
Index: []
```

which can result in partial results (see apache#25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed.

### Why are the changes needed?
To prevent the same regression in the future.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Test was added.

Closes apache#25594 from HyukjinKwon/SPARK-28881.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
rshkv pushed a commit to palantir/spark that referenced this pull request Jun 5, 2020
…row optimization throws an exception per maxResultSize

### What changes were proposed in this pull request?
This PR proposes to add a test case for:

```bash
./bin/pyspark --conf spark.driver.maxResultSize=1m
spark.conf.set("spark.sql.execution.arrow.enabled",True)
```

```python
spark.range(10000000).toPandas()
```

```
Empty DataFrame
Columns: [id]
Index: []
```

which can result in partial results (see apache#25593 (comment)). This regression was found between Spark 2.3 and Spark 2.4, and accidentally fixed.

### Why are the changes needed?
To prevent the same regression in the future.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Test was added.

Closes apache#25594 from HyukjinKwon/SPARK-28881.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants