Skip to content

Comments

[WIP][SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool#36279

Closed
wankunde wants to merge 7 commits intoapache:masterfrom
wankunde:retry_transfer_block
Closed

[WIP][SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool#36279
wankunde wants to merge 7 commits intoapache:masterfrom
wankunde:retry_transfer_block

Conversation

@wankunde
Copy link
Contributor

@wankunde wankunde commented Apr 20, 2022

What changes were proposed in this pull request?

  • Optimize RemoteBlockPushResolver
    • Pushed blocks are cached in a memory pool before flushing to the final data file. So we can control the memory used by RemoteBlockPushResolver.
    • Attempts to acquire lock only when merging pushed block data into the final data file. This is very useful when a reduce has many small pushed blocks at the same time.
    • Remove BLOCK_APPEND_COLLISION_DETECTED failure because only one pushed map data can write the data file at the same time.
  • Change BlockPushNonFatalFailure to BlockPushResponse, so the returnCode can be SUCCESS. We can also encode and decode it to a ByteBuffer, so network RPC do not need to rely on BlockTransferMessage (core module). And remove PUSH_BLOCK_RETURN_CODE type from BlockTransferMessage.

Why are the changes needed?

For push-based shuffle service, there are many BLOCK_APPEND_COLLISION_DETECTED when there are many small map tasks outputs. In RemoteBlockPushResolver, if one map task pushed blocks is writing, the others map tasks pushed blocks will failed in onComplete() method.
And RemoteBlockPushResolver has no memory limit , so many executors will OOM when there are many small pushed blocks waiting to be written to the final data file.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Exists UTs

@wankunde wankunde force-pushed the retry_transfer_block branch 2 times, most recently from f85ea3a to 2ad98ea Compare April 20, 2022 08:41
@weixiuli
Copy link
Contributor

weixiuli commented Apr 20, 2022

Good catching !
It is necessary to distinguish the IOException and the errorHandler.shouldRetryError, the errorHandler.shouldRetryError has two cases need to retry, one is this PR described BLOCK_APPEND_COLLISION_MSG_SUFFIX for pushing, and other is STALE_SHUFFLE_BLOCK_FETCH for fetching . Please add some unittests for them.
also cc @otterc @mridulm

@otterc
Copy link
Contributor

otterc commented Apr 20, 2022

@wankunde

  • On the client side all the exceptions from the server are wrapped up once in RuntimeException and then in IOException. Take a look at TransportClient and TransportResponseHandler.

  • RetryingBlockTransferor is used for both the fetch and push so that should not be changed if you just want to address some gaps in push

  • Look at ExternalBlockStoreClient to see how we pass different error handlers for push and fetch side

Did you see any issue which you are trying to address here?

@wankunde
Copy link
Contributor Author

wankunde commented Apr 21, 2022

Thanks @otterc for your review.

  • ExternalBlockHandler use StreamInterceptor to receive data buf, and BlockPushNonFatalFailure will not be wrapped in IOException or RuntimeException

  • I am not sure BlockPushNonFatalFailure should be handle in RetryingBlockTransferor , but now client will retry to push blocks both in RetryingBlockTransferor and ShuffleBlockPusher. Unfortunately, they will only handle IOException now.

Could we handle BlockPushNonFatalFailure in ShuffleBlockPusher ?

override def onBlockPushFailure(blockId: String, exception: Throwable): Unit = {

@wankunde
Copy link
Contributor Author

wankunde commented Apr 21, 2022

Hi, @otterc @weixiuli There are so many BlockPushNonFatalFailure.BLOCK_APPEND_COLLISION_DETECTED exceptions when some small map outputs try to call onComplete() method while one another large map output is writing the merge data file.
Could we optimize the StreamInterceptor onComplete() method ?

@wankunde
Copy link
Contributor Author

Now RemoteBlockPushResolver does not limit the memory used to receive pushed blocks. So OOM may happen in NodeManager.

2022-04-23 20:47:25,142 ERROR org.apache.spark.network.shuffle.RemoteBlockPushResolver: Encountered issue when merging shufflePush_0_0_7336_8567
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.onData(RemoteBlockPushResolver.java:921)
        at org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:84)
        at org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

@wankunde wankunde force-pushed the retry_transfer_block branch from a7be9d1 to d741acb Compare April 25, 2022 08:05
@wankunde wankunde force-pushed the retry_transfer_block branch from d741acb to 3c16a73 Compare April 25, 2022 08:52
@wankunde wankunde changed the title [SPARK-38965][SHUFFLE]Retry transfer blocks for exceptions listed in the error handler [SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool Apr 25, 2022
@wankunde
Copy link
Contributor Author

Hi, @otterc @Ngone51
Encountered many BLOCK_APPEND_COLLISION failures in our spark cluster, also some NodeManagers OOM for a simple query.
Could you help me to review this PR?

@wankunde
Copy link
Contributor Author

retest this please

@wankunde wankunde changed the title [SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool [WIP][SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool Apr 27, 2022
@wankunde wankunde changed the title [WIP][SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool [SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool Apr 30, 2022
@wankunde wankunde changed the title [SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool [WIP][SPARK-38965][SHUFFLE]Optimize RemoteBlockPushResolver with a memory pool May 5, 2022
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 23, 2022
@github-actions github-actions bot closed this Sep 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants