Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion #36425

Closed
wants to merge 2 commits into from

Conversation

sadikovi
Copy link
Contributor

@sadikovi sadikovi commented May 2, 2022

What changes were proposed in this pull request?

This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling df.rdd.isEmpty() on a particular dataset could result in a JVM crash and/or executor failure.

The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used.

Why are the changes needed?

Fixes the JVM crash when checking isEmpty() on a dataset.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix.

@sadikovi sadikovi changed the title [SPARK-39084] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion [SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion May 2, 2022
@sadikovi
Copy link
Contributor Author

sadikovi commented May 2, 2022

@HyukjinKwon and @ueshin Can you review the PR? Thanks.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

Will leave it to @ueshin though ..

@ueshin
Copy link
Member

ueshin commented May 2, 2022

I remember there were some more fixes after mine.
@ankurdave Could you also take a look at this?

@ankurdave
Copy link
Contributor

As @HyukjinKwon had noted in October 2020, the ContextAwareIterator approach didn't fully solve the problem because the root issue was that there was a race between (1) the main task thread freeing an off-heap buffer, and (2) the Python WriterThread reading from that off-heap buffer. To fully solve that, it required adding synchronization between the two threads.

If this issue is caused by a race between the main thread and some other thread, then it seems like the same reasoning applies. Do you have a crash file indicating which thread is causing the JVM crash?

@HyukjinKwon
Copy link
Member

BTW I'll merge this in few days to backport to other branches as a minimal fix if there are no objection.

@sadikovi
Copy link
Contributor Author

sadikovi commented May 2, 2022

@ankurdave This PR solves the problem for df.rdd.isEmpty(), I am not familiar with UDF failures. My test reproduces the issue and I confirm it is fixed with the patch.

@sadikovi
Copy link
Contributor Author

sadikovi commented May 2, 2022

The reason the bug existed was because the code was trying to control completion from Python which is not going to work in most of the cases. The issue is not a race condition per se - the issue is the incorrect implementation of hasNext and next methods on one of the iterators for reading serialised Python output which causes the crash because the iterator reads past EOF and does not terminate when it is supposed to.

@sadikovi
Copy link
Contributor Author

sadikovi commented May 2, 2022

I do have a crash file but I cannot share it unfortunately.

@ankurdave
Copy link
Contributor

I see, so the use-after-free that caused the crash is occurring in the main task thread? In that case this fix LGTM.

@sadikovi
Copy link
Contributor Author

sadikovi commented May 2, 2022

Thanks, @ankurdave.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 2, 2022

Thanks guys!!

Merged to master, branch-3.3, branch-3.2 and branch-3.1.

HyukjinKwon pushed a commit that referenced this pull request May 2, 2022
…top iterator on task completion

### What changes were proposed in this pull request?

This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure.

The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used.

### Why are the changes needed?

Fixes the JVM crash when checking isEmpty() on a dataset.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix.

Closes #36425 from sadikovi/fix-pyspark-iter-2.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9305cc7)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request May 2, 2022
…top iterator on task completion

This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure.

The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used.

Fixes the JVM crash when checking isEmpty() on a dataset.

No.

I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix.

Closes #36425 from sadikovi/fix-pyspark-iter-2.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9305cc7)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request May 2, 2022
…top iterator on task completion

This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure.

The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used.

Fixes the JVM crash when checking isEmpty() on a dataset.

No.

I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix.

Closes #36425 from sadikovi/fix-pyspark-iter-2.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9305cc7)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request May 2, 2022
…top iterator on task completion

This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure.

The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used.

Fixes the JVM crash when checking isEmpty() on a dataset.

No.

I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix.

Closes #36425 from sadikovi/fix-pyspark-iter-2.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9305cc7)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request May 2, 2022
…top iterator on task completion

This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure.

The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used.

Fixes the JVM crash when checking isEmpty() on a dataset.

No.

I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix.

Closes #36425 from sadikovi/fix-pyspark-iter-2.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9305cc7)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@ankurdave
Copy link
Contributor

From talking to @sadikovi, it sounds like the use-after-free that caused this crash does in fact occur in the Python writer thread, not the main task thread. And since RDD#isEmpty() is implemented using limit(1), this is a very similar situation as the one described in #34245. The main difference appears to be the presence of a group-by with codegen enabled.

Given that, the question is why #34245 was not sufficient to fix this. I'm guessing the task completion listener that frees the off-heap memory is being registered in the wrong order relative to the BasePythonRunner task completion listener.

Anyway, even if that were fixed I think we would still need the fix in this PR for performance reasons. Otherwise the writer thread could read an arbitrary amount of data before checking the interrupt status.

kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…top iterator on task completion

This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure.

The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used.

Fixes the JVM crash when checking isEmpty() on a dataset.

No.

I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix.

Closes apache#36425 from sadikovi/fix-pyspark-iter-2.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 9305cc7)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 744b5f4)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants