Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt #44445

Closed
wants to merge 1 commit into from

Conversation

ulysses-you
Copy link
Contributor

What changes were proposed in this pull request?

This pr adds a check: we only mark the cached partition is materialized if the task is not failed and not interrupted. And adds a new method isFailed in TaskContext.

Why are the changes needed?

Before this pr, when do cache, task failure can cause NPE in other tasks

java.lang.NullPointerException
	at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)

Does this PR introduce any user-facing change?

yes, it's a bug fix

How was this patch tested?

add test

Was this patch authored or co-authored using generative AI tooling?

no

@ulysses-you
Copy link
Contributor Author

cc @cloud-fan @yaooqinn

Copy link
Member

@yaooqinn yaooqinn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor

which variable can be null?

@ulysses-you
Copy link
Contributor Author

@cloud-fan according to the stack,the DefaultCachedBatch.buffers[i] is null.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me.

QQ: Do we want to do this only is entire iterator has been consumed ? (and so fully materialized ?)

/**
* Returns true if the task has failed.
*/
def isFailed(): Boolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if we can add this into PySpark side as well ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems python taskcontext does not have isCompleted and isInterrupted method. Is there any reason ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have them all :-).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind doing separately in a differnet PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a new pr for it later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon does pyspark support update status from jvm side ? It seems we can only send a snapshot status to python side ?

@@ -275,6 +275,8 @@ private[spark] class TaskContextImpl(
@GuardedBy("this")
override def isCompleted(): Boolean = synchronized(completed)

override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined)

override def isInterrupted(): Boolean = reasonIfKilled.isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to reason about the relationship between these 3 flags:

  • isCompleted is false only when the task is interrupted? Or when the task is still running?
  • isFailed is true only if the task completes and fails?
  • isInterrupted can be true before the task completes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isCompleted includes all the state success, failure and cancellation. If isCompleted is true, then the task won't be in running any more.

@ulysses-you
Copy link
Contributor Author

@mridulm yes, to make sure the cached rdd is materialized after all the tasks are succeeded

@yaooqinn yaooqinn closed this in 43f7932 Dec 22, 2023
@yaooqinn
Copy link
Member

Thanks, merged to master according to the affect versions in JIRA

ulysses-you added a commit to ulysses-you/spark that referenced this pull request Dec 22, 2023
This pr adds a check: we only mark the cached partition is materialized if the task is not failed and not interrupted. And adds a new method `isFailed` in `TaskContext`.

Before this pr, when do cache, task failure can cause NPE in other tasks

```
java.lang.NullPointerException
	at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
```

yes, it's a bug fix

add test

no

Closes apache#44445 from ulysses-you/fix-cache.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
@ulysses-you
Copy link
Contributor Author

@yaooqinn There are some conflicts. I created #44457 for branch-3.5

@ulysses-you ulysses-you deleted the fix-cache branch December 22, 2023 05:44
ulysses-you added a commit that referenced this pull request Dec 22, 2023
This pr backports #44445 for branch-3.5

### What changes were proposed in this pull request?

This pr adds a check: we only mark the cached partition is materialized if the task is not failed and not interrupted. And adds a new method `isFailed` in `TaskContext`.

### Why are the changes needed?

Before this pr, when do cache, task failure can cause NPE in other tasks

```
java.lang.NullPointerException
	at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44457 from ulysses-you/fix-cache-3.5.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants