Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE #35146

Closed
wants to merge 4 commits into from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Jan 9, 2022

What changes were proposed in this pull request?

Check null in isStale to avoid NPE.

Why are the changes needed?

There is a chance that late push shuffle block request invokes PushBlockStreamCallback#onData after the merged partition finalized, which causes NPE.

2022-01-07 21:06:14,464 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0149_-1 102 6922, chunk_size=1, meta_length=138, data_length=112632
2022-01-07 21:06:14,615 ERROR shuffle.RemoteBlockPushResolver: Encountered issue when merging shufflePush_102_0_279_6922
java.lang.NullPointerException
        at org.apache.spark.network.shuffle.RemoteBlockPushResolver$AppShuffleMergePartitionsInfo.access$200(RemoteBlockPushResolver.java:1017)
        at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.isStale(RemoteBlockPushResolver.java:806)
        at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.onData(RemoteBlockPushResolver.java:840)
        at org.apache.spark.network.server.TransportRequestHandler$3.onData(TransportRequestHandler.java:209)
        at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79)
        at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

isTooLate checks null but isStale does not, so check isTooLate first to avoid NPE

   private boolean isTooLate(
        AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
        int reduceId) {
      return null == appShuffleMergePartitionsInfo ||
        INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions ||
        !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
    }

Does this PR introduce any user-facing change?

Bugfix, to avoid NPE in Yarn ESS.

How was this patch tested?

I don't think it's easy to write a unit test for this issue based on current code, since it's a minor change, use exsiting ut to ensue the change doesn't break the current functionalities.

@github-actions github-actions bot added the CORE label Jan 9, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@pan3793
Copy link
Member Author

pan3793 commented Jan 9, 2022

cc @mridulm @otterc

@mridulm
Copy link
Contributor

mridulm commented Jan 9, 2022

Given that isStale is used in a few places, it would be better to fix the null issue there itself.
Can you try with:
return null == appShuffleMergePartitionsInfo || appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId;
for isStale @pan3793 ?
That should work, we can merge this PR after you have validated that.

We also need to ensure we have tests which check for stale but not too late as well - this specific codepath is an excellent test case for it.

+CC @otterc, @Ngone51

@pan3793
Copy link
Member Author

pan3793 commented Jan 10, 2022

Given that isStale is used in a few places, it would be better to fix the null issue there itself.

I'm not sure if such change will break the semantic of isStale? Other calls of isStale always after isLate, so it's safe to skip null check

@mridulm
Copy link
Contributor

mridulm commented Jan 10, 2022

There is no issues with making the change, please go ahead.

@pan3793 pan3793 changed the title [SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback should check isTooLate first to avoid NPE [SPARK-37847][CORE][SHUFFLE] PushBlockStreamCallback#isStale should check null to avoid NPE Jan 10, 2022
@mridulm
Copy link
Contributor

mridulm commented Jan 10, 2022

+CC @otterc, @Ngone51 Please take a look. Will merge it later tomorrow if no concerns.

@venkata91
Copy link
Contributor

LGTM, but this comment is not yet addressed right?

@zhouyejoe
Copy link
Contributor

LGTM. @venkata91 I think that comment has been reverted.

Copy link
Contributor

@otterc otterc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a minor nit. Otherwise looks good to me

@asfgit asfgit closed this in 7463564 Jan 11, 2022
@mridulm
Copy link
Contributor

mridulm commented Jan 11, 2022

Merged to master.
Thanks @pan3793 for fixing this !
Thanks @venkata91, @zhouyejoe, @otterc for the review :-)

dchvn pushed a commit to dchvn/spark that referenced this pull request Jan 19, 2022
…heck null to avoid NPE

### What changes were proposed in this pull request?

Check `null` in `isStale` to avoid NPE.

### Why are the changes needed?

There is a chance that late push shuffle block request invokes `PushBlockStreamCallback#onData` after the merged partition finalized, which causes NPE.

```
2022-01-07 21:06:14,464 INFO shuffle.RemoteBlockPushResolver: shuffle partition application_1640143179334_0149_-1 102 6922, chunk_size=1, meta_length=138, data_length=112632
2022-01-07 21:06:14,615 ERROR shuffle.RemoteBlockPushResolver: Encountered issue when merging shufflePush_102_0_279_6922
java.lang.NullPointerException
        at org.apache.spark.network.shuffle.RemoteBlockPushResolver$AppShuffleMergePartitionsInfo.access$200(RemoteBlockPushResolver.java:1017)
        at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.isStale(RemoteBlockPushResolver.java:806)
        at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.onData(RemoteBlockPushResolver.java:840)
        at org.apache.spark.network.server.TransportRequestHandler$3.onData(TransportRequestHandler.java:209)
        at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79)
        at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
```

`isTooLate` checks null but `isStale` does not, so check `isTooLate` first to avoid NPE
```java
   private boolean isTooLate(
        AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
        int reduceId) {
      return null == appShuffleMergePartitionsInfo ||
        INDETERMINATE_SHUFFLE_FINALIZED == appShuffleMergePartitionsInfo.shuffleMergePartitions ||
        !appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
    }
```

### Does this PR introduce _any_ user-facing change?
Bugfix, to avoid NPE in Yarn ESS.

### How was this patch tested?
I don't think it's easy to write a unit test for this issue based on current code, since it's a minor change, use exsiting ut to ensue the change doesn't break the current functionalities.

Closes apache#35146 from pan3793/SPARK-37847.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants