Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27805][PYTHON] Propagate SparkExceptions during toPandas with arrow enabled #24677

Closed
wants to merge 5 commits into from
Closed

[SPARK-27805][PYTHON] Propagate SparkExceptions during toPandas with arrow enabled #24677

wants to merge 5 commits into from

Conversation

dvogelbacher
Copy link
Contributor

What changes were proposed in this pull request?

Similar to #24070, we now propagate SparkExceptions that are encountered during the collect in the java process to the python process.

Fixes https://jira.apache.org/jira/browse/SPARK-27805

How was this patch tested?

Added a new unit test

@dvogelbacher
Copy link
Contributor Author

@BryanCutler @HyukjinKwon could you please take a look at this one as well?

@dvogelbacher dvogelbacher changed the title [SPARK-27805][PYTHON] Propagate SparkExceptions for toPandas with arrow enabled [SPARK-27805][PYTHON] Propagate SparkExceptions during toPandas with arrow enabled May 22, 2019
@BryanCutler
Copy link
Member

We should definitely have had a test for this, but I would think the error would occur in the call to self._jdf.collectAsArrowToPython() and then get propagated through Py4j, so we shouldn't need to do any special handling. I'll have to look into whats going on a little bit later..

@dvogelbacher
Copy link
Contributor Author

collectAsArrowToPython will just return the socket info from PythonRDD.serveToStream("serve-Arrow"). The exception will occur during the runJob which is inside the serveToStream, which will be executed in a background thread. When the background thread encounters an exception it will close the OutputStream.
The ArrowStreamSerializer in the python process will then think that it read all the batches after which the ArrowCollectSerializer will try to read the batch order indices and throw an EofError as those were never written.

Also note that before #22275 (which introduced the batch order indices) this would not have resulted in any error on the python side. We would have just dropped some partitions without throwing an error. Now at least we get an error but it is not a very helpful one.

@dvogelbacher
Copy link
Contributor Author

Do you have any more thoughts on this @BryanCutler ?

@BryanCutler
Copy link
Member

Yes, you are right, this is the same issue as toLocalIterator in #24070 and needs to be fixed. This is a real problem for branch-2.4 which, like you said, could cause toPandas to return a partial result without raising the error. @HyukjinKwon do you think would it make sense to patch branch-2.4 with a manual fix?

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @dvogelbacher , I had a few comments but looks pretty good

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala Outdated Show resolved Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala Outdated Show resolved Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala Outdated Show resolved Hide resolved
python/pyspark/sql/tests/test_arrow.py Outdated Show resolved Hide resolved
python/pyspark/serializers.py Outdated Show resolved Hide resolved
@BryanCutler
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105882 has finished for PR 24677 at commit 4f57b7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105883 has finished for PR 24677 at commit 4f57b7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

Yes, you are right, this is the same issue as toLocalIterator in #24070 and needs to be fixed. This is a real problem for branch-2.4 which, like you said, could cause toPandas to return a partial result without raising the error. @HyukjinKwon do you think would it make sense to patch branch-2.4 with a manual fix?

this sounds important...

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105915 has finished for PR 24677 at commit d9936d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105916 has finished for PR 24677 at commit ccfeb9e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good now @dvogelbacher , I'm just not sure if it should write the error in a finally block and possibly re-throw the exception. Let me look into a little more


// After processing all partitions, end the stream and write batch order indices
// After processing all partitions, end the batch stream
batchWriter.end()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this and the code below should be in a finally block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we put it into a finally block but only catch SparkException then that would be wrong: If a different exception gets thrown then we would go into case None, end the stream as if nothing happened and only get partial, incorrect data on the python side.
If we want to put this into a finally block then we should catch all exceptions but I figured I'd do the same as in https://github.com/apache/spark/pull/24070/files#r279589039

It should be fine as is, if any exception that isn't a SparkException gets thrown then we will never reach this code. Instead the OutputStream just gets closed and we get an EofError on the python side (like we do right now for all Exceptions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any more thoughts on this @BryanCutler ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is fine

@BryanCutler
Copy link
Member

BryanCutler commented May 30, 2019

this sounds important...

Yeah @felixcheung , it could be a nasty problem. I think the logs will show the job had an error, but the application python script would continue to run with partial results.. Let me verify this in branch-2.4. What are your thoughts on possibly backporting a fix?

After checking it out with branch-2.4, it is possible to get partial results in Python, but the JVM error is shown and is pretty obvious. It's unfortunate the this wasn't caught earlier, but I don't think it's worth the risk to backport right now.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BryanCutler
Copy link
Member

merged to master, thanks @dvogelbacher !

@felixcheung
Copy link
Member

After checking it out with branch-2.4, it is possible to get partial results in Python, but the JVM error is shown and is pretty obvious. It's unfortunate the this wasn't caught earlier, but I don't think it's worth the risk to backport right now.

ah ok

case Some(exception) =>
// Signal failure and write error message
out.writeInt(-1)
PythonRDD.writeUTF(exception.getMessage, out)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late response, @BryanCutler

Yea, I had the same question #24677 (comment). Thanks for details at #24677 (comment). It would have been better if those are commented since at least two committers raised the same questions :-).

@HyukjinKwon do you think would it make sense to patch branch-2.4 with a manual fix?

Yea, I don't mind backporting it (don't strongly feel we should do too).

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too but .. I wonder if this is the only way.

@dvogelbacher would you mind if I ask to check normal collect() code path too? Dataset.collectToPython() looks like a same instance.

@BryanCutler
Copy link
Member

LGTM too but .. I wonder if this is the only way.
@dvogelbacher would you mind if I ask to check normal collect() code path too? Dataset.collectToPython() looks like a same instance.

The collect() code path is different and it doesn't have this exact problem, but it's possible it could fail in another way. @HyukjinKwon @felixcheung @dvogelbacher lets continue the discussion in #24834 where I'm trying out another way to do this.

emanuelebardelli pushed a commit to emanuelebardelli/spark that referenced this pull request Jun 15, 2019
…arrow enabled

## What changes were proposed in this pull request?
Similar to apache#24070, we now propagate SparkExceptions that are encountered during the collect in the java process to the python process.

Fixes https://jira.apache.org/jira/browse/SPARK-27805

## How was this patch tested?
Added a new unit test

Closes apache#24677 from dvogelbacher/dv/betterErrorMsgWhenUsingArrow.

Authored-by: David Vogelbacher <dvogelbacher@palantir.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
dvogelbacher added a commit to palantir/spark that referenced this pull request Nov 25, 2019
…arrow enabled

## What changes were proposed in this pull request?
Similar to apache#24070, we now propagate SparkExceptions that are encountered during the collect in the java process to the python process.

Fixes https://jira.apache.org/jira/browse/SPARK-27805

## How was this patch tested?
Added a new unit test

Closes apache#24677 from dvogelbacher/dv/betterErrorMsgWhenUsingArrow.

Authored-by: David Vogelbacher <dvogelbacher@palantir.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
bulldozer-bot bot pushed a commit to palantir/spark that referenced this pull request Nov 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants