Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized #35325

Closed
wants to merge 5 commits into from

Conversation

otterc
Copy link
Contributor

@otterc otterc commented Jan 25, 2022

What changes were proposed in this pull request?

This fixes the bugs that were reported in SPARK-37675 and SPARK-37793.

  • Empty merged partitions were reported by the shuffle server to the driver.
  • The push merged files were getting overwritten after a shuffle merge is finalized.
  • Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.

Why are the changes needed?

Changes are need to fix the bug.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Have added unit test.

@pan3793
Copy link
Member

pan3793 commented Jan 26, 2022

I tested your first commit 388559f, it does not solve the problem.
image

@otterc
Copy link
Contributor Author

otterc commented Jan 26, 2022

@pan3793 Thanks for testing this. So with this patch are you still seeing that the executor is requesting for merged blocks which either had no data or were overwritten by the shuffle service?
Will you be able to share the logs with me again? It will be great if you can turn the debug level logging on for the RemoteBlockPushResolver.

@pan3793
Copy link
Member

pan3793 commented Jan 26, 2022

@otterc Oops, sorry, I forgot to restart the node manager so the patch does not take effect, let me retry again.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just couple of comments.

@mridulm
Copy link
Contributor

mridulm commented Jan 26, 2022

@pan3793 Functionally, @otterc's patch looks good (except for testing enhancements, renames, etc).
Can you also confirm if this works for you, given that your env has been great with producing this issue consistently ? Thanks !

… files once the corresponding shuffle is finalized
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor style comments, looks good to me.
+CC @Ngone51

if (null == mergePartitionsInfo) {
//If the mergePartitions was never created then it means that there weren't any push
//blocks that were ever received for this shuffle. This could be the case when the driver
//doesn't wait for enough time to start the stage which reads this shuffle data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space between // and text.

//doesn't wait for enough time to start the stage which reads this shuffle data.
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
} else if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId
|| mergePartitionsInfo.isFinalized()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move || to prev line

// 2. finalization of indeterminate stage if the shuffleMergeId related to it is the one
// for which the message is received.
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: All paths in this lambda, which are not throwing exceptions, are doing the same thing - new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
Make that common ?

reduceIds.add(partition.reduceId);
sizes.add(partition.getLastChunkOffset());
logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalization results added"
+ " for partition {} data size {} index size {} meta size {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move the + to prev line (here and elsewhere in string concat for log msgs)

@mridulm
Copy link
Contributor

mridulm commented Jan 27, 2022

@otterc Can you remove the WIP tag, given the PR should be complete now ? Thx

@otterc otterc changed the title [WIP][SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized [SPARK-37675][SPARK-37793] Prevent overwriting of push shuffle merged files once the shuffle is finalized Jan 27, 2022
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.
Will wait for @pan3793 to confirm it works before merging (and tests success ofcourse).

@pan3793
Copy link
Member

pan3793 commented Jan 27, 2022

Thanks @otterc and @mridulm, I verified this patch with 2 rounds of 1TB TPC-DS query, the issue is gone.

One minor question, can we suppress this warning message? I think it's a little bit noisy for users.

01-27 16:18:35 WARN [shuffle-client-8-5] scheduler.DAGScheduler@94: Exception encountered when trying to finalize shuffle merge on beta-spark2 for shuffle 189
java.lang.RuntimeException: java.lang.RuntimeException: Shuffle merge finalize request for shuffle 189 with shuffleMergeId 0 is stale shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the shuffle is already being pushed
	at org.apache.spark.network.shuffle.RemoteBlockPushResolver.lambda$finalizeShuffleMerge$11(RemoteBlockPushResolver.java:523)
	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
	at org.apache.spark.network.shuffle.RemoteBlockPushResolver.finalizeShuffleMerge(RemoteBlockPushResolver.java:519)
	at org.apache.spark.network.shuffle.ExternalBlockHandler.handleMessage(ExternalBlockHandler.java:218)
	at org.apache.spark.network.shuffle.ExternalBlockHandler.receive(ExternalBlockHandler.java:122)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.sparkproject.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:209) [spark-network-common_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) [spark-network-common_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) [spark-network-common_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [netty-codec-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) [spark-network-common_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) [netty-transport-4.1.72.Final.jar:4.1.72.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]

@otterc
Copy link
Contributor Author

otterc commented Jan 27, 2022

Thanks @otterc and @mridulm, I verified this patch with 2 rounds of 1TB TPC-DS query, the issue is gone.

One minor question, can we suppress this warning message? I think it's a little bit noisy for users.

01-27 16:18:35 WARN [shuffle-client-8-5] scheduler.DAGScheduler@94: Exception encountered when trying to finalize shuffle merge on beta-spark2 for shuffle 189
java.lang.RuntimeException: java.lang.RuntimeException: Shuffle merge finalize request for shuffle 189 with shuffleMergeId 0 is stale shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the shuffle is already being pushed

Thanks @pan3793 for verifying this change. I saw those logs on the shuffle sever as well and the commit c606185 fixes that already. This will now be logged for a true stale finalize message. Have added a UT for it as well.

@pan3793
Copy link
Member

pan3793 commented Jan 27, 2022

Thanks @otterc, would you please add some description of how the corner case caused the file to get overwritten to help others understand how the patch works?

@mridulm
Copy link
Contributor

mridulm commented Jan 27, 2022

Thanks for verifying @pan3793 !
I will leave the PR open until tomorrow in case there are other comments/reviews, before merging.

@mridulm
Copy link
Contributor

mridulm commented Jan 27, 2022

@otterc, can you please update the jira with details of root cause ? That should help @pan3793 and future investigations understand the issue fixed here better. Thanks !

@otterc
Copy link
Contributor Author

otterc commented Jan 27, 2022

@mridulm
Copy link
Contributor

mridulm commented Jan 29, 2022

@otterc Can you take a look at the test failure ? It is for YarnShuffleIntegrationSuite: external shuffle service Can you retrigger the tests in case it is not related ?

It looks unrelated, based on logs.
Merging to master and branch-3.2.

@asfgit asfgit closed this in 9afb407 Jan 29, 2022
asfgit pushed a commit that referenced this pull request Jan 29, 2022
… files once the shuffle is finalized

### What changes were proposed in this pull request?
This fixes the bugs that were reported in SPARK-37675 and SPARK-37793.
- Empty merged partitions were reported by the shuffle server to the driver.
- The push merged files were getting overwritten after a shuffle merge is finalized.
- Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.

### Why are the changes needed?
Changes are need to fix the bug.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Have added unit test.

Closes #35325 from otterc/SPARK-37675.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 9afb407)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
@mridulm
Copy link
Contributor

mridulm commented Jan 29, 2022

Merged to master, branch-3.2
Thanks for working on this @otterc
Thanks for reporting and helping test this @pan3793 !

kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
… files once the shuffle is finalized

### What changes were proposed in this pull request?
This fixes the bugs that were reported in SPARK-37675 and SPARK-37793.
- Empty merged partitions were reported by the shuffle server to the driver.
- The push merged files were getting overwritten after a shuffle merge is finalized.
- Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks.

### Why are the changes needed?
Changes are need to fix the bug.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Have added unit test.

Closes apache#35325 from otterc/SPARK-37675.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 9afb407)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants